Untitled

mail@pastecode.io avatar
unknown
plain_text
6 months ago
12 kB
2
Indexable
Never
#define _XOPEN_SOURCE 700
// Headers
#include <stdio.h>
#include <stdlib.h>
#include <stdbool.h>
#include <string.h>
#include <pthread.h>
#include <unistd.h>
#include <signal.h>
#include <sys/wait.h>

// Defines
#define NUM_CORES 4
#define MAX_BUFFER 256
#define WATCHDOG_TIMEOUT 2

// A structure that stores data for each core
typedef struct
{
    unsigned long user;    // Time used for user tasks
    unsigned long nice;    // Time used for "nice" tasks
    unsigned long system;  // Time used for system tasks
    unsigned long idle;    // Processor idle time
    unsigned long irq;     // Time used for servicing interrupts
    unsigned long softirq; // Time used for servicing software interrupts
    unsigned long steal;   // Stolen time (virtualization)
} CPU_Data_t;

// A structure that stores data for each thread
#pragma pack(1) // Disable padding
typedef struct
{
    CPU_Data_t prev_data;
    CPU_Data_t curr_data;
    pthread_t reader_thread_id;
    pthread_t analyzer_thread_id;
    __int32_t thread_index;
    bool is_alive; // Thread status for watchdog thread
} Thread_Data_t;
#pragma pack(4) // Reset to previous alignment rule (enable padding)

// Logger thread data
typedef struct
{
    pthread_t logger_thread_id;
    FILE *log_file;
    pthread_mutex_t log_file_lock;
} Logger_Data_t;

typedef struct node
{
    float value;
    struct node *next;
} Node;

typedef struct queue
{
    Node *head;
    Node *tail;
    int count;
    int capacity;
} Queue;

static float printData[NUM_CORES];                                // Shared data structure for CPU usage data
static pthread_mutex_t printDataLock = PTHREAD_MUTEX_INITIALIZER; // Definition of printDataLock (mutex)
static volatile sig_atomic_t flagSigTerm = 0;                     // Flag which detects SIGTERM signal
static Thread_Data_t threads[NUM_CORES];                          // Array of Thread_Data_t structures to store data for each thread.
static Queue *cpuUsageQueue;                                             // Queue for CPU usage data
static pthread_mutex_t queueLock = PTHREAD_MUTEX_INITIALIZER;            // Definition of queueLock (mutex)

// Queue operations
static Queue *initQueue(int capacity)
{
    Queue *queue = (Queue *)malloc(sizeof(Queue));
    queue->head = NULL;
    queue->tail = NULL;
    queue->count = 0;
    queue->capacity = capacity;
    return queue;
}

static void enqueue(Queue *queue, float value)
{
    Node *newNode = (Node *)malloc(sizeof(Node));
    newNode->value = value;
    newNode->next = NULL;
    if (queue->count == queue->capacity)
    {
        return;
    }
    if (queue->tail != NULL)
    {
        queue->tail->next = newNode;
    }
    queue->tail = newNode;
    if (queue->head == NULL)
    {
        queue->head = newNode;
    }
    queue->count++;
}

static float dequeue(Queue *queue)
{
    Node *temp = queue->head;
    float value = temp->value;
    if (queue->count == 0)
    {
        return -1.0; // error value, queue is empty
    }
    queue->head = queue->head->next;
    if (queue->head == NULL)
    {
        queue->tail = NULL;
    }
    free(temp);
    queue->count--;
    return value;
}

static int isFull(Queue *queue)
{
    return queue->count == queue->capacity;
}

static int isEmpty(Queue *queue)
{
    return queue->count == 0;
}

// Signal handler function for SIGTERM signal
static void sigHandler(int signal)
{
    if (signal == SIGTERM)
    {
        printf("Received SIGTERM signal. Closing gracefully...\n");
        flagSigTerm = 1;
        sleep(2); // Wait 2 seconds to shutdown gracefully
    }
}

