Untitled
unknown
plain_text
a month ago
12 kB
2
Indexable
Never
#include <stdio.h> #include <stdlib.h> #include <string.h> #include <unistd.h> #include <errno.h> #include <netdb.h> #include <sys/socket.h> #include <netinet/in.h> #include <arpa/inet.h> #include <infiniband/verbs.h> #define MAX_POLL_CQ_TIMEOUT 2000 #define MSG_SIZE 64 struct config_t { const char *dev_name; const char *server_name; u_int32_t tcp_port; int ib_port; int gid_idx; }; struct qp_info { uint32_t qp_num; uint16_t lid; uint8_t gid[16]; }; struct connection { struct ibv_context *context; struct ibv_pd *pd; struct ibv_cq *cq; struct ibv_qp *qp; struct ibv_mr *mr; char *buf; int size; int sock; struct qp_info local_qp_info; struct qp_info remote_qp_info; }; static int modify_qp_to_init(struct ibv_qp *qp, int ib_port) { struct ibv_qp_attr attr; int flags; int rc; memset(&attr, 0, sizeof(attr)); attr.qp_state = IBV_QPS_INIT; attr.port_num = ib_port; attr.pkey_index = 0; attr.qp_access_flags = IBV_ACCESS_LOCAL_WRITE | IBV_ACCESS_REMOTE_READ | IBV_ACCESS_REMOTE_WRITE; flags = IBV_QP_STATE | IBV_QP_PKEY_INDEX | IBV_QP_PORT | IBV_QP_ACCESS_FLAGS; rc = ibv_modify_qp(qp, &attr, flags); if (rc) { fprintf(stderr, "Failed to modify QP to INIT: %s\n", strerror(rc)); return rc; } return 0; } static int modify_qp_to_rtr(struct ibv_qp *qp, int ib_port, uint32_remote_qpn, uint16_t dlid, uint8_t *dgid) { struct ibv_qp_attr attr; int flags; int rc; memset(&attr, 0, sizeof(attr)); attr.qp_state = IBV_QPS_RTR; attr.path_mtu = IBV_MTU_1024; attr.dest_qp_num = remote_qpn; attr.rq_psn = 0; attr.max_dest_rd_atomic = 1; attr.min_rnr_timer = 0x12; attr.ah_attr.is_global = 0; attr.ah_attr.dlid = dlid; attr.ah_attr.sl = 0; attr.ah_attr.src_path_bits = 0; attr.ah_attr.port_num = ib_port; if (dgid) { attr.ah_attr.is_global = 1; attr.ah_attr.grh.hop_limit = 1; memcpy(&attr.ah_attr.grh.dgid, dgid, 16); attr.ah_attr.grh.sgid_index = 0; } flags = IBV_QP_STATE | IBV_QP_AV | IBV_QP_PATH_MTU | IBV_QP_DEST_QPN | IBV_QP_RQ_PSN | IBV_QP_MAX_DEST_RD_ATOMIC | IBV_QP_MIN_RNR_TIMER; rc = ibv_modify_qp(qp, &attr, flags); if (rc) { fprintf(stderr, "Failed to modify QP to RTR: %s\n", strerror(rc)); return rc; } return 0; } static int modify_qp_to_rts(struct ibv_qp *qp) { struct ibv_qp_attr attr; int flags; int rc; memset(&attr, 0, sizeof(attr)); attr.qp_state = IBV_QPS_RTS; attr.timeout = 0x12; attr.retry_cnt = 7; attr.rnr_retry = 7; attr.sq_psn = 0; attr.max_rd_atomic = 1; flags = IBV_QP_STATE | IBV_QP_TIMEOUT | IBV_QP_RETRY_CNT | IBV_QP_RNR_RETRY | IBV_QP_SQ_PSN | IBV_QP_MAX_QP_RD_ATOMIC; rc = ibv_modify_qp(qp, &attr, flags); if (rc) { fprintf(stderr, "Failed to modify QP to RTS: %s\n", strerror(rc)); return rc; } return 0; } static int post_send(struct connection *conn) { struct ibv_send_wr wr, *bad_wr = NULL; struct ibv_sge sge; memset(&wr, 0, sizeof(wr)); wr.wr_id = (uintptr_t)conn; wr.opcode = IBV_WR_SEND; wr.sg_list = &sge; wr.num_sge = 1; wr.send_flags = IBV_SEND_SIGNALED; sge.addr = (uintptr_t)conn->buf; sge.length = MSG_SIZE; sge.lkey = conn->mr->lkey; int ret = ibv_post_send(conn->qp, &wr, &bad_wr); if (ret) { fprintf(stderr, "Failed to post send: %s\n", strerror(ret)); return ret; } printf("Send request posted successfully\n"); return 0; } static int post_recv(struct connection *conn) { struct ibv_recv_wr wr, *bad_wr = NULL; struct ibv_sge sge; memset(&wr, 0, sizeof(wr)); wr.wr_id = (uintptr_t)conn; wr.sg_list = &sge; wr.num_sge = 1; sge.addr = (uintptr_t)conn->buf; sge.length = MSG_SIZE; sge.lkey = conn->mr->lkey; int ret = ibv_post_recv(conn->qp, &wr, &bad_wr); if (ret) { fprintf(stderr, "Failed to post recv: %s\n", strerror(ret)); return ret; } printf("Receive request posted successfully\n"); return 0; } static int setup_connection(struct connection *conn, struct config_t *config) { struct ibv_device **dev_list; struct ibv_device *ib_dev; int num_devices; dev_list = ibv_get_device_list(&num_devices); if (!dev_list) { fprintf(stderr, "Failed to get IB devices list: %s\n", strerror(errno)); return 1; } ib_dev = *dev_list; if (!ib_dev) { fprintf(stderr, "No IB devices found\n"); return 1; } printf("Opening device: %s\n", ibv_get_device_name(ib_dev)); conn->context = ibv_open_device(ib_dev); if (!conn->context) { fprintf(stderr, "Failed to open device: %s\n", strerror(errno)); return 1; } conn->pd = ibv_alloc_pd(conn->context); if (!conn->pd) { fprintf(stderr, "Failed to allocate PD: %s\n", strerror(errno)); return 1; } conn->cq = ibv_create_cq(conn->context, 10, NULL, NULL, 0); if (!conn->cq) { fprintf(stderr, "Failed to create CQ: %s\n", strerror(errno)); return 1; } struct ibv_qp_init_attr qp_init_attr = { .send_cq = conn->cq, .recv_cq = conn->cq, .qp_type = IBV_QPT_RC, .cap = { .max_send_wr = 10, .max_recv_wr = 10, .max_send_sge = 1, .max_recv_sge = 1 } }; conn->qp = ibv_create_qp(conn->pd, &qp_init_attr); if (!conn->qp) { fprintf(stderr, "Failed to create QP: %s\n", strerror(errno)); return 1; } conn->size = MSG_SIZE; conn->buf = malloc(conn->size); if (!conn->buf) { fprintf(stderr, "Failed to allocate memory: %s\n", strerror(errno)); return 1; } conn->mr = ibv_reg_mr(conn->pd, conn->buf, conn->size, IBV_ACCESS_LOCAL_WRITE | IBV_ACCESS_REMOTE_WRITE); if (!conn->mr) { fprintf(stderr, "Failed to register MR: %s\n", strerror(errno)); return 1; } if (modify_qp_to_init(conn->qp, config->ib_port)) { fprintf(stderr, "Failed to modify QP to INIT\n"); return 1; } // Get local QP info conn->local_qp_info.qp_num = conn->qp->qp_num; struct ibv_port_attr port_attr; if (ibv_query_port(conn->context, config->ib_port, &port_attr)) { fprintf(stderr, "Failed to query port attributes\n"); return 1; } conn->local_qp_info.lid = port_attr.lid; // For simplicity, we're not using GID in this example memset(conn->local_qp_info.gid, 0, 16); printf("Local QP number: %u\n", conn->local_qp_info.qp_num); printf("Local LID: %u\n", conn->local_qp_info.lid); printf("Connection setup completed successfully\n"); return 0; } static int connect_qp(struct connection *conn) { // Exchange QP information if (write(conn->sock, &conn->local_qp_info, sizeof(conn->local_qp_info)) != sizeof(conn->local_qp_info)) { fprintf(stderr, "Failed to send local QP info\n"); return 1; } if (read(conn->sock, &conn->remote_qp_info, sizeof(conn->remote_qp_info)) != sizeof(conn->remote_qp_info)) { fprintf(stderr, "Failed to receive remote QP info\n"); return 1; } printf("Remote QP number: %u\n", conn->remote_qp_info.qp_num); printf("Remote LID: %u\n", conn->remote_qp_info.lid); // Modify QP to RTR if (modify_qp_to_rtr(conn->qp, conn->config.ib_port, conn->remote_qp_info.qp_num, conn->remote_qp_info.lid, NULL)) { fprintf(stderr, "Failed to modify QP to RTR\n"); return 1; } // Modify QP to RTS if (modify_qp_to_rts(conn->qp)) { fprintf(stderr, "Failed to modify QP to RTS\n"); return 1; } printf("QP connected successfully\n"); return 0; } static int setup_socket(struct connection *conn, struct config_t *config) { struct sockaddr_in addr; struct addrinfo *res; int ret; conn->sock = socket(AF_INET, SOCK_STREAM, 0); if (conn->sock < 0) { fprintf(stderr, "Failed to create socket: %s\n", strerror(errno)); return 1; } memset(&addr, 0, sizeof(addr)); addr.sin_family = AF_INET; addr.sin_port = htons(config->tcp_port); if (config->server_name) { // Client ret = getaddrinfo(config->server_name, NULL, NULL, &res); if (ret) { fprintf(stderr, "getaddrinfo failed: %s\n", gai_strerror(ret)); return 1; } memcpy(&addr.sin_addr, &((struct sockaddr_in *)res->ai_addr)->sin_addr, sizeof(addr.sin_addr)); freeaddrinfo(res); if (connect(conn->sock, (struct sockaddr *)&addr, sizeof(addr))) { fprintf(stderr, "Failed to connect to server: %s\n", strerror(errno)); return 1; } } else { // Server addr.sin_addr.s_addr = INADDR_ANY; if (bind(conn->sock, (struct sockaddr *)&addr, sizeof(addr))) { fprintf(stderr, "Failed to bind socket: %s\n", strerror(errno)); return 1; } listen(conn->sock, 1); int client_sock = accept(conn->sock, NULL, NULL); if (client_sock < 0) { fprintf(stderr, "Failed to accept client connection: %s\n", strerror(errno)); return 1; } close(conn->sock); conn->sock = client_sock; } printf("Socket connection established\n"); return 0; } int main(int argc, char *argv[]) { struct connection conn; struct config_t config = { .dev_name = NULL, .server_name = NULL, .tcp_port = 18515, .ib_port = 1, .gid_idx = -1 }; if (argc > 1) config.server_name = argv[1]; memset(&conn, 0, sizeof(conn)); conn.config = config; if (setup_socket(&conn, &config)) { fprintf(stderr, "Failed to setup socket connection\n"); return 1; } if (setup_connection(&conn, &config)) { fprintf(stderr, "Failed to setup RDMA connection\n"); return 1; } if (connect_qp(&conn)) { fprintf(stderr, "Failed to connect QPs\n"); return 1; } if (config.server_name) { // Client mode strcpy(conn.buf, "Hello from client!"); printf("Sending message: %s\n", conn.buf); if (post_send(&conn)) { fprintf(stderr, "Failed to post send\n"); return 1; } printf("Message sent: %s\n", conn.buf); } else { // Server mode printf("Server mode: Waiting for incoming message\n"); if (post_recv(&conn)) { fprintf(stderr, "Failed to post receive\n"); return 1; } printf("Receive request posted, waiting for message...\n"); } struct ibv_wc wc; int ne; printf("Polling completion queue...\n"); do { ne = ibv_poll_cq(conn.cq, 1, &wc); } while (ne == 0); if (ne < 0) { fprintf(stderr, "Failed to poll CQ: %s\n", strerror(errno)); return 1; } if (wc.status != IBV_WC_SUCCESS) { fprintf(stderr, "Work completion failed with status %s (%d)\n", ibv_wc_status_str(wc.status), wc.status); fprintf(stderr, "WC details: wr_id=%lu, opcode=%d, vendor_err=%d\n", wc.wr_id, wc.opcode, wc.vendor_err); return 1; } printf("Work completion status: SUCCESS\n"); if (!config.server_name) { printf("Message received: %s\n", conn.buf); } // Clean up ibv_destroy_qp(conn.qp); ibv_destroy_cq(conn.cq); ibv_dereg_mr(conn.mr); ibv_dealloc_pd(conn.pd); ibv_close_device(conn.context); free(conn.buf); close(conn.sock); return 0; }
Leave a Comment