GitHub Access Token became invalid

It seems like the GitHub access token used for retrieving details about this repository from GitHub became invalid. This might prevent certain types of inspections from being run (in particular, everything related to pull requests).
Please ask an admin of your repository to re-new the access token on this website.

Messenger::__construct()   A
last analyzed

Complexity

Conditions 3
Paths 1

Size

Total Lines 28
Code Lines 15

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 11
CRAP Score 3.1707

Importance

Changes 0
Metric Value
eloc 15
c 0
b 0
f 0
dl 0
loc 28
rs 9.7666
ccs 11
cts 15
cp 0.7332
cc 3
nc 1
nop 2
crap 3.1707
1
<?php
2
3
declare(strict_types=1);
4
5
namespace WyriHaximus\React\ChildProcess\Messenger;
6
7
use Evenement\EventEmitterInterface;
8
use Evenement\EventEmitterTrait;
9
use React\Promise\PromiseInterface;
10
use React\Socket\ConnectionInterface;
11
use Throwable;
12
use WyriHaximus\React\ChildProcess\Messenger\ChildProcess\Options;
13
use WyriHaximus\React\ChildProcess\Messenger\Messages\ActionableMessageInterface;
14
use WyriHaximus\React\ChildProcess\Messenger\Messages\Error;
15
use WyriHaximus\React\ChildProcess\Messenger\Messages\Factory as MessagesFactory;
16
use WyriHaximus\React\ChildProcess\Messenger\Messages\Line;
17
use WyriHaximus\React\ChildProcess\Messenger\Messages\LineInterface;
18
use WyriHaximus\React\ChildProcess\Messenger\Messages\Message;
19
use WyriHaximus\React\ChildProcess\Messenger\Messages\Payload;
20
use WyriHaximus\React\ChildProcess\Messenger\Messages\Rpc;
21
22
use function array_key_exists;
23
use function array_pop;
24
use function count;
25
use function explode;
26
use function React\Promise\reject;
27
use function strpos;
28
29
final class Messenger implements MessengerInterface, EventEmitterInterface
30
{
31
    use EventEmitterTrait;
32
33
    public const INTERVAL      = 0.1;
34
    public const TERMINATE_RPC = 'wyrihaximus.react.child-process.messenger.terminate';
35
36
    protected ConnectionInterface $connection;
37
38
    protected OutstandingCalls $outstandingRpcCalls;
39
40
    /** @var array<mixed> */
41
    protected array $rpcs = [];
42
43
    /** @var array<mixed> */
44
    protected array $options = [];
45
46
    protected string $buffer = '';
47
48
    /** @var array<string, class-string|array<mixed>> */
0 ignored issues
show
Documentation Bug introduced by
The doc comment array<string, class-string|array<mixed>> at position 4 could not be parsed: Unknown type name 'class-string' at position 4 in array<string, class-string|array<mixed>>.
Loading history...
49
    protected array $defaultOptions = [
50
        'lineClass' => Line::class,
51
        'messageFactoryClass' => MessagesFactory::class,
52
        'lineOptions' => [],
53
    ];
54
55
    public function __construct(
56
        ConnectionInterface $connection,
57 7
        Options $options
58
    ) {
59
        $this->connection = $connection;
60
61 7
        /**
62
         * @psalm-suppress InvalidMethodCall
63 7
         */
64
        $this->options = $this->defaultOptions + $options->toArray();
65 7
66
        $this->outstandingRpcCalls = new OutstandingCalls();
67
68 2
        $this->connection->on('data', function (string $data): void {
69 2
            $this->buffer .= $data;
70 2
            $this->emit('data', [$data]);
71 7
            $this->handleData();
72
        });
73 1
        $this->connection->on('close', function (): void {
74 1
            $calls = $this->outstandingRpcCalls->getCalls();
75 1
            if (count($calls) === 0) {
76
                return;
77
            }
78
79
            $error = new CommunicationWithProcessUnexpectedEndException();
80
            $this->emit('error', [$error, $this]);
81
            foreach ($calls as $call) {
82
                $call->reject($error);
83 7
            }
84 7
        });
85
    }
86
87
    public function registerRpc(string $target, callable $listener): void
88
    {
89
        $this->rpcs[$target] = $listener;
90 1
    }
91
92 1
    public function deregisterRpc(string $target): void
93 1
    {
94
        unset($this->rpcs[$target]);
95
    }
96
97
    public function hasRpc(string $target): bool
98 1
    {
99
        return array_key_exists($target, $this->rpcs);
100 1
    }
101 1
102
    public function callRpc(string $target, Payload $payload): PromiseInterface
103
    {
104
        try {
105
            return $this->rpcs[$target]($payload, $this);
106
107 1
            /** @phpstan-ignore-next-line  */
108
        } catch (Throwable $exception) {
109 1
            return reject($exception);
110
        }
111
    }
112
113
    public function message(Message $message): void
114
    {
115
        $this->write($this->createLine($message));
116
    }
117 1
118
    public function error(Error $error): void
119
    {
120 1
        $this->write($this->createLine($error));
121 1
    }
122
123
    public function getOutstandingCall(string $uniqid): OutstandingCallInterface
124
    {
125 1
        return $this->outstandingRpcCalls->getCall($uniqid);
126 1
    }
127 1
128
    public function rpc(Rpc $rpc): PromiseInterface
129
    {
130
        $callReference = $this->outstandingRpcCalls->newCall(function (): void {}); // phpcs:disabled
131
132
        $this->write($this->createLine($rpc->setUniqid($callReference->getUniqid())));
133
134
        return $callReference->getDeferred()->promise();
135
    }
136 1
137
    public function createLine(ActionableMessageInterface $line): string
138 1
    {
139 1
        $lineCLass = $this->options['lineClass'];
140
141
        /**
142
         * @psalm-suppress InvalidCast
143
         */
144 1
        return (string) new $lineCLass($line, $this->options['lineOptions']);
145
    }
146 1
147 1
    public function softTerminate(): PromiseInterface
148
    {
149
        return $this->rpc(MessagesFactory::rpc(self::TERMINATE_RPC));
150
    }
151
152
    public function write(string $line): void
153 1
    {
154
        $this->connection->write($line);
155 1
    }
156
157
    /**
158
     * @internal
159
     */
160
    public function crashed(int $exitCode): void
161
    {
162
        $this->emit('error', [new ProcessUnexpectedEndException($exitCode), $this]);
163
    }
164 2
165 2
    private function handleData(): void
166
    {
167 2
        if (strpos($this->buffer, LineInterface::EOL) === false) {
168
            return;
169 2
        }
170
171
        $messages     = explode(LineInterface::EOL, $this->buffer);
172
        $this->buffer = array_pop($messages);
173
        $this->iterateMessages($messages);
174
    }
175
176 4
    /**
177
     * @param array<string> $messages
178 4
     */
179
    private function iterateMessages(array $messages): void
180 4
    {
181
        foreach ($messages as $message) {
182
            try {
183
                MessagesFactory::fromLine($message, [])->handle($this, 'source');
184
                /** @phpstan-ignore-next-line  */
185
            } catch (Throwable $exception) {
186 1
                $this->emit('error', [$exception, $this]);
187
            }
188 1
        }
189
    }
190
}
191