1: | <?php declare(strict_types = 1); |
2: | |
3: | namespace ApiGen\Scheduler; |
4: | |
5: | use ApiGen\Task\Task; |
6: | use ApiGen\Task\TaskHandler; |
7: | |
8: | use function fclose; |
9: | use function pcntl_fork; |
10: | use function pcntl_waitpid; |
11: | use function pcntl_wexitstatus; |
12: | use function pcntl_wifexited; |
13: | use function pcntl_wifsignaled; |
14: | use function pcntl_wtermsig; |
15: | use function stream_socket_pair; |
16: | |
17: | use const STREAM_IPPROTO_IP; |
18: | use const STREAM_PF_UNIX; |
19: | use const STREAM_SOCK_STREAM; |
20: | |
21: | |
22: | |
23: | |
24: | |
25: | |
26: | |
27: | class ForkScheduler extends WorkerScheduler |
28: | { |
29: | |
30: | protected array $workers = []; |
31: | |
32: | |
33: | |
34: | |
35: | |
36: | public function __construct( |
37: | protected TaskHandler $handler, |
38: | int $workerCount, |
39: | ) { |
40: | parent::__construct($workerCount); |
41: | } |
42: | |
43: | |
44: | protected function start(): void |
45: | { |
46: | for ($workerId = 0; $workerId < $this->workerCount; $workerId++) { |
47: | $pair = stream_socket_pair(STREAM_PF_UNIX, STREAM_SOCK_STREAM, STREAM_IPPROTO_IP); |
48: | |
49: | if ($pair === false) { |
50: | throw new \RuntimeException('Failed to create socket pair, try running ApiGen with --workers 1'); |
51: | } |
52: | |
53: | $pid = pcntl_fork(); |
54: | |
55: | if ($pid < 0) { |
56: | throw new \RuntimeException('Failed to fork process, try running ApiGen with --workers 1'); |
57: | |
58: | } elseif ($pid === 0) { |
59: | fclose($pair[0]); |
60: | self::workerLoop($this->handler, $pair[1], $pair[1]); |
61: | exit(0); |
62: | |
63: | } else { |
64: | fclose($pair[1]); |
65: | $this->workers[$workerId] = $pid; |
66: | $this->workerReadableStreams[$workerId] = $pair[0]; |
67: | $this->workerWritableStreams[$workerId] = $pair[0]; |
68: | } |
69: | } |
70: | } |
71: | |
72: | |
73: | protected function stop(): void |
74: | { |
75: | foreach ($this->workerWritableStreams as $stream) { |
76: | fclose($stream); |
77: | } |
78: | |
79: | foreach ($this->workers as $pid) { |
80: | pcntl_waitpid($pid, $status); |
81: | |
82: | if (pcntl_wifexited($status)) { |
83: | if (($exitCode = pcntl_wexitstatus($status)) !== 0) { |
84: | throw new \RuntimeException("Worker with PID $pid exited with code $exitCode, try running ApiGen with --workers 1"); |
85: | } |
86: | |
87: | } elseif (pcntl_wifsignaled($status)) { |
88: | $signal = pcntl_wtermsig($status); |
89: | throw new \RuntimeException("Worker with PID $pid was killed by signal $signal, try running ApiGen with --workers 1"); |
90: | |
91: | } else { |
92: | throw new \LogicException('Invalid worker state'); |
93: | } |
94: | } |
95: | } |
96: | } |
97: | |