Untitled
unknown
php
a year ago
3.4 kB
9
Indexable
<?php
namespace Parallel;
class Parallel
{
/**
* Executes several processes in parallel.
*
* @param array $processes
* @return array The results of the function execution in each process.
*/
public function run(array $processes): array
{
$pipes = [];
$pids = [];
// Create child processes
foreach ($processes as $index => [$callback, $args]) {
$pipes[$index] = $this->createPipe();
$args = $args ?? [];
if (($pid = $this->forkProcess($callback, $args, $pipes[$index])) !== -1) {
$pids[$pid] = $index; // Store pid
fclose($pipes[$index][1]); // Close the write part in the parent process
} else {
die("Error when calling pcntl_fork()");
}
}
// Read results from pipes
$results = $this->readResults($pipes, $pids);
// Wait for child processes to finish
$this->waitForProcesses($pids);
return $results;
}
/**
* Creates a pair of sockets for inter-process communication.
*
* @return array Two connected sockets.
*/
private function createPipe(): array
{
$pipe = stream_socket_pair(STREAM_PF_UNIX, STREAM_SOCK_STREAM, STREAM_IPPROTO_IP);
if ($pipe === false) {
die("Could not create socket pair");
}
return $pipe;
}
/**
* Creates a child process and executes the specified callback in it.
*
* @param callable $callback The function to execute in the child process.
* @param array $args Arguments for the function.
* @param array $pipe The pipe for sending the result back to the parent process.
*
* @return int The identifier of the child process, -1 in case of error.
*/
private function forkProcess(callable $callback, array $args, array $pipe): int
{
$pid = pcntl_fork();
if ($pid === 0) { // Child process
fclose($pipe[0]); // Close the read part in the child process
$result = $callback(...$args);
fwrite($pipe[1], serialize($result));
fclose($pipe[1]); // Close the write part after writing the result
exit(0);
}
return $pid; // Return pid to the parent process
}
/**
* Reads results from the pipes associated with the processes.
*
* @param array $pipes An array of pipes for each process.
* @param array $pids An array of process identifiers.
*
* @return array The results of each process execution.
*/
private function readResults(array $pipes, array $pids): array
{
$results = [];
foreach ($pids as $pid => $index) {
// Get data from the pipe
$result = stream_get_contents($pipes[$index][0]);
if ($result !== false && $result !== '') {
$results[$index] = unserialize($result);
}
fclose($pipes[$index][0]);
}
return $results;
}
/**
* Waits for child processes to finish.
*
* @param array $pids An array of process identifiers.
*/
private function waitForProcesses(array $pids): void
{
while ($pids) {
$pid = pcntl_wait($status);
if ($pid >= 0) {
unset($pids[$pid]);
}
}
}
}
Editor is loading...
Leave a Comment