Untitled

mail@pastecode.io avatar
unknown
php
a month ago
3.4 kB
2
Indexable
Never
<?php

namespace Parallel;

class Parallel
{
    /**
     * Executes several processes in parallel.
     *
     * @param array $processes
     * @return array The results of the function execution in each process.
     */
    public function run(array $processes): array
    {
        $pipes = [];
        $pids = [];

        // Create child processes
        foreach ($processes as $index => [$callback, $args]) {
            $pipes[$index] = $this->createPipe();
            $args = $args ?? [];
            if (($pid = $this->forkProcess($callback, $args, $pipes[$index])) !== -1) {
                $pids[$pid] = $index; // Store pid
                fclose($pipes[$index][1]); // Close the write part in the parent process
            } else {
                die("Error when calling pcntl_fork()");
            }
        }

        // Read results from pipes
        $results = $this->readResults($pipes, $pids);

        // Wait for child processes to finish
        $this->waitForProcesses($pids);

        return $results;
    }

    /**
     * Creates a pair of sockets for inter-process communication.
     *
     * @return array Two connected sockets.
     */
    private function createPipe(): array
    {
        $pipe = stream_socket_pair(STREAM_PF_UNIX, STREAM_SOCK_STREAM, STREAM_IPPROTO_IP);
        if ($pipe === false) {
            die("Could not create socket pair");
        }
        return $pipe;
    }

    /**
     * Creates a child process and executes the specified callback in it.
     *
     * @param callable $callback The function to execute in the child process.
     * @param array $args Arguments for the function.
     * @param array $pipe The pipe for sending the result back to the parent process.
     *
     * @return int The identifier of the child process, -1 in case of error.
     */
    private function forkProcess(callable $callback, array $args, array $pipe): int
    {
        $pid = pcntl_fork();

        if ($pid === 0) { // Child process
            fclose($pipe[0]); // Close the read part in the child process
            $result = $callback(...$args);
            fwrite($pipe[1], serialize($result));
            fclose($pipe[1]); // Close the write part after writing the result
            exit(0);
        }

        return $pid; // Return pid to the parent process
    }

    /**
     * Reads results from the pipes associated with the processes.
     *
     * @param array $pipes An array of pipes for each process.
     * @param array $pids An array of process identifiers.
     *
     * @return array The results of each process execution.
     */
    private function readResults(array $pipes, array $pids): array
    {
        $results = [];

        foreach ($pids as $pid => $index) {
            // Get data from the pipe
            $result = stream_get_contents($pipes[$index][0]);
            if ($result !== false && $result !== '') {
                $results[$index] = unserialize($result);
            }
            fclose($pipes[$index][0]);
        }

        return $results;
    }

    /**
     * Waits for child processes to finish.
     *
     * @param array $pids An array of process identifiers.
     */
    private function waitForProcesses(array $pids): void
    {
        while ($pids) {
            $pid = pcntl_wait($status);
            if ($pid >= 0) {
                unset($pids[$pid]);
            }
        }
    }
}
Leave a Comment