Untitled

mail@pastecode.io avatar
unknown
plain_text
a year ago
21 kB
1
Indexable
Never

static thread_ret_t ggml_graph_compute_thread(void * data) {
    struct ggml_compute_state * state = (struct ggml_compute_state *) data;

    const int n_threads = state->shared->n_threads;

    while (true) {
        // Wait for work
        pthread_mutex_lock(&state->shared->mutex);
        while (!atomic_load(&state->shared->has_work) && !atomic_load(&state->shared->stop)) {
//            if (atomic_load(&state->shared->stop)) {
//                pthread_mutex_unlock(&state->shared->mutex);
//                return 0;
//            }
            pthread_cond_wait(&state->shared->cv, &state->shared->mutex);
//            ggml_lock_lock  (&state->shared->spin);
//            ggml_lock_unlock(&state->shared->spin);
        }
        pthread_mutex_unlock(&state->shared->mutex);

        // check if we should stop
        if (atomic_load(&state->shared->stop)) {
            break;
        }

        // Perform the work
        if (state->node && state->params.ith < state->params.nth) {
            ggml_compute_forward(&state->params, state->node);
            
            // Signal the main thread when the worker thread is done with its task
            pthread_mutex_lock(&state->shared->mutex);
            atomic_fetch_sub(&state->shared->n_ready, 1);
            pthread_cond_signal(&state->shared->cv);
            pthread_mutex_unlock(&state->shared->mutex);
            
//            state->node = NULL;
        } else {
            break;
        }
    }

    return 0;
}

