WyriHaximus /
reactphp-child-process-messenger
| 1 | <?php |
||||||||
| 2 | |||||||||
| 3 | declare(strict_types=1); |
||||||||
| 4 | |||||||||
| 5 | namespace WyriHaximus\React\ChildProcess\Messenger; |
||||||||
| 6 | |||||||||
| 7 | use Exception; |
||||||||
| 8 | use React\ChildProcess\Process; |
||||||||
| 9 | use React\EventLoop\LoopInterface; |
||||||||
| 10 | use React\EventLoop\TimerInterface; |
||||||||
| 11 | use React\Promise; |
||||||||
| 12 | use React\Socket\ConnectionInterface; |
||||||||
| 13 | use React\Socket\Connector; |
||||||||
| 14 | use React\Socket\SocketServer; |
||||||||
| 15 | use RuntimeException; |
||||||||
| 16 | use Throwable; |
||||||||
| 17 | use WyriHaximus\FileDescriptors\Factory as FileDescriptorsFactory; |
||||||||
| 18 | use WyriHaximus\FileDescriptors\ListerInterface; |
||||||||
| 19 | use WyriHaximus\React\ChildProcess\Messenger\ChildProcess\ArgvEncoder; |
||||||||
| 20 | use WyriHaximus\React\ChildProcess\Messenger\ChildProcess\Options; |
||||||||
| 21 | use WyriHaximus\React\ChildProcess\Messenger\Messages\Factory as MessengesFactory; |
||||||||
| 22 | use WyriHaximus\React\ChildProcess\Messenger\Messages\Payload; |
||||||||
| 23 | |||||||||
| 24 | use function array_key_exists; |
||||||||
| 25 | use function bin2hex; |
||||||||
| 26 | use function dirname; |
||||||||
| 27 | use function escapeshellarg; |
||||||||
| 28 | use function explode; |
||||||||
| 29 | use function hash_hmac; |
||||||||
| 30 | use function is_subclass_of; |
||||||||
| 31 | use function random_bytes; |
||||||||
| 32 | use function Safe\sprintf; |
||||||||
| 33 | use function WyriHaximus\React\futurePromise; |
||||||||
| 34 | |||||||||
| 35 | use const DIRECTORY_SEPARATOR; |
||||||||
| 36 | use const PHP_BINARY; |
||||||||
| 37 | use const PHP_EOL; |
||||||||
| 38 | use const PHP_SAPI; |
||||||||
| 39 | |||||||||
| 40 | final class Factory |
||||||||
| 41 | { |
||||||||
| 42 | public const INTERVAL = 0.1; |
||||||||
| 43 | public const TIMEOUT = 13; |
||||||||
| 44 | public const TERMINATE_TIMEOUT = 1; |
||||||||
| 45 | 2 | public const PROCESS_REGISTER = 'wyrihaximus.react.child-process.messenger.child.register'; |
|||||||
| 46 | public const DEFAULT_CONNECT_TIMEOUT = 15; |
||||||||
| 47 | |||||||||
| 48 | /** |
||||||||
| 49 | * @param array<mixed> $options |
||||||||
| 50 | 2 | */ |
|||||||
| 51 | 1 | public static function parent( |
|||||||
| 52 | Process $process, |
||||||||
| 53 | LoopInterface $loop, |
||||||||
| 54 | array $options = [] |
||||||||
| 55 | 1 | ): Promise\PromiseInterface { |
|||||||
| 56 | 1 | return new Promise\Promise(static function (callable $resolve, callable $reject) use ($process, $loop, $options): void { |
|||||||
| 57 | 1 | $server = new SocketServer('127.0.0.1:0', [], $loop); |
|||||||
| 58 | |||||||||
| 59 | 1 | $options['random'] = bin2hex(random_bytes(32)); |
|||||||
| 60 | 1 | $options['address'] = (string) $server->getAddress(); |
|||||||
| 61 | $options = new Options($options['random'], $options['address'], $options['connect-timeout'] ?? self::DEFAULT_CONNECT_TIMEOUT); |
||||||||
| 62 | $argvString = escapeshellarg(ArgvEncoder::encode($options)); |
||||||||
| 63 | $process = new Process($process->getCommand() . ' ' . $argvString); |
||||||||
| 64 | |||||||||
| 65 | 1 | self::startParent($process, $server, $loop, $options)->then($resolve, $reject); |
|||||||
| 66 | 1 | }); |
|||||||
| 67 | 1 | } |
|||||||
| 68 | 1 | ||||||||
| 69 | 1 | /** |
|||||||
| 70 | 1 | * @param array<mixed> $options |
|||||||
| 71 | 1 | * |
|||||||
| 72 | 1 | * @return Promise\PromiseInterface<Messenger> |
|||||||
| 73 | * |
||||||||
| 74 | * @psalm-suppress TooManyTemplateParams |
||||||||
| 75 | * @phpstan-ignore-next-line |
||||||||
| 76 | */ |
||||||||
| 77 | 1 | public static function parentFromClass( |
|||||||
| 78 | 1 | string $class, |
|||||||
| 79 | LoopInterface $loop, |
||||||||
| 80 | 1 | array $options = [] |
|||||||
| 81 | 1 | ): Promise\PromiseInterface { |
|||||||
| 82 | 1 | if (! is_subclass_of($class, ChildInterface::class)) { |
|||||||
| 83 | 1 | /** @phpstan-ignore-next-line */ |
|||||||
| 84 | throw new Exception('Given class doesn\'t implement ChildInterface'); |
||||||||
| 85 | } |
||||||||
| 86 | |||||||||
| 87 | return new Promise\Promise(static function (callable $resolve, callable $reject) use ($class, $loop, $options): void { |
||||||||
| 88 | $template = '%s'; |
||||||||
| 89 | if (array_key_exists('cmdTemplate', $options)) { |
||||||||
| 90 | $template = $options['cmdTemplate']; |
||||||||
| 91 | unset($options['cmdTemplate']); |
||||||||
| 92 | } |
||||||||
| 93 | |||||||||
| 94 | $fds = []; |
||||||||
| 95 | if (StaticConfig::shouldListFileDescriptors() && DIRECTORY_SEPARATOR !== '\\') { |
||||||||
| 96 | if (array_key_exists('fileDescriptorLister', $options) && $options['fileDescriptorLister'] instanceof ListerInterface) { |
||||||||
| 97 | $fileDescriptorLister = $options['fileDescriptorLister']; |
||||||||
| 98 | unset($options['fileDescriptorLister']); |
||||||||
| 99 | } else { |
||||||||
| 100 | $fileDescriptorLister = FileDescriptorsFactory::create(); |
||||||||
| 101 | } |
||||||||
| 102 | |||||||||
| 103 | foreach ($fileDescriptorLister->list() as $id) { |
||||||||
| 104 | $fds[(int) $id] = ['file', '/dev/null', 'r']; |
||||||||
| 105 | } |
||||||||
| 106 | } |
||||||||
| 107 | |||||||||
| 108 | $server = new SocketServer('127.0.0.1:0', [], $loop); |
||||||||
| 109 | $connectTimeout = $options['connect-timeout'] ?? self::DEFAULT_CONNECT_TIMEOUT; |
||||||||
| 110 | $options = new Options(bin2hex(random_bytes(32)), (string) $server->getAddress(), $options['connect-timeout'] ?? self::DEFAULT_CONNECT_TIMEOUT); |
||||||||
| 111 | |||||||||
| 112 | $phpBinary = escapeshellarg(PHP_BINARY . (PHP_SAPI === 'phpdbg' ? ' -qrr --' : '')); |
||||||||
| 113 | $childProcessPath = escapeshellarg(dirname(__DIR__) . DIRECTORY_SEPARATOR . 'bin' . DIRECTORY_SEPARATOR . 'child-process'); |
||||||||
| 114 | $argvString = escapeshellarg(ArgvEncoder::encode($options)); |
||||||||
| 115 | $command = $phpBinary . ' ' . $childProcessPath; |
||||||||
| 116 | |||||||||
| 117 | $process = new Process( |
||||||||
| 118 | sprintf( |
||||||||
| 119 | $template, |
||||||||
| 120 | $command . ' ' . $argvString |
||||||||
| 121 | ), |
||||||||
| 122 | null, |
||||||||
| 123 | null, |
||||||||
| 124 | $fds |
||||||||
| 125 | ); |
||||||||
| 126 | |||||||||
| 127 | futurePromise($loop)->then(static function () use ($process, $server, $loop, $options, $connectTimeout): Promise\PromiseInterface { |
||||||||
| 128 | return Promise\Timer\timeout(self::startParent($process, $server, $loop, $options), $connectTimeout, $loop); |
||||||||
|
0 ignored issues
–
show
Bug
introduced
by
Loading history...
|
|||||||||
| 129 | })->then(static function (Messenger $messenger) use ($class): Promise\PromiseInterface { |
||||||||
| 130 | return $messenger->rpc(MessengesFactory::rpc(Factory::PROCESS_REGISTER, ['className' => $class]))->then(static function () use ($messenger): Promise\PromiseInterface { |
||||||||
| 131 | return Promise\resolve($messenger); |
||||||||
|
0 ignored issues
–
show
The function
resolve was not found. Maybe you did not declare it correctly or list all dependencies?
(
Ignorable by Annotation
)
If this is a false-positive, you can also ignore this issue in your code via the
Loading history...
|
|||||||||
| 132 | }); |
||||||||
| 133 | })->then($resolve, $reject); |
||||||||
| 134 | }); |
||||||||
| 135 | } |
||||||||
| 136 | 1 | ||||||||
| 137 | /** |
||||||||
| 138 | * @phpstan-ignore-next-line |
||||||||
| 139 | */ |
||||||||
| 140 | public static function child(LoopInterface $loop, Options $options, ?callable $termiteCallable = null): Promise\PromiseInterface |
||||||||
| 141 | { |
||||||||
| 142 | $connectTimeout = $options->connectTimeout(); |
||||||||
| 143 | 1 | ||||||||
| 144 | 1 | return (new Connector($loop, ['timeout' => $connectTimeout]))->connect($options->address())->then(static function (ConnectionInterface $connection) use ($options, $loop, $connectTimeout): Promise\PromiseInterface { |
|||||||
| 145 | return new Promise\Promise(static function (callable $resolve, callable $reject) use ($connection, $options, $loop, $connectTimeout): void { |
||||||||
| 146 | Promise\Timer\timeout(Promise\Stream\first($connection), $connectTimeout, $loop)->then(static function (string $chunk) use ($resolve, $reject, $connection, $options, $loop): void { |
||||||||
|
0 ignored issues
–
show
The function
first was not found. Maybe you did not declare it correctly or list all dependencies?
(
Ignorable by Annotation
)
If this is a false-positive, you can also ignore this issue in your code via the
Loading history...
The function
timeout was not found. Maybe you did not declare it correctly or list all dependencies?
(
Ignorable by Annotation
)
If this is a false-positive, you can also ignore this issue in your code via the
Loading history...
|
|||||||||
| 147 | 1 | [$confirmation] = explode(PHP_EOL, $chunk); |
|||||||
| 148 | 1 | if ($confirmation === 'syn') { |
|||||||
| 149 | 1 | $connection->write('ack' . PHP_EOL); |
|||||||
| 150 | 1 | $resolve(new Messenger($connection, $options)); |
|||||||
| 151 | $connection->on('close', [$loop, 'stop']); |
||||||||
| 152 | 1 | $connection->on('error', [$loop, 'stop']); |
|||||||
| 153 | |||||||||
| 154 | return; |
||||||||
| 155 | 1 | } |
|||||||
| 156 | 1 | ||||||||
| 157 | 1 | $reject(new RuntimeException('Handshake SYN failed')); |
|||||||
| 158 | }, $reject); |
||||||||
| 159 | 1 | $connection->write(hash_hmac('sha512', $options->address(), $options->random()) . PHP_EOL); |
|||||||
| 160 | 1 | }); |
|||||||
| 161 | })->then(static function (Messenger $messenger) use ($loop, $termiteCallable): Messenger { |
||||||||
| 162 | if ($termiteCallable === null) { |
||||||||
| 163 | $termiteCallable = static function () use ($loop): void { |
||||||||
| 164 | 1 | $loop->addTimer( |
|||||||
| 165 | self::TERMINATE_TIMEOUT, |
||||||||
| 166 | 1 | [ |
|||||||
| 167 | $loop, |
||||||||
| 168 | 1 | 'stop', |
|||||||
| 169 | 1 | ] |
|||||||
| 170 | 1 | ); |
|||||||
| 171 | }; |
||||||||
| 172 | 1 | } |
|||||||
| 173 | 1 | ||||||||
| 174 | 1 | $messenger->registerRpc( |
|||||||
| 175 | Messenger::TERMINATE_RPC, |
||||||||
| 176 | static function (Payload $payload, Messenger $messenger) use ($termiteCallable): Promise\PromiseInterface { |
||||||||
| 177 | $messenger->emit('terminate', [$messenger]); |
||||||||
| 178 | $termiteCallable(); |
||||||||
| 179 | 1 | ||||||||
| 180 | return Promise\resolve([]); |
||||||||
|
0 ignored issues
–
show
The function
resolve was not found. Maybe you did not declare it correctly or list all dependencies?
(
Ignorable by Annotation
)
If this is a false-positive, you can also ignore this issue in your code via the
Loading history...
|
|||||||||
| 181 | 1 | } |
|||||||
| 182 | 1 | ); |
|||||||
| 183 | |||||||||
| 184 | return $messenger; |
||||||||
| 185 | }); |
||||||||
| 186 | } |
||||||||
| 187 | |||||||||
| 188 | private static function startParent( |
||||||||
| 189 | Process $process, |
||||||||
| 190 | SocketServer $server, |
||||||||
| 191 | LoopInterface $loop, |
||||||||
| 192 | Options $options |
||||||||
| 193 | ): Promise\PromiseInterface { |
||||||||
| 194 | return (new Promise\Promise(static function (callable $resolve, callable $reject) use ($process, $server, $loop, $options): void { |
||||||||
| 195 | $server->on( |
||||||||
| 196 | 'connection', |
||||||||
| 197 | static function (ConnectionInterface $connection) use ($server, $resolve, $reject, $options): void { |
||||||||
| 198 | Promise\Stream\first($connection)->then(static function (string $chunk) use ($options, $connection): Promise\PromiseInterface { |
||||||||
|
0 ignored issues
–
show
The function
first was not found. Maybe you did not declare it correctly or list all dependencies?
(
Ignorable by Annotation
)
If this is a false-positive, you can also ignore this issue in your code via the
Loading history...
|
|||||||||
| 199 | [$confirmation] = explode(PHP_EOL, $chunk); |
||||||||
| 200 | if ($confirmation === hash_hmac('sha512', $options->address(), $options->random())) { |
||||||||
| 201 | $connection->write('syn' . PHP_EOL); |
||||||||
| 202 | |||||||||
| 203 | return Promise\Stream\first($connection); |
||||||||
|
0 ignored issues
–
show
The function
first was not found. Maybe you did not declare it correctly or list all dependencies?
(
Ignorable by Annotation
)
If this is a false-positive, you can also ignore this issue in your code via the
Loading history...
|
|||||||||
| 204 | } |
||||||||
| 205 | |||||||||
| 206 | return Promise\reject(new RuntimeException('Signature mismatch')); |
||||||||
|
0 ignored issues
–
show
The function
reject was not found. Maybe you did not declare it correctly or list all dependencies?
(
Ignorable by Annotation
)
If this is a false-positive, you can also ignore this issue in your code via the
Loading history...
|
|||||||||
| 207 | })->then(static function (string $chunk) use ($options, $connection): Promise\PromiseInterface { |
||||||||
| 208 | [$confirmation] = explode(PHP_EOL, $chunk); |
||||||||
| 209 | if ($confirmation === 'ack') { |
||||||||
| 210 | return Promise\resolve(new Messenger($connection, $options)); |
||||||||
|
0 ignored issues
–
show
The function
resolve was not found. Maybe you did not declare it correctly or list all dependencies?
(
Ignorable by Annotation
)
If this is a false-positive, you can also ignore this issue in your code via the
Loading history...
|
|||||||||
| 211 | } |
||||||||
| 212 | |||||||||
| 213 | return Promise\reject(new RuntimeException('Handshake failed')); |
||||||||
|
0 ignored issues
–
show
The function
reject was not found. Maybe you did not declare it correctly or list all dependencies?
(
Ignorable by Annotation
)
If this is a false-positive, you can also ignore this issue in your code via the
Loading history...
|
|||||||||
| 214 | })->then(static function (MessengerInterface $messenger) use ($server, $resolve): void { |
||||||||
| 215 | $server->close(); |
||||||||
| 216 | $resolve($messenger); |
||||||||
| 217 | }, static function (Throwable $throwable) use ($server, $reject): void { |
||||||||
| 218 | $server->close(); |
||||||||
| 219 | $reject($throwable); |
||||||||
| 220 | }); |
||||||||
| 221 | } |
||||||||
| 222 | ); |
||||||||
| 223 | $server->on('error', static function (Throwable $et) use ($reject): void { |
||||||||
| 224 | $reject($et); |
||||||||
| 225 | }); |
||||||||
| 226 | |||||||||
| 227 | $process->start($loop); |
||||||||
| 228 | }, static function () use ($server, $process): void { |
||||||||
| 229 | $server->close(); |
||||||||
| 230 | $process->terminate(); |
||||||||
| 231 | }))->then(static function (Messenger $messenger) use ($loop, $process): Messenger { |
||||||||
| 232 | $loop->addPeriodicTimer(self::INTERVAL, static function (TimerInterface $timer) use ($messenger, $loop, $process): void { |
||||||||
| 233 | if ($process->isRunning()) { |
||||||||
| 234 | return; |
||||||||
| 235 | } |
||||||||
| 236 | |||||||||
| 237 | $loop->cancelTimer($timer); |
||||||||
| 238 | |||||||||
| 239 | $exitCode = $process->getExitCode(); |
||||||||
| 240 | if ($exitCode === 0) { |
||||||||
| 241 | return; |
||||||||
| 242 | } |
||||||||
| 243 | |||||||||
| 244 | if ($exitCode === null) { |
||||||||
| 245 | return; |
||||||||
| 246 | } |
||||||||
| 247 | |||||||||
| 248 | $messenger->crashed($exitCode); |
||||||||
| 249 | }); |
||||||||
| 250 | |||||||||
| 251 | return $messenger; |
||||||||
| 252 | }); |
||||||||
| 253 | } |
||||||||
| 254 | } |
||||||||
| 255 |