// Reader thread
static void *readerThread(void *arg)
{
    Thread_Data_t *threadData = (Thread_Data_t *)arg;
    char buffer[MAX_BUFFER];

    while (!flagSigTerm)
    {
        // Create the file pointer for each thread
        FILE *file = fopen("/proc/stat", "r"); // Opening /proc/stat (read)
        // Checks for NULL exception
        if (file == NULL)
        {
            perror("Failed to open /proc/stat");
            exit(EXIT_FAILURE);
        }

        // Seek to the beginning of the file
        fseek(file, 0, SEEK_SET);

        // Read the overall CPU usage data
        if (fgets(buffer, sizeof(buffer), file) == NULL)
        {
            perror("Failed to read /proc/stat");
            exit(EXIT_FAILURE);
        }

        // Parse the CPU usage data
        if (sscanf(buffer, "%*s %lu %lu %lu %lu %lu %lu %lu",
                   &(threadData->curr_data.user),
                   &(threadData->curr_data.nice),
                   &(threadData->curr_data.system),
                   &(threadData->curr_data.idle),
                   &(threadData->curr_data.irq),
                   &(threadData->curr_data.softirq),
                   &(threadData->curr_data.steal)) != 7)
        {
            fprintf(stderr, "Failed to parse CPU data\n");
            exit(EXIT_FAILURE);
        }

        // Close the file pointer
        fclose(file);

        // Sleep for 1s
        sleep(1);

        // Set the thread as alive
        threadData->is_alive = true;
    }

    return NULL;
}

static void *analyzerThread(void *arg)
{
    Thread_Data_t *threadData = (Thread_Data_t *)arg;

    threadData->prev_data = threadData->curr_data; // Initialize prev_data

    while (!flagSigTerm)
    {
        // Getting the values needed to calculate CPU usage
        unsigned long prev_idle = threadData->prev_data.idle +
                                  threadData->prev_data.irq +
                                  threadData->prev_data.softirq;
        unsigned long curr_idle = threadData->curr_data.idle +
                                  threadData->curr_data.irq +
                                  threadData->curr_data.softirq;

        unsigned long prev_non_idle = threadData->prev_data.user +
                                      threadData->prev_data.nice +
                                      threadData->prev_data.system +
                                      threadData->prev_data.steal;
        unsigned long curr_non_idle = threadData->curr_data.user +
                                      threadData->curr_data.nice +
                                      threadData->curr_data.system +
                                      threadData->curr_data.steal;

        unsigned long prev_total = prev_idle + prev_non_idle;
        unsigned long curr_total = curr_idle + curr_non_idle;

        unsigned long total_diff = curr_total - prev_total;
        unsigned long idle_diff = curr_idle - prev_idle;

        // Calculating the percentage of CPU usage
        float cpu_usage;
        if (total_diff != 0)
        {
            cpu_usage = (float)(total_diff - idle_diff) * 100.0f / (float)total_diff;
        }
        else
        {
            cpu_usage = 0.0f;
        }

        // Push the CPU usage to the queue
        pthread_mutex_lock(&queueLock);
        if (!isFull(cpuUsageQueue))
        {
            enqueue(cpuUsageQueue, cpu_usage);
        }
        pthread_mutex_unlock(&queueLock);

        // Replace previous data with current data
        threadData->prev_data = threadData->curr_data;

        // Sleep for 1s
        sleep(1);

        // Set the thread as alive
        threadData->is_alive = true;
    }

    return NULL;
}

static void *printerThread()
{

    while (!flagSigTerm)
    {
        float total_cpu_usage = 0.0f;
        float average_cpu_usage = 0.0f;

        // Acquire the lock to safely access the shared printData structure
        pthread_mutex_lock(&printDataLock);

        // Fetch the CPU usage from the queue
        pthread_mutex_lock(&queueLock);
        while(!isEmpty(cpuUsageQueue))
        {
            total_cpu_usage += dequeue(cpuUsageQueue);
        }
        pthread_mutex_unlock(&queueLock);

        // Calculate the average CPU usage
        if (NUM_CORES != 0)
        {
            average_cpu_usage = total_cpu_usage / NUM_CORES;
        }
        else
        {
            average_cpu_usage = 0.0f;
        }

        // Release the lock
        pthread_mutex_unlock(&printDataLock);

        printf("Average CPU usage: %.2f%%\n", (double)average_cpu_usage);

        // Sleep for 1s
        sleep(1);
    }

    return NULL;
}

static void *watchdogThread()
{

    while (!flagSigTerm)
    {
        sleep(WATCHDOG_TIMEOUT);
        for (int i = 0; i < NUM_CORES; i++)
        {
            if (!threads[i].is_alive)
            {
                fprintf(stderr, "Thread %d did not report in time. Exiting...\n", i);
                exit(EXIT_FAILURE);
            }
            else
            {
                threads[i].is_alive = false;
            }
        }
    }

    return NULL;
}

static void *loggerThread(void *arg)
{
    Logger_Data_t *loggerData = (Logger_Data_t *)arg;
    FILE *file;

    while (!flagSigTerm)
    {
        // Open the log file
        pthread_mutex_lock(&loggerData->log_file_lock); // Lock
        file = loggerData->log_file;
        pthread_mutex_unlock(&loggerData->log_file_lock); // Release

        if (file != NULL)
        {
            // Acquire the lock to safely access the shared printData structure
            pthread_mutex_lock(&printDataLock);

            // Write CPU usage data to the log file
            for (int i = 0; i < NUM_CORES; i++)
            {
                fprintf(file, "Thread %d CPU usage: %.2f%%\n", i, (double)printData[i]);
            }

            // Release the lock
            pthread_mutex_unlock(&printDataLock);
        }

        // Sleep for 1s
        sleep(1);
    }
    return NULL;
}

int main()
{
    pthread_t printer_thread, watchdog_thread;
    Logger_Data_t loggerData;
    struct sigaction action;
    memset(&action, 0, sizeof(struct sigaction));
#if defined(__clang__)
#pragma GCC diagnostic push
#pragma GCC diagnostic ignored "-Wdisabled-macro-expansion"
/*
The sa_handler in __sigaction_handler.sa_handler refers to a member of the __sigaction_handler
union, not the sa_handler macro, so I decided to disable this warning for the clang compilier.
*/
#endif
    action.sa_handler = sigHandler;
#if defined(__clang__)
#pragma GCC diagnostic pop
#endif
    sigaction(SIGTERM, &action, NULL);
    printf("Process ID: %d\n", getpid());
    cpuUsageQueue = initQueue(10);               // initialize the queue with a maximum size of 10
    loggerData.log_file = fopen("log.txt", "w"); // Open the log file for writing
    if (loggerData.log_file == NULL)
    {
        perror("Failed to open log file");
        exit(EXIT_FAILURE);
    }
    pthread_mutex_init(&loggerData.log_file_lock, NULL); // Initialize the log file lock

    if (loggerData.log_file == NULL)
    {
        perror("Failed to open log file");
        exit(EXIT_FAILURE);
    }

    // Reader and analyzer thread initialization
    for (int i = 0; i < NUM_CORES; i++)
    {
        threads[i].thread_index = i;
        threads[i].is_alive = false;
        pthread_create(&threads[i].reader_thread_id, NULL, readerThread, &threads[i]);
        pthread_create(&threads[i].analyzer_thread_id, NULL, analyzerThread, &threads[i]);
    }

    // Printer thread initialization
    pthread_create(&printer_thread, NULL, printerThread, NULL);

    // Watchdog thread initialization
    pthread_create(&watchdog_thread, NULL, watchdogThread, threads);

    // Logger thread initialization
    pthread_create(&loggerData.logger_thread_id, NULL, loggerThread, &loggerData);

    while (!flagSigTerm)
    {
        for (int i = 0; i < NUM_CORES; i++)
        {
            pthread_join(threads[i].reader_thread_id, NULL);
            pthread_join(threads[i].analyzer_thread_id, NULL);
        }

        pthread_join(printer_thread, NULL);
        pthread_join(watchdog_thread, NULL);
        pthread_join(loggerData.logger_thread_id, NULL);
    }

    // Close the log file
    pthread_mutex_lock(&loggerData.log_file_lock);
    if (fclose(loggerData.log_file) != 0)
    {
        perror("Failed to close log file");
        exit(EXIT_FAILURE);
    }
    pthread_mutex_unlock(&loggerData.log_file_lock);
    printf("Cleaning successful\n");

    return 0;
}