1 | <?php |
||||||||
2 | |||||||||
3 | declare(strict_types=1); |
||||||||
4 | |||||||||
5 | namespace WyriHaximus\React\ChildProcess\Messenger; |
||||||||
6 | |||||||||
7 | use Exception; |
||||||||
8 | use React\ChildProcess\Process; |
||||||||
9 | use React\EventLoop\LoopInterface; |
||||||||
10 | use React\EventLoop\TimerInterface; |
||||||||
11 | use React\Promise; |
||||||||
12 | use React\Socket\ConnectionInterface; |
||||||||
13 | use React\Socket\Connector; |
||||||||
14 | use React\Socket\SocketServer; |
||||||||
15 | use RuntimeException; |
||||||||
16 | use Throwable; |
||||||||
17 | use WyriHaximus\FileDescriptors\Factory as FileDescriptorsFactory; |
||||||||
18 | use WyriHaximus\FileDescriptors\ListerInterface; |
||||||||
19 | use WyriHaximus\React\ChildProcess\Messenger\ChildProcess\ArgvEncoder; |
||||||||
20 | use WyriHaximus\React\ChildProcess\Messenger\ChildProcess\Options; |
||||||||
21 | use WyriHaximus\React\ChildProcess\Messenger\Messages\Factory as MessengesFactory; |
||||||||
22 | use WyriHaximus\React\ChildProcess\Messenger\Messages\Payload; |
||||||||
23 | |||||||||
24 | use function array_key_exists; |
||||||||
25 | use function bin2hex; |
||||||||
26 | use function dirname; |
||||||||
27 | use function escapeshellarg; |
||||||||
28 | use function explode; |
||||||||
29 | use function hash_hmac; |
||||||||
30 | use function is_subclass_of; |
||||||||
31 | use function random_bytes; |
||||||||
32 | use function Safe\sprintf; |
||||||||
33 | use function WyriHaximus\React\futurePromise; |
||||||||
34 | |||||||||
35 | use const DIRECTORY_SEPARATOR; |
||||||||
36 | use const PHP_BINARY; |
||||||||
37 | use const PHP_EOL; |
||||||||
38 | use const PHP_SAPI; |
||||||||
39 | |||||||||
40 | final class Factory |
||||||||
41 | { |
||||||||
42 | public const INTERVAL = 0.1; |
||||||||
43 | public const TIMEOUT = 13; |
||||||||
44 | public const TERMINATE_TIMEOUT = 1; |
||||||||
45 | 2 | public const PROCESS_REGISTER = 'wyrihaximus.react.child-process.messenger.child.register'; |
|||||||
46 | public const DEFAULT_CONNECT_TIMEOUT = 15; |
||||||||
47 | |||||||||
48 | /** |
||||||||
49 | * @param array<mixed> $options |
||||||||
50 | 2 | */ |
|||||||
51 | 1 | public static function parent( |
|||||||
52 | Process $process, |
||||||||
53 | LoopInterface $loop, |
||||||||
54 | array $options = [] |
||||||||
55 | 1 | ): Promise\PromiseInterface { |
|||||||
56 | 1 | return new Promise\Promise(static function (callable $resolve, callable $reject) use ($process, $loop, $options): void { |
|||||||
57 | 1 | $server = new SocketServer('127.0.0.1:0', [], $loop); |
|||||||
58 | |||||||||
59 | 1 | $options['random'] = bin2hex(random_bytes(32)); |
|||||||
60 | 1 | $options['address'] = (string) $server->getAddress(); |
|||||||
61 | $options = new Options($options['random'], $options['address'], $options['connect-timeout'] ?? self::DEFAULT_CONNECT_TIMEOUT); |
||||||||
62 | $argvString = escapeshellarg(ArgvEncoder::encode($options)); |
||||||||
63 | $process = new Process($process->getCommand() . ' ' . $argvString); |
||||||||
64 | |||||||||
65 | 1 | self::startParent($process, $server, $loop, $options)->then($resolve, $reject); |
|||||||
66 | 1 | }); |
|||||||
67 | 1 | } |
|||||||
68 | 1 | ||||||||
69 | 1 | /** |
|||||||
70 | 1 | * @param array<mixed> $options |
|||||||
71 | 1 | * |
|||||||
72 | 1 | * @return Promise\PromiseInterface<Messenger> |
|||||||
73 | * |
||||||||
74 | * @psalm-suppress TooManyTemplateParams |
||||||||
75 | * @phpstan-ignore-next-line |
||||||||
76 | */ |
||||||||
77 | 1 | public static function parentFromClass( |
|||||||
78 | 1 | string $class, |
|||||||
79 | LoopInterface $loop, |
||||||||
80 | 1 | array $options = [] |
|||||||
81 | 1 | ): Promise\PromiseInterface { |
|||||||
82 | 1 | if (! is_subclass_of($class, ChildInterface::class)) { |
|||||||
83 | 1 | /** @phpstan-ignore-next-line */ |
|||||||
84 | throw new Exception('Given class doesn\'t implement ChildInterface'); |
||||||||
85 | } |
||||||||
86 | |||||||||
87 | return new Promise\Promise(static function (callable $resolve, callable $reject) use ($class, $loop, $options): void { |
||||||||
88 | $template = '%s'; |
||||||||
89 | if (array_key_exists('cmdTemplate', $options)) { |
||||||||
90 | $template = $options['cmdTemplate']; |
||||||||
91 | unset($options['cmdTemplate']); |
||||||||
92 | } |
||||||||
93 | |||||||||
94 | $fds = []; |
||||||||
95 | if (StaticConfig::shouldListFileDescriptors() && DIRECTORY_SEPARATOR !== '\\') { |
||||||||
96 | if (array_key_exists('fileDescriptorLister', $options) && $options['fileDescriptorLister'] instanceof ListerInterface) { |
||||||||
97 | $fileDescriptorLister = $options['fileDescriptorLister']; |
||||||||
98 | unset($options['fileDescriptorLister']); |
||||||||
99 | } else { |
||||||||
100 | $fileDescriptorLister = FileDescriptorsFactory::create(); |
||||||||
101 | } |
||||||||
102 | |||||||||
103 | foreach ($fileDescriptorLister->list() as $id) { |
||||||||
104 | $fds[(int) $id] = ['file', '/dev/null', 'r']; |
||||||||
105 | } |
||||||||
106 | } |
||||||||
107 | |||||||||
108 | $server = new SocketServer('127.0.0.1:0', [], $loop); |
||||||||
109 | $connectTimeout = $options['connect-timeout'] ?? self::DEFAULT_CONNECT_TIMEOUT; |
||||||||
110 | $options = new Options(bin2hex(random_bytes(32)), (string) $server->getAddress(), $options['connect-timeout'] ?? self::DEFAULT_CONNECT_TIMEOUT); |
||||||||
111 | |||||||||
112 | $phpBinary = escapeshellarg(PHP_BINARY . (PHP_SAPI === 'phpdbg' ? ' -qrr --' : '')); |
||||||||
113 | $childProcessPath = escapeshellarg(dirname(__DIR__) . DIRECTORY_SEPARATOR . 'bin' . DIRECTORY_SEPARATOR . 'child-process'); |
||||||||
114 | $argvString = escapeshellarg(ArgvEncoder::encode($options)); |
||||||||
115 | $command = $phpBinary . ' ' . $childProcessPath; |
||||||||
116 | |||||||||
117 | $process = new Process( |
||||||||
118 | sprintf( |
||||||||
119 | $template, |
||||||||
120 | $command . ' ' . $argvString |
||||||||
121 | ), |
||||||||
122 | null, |
||||||||
123 | null, |
||||||||
124 | $fds |
||||||||
125 | ); |
||||||||
126 | |||||||||
127 | futurePromise($loop)->then(static function () use ($process, $server, $loop, $options, $connectTimeout): Promise\PromiseInterface { |
||||||||
128 | return Promise\Timer\timeout(self::startParent($process, $server, $loop, $options), $connectTimeout, $loop); |
||||||||
0 ignored issues
–
show
Bug
introduced
by
![]() |
|||||||||
129 | })->then(static function (Messenger $messenger) use ($class): Promise\PromiseInterface { |
||||||||
130 | return $messenger->rpc(MessengesFactory::rpc(Factory::PROCESS_REGISTER, ['className' => $class]))->then(static function () use ($messenger): Promise\PromiseInterface { |
||||||||
131 | return Promise\resolve($messenger); |
||||||||
0 ignored issues
–
show
The function
resolve was not found. Maybe you did not declare it correctly or list all dependencies?
(
Ignorable by Annotation
)
If this is a false-positive, you can also ignore this issue in your code via the
![]() |
|||||||||
132 | }); |
||||||||
133 | })->then($resolve, $reject); |
||||||||
134 | }); |
||||||||
135 | } |
||||||||
136 | 1 | ||||||||
137 | /** |
||||||||
138 | * @phpstan-ignore-next-line |
||||||||
139 | */ |
||||||||
140 | public static function child(LoopInterface $loop, Options $options, ?callable $termiteCallable = null): Promise\PromiseInterface |
||||||||
141 | { |
||||||||
142 | $connectTimeout = $options->connectTimeout(); |
||||||||
143 | 1 | ||||||||
144 | 1 | return (new Connector($loop, ['timeout' => $connectTimeout]))->connect($options->address())->then(static function (ConnectionInterface $connection) use ($options, $loop, $connectTimeout): Promise\PromiseInterface { |
|||||||
145 | return new Promise\Promise(static function (callable $resolve, callable $reject) use ($connection, $options, $loop, $connectTimeout): void { |
||||||||
146 | Promise\Timer\timeout(Promise\Stream\first($connection), $connectTimeout, $loop)->then(static function (string $chunk) use ($resolve, $reject, $connection, $options, $loop): void { |
||||||||
0 ignored issues
–
show
The function
first was not found. Maybe you did not declare it correctly or list all dependencies?
(
Ignorable by Annotation
)
If this is a false-positive, you can also ignore this issue in your code via the
![]() The function
timeout was not found. Maybe you did not declare it correctly or list all dependencies?
(
Ignorable by Annotation
)
If this is a false-positive, you can also ignore this issue in your code via the
![]() |
|||||||||
147 | 1 | [$confirmation] = explode(PHP_EOL, $chunk); |
|||||||
148 | 1 | if ($confirmation === 'syn') { |
|||||||
149 | 1 | $connection->write('ack' . PHP_EOL); |
|||||||
150 | 1 | $resolve(new Messenger($connection, $options)); |
|||||||
151 | $connection->on('close', [$loop, 'stop']); |
||||||||
152 | 1 | $connection->on('error', [$loop, 'stop']); |
|||||||
153 | |||||||||
154 | return; |
||||||||
155 | 1 | } |
|||||||
156 | 1 | ||||||||
157 | 1 | $reject(new RuntimeException('Handshake SYN failed')); |
|||||||
158 | }, $reject); |
||||||||
159 | 1 | $connection->write(hash_hmac('sha512', $options->address(), $options->random()) . PHP_EOL); |
|||||||
160 | 1 | }); |
|||||||
161 | })->then(static function (Messenger $messenger) use ($loop, $termiteCallable): Messenger { |
||||||||
162 | if ($termiteCallable === null) { |
||||||||
163 | $termiteCallable = static function () use ($loop): void { |
||||||||
164 | 1 | $loop->addTimer( |
|||||||
165 | self::TERMINATE_TIMEOUT, |
||||||||
166 | 1 | [ |
|||||||
167 | $loop, |
||||||||
168 | 1 | 'stop', |
|||||||
169 | 1 | ] |
|||||||
170 | 1 | ); |
|||||||
171 | }; |
||||||||
172 | 1 | } |
|||||||
173 | 1 | ||||||||
174 | 1 | $messenger->registerRpc( |
|||||||
175 | Messenger::TERMINATE_RPC, |
||||||||
176 | static function (Payload $payload, Messenger $messenger) use ($termiteCallable): Promise\PromiseInterface { |
||||||||
177 | $messenger->emit('terminate', [$messenger]); |
||||||||
178 | $termiteCallable(); |
||||||||
179 | 1 | ||||||||
180 | return Promise\resolve([]); |
||||||||
0 ignored issues
–
show
The function
resolve was not found. Maybe you did not declare it correctly or list all dependencies?
(
Ignorable by Annotation
)
If this is a false-positive, you can also ignore this issue in your code via the
![]() |
|||||||||
181 | 1 | } |
|||||||
182 | 1 | ); |
|||||||
183 | |||||||||
184 | return $messenger; |
||||||||
185 | }); |
||||||||
186 | } |
||||||||
187 | |||||||||
188 | private static function startParent( |
||||||||
189 | Process $process, |
||||||||
190 | SocketServer $server, |
||||||||
191 | LoopInterface $loop, |
||||||||
192 | Options $options |
||||||||
193 | ): Promise\PromiseInterface { |
||||||||
194 | return (new Promise\Promise(static function (callable $resolve, callable $reject) use ($process, $server, $loop, $options): void { |
||||||||
195 | $server->on( |
||||||||
196 | 'connection', |
||||||||
197 | static function (ConnectionInterface $connection) use ($server, $resolve, $reject, $options): void { |
||||||||
198 | Promise\Stream\first($connection)->then(static function (string $chunk) use ($options, $connection): Promise\PromiseInterface { |
||||||||
0 ignored issues
–
show
The function
first was not found. Maybe you did not declare it correctly or list all dependencies?
(
Ignorable by Annotation
)
If this is a false-positive, you can also ignore this issue in your code via the
![]() |
|||||||||
199 | [$confirmation] = explode(PHP_EOL, $chunk); |
||||||||
200 | if ($confirmation === hash_hmac('sha512', $options->address(), $options->random())) { |
||||||||
201 | $connection->write('syn' . PHP_EOL); |
||||||||
202 | |||||||||
203 | return Promise\Stream\first($connection); |
||||||||
0 ignored issues
–
show
The function
first was not found. Maybe you did not declare it correctly or list all dependencies?
(
Ignorable by Annotation
)
If this is a false-positive, you can also ignore this issue in your code via the
![]() |
|||||||||
204 | } |
||||||||
205 | |||||||||
206 | return Promise\reject(new RuntimeException('Signature mismatch')); |
||||||||
0 ignored issues
–
show
The function
reject was not found. Maybe you did not declare it correctly or list all dependencies?
(
Ignorable by Annotation
)
If this is a false-positive, you can also ignore this issue in your code via the
![]() |
|||||||||
207 | })->then(static function (string $chunk) use ($options, $connection): Promise\PromiseInterface { |
||||||||
208 | [$confirmation] = explode(PHP_EOL, $chunk); |
||||||||
209 | if ($confirmation === 'ack') { |
||||||||
210 | return Promise\resolve(new Messenger($connection, $options)); |
||||||||
0 ignored issues
–
show
The function
resolve was not found. Maybe you did not declare it correctly or list all dependencies?
(
Ignorable by Annotation
)
If this is a false-positive, you can also ignore this issue in your code via the
![]() |
|||||||||
211 | } |
||||||||
212 | |||||||||
213 | return Promise\reject(new RuntimeException('Handshake failed')); |
||||||||
0 ignored issues
–
show
The function
reject was not found. Maybe you did not declare it correctly or list all dependencies?
(
Ignorable by Annotation
)
If this is a false-positive, you can also ignore this issue in your code via the
![]() |
|||||||||
214 | })->then(static function (MessengerInterface $messenger) use ($server, $resolve): void { |
||||||||
215 | $server->close(); |
||||||||
216 | $resolve($messenger); |
||||||||
217 | }, static function (Throwable $throwable) use ($server, $reject): void { |
||||||||
218 | $server->close(); |
||||||||
219 | $reject($throwable); |
||||||||
220 | }); |
||||||||
221 | } |
||||||||
222 | ); |
||||||||
223 | $server->on('error', static function (Throwable $et) use ($reject): void { |
||||||||
224 | $reject($et); |
||||||||
225 | }); |
||||||||
226 | |||||||||
227 | $process->start($loop); |
||||||||
228 | }, static function () use ($server, $process): void { |
||||||||
229 | $server->close(); |
||||||||
230 | $process->terminate(); |
||||||||
231 | }))->then(static function (Messenger $messenger) use ($loop, $process): Messenger { |
||||||||
232 | $loop->addPeriodicTimer(self::INTERVAL, static function (TimerInterface $timer) use ($messenger, $loop, $process): void { |
||||||||
233 | if ($process->isRunning()) { |
||||||||
234 | return; |
||||||||
235 | } |
||||||||
236 | |||||||||
237 | $loop->cancelTimer($timer); |
||||||||
238 | |||||||||
239 | $exitCode = $process->getExitCode(); |
||||||||
240 | if ($exitCode === 0) { |
||||||||
241 | return; |
||||||||
242 | } |
||||||||
243 | |||||||||
244 | if ($exitCode === null) { |
||||||||
245 | return; |
||||||||
246 | } |
||||||||
247 | |||||||||
248 | $messenger->crashed($exitCode); |
||||||||
249 | }); |
||||||||
250 | |||||||||
251 | return $messenger; |
||||||||
252 | }); |
||||||||
253 | } |
||||||||
254 | } |
||||||||
255 |