Untitled
unknown
plain_text
a year ago
21 kB
1
Indexable
Never
static thread_ret_t ggml_graph_compute_thread(void * data) { struct ggml_compute_state * state = (struct ggml_compute_state *) data; const int n_threads = state->shared->n_threads; while (true) { // Wait for work pthread_mutex_lock(&state->shared->mutex); while (!atomic_load(&state->shared->has_work) && !atomic_load(&state->shared->stop)) { // if (atomic_load(&state->shared->stop)) { // pthread_mutex_unlock(&state->shared->mutex); // return 0; // } pthread_cond_wait(&state->shared->cv, &state->shared->mutex); // ggml_lock_lock (&state->shared->spin); // ggml_lock_unlock(&state->shared->spin); } pthread_mutex_unlock(&state->shared->mutex); // check if we should stop if (atomic_load(&state->shared->stop)) { break; } // Perform the work if (state->node && state->params.ith < state->params.nth) { ggml_compute_forward(&state->params, state->node); // Signal the main thread when the worker thread is done with its task pthread_mutex_lock(&state->shared->mutex); atomic_fetch_sub(&state->shared->n_ready, 1); pthread_cond_signal(&state->shared->cv); pthread_mutex_unlock(&state->shared->mutex); // state->node = NULL; } else { break; } } return 0; } void ggml_graph_compute(struct ggml_context * ctx, struct ggml_cgraph * cgraph) { const int n_threads = cgraph->n_threads; struct ggml_compute_state_shared state_shared = { /*.spin =*/ GGML_LOCK_INITIALIZER, /*.n_threads =*/ n_threads, /*.n_ready =*/ 0, /*.has_work =*/ false, /*.stop =*/ false, /*.mutex =*/ PTHREAD_MUTEX_INITIALIZER, /*.cv =*/ PTHREAD_COND_INITIALIZER, }; // init mutex and cv pthread_mutex_init(&state_shared.mutex, NULL); pthread_cond_init (&state_shared.cv, NULL); struct ggml_compute_state * workers = n_threads > 1 ? alloca(sizeof(struct ggml_compute_state)*(n_threads - 1)) : NULL; // create thread pool if (n_threads > 1) { ggml_lock_init(&state_shared.spin); atomic_store(&state_shared.has_work, true); for (int j = 0; j < n_threads - 1; j++) { workers[j] = (struct ggml_compute_state) { .thrd = 0, .params = { .type = GGML_TASK_COMPUTE, .ith = j + 1, .nth = n_threads, .wsize = cgraph->work ? ggml_nbytes(cgraph->work) : 0, .wdata = cgraph->work ? cgraph->work->data : NULL, }, .node = NULL, .shared = &state_shared, }; int rc = ggml_thread_create(&workers[j].thrd, NULL, ggml_graph_compute_thread, &workers[j]); GGML_ASSERT(rc == 0); UNUSED(rc); } } // initialize tasks + work buffer { size_t work_size = 0; // thread scheduling for the different operations for (int i = 0; i < cgraph->n_nodes; i++) { struct ggml_tensor * node = cgraph->nodes[i]; switch (node->op) { case GGML_OP_CPY: case GGML_OP_DUP: { node->n_tasks = n_threads; size_t cur = 0; if (ggml_is_quantized(node->type)) { cur = GGML_TYPE_SIZE[GGML_TYPE_F32] * node->ne[0] * n_threads; } work_size = MAX(work_size, cur); } break; case GGML_OP_ADD: { node->n_tasks = n_threads; size_t cur = 0; if (ggml_is_quantized(node->src0->type)) { cur = GGML_TYPE_SIZE[GGML_TYPE_F32] * node->src0->ne[0] * n_threads; } work_size = MAX(work_size, cur); } break; case GGML_OP_SUB: case GGML_OP_MUL: case GGML_OP_DIV: case GGML_OP_SQR: case GGML_OP_SQRT: case GGML_OP_SUM: case GGML_OP_MEAN: case GGML_OP_REPEAT: case GGML_OP_ABS: case GGML_OP_SGN: case GGML_OP_NEG: case GGML_OP_STEP: case GGML_OP_RELU: { node->n_tasks = 1; } break; case GGML_OP_GELU: { node->n_tasks = n_threads; } break; case GGML_OP_SILU: { node->n_tasks = n_threads; } break; case GGML_OP_NORM: case GGML_OP_RMS_NORM: { node->n_tasks = n_threads; } break; case GGML_OP_MUL_MAT: { node->n_tasks = n_threads; // TODO: use different scheduling for different matrix sizes //const int nr0 = ggml_nrows(node->src0); //const int nr1 = ggml_nrows(node->src1); //node->n_tasks = MIN(n_threads, MAX(1, nr0/128)); //printf("nr0 = %8d, nr1 = %8d, nr0*nr1 = %8d, n_tasks = %d\n", nr0, nr1, nr0*nr1, node->n_tasks); size_t cur = 0; if (node->src0->type == GGML_TYPE_F16 && node->src1->type == GGML_TYPE_F32) { #if defined(GGML_USE_ACCELERATE) || defined(GGML_USE_OPENBLAS) || defined(GGML_USE_CUBLAS) if (ggml_compute_forward_mul_mat_use_blas(node->src0, node->src1, node)) { node->n_tasks = 1; // TODO: this actually is doing nothing // the threads are still spinning cur = GGML_TYPE_SIZE[GGML_TYPE_F32]*(node->src0->ne[0]*node->src0->ne[1]); //printf("src0: ne0 = %d, ne1 = %d, ne = %d\n", node->src0->ne[0], node->src0->ne[1], node->src0->ne[0]*node->src0->ne[1]); //printf("src1: ne0 = %d, ne1 = %d, ne = %d\n", node->src1->ne[0], node->src1->ne[1], node->src1->ne[0]*node->src1->ne[1]); //printf("cur = %zu\n", cur); } else { cur = GGML_TYPE_SIZE[GGML_TYPE_F16]*ggml_nelements(node->src1); } #else cur = GGML_TYPE_SIZE[GGML_TYPE_F16]*ggml_nelements(node->src1); #endif } else if (node->src0->type == GGML_TYPE_F32 && node->src1->type == GGML_TYPE_F32) { cur = 0; } else if (ggml_is_quantized(node->src0->type) && node->src1->type == GGML_TYPE_F32) { #if defined(GGML_USE_ACCELERATE) || defined(GGML_USE_OPENBLAS) || defined(GGML_USE_CUBLAS) if (ggml_compute_forward_mul_mat_use_blas(node->src0, node->src1, node)) { node->n_tasks = 1; cur = GGML_TYPE_SIZE[GGML_TYPE_F32]*(node->src0->ne[0]*node->src0->ne[1]); } else #endif { const enum ggml_type type_q = quantize_fns[node->src0->type].vec_dot_type; cur = GGML_TYPE_SIZE[type_q]*ggml_nelements(node->src1)/GGML_BLCK_SIZE[type_q]; } } else { GGML_ASSERT(false); } work_size = MAX(work_size, cur); } break; case GGML_OP_SCALE: { node->n_tasks = n_threads; } break; case GGML_OP_CONT: case GGML_OP_RESHAPE: case GGML_OP_VIEW: case GGML_OP_PERMUTE: case GGML_OP_TRANSPOSE: case GGML_OP_GET_ROWS: case GGML_OP_DIAG_MASK_INF: { node->n_tasks = 1; } break; case GGML_OP_SOFT_MAX: { node->n_tasks = n_threads; } break; case GGML_OP_ROPE: { node->n_tasks = n_threads; } break; case GGML_OP_CONV_1D_1S: case GGML_OP_CONV_1D_2S: { node->n_tasks = n_threads; GGML_ASSERT(node->src0->ne[3] == 1); GGML_ASSERT(node->src1->ne[2] == 1); GGML_ASSERT(node->src1->ne[3] == 1); size_t cur = 0; const int nk = node->src0->ne[0]; if (node->src0->type == GGML_TYPE_F16 && node->src1->type == GGML_TYPE_F32) { cur = sizeof(ggml_fp16_t)*( nk*ggml_up32(node->src0->ne[1])*node->src0->ne[2] + ( 2*(nk/2) + node->src1->ne[0])*node->src1->ne[1] ); } else if (node->src0->type == GGML_TYPE_F32 && node->src1->type == GGML_TYPE_F32) { cur = sizeof(float)*( nk*ggml_up32(node->src0->ne[1])*node->src0->ne[2] + ( 2*(nk/2) + node->src1->ne[0])*node->src1->ne[1] ); } else { GGML_ASSERT(false); } work_size = MAX(work_size, cur); } break; case GGML_OP_FLASH_ATTN: { node->n_tasks = n_threads; size_t cur = 0; const int64_t ne11 = ggml_up(node->src1->ne[1], GGML_SOFT_MAX_UNROLL); if (node->src1->type == GGML_TYPE_F32) { cur = sizeof(float)*ne11*node->n_tasks; // TODO: this can become (n_tasks-1) cur += sizeof(float)*ne11*node->n_tasks; // this is overestimated by x2 } if (node->src1->type == GGML_TYPE_F16) { cur = sizeof(float)*ne11*node->n_tasks; // TODO: this can become (n_tasks-1) cur += sizeof(float)*ne11*node->n_tasks; // this is overestimated by x2 } work_size = MAX(work_size, cur); } break; case GGML_OP_FLASH_FF: { node->n_tasks = n_threads; size_t cur = 0; if (node->src1->type == GGML_TYPE_F32) { cur = sizeof(float)*node->src1->ne[1]*node->n_tasks; // TODO: this can become (n_tasks-1) cur += sizeof(float)*node->src1->ne[1]*node->n_tasks; // this is overestimated by x2 } if (node->src1->type == GGML_TYPE_F16) { cur = sizeof(float)*node->src1->ne[1]*node->n_tasks; // TODO: this can become (n_tasks-1) cur += sizeof(float)*node->src1->ne[1]*node->n_tasks; // this is overestimated by x2 } work_size = MAX(work_size, cur); } break; case GGML_OP_MAP_UNARY: case GGML_OP_MAP_BINARY: { node->n_tasks = 1; } break; case GGML_OP_NONE: { node->n_tasks = 1; } break; case GGML_OP_COUNT: { GGML_ASSERT(false); } break; } } if (cgraph->work != NULL && work_size > cgraph->work_size) { GGML_ASSERT(false); // TODO: better handling } if (work_size > 0 && cgraph->work == NULL) { cgraph->work_size = work_size + CACHE_LINE_SIZE*(n_threads - 1); GGML_PRINT_DEBUG("%s: allocating work buffer for graph (%zu bytes)\n", __func__, cgraph->work_size); cgraph->work = ggml_new_tensor_1d(ctx, GGML_TYPE_I8, cgraph->work_size); } } const int64_t perf_start_cycles = ggml_perf_cycles(); const int64_t perf_start_time_us = ggml_perf_time_us(); for (int i = 0; i < cgraph->n_nodes; i++) { GGML_PRINT_DEBUG_5("%s: %d/%d\n", __func__, i, cgraph->n_nodes); struct ggml_tensor * node = cgraph->nodes[i]; // TODO: this could be used to avoid unnecessary computations, but it needs to be improved //if (node->grad == NULL && node->perf_runs > 0) { // continue; //} const int64_t perf_node_start_cycles = ggml_perf_cycles(); const int64_t perf_node_start_time_us = ggml_perf_time_us(); // INIT struct ggml_compute_params params = { /*.type =*/ GGML_TASK_INIT, /*.ith =*/ 0, /*.nth =*/ node->n_tasks, /*.wsize =*/ cgraph->work ? ggml_nbytes(cgraph->work) : 0, /*.wdata =*/ cgraph->work ? cgraph->work->data : NULL, }; ggml_compute_forward(¶ms, node); // COMPUTE if (node->n_tasks > 1) { if (atomic_fetch_add(&state_shared.n_ready, 1) == n_threads - 1) { atomic_store(&state_shared.has_work, false); } pthread_mutex_lock(&state_shared.mutex); while (atomic_load(&state_shared.has_work)) { pthread_cond_wait(&state_shared.cv, &state_shared.mutex); // ggml_lock_lock (&state_shared.spin); // ggml_lock_unlock(&state_shared.spin); } pthread_mutex_unlock(&state_shared.mutex); // launch thread pool for (int j = 0; j < n_threads - 1; j++) { workers[j].params = (struct ggml_compute_params) { .type = GGML_TASK_COMPUTE, .ith = j + 1, .nth = node->n_tasks, .wsize = cgraph->work ? ggml_nbytes(cgraph->work) : 0, .wdata = cgraph->work ? cgraph->work->data : NULL, }; workers[j].node = node; } atomic_fetch_sub(&state_shared.n_ready, 1); pthread_mutex_lock(&state_shared.mutex); while (atomic_load(&state_shared.n_ready) > 0) { pthread_cond_wait(&state_shared.cv, &state_shared.mutex); // ggml_lock_lock (&state_shared.spin); // ggml_lock_unlock(&state_shared.spin); } pthread_mutex_unlock(&state_shared.mutex); atomic_store(&state_shared.has_work, true); } params.type = GGML_TASK_COMPUTE; ggml_compute_forward(¶ms, node); // wait for thread pool if (node->n_tasks > 1) { if (atomic_fetch_add(&state_shared.n_ready, 1) == n_threads - 1) { atomic_store(&state_shared.has_work, false); } pthread_mutex_lock(&state_shared.mutex); while (atomic_load(&state_shared.has_work)) { pthread_cond_wait(&state_shared.cv, &state_shared.mutex); // ggml_lock_lock (&state_shared.spin); // ggml_lock_unlock(&state_shared.spin); } pthread_mutex_unlock(&state_shared.mutex); atomic_fetch_sub(&state_shared.n_ready, 1); pthread_mutex_lock(&state_shared.mutex); while (atomic_load(&state_shared.n_ready) != 0) { pthread_cond_wait(&state_shared.cv, &state_shared.mutex); // ggml_lock_lock (&state_shared.spin); // ggml_lock_unlock(&state_shared.spin); } pthread_mutex_unlock(&state_shared.mutex); } // FINALIZE if (node->n_tasks > 1) { if (atomic_fetch_add(&state_shared.n_ready, 1) == n_threads - 1) { atomic_store(&state_shared.has_work, false); } pthread_mutex_lock(&state_shared.mutex); while (atomic_load(&state_shared.has_work)) { pthread_cond_wait(&state_shared.cv, &state_shared.mutex); // ggml_lock_lock (&state_shared.spin); // ggml_lock_unlock(&state_shared.spin); } pthread_mutex_unlock(&state_shared.mutex); // launch thread pool for (int j = 0; j < n_threads - 1; j++) { workers[j].params = (struct ggml_compute_params) { .type = GGML_TASK_FINALIZE, .ith = j + 1, .nth = node->n_tasks, .wsize = cgraph->work ? ggml_nbytes(cgraph->work) : 0, .wdata = cgraph->work ? cgraph->work->data : NULL, }; workers[j].node = node; } atomic_fetch_sub(&state_shared.n_ready, 1); pthread_mutex_lock(&state_shared.mutex); while (atomic_load(&state_shared.n_ready) > 0) { pthread_cond_wait(&state_shared.cv, &state_shared.mutex); // ggml_lock_lock (&state_shared.spin); // ggml_lock_unlock(&state_shared.spin); } pthread_mutex_unlock(&state_shared.mutex); atomic_store(&state_shared.has_work, true); } params.type = GGML_TASK_FINALIZE; ggml_compute_forward(¶ms, node); // wait for thread pool if (node->n_tasks > 1) { if (atomic_fetch_add(&state_shared.n_ready, 1) == n_threads - 1) { atomic_store(&state_shared.has_work, false); } pthread_mutex_lock(&state_shared.mutex); while (atomic_load(&state_shared.has_work)) { pthread_cond_wait(&state_shared.cv, &state_shared.mutex); // ggml_lock_lock (&state_shared.spin); // ggml_lock_unlock(&state_shared.spin); } pthread_mutex_unlock(&state_shared.mutex); atomic_fetch_sub(&state_shared.n_ready, 1); pthread_mutex_lock(&state_shared.mutex); while (atomic_load(&state_shared.n_ready) != 0) { pthread_cond_wait(&state_shared.cv, &state_shared.mutex); // ggml_lock_lock (&state_shared.spin); // ggml_lock_unlock(&state_shared.spin); } pthread_mutex_unlock(&state_shared.mutex); } // performance stats (node) { int64_t perf_cycles_cur = ggml_perf_cycles() - perf_node_start_cycles; int64_t perf_time_us_cur = ggml_perf_time_us() - perf_node_start_time_us; node->perf_runs++; node->perf_cycles += perf_cycles_cur; node->perf_time_us += perf_time_us_cur; } } // join thread pool if (n_threads > 1) { atomic_store(&state_shared.stop, true); atomic_store(&state_shared.has_work, true); for (int j = 0; j < n_threads - 1; j++) { int rc = ggml_thread_join(workers[j].thrd, NULL); GGML_ASSERT(rc == 0); UNUSED(rc); } pthread_mutex_destroy(&state_shared.mutex); pthread_cond_destroy(&state_shared.cv); ggml_lock_destroy(&state_shared.spin); } // performance stats (graph) { int64_t perf_cycles_cur = ggml_perf_cycles() - perf_start_cycles; int64_t perf_time_us_cur = ggml_perf_time_us() - perf_start_time_us; cgraph->perf_runs++; cgraph->perf_cycles += perf_cycles_cur; cgraph->perf_time_us += perf_time_us_cur; GGML_PRINT_DEBUG("%s: perf (%d) - cpu = %.3f / %.3f ms, wall = %.3f / %.3f ms\n", __func__, cgraph->perf_runs, (double) perf_cycles_cur / (double) ggml_cycles_per_ms(), (double) cgraph->perf_cycles / (double) ggml_cycles_per_ms() / (double) cgraph->perf_runs, (double) perf_time_us_cur / 1000.0, (double) cgraph->perf_time_us / 1000.0 / cgraph->perf_runs); } }