Multi-thread test
unknown
c_cpp
6 months ago
6.3 kB
9
Indexable
#include <iostream> #include <thread> #include <array> #include <vector> #include <atomic> #include <random> #include <barrier> #include <chrono> /*This is a mult-threading test meant to emulate something similar to filling a hash grid in parallel. The memory isn't tightly packed, unfortunately, but whatever. * This current configuration is setup to have 4 threads, each handling 25000 elements. it initializes each array with random numbers between 0 and 100 and then sorts them into "Containers" based on * the remainder of the number and the number of containers there are. in this case it's 1024. so if the remainder of the integer divided by 1024 is 0, it will be sorted into container 0, and if it's 1, it'll sort * into container 1, and so on and so forth. That is just done to test out the hash grid filling, which is multi-threaded, and works. * * So in short, 4 arrays get sorted and merged into 1 array in a multi-threaded fashion... and it does so by emulating how a hash grid would do it. * * Afterwards i compute a new value for each element in the "merged" array by looking at the average of it's neighboring elements. * To avoid race conditions, i store the result into a "Write" array. the results of the write array are then mem copied. * * The algorithm goes like this: * * Merge -> loop n-iterations { syncpoint -> Average -> syncpoint -> copy Write to Read } -> finish. */ // Number of elements to be added from each vector constexpr int VECTOR_SIZE = 25000; constexpr int NUM_CONTAINERS = 32*32; constexpr int CONTAINER_SPACES = 16; constexpr int AVERAGING_ITERATIONS = 10; constexpr int NUM_THREADS = 4; // Shared vector to hold merged elements std::vector<float> mergedVector; std::vector<float> writeVector; std::array<std::atomic<int>, NUM_CONTAINERS> containerCounters; // Generates random numbers between min and max. void generateRandomNumbers(std::vector<float>& vec, float minValue, float maxValue) { std::random_device rd; std::mt19937 gen(rd()); std::uniform_real_distribution<> dis(minValue, maxValue); // Fill vector with random numbers for(int i = 0; i < VECTOR_SIZE; ++i) { vec[i] = static_cast<float>(dis(gen)); } } // Function to merge vectors into a shared vector void mergeVectors(const std::vector<float>& vec, std::array<std::atomic<int>, NUM_CONTAINERS>& counters) { for (const float& elem : vec) { // Sort the numbers into specific buckets based on remainder const int containerIndex = static_cast<int>(floor(fmod(elem, NUM_CONTAINERS))); //get the current container count and increment it. if it's less than available spaces, we'll use container count as an index //to store the element. const int containerCount = counters[containerIndex].fetch_add(1, std::memory_order_relaxed); if(containerCount < CONTAINER_SPACES) { const int idx = containerIndex * CONTAINER_SPACES + containerCount; mergedVector[idx] = elem; } } } // Averages elements of the "read" vector and stores them in the "write" vector. void averageVectors(const std::vector<float>& read_vec, std::vector<float>& write_vec, int startIndex, int endIndex){ for(int i = startIndex; i < endIndex; ++i) { const int leftNeighbor = std::max(0, i - 1); const int rightNeighbor = std::min(i + 1, NUM_CONTAINERS * CONTAINER_SPACES - 1); write_vec[i] = (read_vec[leftNeighbor] + read_vec[rightNeighbor]) * 0.5f; } } // copies data from Write back to Read. void blitVectors(std::vector<float>& read_vec, std::vector<float>& write_vec, int startIndex, int endIndex) { std::copy(write_vec.begin() + startIndex, write_vec.begin() + endIndex, read_vec.begin() + startIndex); } int main() { // Initialize the original vectors std::vector<float> vec1(VECTOR_SIZE); std::vector<float> vec2(VECTOR_SIZE); std::vector<float> vec3(VECTOR_SIZE); std::vector<float> vec4(VECTOR_SIZE); generateRandomNumbers(vec1, 0.f, 100.f); generateRandomNumbers(vec2, 0.f, 100.f); generateRandomNumbers(vec3, 0.f, 100.f); generateRandomNumbers(vec4, 0.f, 100.f); // Reserve space in the merged vector to avoid resizing during merging mergedVector.resize(static_cast<int>(NUM_CONTAINERS * CONTAINER_SPACES)); writeVector.resize(static_cast<int>(NUM_CONTAINERS * CONTAINER_SPACES)); // Initialize container counters. this is an array of atomic ints, crucial for the hashing algorithm to work. for(int i = 0; i < NUM_CONTAINERS; ++i) { containerCounters[i] = 0; } //Create a barrier to synchronize after merging std::barrier sync_point_merge(NUM_THREADS, [&]() noexcept{}); //The actual thread task to run. auto threadTask = [&](std::vector<float>& vec, int start, int end) { mergeVectors(std::ref(vec), std::ref(containerCounters)); for(int i = 0; i < AVERAGING_ITERATIONS; ++i) { sync_point_merge.arrive_and_wait(); averageVectors(mergedVector, writeVector, start, end); sync_point_merge.arrive_and_wait(); blitVectors(mergedVector, writeVector, start, end); } }; // Measure the total execution time of the algorithm auto total_start = std::chrono::high_resolution_clock::now(); // Create threads const int elementCount = NUM_CONTAINERS * CONTAINER_SPACES / NUM_THREADS; std::thread t1(threadTask, std::ref(vec1), 0, elementCount - 1); std::thread t2(threadTask, std::ref(vec2), elementCount, elementCount * 2 - 1); std::thread t3(threadTask, std::ref(vec3), elementCount * 2, elementCount * 3 - 1); std::thread t4(threadTask, std::ref(vec4), elementCount * 3, elementCount * 4 - 1); // Join all threads t1.join(); t2.join(); t3.join(); t4.join(); auto total_end = std::chrono::high_resolution_clock::now(); std::chrono::duration<double, std::milli> total_duration = total_end - total_start; std::cout << "Total program execution time: " << total_duration.count() << " ms\n"; return 0; }
Editor is loading...
Leave a Comment