Untitled
<?php namespace Parallel; class Parallel { /** * Executes several processes in parallel. * * @param array $processes * @return array The results of the function execution in each process. */ public function run(array $processes): array { $pipes = []; $pids = []; // Create child processes foreach ($processes as $index => [$callback, $args]) { $pipes[$index] = $this->createPipe(); $args = $args ?? []; if (($pid = $this->forkProcess($callback, $args, $pipes[$index])) !== -1) { $pids[$pid] = $index; // Store pid fclose($pipes[$index][1]); // Close the write part in the parent process } else { die("Error when calling pcntl_fork()"); } } // Read results from pipes $results = $this->readResults($pipes, $pids); // Wait for child processes to finish $this->waitForProcesses($pids); return $results; } /** * Creates a pair of sockets for inter-process communication. * * @return array Two connected sockets. */ private function createPipe(): array { $pipe = stream_socket_pair(STREAM_PF_UNIX, STREAM_SOCK_STREAM, STREAM_IPPROTO_IP); if ($pipe === false) { die("Could not create socket pair"); } return $pipe; } /** * Creates a child process and executes the specified callback in it. * * @param callable $callback The function to execute in the child process. * @param array $args Arguments for the function. * @param array $pipe The pipe for sending the result back to the parent process. * * @return int The identifier of the child process, -1 in case of error. */ private function forkProcess(callable $callback, array $args, array $pipe): int { $pid = pcntl_fork(); if ($pid === 0) { // Child process fclose($pipe[0]); // Close the read part in the child process $result = $callback(...$args); fwrite($pipe[1], serialize($result)); fclose($pipe[1]); // Close the write part after writing the result exit(0); } return $pid; // Return pid to the parent process } /** * Reads results from the pipes associated with the processes. * * @param array $pipes An array of pipes for each process. * @param array $pids An array of process identifiers. * * @return array The results of each process execution. */ private function readResults(array $pipes, array $pids): array { $results = []; foreach ($pids as $pid => $index) { // Get data from the pipe $result = stream_get_contents($pipes[$index][0]); if ($result !== false && $result !== '') { $results[$index] = unserialize($result); } fclose($pipes[$index][0]); } return $results; } /** * Waits for child processes to finish. * * @param array $pids An array of process identifiers. */ private function waitForProcesses(array $pids): void { while ($pids) { $pid = pcntl_wait($status); if ($pid >= 0) { unset($pids[$pid]); } } } }
Leave a Comment