GitHub Access Token became invalid

It seems like the GitHub access token used for retrieving details about this repository from GitHub became invalid. This might prevent certain types of inspections from being run (in particular, everything related to pull requests).
Please ask an admin of your repository to re-new the access token on this website.
Completed
Push — master ( afc5ea...828b9a )
by Cees-Jan
23:31 queued 06:50
created

Messenger::message()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 4
Code Lines 2

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 3
CRAP Score 1

Importance

Changes 0
Metric Value
dl 0
loc 4
ccs 3
cts 3
cp 1
rs 10
c 0
b 0
f 0
cc 1
eloc 2
nc 1
nop 1
crap 1
1
<?php
2
3
namespace WyriHaximus\React\ChildProcess\Messenger;
4
5
use Evenement\EventEmitter;
6
use React\Promise\PromiseInterface;
7
use React\Promise\RejectedPromise;
8
use React\Stream\Stream;
9
use WyriHaximus\React\ChildProcess\Messenger\Messages\ActionableMessageInterface;
10
use WyriHaximus\React\ChildProcess\Messenger\Messages\Error;
11
use WyriHaximus\React\ChildProcess\Messenger\Messages\Factory as MessageFactory;
12
use WyriHaximus\React\ChildProcess\Messenger\Messages\LineInterface;
13
use WyriHaximus\React\ChildProcess\Messenger\Messages\Message;
14
use WyriHaximus\React\ChildProcess\Messenger\Messages\Rpc;
15
16
class Messenger extends EventEmitter
17
{
18
    const INTERVAL = 0.1;
19
    const TERMINATE_RPC = 'wyrihaximus.react.child-process.messenger.terminate';
20
21
    /**
22
     * @var Stream
23
     */
24
    protected $stdin;
25
26
    /**
27
     * @var Stream
28
     */
29
    protected $stdout;
30
31
    /**
32
     * @var Stream
33
     */
34
    protected $stderr;
35
36
    /**
37
     * @var OutstandingCalls
38
     */
39
    protected $outstandingRpcCalls;
40
41
    /**
42
     * @var array
43
     */
44
    protected $rpcs = [];
45
46
    /**
47
     * @var array
48
     */
49
    protected $options = [];
50
51
    /**
52
     * @var string[]
53
     */
54
    protected $buffers = [
55
        'stdin' => '',
56
        'stdout' => '',
57
        'stderr' => '',
58
    ];
59
60
    protected $defaultOptions = [
61
        'lineClass' => 'WyriHaximus\React\ChildProcess\Messenger\Messages\Line',
62
        'messageFactoryClass' => 'WyriHaximus\React\ChildProcess\Messenger\Messages\Factory',
63
        'lineOptions' => [],
64
    ];
65
66
    /**
67
     * @param Stream $stdin
68
     * @param Stream $stdout
69
     * @param Stream $stderr
70
     * @param array  $options
71
     */
72 10
    public function __construct(Stream $stdin, Stream $stdout, Stream $stderr, array $options)
73
    {
74 10
        $this->stdin  = $stdin;
75 10
        $this->stdout = $stdout;
76 10
        $this->stderr = $stderr;
77 10
        $this->options = $this->defaultOptions + $options;
78
79 10
        $this->outstandingRpcCalls = new OutstandingCalls();
80
81 10
        $this->attachMessenger();
82 10
    }
83
84
    /**
85
     * Forward any unknown calls when there is a call forward possible.
86
     *
87
     * @param string $name
88
     * @param array  $arguments
89
     *
90
     * @return mixed
91
     */
92 1
    public function __call($name, array $arguments)
93
    {
94 1
        if (isset($this->options['callForward'])) {
95 1
            $call = $this->options['callForward'];
96
97 1
            return $call($name, $arguments);
98
        }
99
    }
100
101
    /**
102
     * @param string   $target
103
     * @param callable $listener
104
     */
105 2
    public function registerRpc($target, callable $listener)
106
    {
107 2
        $this->rpcs[$target] = $listener;
108 2
    }
109
110
    /**
111
     * @param string $target
112
     */
113 1
    public function deregisterRpc($target)
114
    {
115 1
        unset($this->rpcs[$target]);
116 1
    }
117
118
    /**
119
     * @param  string $target
120
     * @return bool
121
     */
122 1
    public function hasRpc($target)
123
    {
124 1
        return isset($this->rpcs[$target]);
125
    }
126
127
    /**
128
     * @param $target
129
     * @param $payload
130
     * @return React\Promise\PromiseInterface
131
     */
132 2
    public function callRpc($target, $payload)
133
    {
134
        try {
135 2
            $promise = $this->rpcs[$target]($payload, $this);
136 2
            if ($promise instanceof PromiseInterface) {
137 1
                return $promise;
138
            }
139
140 1
            throw new \Exception('RPC must return promise');
141 1
        } catch (\Exception $exception) {
142 1
            return new RejectedPromise($exception);
143
        }
144
    }
145
146
    /**
147
     * @param Message $message
148
     */
149 1
    public function message(Message $message)
150
    {
151 1
        $this->write($this->createLine($message));
152 1
    }
153
154
    /**
155
     * @param Error $error
156
     */
157 1
    public function error(Error $error)
158
    {
159 1
        $this->writeErr($this->createLine($error));
160 1
    }
161
162
    /**
163
     * @param  string          $uniqid
164
     * @return OutstandingCall
165
     */
166 1
    public function getOutstandingCall($uniqid)
167
    {
168 1
        return $this->outstandingRpcCalls->getCall($uniqid);
169
    }
170
171
    /**
172
     * @param  Rpc                    $rpc
173
     * @return \React\Promise\Promise
174
     */
175 2
    public function rpc(Rpc $rpc)
176
    {
177
        $callReference = $this->outstandingRpcCalls->newCall(function () {
178 2
        });
179
180 2
        $this->write($this->createLine($rpc->setUniqid($callReference->getUniqid())));
181
182 2
        return $callReference->getDeferred()->promise();
183
    }
184
185
    /**
186
     * @param  ActionableMessageInterface $line
187
     * @return LineInterface
188
     */
189 4
    public function createLine(ActionableMessageInterface $line)
190
    {
191 4
        $lineCLass = $this->options['lineClass'];
192
193 4
        return (string) new $lineCLass($line, $this->options['lineOptions']);
194
    }
195
196
    /**
197
     * @return \React\Promise\Promise
198
     */
199 1
    public function softTerminate()
200
    {
201 1
        return $this->rpc(MessageFactory::rpc(static::TERMINATE_RPC));
202
    }
203
204
    /**
205
     * @return Stream
206
     */
207 3
    public function getStdin()
208
    {
209 3
        return $this->stdin;
210
    }
211
212
    /**
213
     * @return Stream
214
     */
215 3
    public function getStdout()
216
    {
217 3
        return $this->stdout;
218
    }
219
220
    /**
221
     * @return Stream
222
     */
223 3
    public function getStderr()
224
    {
225 3
        return $this->stderr;
226
    }
227
228 10
    protected function attachMessenger()
229
    {
230
        /**
231
         * @todo duplicated code much?
232
         */
233 10 View Code Duplication
        if (isset($this->options['read_err'])) {
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated across your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
234 2
            $streamName = $this->options['read_err'];
235
            $this->$streamName->on('data', function ($data) use ($streamName) {
236
                $this->onData($data, $streamName);
237 2
            });
238 2
            unset($streamName);
239
        }
240
241 10 View Code Duplication
        if (isset($this->options['read'])) {
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated across your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
242 6
            $streamName = $this->options['read'];
243 6
            $this->$streamName->on('data', function ($data) use ($streamName) {
244 3
                $this->onData($data, $streamName);
245 6
            });
246 6
            unset($streamName);
247
        }
248 10
    }
249
250
    /**
251
     * @param string $line
252
     */
253 3 View Code Duplication
    protected function write($line)
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
254
    {
255 3
        if (isset($this->options['write'])) {
256 3
            $streamName = $this->options['write'];
257 3
            $this->$streamName->write($line);
258 3
            unset($streamName);
259
        }
260 3
    }
261
262
    /**
263
     * @param string $line
264
     */
265 1 View Code Duplication
    protected function writeErr($line)
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
266
    {
267 1
        if (isset($this->options['write_err'])) {
268 1
            $streamName = $this->options['write_err'];
269 1
            $this->$streamName->write($line);
270 1
            unset($streamName);
271
        }
272 1
    }
273
274
    /**
275
     * @param string $data
276
     * @param string $source
277
     */
278 3
    protected function onData($data, $source)
279
    {
280 3
        $this->emit('data', [$source, $data]);
281
282 3
        $this->buffers[$source] .= $data;
283
284 3
        if (strpos($this->buffers[$source], LineInterface::EOL) !== false) {
285 2
            $messages = explode(LineInterface::EOL, $this->buffers[$source]);
286 2
            $this->buffers[$source] = array_pop($messages);
287 2
            $this->iterateMessages($messages, $source);
288
        }
289 3
    }
290
291
    /**
292
     * @param array  $messages
293
     * @param string $source
294
     */
295 2
    protected function iterateMessages(array $messages, $source)
296
    {
297 2
        $messageFactory = $this->options['messageFactoryClass'];
298 2
        foreach ($messages as $message) {
299
            try {
300 2
                $messageFactory::fromLine($message, $this->options['lineOptions'])->handle($this, $source);
301
            } catch (\Exception $exception) {
302 2
                $this->emit('error', [$exception, $this]);
303
            }
304
        }
305 2
    }
306
}
307