1: <?php declare(strict_types = 1);
2:
3: namespace ApiGen\Scheduler;
4:
5: use ApiGen\Task\Task;
6: use ApiGen\Task\TaskHandler;
7:
8: use function fclose;
9: use function pcntl_fork;
10: use function pcntl_waitpid;
11: use function pcntl_wexitstatus;
12: use function pcntl_wifexited;
13: use function pcntl_wifsignaled;
14: use function pcntl_wtermsig;
15: use function stream_socket_pair;
16:
17: use const STREAM_IPPROTO_IP;
18: use const STREAM_PF_UNIX;
19: use const STREAM_SOCK_STREAM;
20:
21:
22: /**
23: * @template TTask of Task
24: * @template TResult
25: * @extends WorkerScheduler<TTask, TResult>
26: */
27: class ForkScheduler extends WorkerScheduler
28: {
29: /** @var int[] $workers indexed by [workerId] */
30: protected array $workers = [];
31:
32:
33: /**
34: * @param TaskHandler<TTask, TResult> $handler
35: */
36: public function __construct(
37: protected TaskHandler $handler,
38: int $workerCount,
39: ) {
40: parent::__construct($workerCount);
41: }
42:
43:
44: protected function start(): void
45: {
46: for ($workerId = 0; $workerId < $this->workerCount; $workerId++) {
47: $pair = stream_socket_pair(STREAM_PF_UNIX, STREAM_SOCK_STREAM, STREAM_IPPROTO_IP);
48:
49: if ($pair === false) {
50: throw new \RuntimeException('Failed to create socket pair, try running ApiGen with --workers 1');
51: }
52:
53: $pid = pcntl_fork();
54:
55: if ($pid < 0) {
56: throw new \RuntimeException('Failed to fork process, try running ApiGen with --workers 1');
57:
58: } elseif ($pid === 0) {
59: fclose($pair[0]);
60: self::workerLoop($this->handler, $pair[1], $pair[1]);
61: exit(0);
62:
63: } else {
64: fclose($pair[1]);
65: $this->workers[$workerId] = $pid;
66: $this->workerReadableStreams[$workerId] = $pair[0];
67: $this->workerWritableStreams[$workerId] = $pair[0];
68: }
69: }
70: }
71:
72:
73: protected function stop(): void
74: {
75: foreach ($this->workerWritableStreams as $stream) {
76: fclose($stream);
77: }
78:
79: foreach ($this->workers as $pid) {
80: pcntl_waitpid($pid, $status);
81:
82: if (pcntl_wifexited($status)) {
83: if (($exitCode = pcntl_wexitstatus($status)) !== 0) {
84: throw new \RuntimeException("Worker with PID $pid exited with code $exitCode, try running ApiGen with --workers 1");
85: }
86:
87: } elseif (pcntl_wifsignaled($status)) {
88: $signal = pcntl_wtermsig($status);
89: throw new \RuntimeException("Worker with PID $pid was killed by signal $signal, try running ApiGen with --workers 1");
90:
91: } else {
92: throw new \LogicException('Invalid worker state');
93: }
94: }
95: }
96: }
97: