Untitled
unknown
php
a year ago
4.2 kB
17
Indexable
<?php
class Parallel
{
/**
* Executes several processes in parallel.
*
* @param int $processCount The number of processes to launch.
* @param callable $callback The function to execute.
* @param array $argsArray An array of arguments for each process.
*
* @return array The results of the function execution in each process.
*/
public function run(int $processCount, callable $callback, array $argsArray = []): array
{
$pipes = [];
$pids = [];
// Create child processes
for ($i = 0; $i < $processCount; $i++) {
$pipes[$i] = $this->createPipe();
$args = $argsArray[$i] ?? []; // Check for argument presence
if (($pid = $this->forkProcess($callback, $args, $pipes[$i])) !== -1) {
$pids[$pid] = $i; // Store pid
fclose($pipes[$i][1]); // Close the write part in the parent process
} else {
die("Error when calling pcntl_fork()");
}
}
// Read results from pipes
$results = $this->readResults($pipes, $pids);
// Wait for child processes to finish
$this->waitForProcesses($pids);
return $results;
}
/**
* Creates a pair of sockets for inter-process communication.
*
* @return array Two connected sockets.
*/
private function createPipe(): array
{
$pipe = stream_socket_pair(STREAM_PF_UNIX, STREAM_SOCK_STREAM, STREAM_IPPROTO_IP);
if ($pipe === false) {
die("Could not create socket pair");
}
return $pipe;
}
/**
* Creates a child process and executes the specified callback in it.
*
* @param callable $callback The function to execute in the child process.
* @param array $args Arguments for the function.
* @param array $pipe The pipe for sending the result back to the parent process.
*
* @return int The identifier of the child process, -1 in case of error.
*/
private function forkProcess(callable $callback, array $args, array $pipe): int
{
$pid = pcntl_fork();
if ($pid === 0) { // Child process
fclose($pipe[0]); // Close the read part in the child process
$result = $callback(...$args);
fwrite($pipe[1], serialize($result));
fclose($pipe[1]); // Close the write part after writing the result
exit(0);
}
return $pid; // Return pid to the parent process
}
/**
* Reads results from the pipes associated with the processes.
*
* @param array $pipes An array of pipes for each process.
* @param array $pids An array of process identifiers.
*
* @return array The results of each process execution.
*/
private function readResults(array $pipes, array $pids): array
{
$results = [];
foreach ($pids as $pid => $index) {
// Get data from the pipe
$result = stream_get_contents($pipes[$index][0]);
if ($result !== false && $result !== '') {
$results[$index] = unserialize($result);
}
fclose($pipes[$index][0]);
}
return $results;
}
/**
* Waits for child processes to finish.
*
* @param array $pids An array of process identifiers.
*/
private function waitForProcesses(array $pids): void
{
while ($pids) {
$pid = pcntl_wait($status);
if ($pid >= 0) {
unset($pids[$pid]);
}
}
}
}
//Example
/**
* @param int $n
* @return int
*/
function fibonacci(int $n): int
{
if ($n <= 1) {
return $n;
}
return fibonacci($n - 1) + fibonacci($n - 2);
}
$parallel = new Parallel();
$processCount = 5;
$argsArray = [[35], [36], [37], [38], [39]]; // Arguments for functions
$results = $parallel->run($processCount, 'fibonacci', $argsArray);
var_dump(
$results
);
/*
array(5) {
[0]=>
int(9227465)
[1]=>
int(14930352)
[2]=>
int(24157817)
[3]=>
int(39088169)
[4]=>
int(63245986)
}
*/Editor is loading...
Leave a Comment