Untitled

 avatar
unknown
php
a year ago
4.2 kB
4
Indexable
<?php
class Parallel
{
    /**
     * Executes several processes in parallel.
     *
     * @param int $processCount The number of processes to launch.
     * @param callable $callback The function to execute.
     * @param array $argsArray An array of arguments for each process.
     *
     * @return array The results of the function execution in each process.
     */
    public function run(int $processCount, callable $callback, array $argsArray = []): array
    {
        $pipes = [];
        $pids = [];

        // Create child processes
        for ($i = 0; $i < $processCount; $i++) {
            $pipes[$i] = $this->createPipe();
            $args = $argsArray[$i] ?? []; // Check for argument presence

            if (($pid = $this->forkProcess($callback, $args, $pipes[$i])) !== -1) {
                $pids[$pid] = $i; // Store pid
                fclose($pipes[$i][1]); // Close the write part in the parent process
            } else {
                die("Error when calling pcntl_fork()");
            }
        }

        // Read results from pipes
        $results = $this->readResults($pipes, $pids);

        // Wait for child processes to finish
        $this->waitForProcesses($pids);

        return $results;
    }

    /**
     * Creates a pair of sockets for inter-process communication.
     *
     * @return array Two connected sockets.
     */
    private function createPipe(): array
    {
        $pipe = stream_socket_pair(STREAM_PF_UNIX, STREAM_SOCK_STREAM, STREAM_IPPROTO_IP);
        if ($pipe === false) {
            die("Could not create socket pair");
        }
        return $pipe;
    }

    /**
     * Creates a child process and executes the specified callback in it.
     *
     * @param callable $callback The function to execute in the child process.
     * @param array $args Arguments for the function.
     * @param array $pipe The pipe for sending the result back to the parent process.
     *
     * @return int The identifier of the child process, -1 in case of error.
     */
    private function forkProcess(callable $callback, array $args, array $pipe): int
    {
        $pid = pcntl_fork();

        if ($pid === 0) { // Child process
            fclose($pipe[0]); // Close the read part in the child process
            $result = $callback(...$args);
            fwrite($pipe[1], serialize($result));
            fclose($pipe[1]); // Close the write part after writing the result
            exit(0);
        }

        return $pid; // Return pid to the parent process
    }

    /**
     * Reads results from the pipes associated with the processes.
     *
     * @param array $pipes An array of pipes for each process.
     * @param array $pids An array of process identifiers.
     *
     * @return array The results of each process execution.
     */
    private function readResults(array $pipes, array $pids): array
    {
        $results = [];

        foreach ($pids as $pid => $index) {
            // Get data from the pipe
            $result = stream_get_contents($pipes[$index][0]);
            if ($result !== false && $result !== '') {
                $results[$index] = unserialize($result);
            }
            fclose($pipes[$index][0]);
        }

        return $results;
    }

    /**
     * Waits for child processes to finish.
     *
     * @param array $pids An array of process identifiers.
     */
    private function waitForProcesses(array $pids): void
    {
        while ($pids) {
            $pid = pcntl_wait($status);
            if ($pid >= 0) {
                unset($pids[$pid]);
            }
        }
    }
}

//Example

/**
 * @param int $n
 * @return int
 */
function fibonacci(int $n): int
{
    if ($n <= 1) {
        return $n;
    }
    return fibonacci($n - 1) + fibonacci($n - 2);
}

$parallel = new Parallel();
$processCount = 5;
$argsArray = [[35], [36], [37], [38], [39]]; // Arguments for functions
$results = $parallel->run($processCount, 'fibonacci', $argsArray);

var_dump(
    $results
);

/*
array(5) {
  [0]=>
  int(9227465)
  [1]=>
  int(14930352)
  [2]=>
  int(24157817)
  [3]=>
  int(39088169)
  [4]=>
  int(63245986)
}
*/
Editor is loading...
Leave a Comment