1: | <?php declare(strict_types = 1); |
2: | |
3: | namespace ApiGen\Scheduler; |
4: | |
5: | use ApiGen\Scheduler; |
6: | use ApiGen\Task\Task; |
7: | use ApiGen\Task\TaskHandler; |
8: | use SplQueue; |
9: | |
10: | use function array_fill_keys; |
11: | use function array_key_first; |
12: | use function array_keys; |
13: | use function base64_decode; |
14: | use function base64_encode; |
15: | use function count; |
16: | use function extension_loaded; |
17: | use function fgets; |
18: | use function fwrite; |
19: | use function igbinary_serialize; |
20: | use function igbinary_unserialize; |
21: | use function serialize; |
22: | use function stream_select; |
23: | use function strlen; |
24: | use function unserialize; |
25: | |
26: | |
27: | |
28: | |
29: | |
30: | |
31: | |
32: | abstract class WorkerScheduler implements Scheduler |
33: | { |
34: | protected const WORKER_CAPACITY_LIMIT = 8; |
35: | |
36: | |
37: | protected SplQueue $tasks; |
38: | |
39: | |
40: | protected int $pendingTaskCount = 0; |
41: | |
42: | |
43: | protected array $workerReadableStreams = []; |
44: | |
45: | |
46: | protected array $workerWritableStreams = []; |
47: | |
48: | |
49: | public function __construct( |
50: | protected int $workerCount, |
51: | ) { |
52: | $this->tasks = new SplQueue(); |
53: | } |
54: | |
55: | |
56: | |
57: | |
58: | |
59: | public static function writeMessage($stream, mixed $message): void |
60: | { |
61: | $serialized = extension_loaded('igbinary') |
62: | ? igbinary_serialize($message) ?? throw new \LogicException('Failed to serialize message.') |
63: | : serialize($message); |
64: | |
65: | $line = base64_encode($serialized) . "\n"; |
66: | |
67: | if (fwrite($stream, $line) !== strlen($line)) { |
68: | throw new \RuntimeException('Failed to write message to stream.'); |
69: | } |
70: | } |
71: | |
72: | |
73: | |
74: | |
75: | |
76: | public static function readMessage($stream): mixed |
77: | { |
78: | $line = fgets($stream); |
79: | |
80: | if ($line === false) { |
81: | return null; |
82: | } |
83: | |
84: | $serialized = base64_decode($line, strict: true); |
85: | |
86: | if ($serialized === false) { |
87: | throw new \RuntimeException('Failed to decode message.'); |
88: | } |
89: | |
90: | return extension_loaded('igbinary') |
91: | ? igbinary_unserialize($serialized) |
92: | : unserialize($serialized); |
93: | } |
94: | |
95: | |
96: | |
97: | |
98: | |
99: | |
100: | |
101: | |
102: | |
103: | |
104: | public static function workerLoop(TaskHandler $handler, $inputStream, $outputStream): void |
105: | { |
106: | while (($task = self::readMessage($inputStream)) !== null) { |
107: | $result = $handler->handle($task); |
108: | self::writeMessage($outputStream, [$task, $result]); |
109: | } |
110: | } |
111: | |
112: | |
113: | |
114: | |
115: | |
116: | public function schedule(Task $task): void |
117: | { |
118: | $this->tasks->enqueue($task); |
119: | $this->pendingTaskCount++; |
120: | } |
121: | |
122: | |
123: | |
124: | |
125: | |
126: | public function process(): iterable |
127: | { |
128: | try { |
129: | $this->start(); |
130: | |
131: | $idleWorkers = array_fill_keys(array_keys($this->workerWritableStreams), self::WORKER_CAPACITY_LIMIT); |
132: | |
133: | while ($this->pendingTaskCount > 0) { |
134: | while (count($idleWorkers) > 0 && !$this->tasks->isEmpty()) { |
135: | $idleWorkerId = array_key_first($idleWorkers); |
136: | $idleWorkerCapacity = $idleWorkers[$idleWorkerId]; |
137: | self::writeMessage($this->workerWritableStreams[$idleWorkerId], $this->tasks->dequeue()); |
138: | unset($idleWorkers[$idleWorkerId]); |
139: | |
140: | if ($idleWorkerCapacity > 1) { |
141: | $idleWorkers[$idleWorkerId] = $idleWorkerCapacity - 1; |
142: | } |
143: | } |
144: | |
145: | $readable = $this->workerReadableStreams; |
146: | $writable = null; |
147: | $except = null; |
148: | $changedCount = stream_select($readable, $writable, $except, null); |
149: | |
150: | if ($changedCount === false || $changedCount === 0) { |
151: | throw new \RuntimeException('stream_select() failed.'); |
152: | } |
153: | |
154: | foreach ($readable as $workerId => $stream) { |
155: | [$task, $result] = self::readMessage($stream) ?? throw new \RuntimeException('Failed to read message from worker.'); |
156: | $idleWorkers[$workerId] = ($idleWorkers[$workerId] ?? 0) + 1; |
157: | $this->pendingTaskCount--; |
158: | yield $task => $result; |
159: | } |
160: | } |
161: | |
162: | } finally { |
163: | $this->pendingTaskCount = 0; |
164: | $this->stop(); |
165: | } |
166: | } |
167: | |
168: | |
169: | abstract protected function start(): void; |
170: | |
171: | |
172: | abstract protected function stop(): void; |
173: | } |
174: | |