Untitled
unknown
plain_text
4 years ago
4.3 kB
11
Indexable
#include <stdio.h>
#include <sys/wait.h>
#include <stdlib.h>
#include <sys/msg.h>
#include <unistd.h> // necesaria para ejecutar fork()
#include <string.h>
#include <signal.h>
#include <semaphore.h>
#include <sys/shm.h>
#include "tickets.h"
int end = 0;
void process ( int id, int firstq, int numberOfNodes,int processPerNode,int idNodo, int sems_key, int data_key){
char param1[20] = "";
sprintf(param1, "%i", id);
char param2[20] = "";
sprintf(param2, "%i", firstq); //queues[0]
char param3[20] = "";
sprintf(param3, "%i", numberOfNodes);
char param4[20] = "";
sprintf(param4, "%i",processPerNode);
char param5[20] = "";
sprintf(param5, "%i",idNodo);
char param6[20] = "";
sprintf(param6, "%i",sems_key);
char param7[20] = "";
sprintf(param7, "%i",data_key);
execl("./process", "./process", param1, param2, param3, param4, param5, param6, param7, NULL);
exit (0);
}
void end_handler(int signo) {
end = 1;
}
int main (int argc, char* argv[]){
int nodeId = atoi(argv[1]);
int firstq = atoi(argv[2]);
int numberOfNodes = atoi(argv[3]);
int processPerNode = atoi(argv[4]);
// Half of the sem are for the child to read after the node sets the values on mem (upperhalf)
// The lowerhalf is to wait the child to read the data before we overwrite it
int sems_key = shmget(IPC_PRIVATE, sizeof(sem_t)*processPerNode*2, IPC_CREAT | IPC_EXCL | 0666);
int data_key = shmget(IPC_PRIVATE, sizeof(ticket_t)*processPerNode, IPC_CREAT | IPC_EXCL | 0666);
ticket_t* tickets_mem = (ticket_t *)shmat(data_key, NULL, 0);
sem_t* sems_mem = (sem_t *)shmat(sems_key, NULL, 0);
for (int i = 0; i < processPerNode; i++)
{
sem_init(&sems_mem[i], 1, 0);
}
for (int i = processPerNode; i < processPerNode*2; i++)
{
sem_init(&sems_mem[i], 1, 1);
}
pid_t childs[processPerNode];
for (int i=0; i<processPerNode; i++){
pid_t child=fork();
if (child==0) process(nodeId+i,firstq,numberOfNodes,processPerNode,nodeId,sems_key,data_key);
childs[i] = child;
//printf("[Node %i] Created process: %i\n",id, id+i);
}
struct sigaction sigact;
sigemptyset(&sigact.sa_mask);
sigact.sa_flags = 0;
sigact.sa_handler = end_handler;
sigaction(SIGUSR1, &sigact, NULL); // Create a handler so the program not exit when SIGUSR1 arrives
printf("\033[0;34m[Node %i] Ready to listen for messages!\033[0m\n", nodeId/numberOfNodes);
ticket_t request;
ticketok_t response;
int myqueue = firstq + nodeId/processPerNode;
int maxpos = processPerNode*2 -1; // max position of the array of sems
while (!end)
{
// Receive a message from the queue, th size is the max size of the message
// So we can set it to the bigger type
msgrcv(myqueue, &request, sizeof(int)*2, 0, 0);
if (request.mtype == 2) {
// Copy the data to the ticketok_t
memcpy(&response, &request, sizeof(ticketok_t));
int pos = response.dest - nodeId; // Position of the array in the shared memory
sem_wait(&sems_mem[maxpos - pos]); // wait for the child to be ready to read data
memcpy(&tickets_mem[pos], &response, sizeof(ticketok_t)); // Copy the data so the child can read it
sem_post(&sems_mem[pos]); // Post to the child
} else {
// Avisar a todos los procesos del nodo
for (size_t i = nodeId; i < nodeId+processPerNode; i++)
{
if (i == request.process) continue;
int pos = i - nodeId;
//printf("[Node %i] \033[0;33m Semaforos: wait(%i); post(%i); \033[0m\n", id, maxpos -pos, pos);
sem_wait(&sems_mem[maxpos - pos]);
memcpy(&tickets_mem[pos], &request, sizeof(ticket_t)); // Copy the data so the child can read it
sem_post(&sems_mem[pos]);
}
}
}
for (size_t i = 0; i < processPerNode; i++)
{
kill(childs[i], SIGKILL);
}
while (wait(NULL)!=-1);
return 0;
}Editor is loading...