1: <?php declare(strict_types = 1);
2:
3: namespace ApiGen\Scheduler;
4:
5: use ApiGen\Scheduler;
6: use ApiGen\Task\Task;
7: use ApiGen\Task\TaskHandler;
8: use SplQueue;
9:
10: use function array_fill_keys;
11: use function array_key_first;
12: use function array_keys;
13: use function base64_decode;
14: use function base64_encode;
15: use function count;
16: use function extension_loaded;
17: use function fgets;
18: use function fwrite;
19: use function igbinary_serialize;
20: use function igbinary_unserialize;
21: use function serialize;
22: use function stream_select;
23: use function strlen;
24: use function unserialize;
25:
26:
27: /**
28: * @template TTask of Task
29: * @template TResult
30: * @implements Scheduler<TTask, TResult>
31: */
32: abstract class WorkerScheduler implements Scheduler
33: {
34: protected const WORKER_CAPACITY_LIMIT = 8;
35:
36: /** @var SplQueue<TTask> queue of tasks which needs to be sent to workers */
37: protected SplQueue $tasks;
38:
39: /** @var int total number of pending tasks (including those already sent to workers) */
40: protected int $pendingTaskCount = 0;
41:
42: /** @var resource[] indexed by [workerId] */
43: protected array $workerReadableStreams = [];
44:
45: /** @var resource[] indexed by [workerId] */
46: protected array $workerWritableStreams = [];
47:
48:
49: public function __construct(
50: protected int $workerCount,
51: ) {
52: $this->tasks = new SplQueue();
53: }
54:
55:
56: /**
57: * @param resource $stream
58: */
59: public static function writeMessage($stream, mixed $message): void
60: {
61: $serialized = extension_loaded('igbinary')
62: ? igbinary_serialize($message) ?? throw new \LogicException('Failed to serialize message.')
63: : serialize($message);
64:
65: $line = base64_encode($serialized) . "\n";
66:
67: if (fwrite($stream, $line) !== strlen($line)) {
68: throw new \RuntimeException('Failed to write message to stream.');
69: }
70: }
71:
72:
73: /**
74: * @param resource $stream
75: */
76: public static function readMessage($stream): mixed
77: {
78: $line = fgets($stream);
79:
80: if ($line === false) {
81: return null;
82: }
83:
84: $serialized = base64_decode($line, strict: true);
85:
86: if ($serialized === false) {
87: throw new \RuntimeException('Failed to decode message.');
88: }
89:
90: return extension_loaded('igbinary')
91: ? igbinary_unserialize($serialized)
92: : unserialize($serialized);
93: }
94:
95:
96: /**
97: * @template T2 of Task
98: * @template R2
99: *
100: * @param TaskHandler<T2, R2> $handler
101: * @param resource $inputStream
102: * @param resource $outputStream
103: */
104: public static function workerLoop(TaskHandler $handler, $inputStream, $outputStream): void
105: {
106: while (($task = self::readMessage($inputStream)) !== null) {
107: $result = $handler->handle($task);
108: self::writeMessage($outputStream, [$task, $result]);
109: }
110: }
111:
112:
113: /**
114: * @param TTask $task
115: */
116: public function schedule(Task $task): void
117: {
118: $this->tasks->enqueue($task);
119: $this->pendingTaskCount++;
120: }
121:
122:
123: /**
124: * @return iterable<TTask, TResult>
125: */
126: public function process(): iterable
127: {
128: try {
129: $this->start();
130:
131: $idleWorkers = array_fill_keys(array_keys($this->workerWritableStreams), self::WORKER_CAPACITY_LIMIT);
132:
133: while ($this->pendingTaskCount > 0) {
134: while (count($idleWorkers) > 0 && !$this->tasks->isEmpty()) {
135: $idleWorkerId = array_key_first($idleWorkers);
136: $idleWorkerCapacity = $idleWorkers[$idleWorkerId];
137: self::writeMessage($this->workerWritableStreams[$idleWorkerId], $this->tasks->dequeue());
138: unset($idleWorkers[$idleWorkerId]);
139:
140: if ($idleWorkerCapacity > 1) {
141: $idleWorkers[$idleWorkerId] = $idleWorkerCapacity - 1;
142: }
143: }
144:
145: $readable = $this->workerReadableStreams;
146: $writable = null;
147: $except = null;
148: $changedCount = stream_select($readable, $writable, $except, null);
149:
150: if ($changedCount === false || $changedCount === 0) {
151: throw new \RuntimeException('stream_select() failed.');
152: }
153:
154: foreach ($readable as $workerId => $stream) {
155: [$task, $result] = self::readMessage($stream) ?? throw new \RuntimeException('Failed to read message from worker.');
156: $idleWorkers[$workerId] = ($idleWorkers[$workerId] ?? 0) + 1;
157: $this->pendingTaskCount--;
158: yield $task => $result;
159: }
160: }
161:
162: } finally {
163: $this->pendingTaskCount = 0;
164: $this->stop();
165: }
166: }
167:
168:
169: abstract protected function start(): void;
170:
171:
172: abstract protected function stop(): void;
173: }
174: