Untitled

mail@pastecode.io avatar
unknown
plain_text
20 days ago
8.9 kB
4
Indexable
Never
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <errno.h>
#include <infiniband/verbs.h>
#include <arpa/inet.h>

#define MAX_POLL_CQ_TIMEOUT 2000
#define MSG_SIZE 64

struct config_t {
    const char *dev_name;
    const char *server_name;
    u_int32_t tcp_port;
    int ib_port;
    int gid_idx;
};

struct connection {
    struct ibv_context *context;
    struct ibv_pd *pd;
    struct ibv_cq *cq;
    struct ibv_qp *qp;
    struct ibv_mr *mr;
    char *buf;
    int size;
    int sock;
};

static int modify_qp_to_init(struct ibv_qp *qp, int ib_port)
{
    struct ibv_qp_attr attr;
    int flags;
    int rc;

    memset(&attr, 0, sizeof(attr));

    attr.qp_state = IBV_QPS_INIT;
    attr.port_num = ib_port;
    attr.pkey_index = 0;
    attr.qp_access_flags = IBV_ACCESS_LOCAL_WRITE | IBV_ACCESS_REMOTE_READ | IBV_ACCESS_REMOTE_WRITE;

    flags = IBV_QP_STATE | IBV_QP_PKEY_INDEX | IBV_QP_PORT | IBV_QP_ACCESS_FLAGS;

    rc = ibv_modify_qp(qp, &attr, flags);
    if (rc) {
        fprintf(stderr, "Failed to modify QP to INIT: %s\n", strerror(rc));
        return rc;
    }

    return 0;
}

static int modify_qp_to_rtr(struct ibv_qp *qp, int ib_port, int dl_psn, uint32_t remote_qpn, uint16_t dlid, union ibv_gid *dgid)
{
    struct ibv_qp_attr attr;
    int flags;
    int rc;

    memset(&attr, 0, sizeof(attr));

    attr.qp_state = IBV_QPS_RTR;
    attr.path_mtu = IBV_MTU_1024;
    attr.dest_qp_num = remote_qpn;
    attr.rq_psn = dl_psn;
    attr.max_dest_rd_atomic = 1;
    attr.min_rnr_timer = 0x12;
    attr.ah_attr.is_global = 0;
    attr.ah_attr.dlid = dlid;
    attr.ah_attr.sl = 0;
    attr.ah_attr.src_path_bits = 0;
    attr.ah_attr.port_num = ib_port;

    if (dgid) {
        attr.ah_attr.is_global = 1;
        attr.ah_attr.grh.hop_limit = 1;
        attr.ah_attr.grh.dgid = *dgid;
        attr.ah_attr.grh.sgid_index = 0;
    }

    flags = IBV_QP_STATE | IBV_QP_AV | IBV_QP_PATH_MTU | IBV_QP_DEST_QPN |
            IBV_QP_RQ_PSN | IBV_QP_MAX_DEST_RD_ATOMIC | IBV_QP_MIN_RNR_TIMER;

    rc = ibv_modify_qp(qp, &attr, flags);
    if (rc) {
        fprintf(stderr, "Failed to modify QP to RTR: %s\n", strerror(rc));
        return rc;
    }

    return 0;
}

static int modify_qp_to_rts(struct ibv_qp *qp)
{
    struct ibv_qp_attr attr;
    int flags;
    int rc;

    memset(&attr, 0, sizeof(attr));

    attr.qp_state = IBV_QPS_RTS;
    attr.timeout = 0x12;
    attr.retry_cnt = 7;  // 增加重试次数
    attr.rnr_retry = 7;  // 增加 RNR 重试次数
    attr.sq_psn = 0;
    attr.max_rd_atomic = 1;

    flags = IBV_QP_STATE | IBV_QP_TIMEOUT | IBV_QP_RETRY_CNT |
            IBV_QP_RNR_RETRY | IBV_QP_SQ_PSN | IBV_QP_MAX_QP_RD_ATOMIC;

    rc = ibv_modify_qp(qp, &attr, flags);
    if (rc) {
        fprintf(stderr, "Failed to modify QP to RTS: %s\n", strerror(rc));
        return rc;
    }

    return 0;
}

static int post_send(struct connection *conn)
{
    struct ibv_send_wr wr, *bad_wr = NULL;
    struct ibv_sge sge;

    memset(&wr, 0, sizeof(wr));

    wr.wr_id = (uintptr_t)conn;
    wr.opcode = IBV_WR_SEND;
    wr.sg_list = &sge;
    wr.num_sge = 1;
    wr.send_flags = IBV_SEND_SIGNALED;

    sge.addr = (uintptr_t)conn->buf;
    sge.length = MSG_SIZE;
    sge.lkey = conn->mr->lkey;

    int ret = ibv_post_send(conn->qp, &wr, &bad_wr);
    if (ret) {
        fprintf(stderr, "Failed to post send: %s\n", strerror(ret));
        return ret;
    }

    printf("Send request posted successfully\n");
    return 0;
}

static int post_recv(struct connection *conn)
{
    struct ibv_recv_wr wr, *bad_wr = NULL;
    struct ibv_sge sge;

    memset(&wr, 0, sizeof(wr));

    wr.wr_id = (uintptr_t)conn;
    wr.sg_list = &sge;
    wr.num_sge = 1;

    sge.addr = (uintptr_t)conn->buf;
    sge.length = MSG_SIZE;
    sge.lkey = conn->mr->lkey;

    int ret = ibv_post_recv(conn->qp, &wr, &bad_wr);
    if (ret) {
        fprintf(stderr, "Failed to post recv: %s\n", strerror(ret));
        return ret;
    }

    printf("Receive request posted successfully\n");
    return 0;
}

static int setup_connection(struct connection *conn, struct config_t *config)
{
    struct ibv_device **dev_list;
    struct ibv_device *ib_dev;
    int num_devices;

    dev_list = ibv_get_device_list(&num_devices);
    if (!dev_list) {
        fprintf(stderr, "Failed to get IB devices list: %s\n", strerror(errno));
        return 1;
    }

    ib_dev = *dev_list;
    if (!ib_dev) {
        fprintf(stderr, "No IB devices found\n");
        return 1;
    }

    printf("Opening device: %s\n", ibv_get_device_name(ib_dev));
    conn->context = ibv_open_device(ib_dev);
    if (!conn->context) {
        fprintf(stderr, "Failed to open device: %s\n", strerror(errno));
        return 1;
    }

    conn->pd = ibv_alloc_pd(conn->context);
    if (!conn->pd) {
        fprintf(stderr, "Failed to allocate PD: %s\n", strerror(errno));
        return 1;
    }

    conn->cq = ibv_create_cq(conn->context, 10, NULL, NULL, 0);
    if (!conn->cq) {
        fprintf(stderr, "Failed to create CQ: %s\n", strerror(errno));
        return 1;
    }

    struct ibv_qp_init_attr qp_init_attr = {
        .send_cq = conn->cq,
        .recv_cq = conn->cq,
        .qp_type = IBV_QPT_RC,
        .cap = {
            .max_send_wr = 10,
            .max_recv_wr = 10,
            .max_send_sge = 1,
            .max_recv_sge = 1
        }
    };

    conn->qp = ibv_create_qp(conn->pd, &qp_init_attr);
    if (!conn->qp) {
        fprintf(stderr, "Failed to create QP: %s\n", strerror(errno));
        return 1;
    }

    conn->size = MSG_SIZE;
    conn->buf = malloc(conn->size);
    if (!conn->buf) {
        fprintf(stderr, "Failed to allocate memory: %s\n", strerror(errno));
        return 1;
    }

    conn->mr = ibv_reg_mr(conn->pd, conn->buf, conn->size, IBV_ACCESS_LOCAL_WRITE | IBV_ACCESS_REMOTE_WRITE);
    if (!conn->mr) {
        fprintf(stderr, "Failed to register MR: %s\n", strerror(errno));
        return 1;
    }

    if (modify_qp_to_init(conn->qp, config->ib_port)) {
        fprintf(stderr, "Failed to modify QP to INIT\n");
        return 1;
    }

    // For simplicity, we're using dummy values for remote QP number and LID
    // In a real application, you would exchange this information with the remote peer
    if (modify_qp_to_rtr(conn->qp, config->ib_port, 0, conn->qp->qp_num, 1, NULL)) {
        fprintf(stderr, "Failed to modify QP to RTR\n");
        return 1;
    }

    if (modify_qp_to_rts(conn->qp)) {
        fprintf(stderr, "Failed to modify QP to RTS\n");
        return 1;
    }

    printf("Connection setup completed successfully\n");
    return 0;
}

int main(int argc, char *argv[])
{
    struct connection conn;
    struct config_t config = {
        .dev_name = NULL,
        .server_name = NULL,
        .tcp_port = 18515,
        .ib_port = 1,
        .gid_idx = -1
    };

    if (argc > 1) config.server_name = argv[1];

    if (setup_connection(&conn, &config)) {
        fprintf(stderr, "Failed to setup connection\n");
        return 1;
    }

    if (config.server_name) {
        // Client mode
        strcpy(conn.buf, "Hello from client!");
        printf("Sending message: %s\n", conn.buf);
        if (post_send(&conn)) {
            fprintf(stderr, "Failed to post send\n");
            return 1;
        }
        printf("Message sent: %s\n", conn.buf);
    } else {
        // Server mode
        printf("Server mode: Waiting for incoming message\n");
        if (post_recv(&conn)) {
            fprintf(stderr, "Failed to post receive\n");
            return 1;
        }
        printf("Receive request posted, waiting for message...\n");
    }

    struct ibv_wc wc;
    int ne;

    printf("Polling completion queue...\n");
    do {
        ne = ibv_poll_cq(conn.cq, 1, &wc);
    } while (ne == 0);

    if (ne < 0) {
        fprintf(stderr, "Failed to poll CQ: %s\n", strerror(errno));
        return 1;
    }

    if (wc.status != IBV_WC_SUCCESS) {
        fprintf(stderr, "Work completion failed with status %s (%d)\n", 
                ibv_wc_status_str(wc.status), wc.status);
        fprintf(stderr, "WC details: wr_id=%lu, opcode=%d, vendor_err=%d\n", 
                wc.wr_id, wc.opcode, wc.vendor_err);
        return 1;
    }

    printf("Work completion status: SUCCESS\n");

    if (!config.server_name) {
        printf("Message received: %s\n", conn.buf);
    }

    // Clean up
    ibv_destroy_qp(conn.qp);
    ibv_destroy_cq(conn.cq);
    ibv_dereg_mr(conn.mr);
    ibv_dealloc_pd(conn.pd);
    ibv_close_device(conn.context);
    free(conn.buf);

    return 0;
}
Leave a Comment