Untitled
unknown
plain_text
6 months ago
12 kB
2
Indexable
Never
#define _XOPEN_SOURCE 700 // Headers #include <stdio.h> #include <stdlib.h> #include <stdbool.h> #include <string.h> #include <pthread.h> #include <unistd.h> #include <signal.h> #include <sys/wait.h> // Defines #define NUM_CORES 4 #define MAX_BUFFER 256 #define WATCHDOG_TIMEOUT 2 // A structure that stores data for each core typedef struct { unsigned long user; // Time used for user tasks unsigned long nice; // Time used for "nice" tasks unsigned long system; // Time used for system tasks unsigned long idle; // Processor idle time unsigned long irq; // Time used for servicing interrupts unsigned long softirq; // Time used for servicing software interrupts unsigned long steal; // Stolen time (virtualization) } CPU_Data_t; // A structure that stores data for each thread #pragma pack(1) // Disable padding typedef struct { CPU_Data_t prev_data; CPU_Data_t curr_data; pthread_t reader_thread_id; pthread_t analyzer_thread_id; __int32_t thread_index; bool is_alive; // Thread status for watchdog thread } Thread_Data_t; #pragma pack(4) // Reset to previous alignment rule (enable padding) // Logger thread data typedef struct { pthread_t logger_thread_id; FILE *log_file; pthread_mutex_t log_file_lock; } Logger_Data_t; typedef struct node { float value; struct node *next; } Node; typedef struct queue { Node *head; Node *tail; int count; int capacity; } Queue; static float printData[NUM_CORES]; // Shared data structure for CPU usage data static pthread_mutex_t printDataLock = PTHREAD_MUTEX_INITIALIZER; // Definition of printDataLock (mutex) static volatile sig_atomic_t flagSigTerm = 0; // Flag which detects SIGTERM signal static Thread_Data_t threads[NUM_CORES]; // Array of Thread_Data_t structures to store data for each thread. static Queue *cpuUsageQueue; // Queue for CPU usage data static pthread_mutex_t queueLock = PTHREAD_MUTEX_INITIALIZER; // Definition of queueLock (mutex) // Queue operations static Queue *initQueue(int capacity) { Queue *queue = (Queue *)malloc(sizeof(Queue)); queue->head = NULL; queue->tail = NULL; queue->count = 0; queue->capacity = capacity; return queue; } static void enqueue(Queue *queue, float value) { Node *newNode = (Node *)malloc(sizeof(Node)); newNode->value = value; newNode->next = NULL; if (queue->count == queue->capacity) { return; } if (queue->tail != NULL) { queue->tail->next = newNode; } queue->tail = newNode; if (queue->head == NULL) { queue->head = newNode; } queue->count++; } static float dequeue(Queue *queue) { Node *temp = queue->head; float value = temp->value; if (queue->count == 0) { return -1.0; // error value, queue is empty } queue->head = queue->head->next; if (queue->head == NULL) { queue->tail = NULL; } free(temp); queue->count--; return value; } static int isFull(Queue *queue) { return queue->count == queue->capacity; } static int isEmpty(Queue *queue) { return queue->count == 0; } // Signal handler function for SIGTERM signal static void sigHandler(int signal) { if (signal == SIGTERM) { printf("Received SIGTERM signal. Closing gracefully...\n"); flagSigTerm = 1; sleep(2); // Wait 2 seconds to shutdown gracefully } } // Reader thread static void *readerThread(void *arg) { Thread_Data_t *threadData = (Thread_Data_t *)arg; char buffer[MAX_BUFFER]; while (!flagSigTerm) { // Create the file pointer for each thread FILE *file = fopen("/proc/stat", "r"); // Opening /proc/stat (read) // Checks for NULL exception if (file == NULL) { perror("Failed to open /proc/stat"); exit(EXIT_FAILURE); } // Seek to the beginning of the file fseek(file, 0, SEEK_SET); // Read the overall CPU usage data if (fgets(buffer, sizeof(buffer), file) == NULL) { perror("Failed to read /proc/stat"); exit(EXIT_FAILURE); } // Parse the CPU usage data if (sscanf(buffer, "%*s %lu %lu %lu %lu %lu %lu %lu", &(threadData->curr_data.user), &(threadData->curr_data.nice), &(threadData->curr_data.system), &(threadData->curr_data.idle), &(threadData->curr_data.irq), &(threadData->curr_data.softirq), &(threadData->curr_data.steal)) != 7) { fprintf(stderr, "Failed to parse CPU data\n"); exit(EXIT_FAILURE); } // Close the file pointer fclose(file); // Sleep for 1s sleep(1); // Set the thread as alive threadData->is_alive = true; } return NULL; } static void *analyzerThread(void *arg) { Thread_Data_t *threadData = (Thread_Data_t *)arg; threadData->prev_data = threadData->curr_data; // Initialize prev_data while (!flagSigTerm) { // Getting the values needed to calculate CPU usage unsigned long prev_idle = threadData->prev_data.idle + threadData->prev_data.irq + threadData->prev_data.softirq; unsigned long curr_idle = threadData->curr_data.idle + threadData->curr_data.irq + threadData->curr_data.softirq; unsigned long prev_non_idle = threadData->prev_data.user + threadData->prev_data.nice + threadData->prev_data.system + threadData->prev_data.steal; unsigned long curr_non_idle = threadData->curr_data.user + threadData->curr_data.nice + threadData->curr_data.system + threadData->curr_data.steal; unsigned long prev_total = prev_idle + prev_non_idle; unsigned long curr_total = curr_idle + curr_non_idle; unsigned long total_diff = curr_total - prev_total; unsigned long idle_diff = curr_idle - prev_idle; // Calculating the percentage of CPU usage float cpu_usage; if (total_diff != 0) { cpu_usage = (float)(total_diff - idle_diff) * 100.0f / (float)total_diff; } else { cpu_usage = 0.0f; } // Push the CPU usage to the queue pthread_mutex_lock(&queueLock); if (!isFull(cpuUsageQueue)) { enqueue(cpuUsageQueue, cpu_usage); } pthread_mutex_unlock(&queueLock); // Replace previous data with current data threadData->prev_data = threadData->curr_data; // Sleep for 1s sleep(1); // Set the thread as alive threadData->is_alive = true; } return NULL; } static void *printerThread() { while (!flagSigTerm) { float total_cpu_usage = 0.0f; float average_cpu_usage = 0.0f; // Acquire the lock to safely access the shared printData structure pthread_mutex_lock(&printDataLock); // Fetch the CPU usage from the queue pthread_mutex_lock(&queueLock); while(!isEmpty(cpuUsageQueue)) { total_cpu_usage += dequeue(cpuUsageQueue); } pthread_mutex_unlock(&queueLock); // Calculate the average CPU usage if (NUM_CORES != 0) { average_cpu_usage = total_cpu_usage / NUM_CORES; } else { average_cpu_usage = 0.0f; } // Release the lock pthread_mutex_unlock(&printDataLock); printf("Average CPU usage: %.2f%%\n", (double)average_cpu_usage); // Sleep for 1s sleep(1); } return NULL; } static void *watchdogThread() { while (!flagSigTerm) { sleep(WATCHDOG_TIMEOUT); for (int i = 0; i < NUM_CORES; i++) { if (!threads[i].is_alive) { fprintf(stderr, "Thread %d did not report in time. Exiting...\n", i); exit(EXIT_FAILURE); } else { threads[i].is_alive = false; } } } return NULL; } static void *loggerThread(void *arg) { Logger_Data_t *loggerData = (Logger_Data_t *)arg; FILE *file; while (!flagSigTerm) { // Open the log file pthread_mutex_lock(&loggerData->log_file_lock); // Lock file = loggerData->log_file; pthread_mutex_unlock(&loggerData->log_file_lock); // Release if (file != NULL) { // Acquire the lock to safely access the shared printData structure pthread_mutex_lock(&printDataLock); // Write CPU usage data to the log file for (int i = 0; i < NUM_CORES; i++) { fprintf(file, "Thread %d CPU usage: %.2f%%\n", i, (double)printData[i]); } // Release the lock pthread_mutex_unlock(&printDataLock); } // Sleep for 1s sleep(1); } return NULL; } int main() { pthread_t printer_thread, watchdog_thread; Logger_Data_t loggerData; struct sigaction action; memset(&action, 0, sizeof(struct sigaction)); #if defined(__clang__) #pragma GCC diagnostic push #pragma GCC diagnostic ignored "-Wdisabled-macro-expansion" /* The sa_handler in __sigaction_handler.sa_handler refers to a member of the __sigaction_handler union, not the sa_handler macro, so I decided to disable this warning for the clang compilier. */ #endif action.sa_handler = sigHandler; #if defined(__clang__) #pragma GCC diagnostic pop #endif sigaction(SIGTERM, &action, NULL); printf("Process ID: %d\n", getpid()); cpuUsageQueue = initQueue(10); // initialize the queue with a maximum size of 10 loggerData.log_file = fopen("log.txt", "w"); // Open the log file for writing if (loggerData.log_file == NULL) { perror("Failed to open log file"); exit(EXIT_FAILURE); } pthread_mutex_init(&loggerData.log_file_lock, NULL); // Initialize the log file lock if (loggerData.log_file == NULL) { perror("Failed to open log file"); exit(EXIT_FAILURE); } // Reader and analyzer thread initialization for (int i = 0; i < NUM_CORES; i++) { threads[i].thread_index = i; threads[i].is_alive = false; pthread_create(&threads[i].reader_thread_id, NULL, readerThread, &threads[i]); pthread_create(&threads[i].analyzer_thread_id, NULL, analyzerThread, &threads[i]); } // Printer thread initialization pthread_create(&printer_thread, NULL, printerThread, NULL); // Watchdog thread initialization pthread_create(&watchdog_thread, NULL, watchdogThread, threads); // Logger thread initialization pthread_create(&loggerData.logger_thread_id, NULL, loggerThread, &loggerData); while (!flagSigTerm) { for (int i = 0; i < NUM_CORES; i++) { pthread_join(threads[i].reader_thread_id, NULL); pthread_join(threads[i].analyzer_thread_id, NULL); } pthread_join(printer_thread, NULL); pthread_join(watchdog_thread, NULL); pthread_join(loggerData.logger_thread_id, NULL); } // Close the log file pthread_mutex_lock(&loggerData.log_file_lock); if (fclose(loggerData.log_file) != 0) { perror("Failed to close log file"); exit(EXIT_FAILURE); } pthread_mutex_unlock(&loggerData.log_file_lock); printf("Cleaning successful\n"); return 0; }