1 | <?php |
||
2 | |||
3 | declare(strict_types=1); |
||
4 | |||
5 | namespace WyriHaximus\React\ChildProcess\Messenger; |
||
6 | |||
7 | use Evenement\EventEmitterInterface; |
||
8 | use Evenement\EventEmitterTrait; |
||
9 | use React\Promise\PromiseInterface; |
||
10 | use React\Socket\ConnectionInterface; |
||
11 | use Throwable; |
||
12 | use WyriHaximus\React\ChildProcess\Messenger\ChildProcess\Options; |
||
13 | use WyriHaximus\React\ChildProcess\Messenger\Messages\ActionableMessageInterface; |
||
14 | use WyriHaximus\React\ChildProcess\Messenger\Messages\Error; |
||
15 | use WyriHaximus\React\ChildProcess\Messenger\Messages\Factory as MessagesFactory; |
||
16 | use WyriHaximus\React\ChildProcess\Messenger\Messages\Line; |
||
17 | use WyriHaximus\React\ChildProcess\Messenger\Messages\LineInterface; |
||
18 | use WyriHaximus\React\ChildProcess\Messenger\Messages\Message; |
||
19 | use WyriHaximus\React\ChildProcess\Messenger\Messages\Payload; |
||
20 | use WyriHaximus\React\ChildProcess\Messenger\Messages\Rpc; |
||
21 | |||
22 | use function array_key_exists; |
||
23 | use function array_pop; |
||
24 | use function count; |
||
25 | use function explode; |
||
26 | use function React\Promise\reject; |
||
27 | use function strpos; |
||
28 | |||
29 | final class Messenger implements MessengerInterface, EventEmitterInterface |
||
30 | { |
||
31 | use EventEmitterTrait; |
||
32 | |||
33 | public const INTERVAL = 0.1; |
||
34 | public const TERMINATE_RPC = 'wyrihaximus.react.child-process.messenger.terminate'; |
||
35 | |||
36 | protected ConnectionInterface $connection; |
||
37 | |||
38 | protected OutstandingCalls $outstandingRpcCalls; |
||
39 | |||
40 | /** @var array<mixed> */ |
||
41 | protected array $rpcs = []; |
||
42 | |||
43 | /** @var array<mixed> */ |
||
44 | protected array $options = []; |
||
45 | |||
46 | protected string $buffer = ''; |
||
47 | |||
48 | /** @var array<string, class-string|array<mixed>> */ |
||
0 ignored issues
–
show
Documentation
Bug
introduced
by
![]() |
|||
49 | protected array $defaultOptions = [ |
||
50 | 'lineClass' => Line::class, |
||
51 | 'messageFactoryClass' => MessagesFactory::class, |
||
52 | 'lineOptions' => [], |
||
53 | ]; |
||
54 | |||
55 | public function __construct( |
||
56 | ConnectionInterface $connection, |
||
57 | 7 | Options $options |
|
58 | ) { |
||
59 | $this->connection = $connection; |
||
60 | |||
61 | 7 | /** |
|
62 | * @psalm-suppress InvalidMethodCall |
||
63 | 7 | */ |
|
64 | $this->options = $this->defaultOptions + $options->toArray(); |
||
65 | 7 | ||
66 | $this->outstandingRpcCalls = new OutstandingCalls(); |
||
67 | |||
68 | 2 | $this->connection->on('data', function (string $data): void { |
|
69 | 2 | $this->buffer .= $data; |
|
70 | 2 | $this->emit('data', [$data]); |
|
71 | 7 | $this->handleData(); |
|
72 | }); |
||
73 | 1 | $this->connection->on('close', function (): void { |
|
74 | 1 | $calls = $this->outstandingRpcCalls->getCalls(); |
|
75 | 1 | if (count($calls) === 0) { |
|
76 | return; |
||
77 | } |
||
78 | |||
79 | $error = new CommunicationWithProcessUnexpectedEndException(); |
||
80 | $this->emit('error', [$error, $this]); |
||
81 | foreach ($calls as $call) { |
||
82 | $call->reject($error); |
||
83 | 7 | } |
|
84 | 7 | }); |
|
85 | } |
||
86 | |||
87 | public function registerRpc(string $target, callable $listener): void |
||
88 | { |
||
89 | $this->rpcs[$target] = $listener; |
||
90 | 1 | } |
|
91 | |||
92 | 1 | public function deregisterRpc(string $target): void |
|
93 | 1 | { |
|
94 | unset($this->rpcs[$target]); |
||
95 | } |
||
96 | |||
97 | public function hasRpc(string $target): bool |
||
98 | 1 | { |
|
99 | return array_key_exists($target, $this->rpcs); |
||
100 | 1 | } |
|
101 | 1 | ||
102 | public function callRpc(string $target, Payload $payload): PromiseInterface |
||
103 | { |
||
104 | try { |
||
105 | return $this->rpcs[$target]($payload, $this); |
||
106 | |||
107 | 1 | /** @phpstan-ignore-next-line */ |
|
108 | } catch (Throwable $exception) { |
||
109 | 1 | return reject($exception); |
|
110 | } |
||
111 | } |
||
112 | |||
113 | public function message(Message $message): void |
||
114 | { |
||
115 | $this->write($this->createLine($message)); |
||
116 | } |
||
117 | 1 | ||
118 | public function error(Error $error): void |
||
119 | { |
||
120 | 1 | $this->write($this->createLine($error)); |
|
121 | 1 | } |
|
122 | |||
123 | public function getOutstandingCall(string $uniqid): OutstandingCallInterface |
||
124 | { |
||
125 | 1 | return $this->outstandingRpcCalls->getCall($uniqid); |
|
126 | 1 | } |
|
127 | 1 | ||
128 | public function rpc(Rpc $rpc): PromiseInterface |
||
129 | { |
||
130 | $callReference = $this->outstandingRpcCalls->newCall(function (): void {}); // phpcs:disabled |
||
131 | |||
132 | $this->write($this->createLine($rpc->setUniqid($callReference->getUniqid()))); |
||
133 | |||
134 | return $callReference->getDeferred()->promise(); |
||
135 | } |
||
136 | 1 | ||
137 | public function createLine(ActionableMessageInterface $line): string |
||
138 | 1 | { |
|
139 | 1 | $lineCLass = $this->options['lineClass']; |
|
140 | |||
141 | /** |
||
142 | * @psalm-suppress InvalidCast |
||
143 | */ |
||
144 | 1 | return (string) new $lineCLass($line, $this->options['lineOptions']); |
|
145 | } |
||
146 | 1 | ||
147 | 1 | public function softTerminate(): PromiseInterface |
|
148 | { |
||
149 | return $this->rpc(MessagesFactory::rpc(self::TERMINATE_RPC)); |
||
150 | } |
||
151 | |||
152 | public function write(string $line): void |
||
153 | 1 | { |
|
154 | $this->connection->write($line); |
||
155 | 1 | } |
|
156 | |||
157 | /** |
||
158 | * @internal |
||
159 | */ |
||
160 | public function crashed(int $exitCode): void |
||
161 | { |
||
162 | $this->emit('error', [new ProcessUnexpectedEndException($exitCode), $this]); |
||
163 | } |
||
164 | 2 | ||
165 | 2 | private function handleData(): void |
|
166 | { |
||
167 | 2 | if (strpos($this->buffer, LineInterface::EOL) === false) { |
|
168 | return; |
||
169 | 2 | } |
|
170 | |||
171 | $messages = explode(LineInterface::EOL, $this->buffer); |
||
172 | $this->buffer = array_pop($messages); |
||
173 | $this->iterateMessages($messages); |
||
174 | } |
||
175 | |||
176 | 4 | /** |
|
177 | * @param array<string> $messages |
||
178 | 4 | */ |
|
179 | private function iterateMessages(array $messages): void |
||
180 | 4 | { |
|
181 | foreach ($messages as $message) { |
||
182 | try { |
||
183 | MessagesFactory::fromLine($message, [])->handle($this, 'source'); |
||
184 | /** @phpstan-ignore-next-line */ |
||
185 | } catch (Throwable $exception) { |
||
186 | 1 | $this->emit('error', [$exception, $this]); |
|
187 | } |
||
188 | 1 | } |
|
189 | } |
||
190 | } |
||
191 |