void ggml_graph_compute(struct ggml_context * ctx, struct ggml_cgraph * cgraph) {
    const int n_threads = cgraph->n_threads;

    struct ggml_compute_state_shared state_shared = {
        /*.spin      =*/ GGML_LOCK_INITIALIZER,
        /*.n_threads =*/ n_threads,
        /*.n_ready   =*/ 0,
        /*.has_work  =*/ false,
        /*.stop      =*/ false,
        /*.mutex     =*/ PTHREAD_MUTEX_INITIALIZER,
        /*.cv        =*/ PTHREAD_COND_INITIALIZER,
    };

    // init mutex and cv
    pthread_mutex_init(&state_shared.mutex, NULL);
    pthread_cond_init (&state_shared.cv,    NULL);

    struct ggml_compute_state * workers = n_threads > 1 ? alloca(sizeof(struct ggml_compute_state)*(n_threads - 1)) : NULL;

    // create thread pool
    if (n_threads > 1) {
        ggml_lock_init(&state_shared.spin);

        atomic_store(&state_shared.has_work, true);

        for (int j = 0; j < n_threads - 1; j++) {
            workers[j] = (struct ggml_compute_state) {
                .thrd   = 0,
                .params = {
                    .type  = GGML_TASK_COMPUTE,
                    .ith   = j + 1,
                    .nth   = n_threads,
                    .wsize = cgraph->work ? ggml_nbytes(cgraph->work) : 0,
                    .wdata = cgraph->work ? cgraph->work->data : NULL,
                },
                .node   = NULL,
                .shared = &state_shared,
            };

            int rc = ggml_thread_create(&workers[j].thrd, NULL, ggml_graph_compute_thread, &workers[j]);
            GGML_ASSERT(rc == 0);
            UNUSED(rc);
        }
    }

    // initialize tasks + work buffer
    {
        size_t work_size = 0;

        // thread scheduling for the different operations
        for (int i = 0; i < cgraph->n_nodes; i++) {
            struct ggml_tensor * node = cgraph->nodes[i];

            switch (node->op) {
                case GGML_OP_CPY:
                case GGML_OP_DUP:
                    {
                        node->n_tasks = n_threads;

                        size_t cur = 0;
                        if (ggml_is_quantized(node->type)) {
                            cur = GGML_TYPE_SIZE[GGML_TYPE_F32] * node->ne[0] * n_threads;
                        }

                        work_size = MAX(work_size, cur);
                    } break;
                case GGML_OP_ADD:
                    {
                        node->n_tasks = n_threads;

                        size_t cur = 0;

                        if (ggml_is_quantized(node->src0->type)) {
                            cur = GGML_TYPE_SIZE[GGML_TYPE_F32] * node->src0->ne[0] * n_threads;
                        }

                        work_size = MAX(work_size, cur);
                    } break;
                case GGML_OP_SUB:
                case GGML_OP_MUL:
                case GGML_OP_DIV:
                case GGML_OP_SQR:
                case GGML_OP_SQRT:
                case GGML_OP_SUM:
                case GGML_OP_MEAN:
                case GGML_OP_REPEAT:
                case GGML_OP_ABS:
                case GGML_OP_SGN:
                case GGML_OP_NEG:
                case GGML_OP_STEP:
                case GGML_OP_RELU:
                    {
                        node->n_tasks = 1;
                    } break;
                case GGML_OP_GELU:
                    {
                        node->n_tasks = n_threads;
                    } break;
                case GGML_OP_SILU:
                    {
                        node->n_tasks = n_threads;
                    } break;
                case GGML_OP_NORM:
                case GGML_OP_RMS_NORM:
                    {
                        node->n_tasks = n_threads;
                    } break;
                case GGML_OP_MUL_MAT:
                    {
                        node->n_tasks = n_threads;

                        // TODO: use different scheduling for different matrix sizes
                        //const int nr0 = ggml_nrows(node->src0);
                        //const int nr1 = ggml_nrows(node->src1);

                        //node->n_tasks = MIN(n_threads, MAX(1, nr0/128));
                        //printf("nr0 = %8d, nr1 = %8d, nr0*nr1 = %8d, n_tasks = %d\n", nr0, nr1, nr0*nr1, node->n_tasks);

                        size_t cur = 0;

                        if (node->src0->type == GGML_TYPE_F16 && node->src1->type == GGML_TYPE_F32) {
#if defined(GGML_USE_ACCELERATE) || defined(GGML_USE_OPENBLAS) || defined(GGML_USE_CUBLAS)
                            if (ggml_compute_forward_mul_mat_use_blas(node->src0, node->src1, node)) {
                                node->n_tasks = 1; // TODO: this actually is doing nothing
                                                   //       the threads are still spinning
                                cur = GGML_TYPE_SIZE[GGML_TYPE_F32]*(node->src0->ne[0]*node->src0->ne[1]);
                                //printf("src0: ne0 = %d, ne1 = %d, ne = %d\n", node->src0->ne[0], node->src0->ne[1], node->src0->ne[0]*node->src0->ne[1]);
                                //printf("src1: ne0 = %d, ne1 = %d, ne = %d\n", node->src1->ne[0], node->src1->ne[1], node->src1->ne[0]*node->src1->ne[1]);
                                //printf("cur = %zu\n", cur);
                            } else {
                                cur = GGML_TYPE_SIZE[GGML_TYPE_F16]*ggml_nelements(node->src1);
                            }
#else
                            cur = GGML_TYPE_SIZE[GGML_TYPE_F16]*ggml_nelements(node->src1);
#endif
                        } else if (node->src0->type == GGML_TYPE_F32 && node->src1->type == GGML_TYPE_F32) {
                            cur = 0;
                        } else if (ggml_is_quantized(node->src0->type) && node->src1->type == GGML_TYPE_F32) {
#if defined(GGML_USE_ACCELERATE) || defined(GGML_USE_OPENBLAS) || defined(GGML_USE_CUBLAS)
                            if (ggml_compute_forward_mul_mat_use_blas(node->src0, node->src1, node)) {
                                node->n_tasks = 1;
                                cur = GGML_TYPE_SIZE[GGML_TYPE_F32]*(node->src0->ne[0]*node->src0->ne[1]);
                            } else
#endif
                            {
                                const enum ggml_type type_q = quantize_fns[node->src0->type].vec_dot_type;
                                cur = GGML_TYPE_SIZE[type_q]*ggml_nelements(node->src1)/GGML_BLCK_SIZE[type_q];
                            }
                        } else {
                            GGML_ASSERT(false);
                        }

                        work_size = MAX(work_size, cur);
                    } break;
                case GGML_OP_SCALE:
                    {
                        node->n_tasks = n_threads;
                    } break;
                case GGML_OP_CONT:
                case GGML_OP_RESHAPE:
                case GGML_OP_VIEW:
                case GGML_OP_PERMUTE:
                case GGML_OP_TRANSPOSE:
                case GGML_OP_GET_ROWS:
                case GGML_OP_DIAG_MASK_INF:
                    {
                        node->n_tasks = 1;
                    } break;
                case GGML_OP_SOFT_MAX:
                    {
                        node->n_tasks = n_threads;
                    } break;
                case GGML_OP_ROPE:
                    {
                        node->n_tasks = n_threads;
                    } break;
                case GGML_OP_CONV_1D_1S:
                case GGML_OP_CONV_1D_2S:
                    {
                        node->n_tasks = n_threads;

                        GGML_ASSERT(node->src0->ne[3] == 1);
                        GGML_ASSERT(node->src1->ne[2] == 1);
                        GGML_ASSERT(node->src1->ne[3] == 1);

                        size_t cur = 0;
                        const int nk = node->src0->ne[0];

                        if (node->src0->type == GGML_TYPE_F16 &&
                            node->src1->type == GGML_TYPE_F32) {
                            cur = sizeof(ggml_fp16_t)*(
                                    nk*ggml_up32(node->src0->ne[1])*node->src0->ne[2] +
                                    ( 2*(nk/2) + node->src1->ne[0])*node->src1->ne[1]
                                    );
                        } else if (node->src0->type == GGML_TYPE_F32 &&
                                   node->src1->type == GGML_TYPE_F32) {
                            cur = sizeof(float)*(
                                    nk*ggml_up32(node->src0->ne[1])*node->src0->ne[2] +
                                    ( 2*(nk/2) + node->src1->ne[0])*node->src1->ne[1]
                                    );
                        } else {
                            GGML_ASSERT(false);
                        }

                        work_size = MAX(work_size, cur);
                    } break;
                case GGML_OP_FLASH_ATTN:
                    {
                        node->n_tasks = n_threads;

                        size_t cur = 0;

                        const int64_t ne11 = ggml_up(node->src1->ne[1], GGML_SOFT_MAX_UNROLL);

                        if (node->src1->type == GGML_TYPE_F32) {
                            cur  = sizeof(float)*ne11*node->n_tasks; // TODO: this can become (n_tasks-1)
                            cur += sizeof(float)*ne11*node->n_tasks; // this is overestimated by x2
                        }

                        if (node->src1->type == GGML_TYPE_F16) {
                            cur  = sizeof(float)*ne11*node->n_tasks; // TODO: this can become (n_tasks-1)
                            cur += sizeof(float)*ne11*node->n_tasks; // this is overestimated by x2
                        }

                        work_size = MAX(work_size, cur);
                    } break;
                case GGML_OP_FLASH_FF:
                    {
                        node->n_tasks = n_threads;

                        size_t cur = 0;

                        if (node->src1->type == GGML_TYPE_F32) {
                            cur  = sizeof(float)*node->src1->ne[1]*node->n_tasks; // TODO: this can become (n_tasks-1)
                            cur += sizeof(float)*node->src1->ne[1]*node->n_tasks; // this is overestimated by x2
                        }

                        if (node->src1->type == GGML_TYPE_F16) {
                            cur  = sizeof(float)*node->src1->ne[1]*node->n_tasks; // TODO: this can become (n_tasks-1)
                            cur += sizeof(float)*node->src1->ne[1]*node->n_tasks; // this is overestimated by x2
                        }

                        work_size = MAX(work_size, cur);
                    } break;
                case GGML_OP_MAP_UNARY:
                case GGML_OP_MAP_BINARY:
                    {
                        node->n_tasks = 1;
                    } break;
                case GGML_OP_NONE:
                    {
                        node->n_tasks = 1;
                    } break;
                case GGML_OP_COUNT:
                    {
                        GGML_ASSERT(false);
                    } break;
            }
        }

        if (cgraph->work != NULL && work_size > cgraph->work_size) {
            GGML_ASSERT(false); // TODO: better handling
        }

        if (work_size > 0 && cgraph->work == NULL) {
            cgraph->work_size = work_size + CACHE_LINE_SIZE*(n_threads - 1);

            GGML_PRINT_DEBUG("%s: allocating work buffer for graph (%zu bytes)\n", __func__, cgraph->work_size);
            cgraph->work = ggml_new_tensor_1d(ctx, GGML_TYPE_I8, cgraph->work_size);
        }
    }

    const int64_t perf_start_cycles  = ggml_perf_cycles();
    const int64_t perf_start_time_us = ggml_perf_time_us();

    for (int i = 0; i < cgraph->n_nodes; i++) {
        GGML_PRINT_DEBUG_5("%s: %d/%d\n", __func__, i, cgraph->n_nodes);

        struct ggml_tensor * node = cgraph->nodes[i];

        // TODO: this could be used to avoid unnecessary computations, but it needs to be improved
        //if (node->grad == NULL && node->perf_runs > 0) {
        //    continue;
        //}

        const int64_t perf_node_start_cycles  = ggml_perf_cycles();
        const int64_t perf_node_start_time_us = ggml_perf_time_us();

        // INIT
        struct ggml_compute_params params = {
            /*.type  =*/ GGML_TASK_INIT,
            /*.ith   =*/ 0,
            /*.nth   =*/ node->n_tasks,
            /*.wsize =*/ cgraph->work ? ggml_nbytes(cgraph->work) : 0,
            /*.wdata =*/ cgraph->work ? cgraph->work->data : NULL,
        };

        ggml_compute_forward(&params, node);

        // COMPUTE
        if (node->n_tasks > 1) {
            if (atomic_fetch_add(&state_shared.n_ready, 1) == n_threads - 1) {
                atomic_store(&state_shared.has_work, false);
            }

            pthread_mutex_lock(&state_shared.mutex);
            while (atomic_load(&state_shared.has_work)) {
                pthread_cond_wait(&state_shared.cv, &state_shared.mutex);
//                ggml_lock_lock  (&state_shared.spin);
//                ggml_lock_unlock(&state_shared.spin);
            }
            pthread_mutex_unlock(&state_shared.mutex);

            // launch thread pool
            for (int j = 0; j < n_threads - 1; j++) {
                workers[j].params = (struct ggml_compute_params) {
                    .type  = GGML_TASK_COMPUTE,
                    .ith   = j + 1,
                    .nth   = node->n_tasks,
                    .wsize = cgraph->work ? ggml_nbytes(cgraph->work) : 0,
                    .wdata = cgraph->work ? cgraph->work->data : NULL,
                };
                workers[j].node = node;
            }

            atomic_fetch_sub(&state_shared.n_ready, 1);

            pthread_mutex_lock(&state_shared.mutex);
            while (atomic_load(&state_shared.n_ready) > 0) {
                pthread_cond_wait(&state_shared.cv, &state_shared.mutex);
//                ggml_lock_lock  (&state_shared.spin);
//                ggml_lock_unlock(&state_shared.spin);
            }
            pthread_mutex_unlock(&state_shared.mutex);

            atomic_store(&state_shared.has_work, true);
        }

        params.type = GGML_TASK_COMPUTE;
        ggml_compute_forward(&params, node);

        // wait for thread pool
        if (node->n_tasks > 1) {
            if (atomic_fetch_add(&state_shared.n_ready, 1) == n_threads - 1) {
                atomic_store(&state_shared.has_work, false);
            }

            pthread_mutex_lock(&state_shared.mutex);
            while (atomic_load(&state_shared.has_work)) {
                pthread_cond_wait(&state_shared.cv, &state_shared.mutex);
//                ggml_lock_lock  (&state_shared.spin);
//                ggml_lock_unlock(&state_shared.spin);
            }
            pthread_mutex_unlock(&state_shared.mutex);

            atomic_fetch_sub(&state_shared.n_ready, 1);

            pthread_mutex_lock(&state_shared.mutex);
            while (atomic_load(&state_shared.n_ready) != 0) {
                pthread_cond_wait(&state_shared.cv, &state_shared.mutex);
//                ggml_lock_lock  (&state_shared.spin);
//                ggml_lock_unlock(&state_shared.spin);
            }
            pthread_mutex_unlock(&state_shared.mutex);
        }

        // FINALIZE
        if (node->n_tasks > 1) {
            if (atomic_fetch_add(&state_shared.n_ready, 1) == n_threads - 1) {
                atomic_store(&state_shared.has_work, false);
            }

            pthread_mutex_lock(&state_shared.mutex);
            while (atomic_load(&state_shared.has_work)) {
                pthread_cond_wait(&state_shared.cv, &state_shared.mutex);
//                ggml_lock_lock  (&state_shared.spin);
//                ggml_lock_unlock(&state_shared.spin);
            }
            pthread_mutex_unlock(&state_shared.mutex);

            // launch thread pool
            for (int j = 0; j < n_threads - 1; j++) {
                workers[j].params = (struct ggml_compute_params) {
                    .type  = GGML_TASK_FINALIZE,
                    .ith   = j + 1,
                    .nth   = node->n_tasks,
                    .wsize = cgraph->work ? ggml_nbytes(cgraph->work) : 0,
                    .wdata = cgraph->work ? cgraph->work->data : NULL,
                };
                workers[j].node = node;
            }

            atomic_fetch_sub(&state_shared.n_ready, 1);

            pthread_mutex_lock(&state_shared.mutex);
            while (atomic_load(&state_shared.n_ready) > 0) {
                pthread_cond_wait(&state_shared.cv, &state_shared.mutex);
//                ggml_lock_lock  (&state_shared.spin);
//                ggml_lock_unlock(&state_shared.spin);
            }
            pthread_mutex_unlock(&state_shared.mutex);

            atomic_store(&state_shared.has_work, true);
        }

        params.type = GGML_TASK_FINALIZE;
        ggml_compute_forward(&params, node);

        // wait for thread pool
        if (node->n_tasks > 1) {
            if (atomic_fetch_add(&state_shared.n_ready, 1) == n_threads - 1) {
                atomic_store(&state_shared.has_work, false);
            }

            pthread_mutex_lock(&state_shared.mutex);
            while (atomic_load(&state_shared.has_work)) {
                pthread_cond_wait(&state_shared.cv, &state_shared.mutex);
//                ggml_lock_lock  (&state_shared.spin);
//                ggml_lock_unlock(&state_shared.spin);
            }
            pthread_mutex_unlock(&state_shared.mutex);

            atomic_fetch_sub(&state_shared.n_ready, 1);

            pthread_mutex_lock(&state_shared.mutex);
            while (atomic_load(&state_shared.n_ready) != 0) {
                pthread_cond_wait(&state_shared.cv, &state_shared.mutex);
//                ggml_lock_lock  (&state_shared.spin);
//                ggml_lock_unlock(&state_shared.spin);
            }
            pthread_mutex_unlock(&state_shared.mutex);
        }

        // performance stats (node)
        {
            int64_t perf_cycles_cur  = ggml_perf_cycles()  - perf_node_start_cycles;
            int64_t perf_time_us_cur = ggml_perf_time_us() - perf_node_start_time_us;

            node->perf_runs++;
            node->perf_cycles  += perf_cycles_cur;
            node->perf_time_us += perf_time_us_cur;
        }
    }

    // join thread pool
    if (n_threads > 1) {
        atomic_store(&state_shared.stop, true);
        atomic_store(&state_shared.has_work, true);

        for (int j = 0; j < n_threads - 1; j++) {
            int rc = ggml_thread_join(workers[j].thrd, NULL);
            GGML_ASSERT(rc == 0);
            UNUSED(rc);
        }

        pthread_mutex_destroy(&state_shared.mutex);
        pthread_cond_destroy(&state_shared.cv);

        ggml_lock_destroy(&state_shared.spin);
    }

    // performance stats (graph)
    {
        int64_t perf_cycles_cur  = ggml_perf_cycles()  - perf_start_cycles;
        int64_t perf_time_us_cur = ggml_perf_time_us() - perf_start_time_us;

        cgraph->perf_runs++;
        cgraph->perf_cycles  += perf_cycles_cur;
        cgraph->perf_time_us += perf_time_us_cur;

        GGML_PRINT_DEBUG("%s: perf (%d) - cpu = %.3f / %.3f ms, wall = %.3f / %.3f ms\n",
                __func__, cgraph->perf_runs,
                (double) perf_cycles_cur      / (double) ggml_cycles_per_ms(),
                (double) cgraph->perf_cycles  / (double) ggml_cycles_per_ms() / (double) cgraph->perf_runs,
                (double) perf_time_us_cur     / 1000.0,
                (double) cgraph->perf_time_us / 1000.0 / cgraph->perf_runs);
    }
}