GitHub Access Token became invalid

It seems like the GitHub access token used for retrieving details about this repository from GitHub became invalid. This might prevent certain types of inspections from being run (in particular, everything related to pull requests).
Please ask an admin of your repository to re-new the access token on this website.
Completed
Push — master ( 93608b...d96f4b )
by Cees-Jan
03:32
created

Messenger::onData()   A

Complexity

Conditions 2
Paths 2

Size

Total Lines 12
Code Lines 7

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 9
CRAP Score 2

Importance

Changes 2
Bugs 0 Features 0
Metric Value
c 2
b 0
f 0
dl 0
loc 12
ccs 9
cts 9
cp 1
rs 9.4286
cc 2
eloc 7
nc 2
nop 2
crap 2
1
<?php
2
3
namespace WyriHaximus\React\ChildProcess\Messenger;
4
5
use Evenement\EventEmitter;
6
use React\Promise\PromiseInterface;
7
use React\Promise\RejectedPromise;
8
use React\Stream\Stream;
9
use WyriHaximus\React\ChildProcess\Messenger\Messages\ActionableMessageInterface;
10
use WyriHaximus\React\ChildProcess\Messenger\Messages\Error;
11
use WyriHaximus\React\ChildProcess\Messenger\Messages\Factory as MessageFactory;
12
use WyriHaximus\React\ChildProcess\Messenger\Messages\LineInterface;
13
use WyriHaximus\React\ChildProcess\Messenger\Messages\Message;
14
use WyriHaximus\React\ChildProcess\Messenger\Messages\Rpc;
15
16
class Messenger extends EventEmitter
17
{
18
    const INTERVAL = 0.1;
19
    const TERMINATE_RPC = 'wyrihaximus.react.child-process.messenger.terminate';
20
21
    /**
22
     * @var Stream
23
     */
24
    protected $stdin;
25
26
    /**
27
     * @var Stream
28
     */
29
    protected $stdout;
30
31
    /**
32
     * @var Stream
33
     */
34
    protected $stderr;
35
36
    /**
37
     * @var OutstandingCalls
38
     */
39
    protected $outstandingRpcCalls;
40
41
    /**
42
     * @var array
43
     */
44
    protected $rpcs = [];
45
46
    /**
47
     * @var array
48
     */
49
    protected $options = [];
50
51
    /**
52
     * @var string[]
53
     */
54
    protected $buffers = [
55
        'stdin' => '',
56
        'stdout' => '',
57
        'stderr' => '',
58
    ];
59
60
    protected $defaultOptions = [
61
        'lineClass' => 'WyriHaximus\React\ChildProcess\Messenger\Messages\Line',
62
        'messageFactoryClass' => 'WyriHaximus\React\ChildProcess\Messenger\Messages\Factory',
63
        'lineOptions' => [],
64
    ];
65
66
    /**
67
     * @param Stream $stdin
68
     * @param Stream $stdout
69
     * @param Stream $stderr
70
     * @param array $options
71
     */
72 10
    public function __construct(Stream $stdin, Stream $stdout, Stream $stderr, array $options)
73
    {
74 10
        $this->stdin  = $stdin;
75 10
        $this->stdout = $stdout;
76 10
        $this->stderr = $stderr;
77 10
        $this->options = $this->defaultOptions + $options;
78
79 10
        $this->outstandingRpcCalls = new OutstandingCalls();
80
81 10
        $this->attachMessenger();
82 10
    }
83
84
    /**
85
     * @param string $target
86
     * @param callable $listener
87
     */
88 2
    public function registerRpc($target, callable $listener)
89
    {
90 2
        $this->rpcs[$target] = $listener;
91 2
    }
92
93
    /**
94
     * @param string $target
95
     */
96 1
    public function deregisterRpc($target)
97
    {
98 1
        unset($this->rpcs[$target]);
99 1
    }
100
101
    /**
102
     * @param string $target
103
     * @return bool
104
     */
105 1
    public function hasRpc($target)
106
    {
107 1
        return isset($this->rpcs[$target]);
108
    }
109
110
    /**
111
     * @param $target
112
     * @param $payload
113
     * @return React\Promise\PromiseInterface
114
     */
115 2
    public function callRpc($target, $payload)
116
    {
117
        try {
118 2
            $promise = $this->rpcs[$target]($payload, $this);
119 2
            if ($promise instanceof PromiseInterface) {
120 1
                return $promise;
121
            }
122
123 1
            throw new \Exception('RPC must return promise');
124 1
        } catch (\Exception $exception) {
125 1
            return new RejectedPromise($exception);
126
        }
127
    }
128
129 10
    protected function attachMessenger()
130
    {
131
        /**
132
         * @todo duplicated code much?
133
         */
134 10 View Code Duplication
        if (isset($this->options['read_err'])) {
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated across your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
135 2
            $streamName = $this->options['read_err'];
136
            $this->$streamName->on('data', function ($data) use ($streamName) {
137 1
                $this->onData($data, $streamName);
138 2
            });
139 2
            unset($streamName);
140 2
        }
141
142 10 View Code Duplication
        if (isset($this->options['read'])) {
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated across your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
143 6
            $streamName = $this->options['read'];
144
            $this->$streamName->on('data', function ($data) use ($streamName) {
145 3
                $this->onData($data, $streamName);
146 6
            });
147 6
            unset($streamName);
148 6
        }
149 10
    }
150
151
    /**
152
     * @param string $line
153
     */
154 3 View Code Duplication
    protected function write($line)
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
155
    {
156 3
        if (isset($this->options['write'])) {
157 3
            $streamName = $this->options['write'];
158 3
            $this->$streamName->write($line);
159 3
            unset($streamName);
160 3
        }
161 3
    }
162
163
    /**
164
     * @param string $line
165
     */
166 1 View Code Duplication
    protected function writeErr($line)
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
167
    {
168 1
        if (isset($this->options['write_err'])) {
169 1
            $streamName = $this->options['write_err'];
170 1
            $this->$streamName->write($line);
171 1
            unset($streamName);
172 1
        }
173 1
    }
174
175
    /**
176
     * @param Message $message
177
     */
178 1
    public function message(Message $message)
179
    {
180 1
        $this->write($this->createLine($message));
181 1
    }
182
183
    /**
184
     * @param Error $error
185
     */
186 1
    public function error(Error $error)
187
    {
188 1
        $this->writeErr($this->createLine($error));
189 1
    }
190
191
    /**
192
     * @param string $uniqid
193
     * @return OutstandingCall
194
     */
195 1
    public function getOutstandingCall($uniqid)
196
    {
197 1
        return $this->outstandingRpcCalls->getCall($uniqid);
198
    }
199
200
    /**
201
     * @param Rpc $rpc
202
     * @return \React\Promise\Promise
203
     */
204
    public function rpc(Rpc $rpc)
205
    {
206 2
        $callReference = $this->outstandingRpcCalls->newCall(function () {
207
208 2
        });
209
210 2
        $this->write($this->createLine($rpc->setUniqid($callReference->getUniqid())));
211
212 2
        return $callReference->getDeferred()->promise();
213
    }
214
215
    /**
216
     * @param string $data
217
     * @param string $source
218
     */
219 3
    protected function onData($data, $source)
220
    {
221 3
        $this->emit('data', [$source, $data]);
222
223 3
        $this->buffers[$source] .= $data;
224
225 3
        if (strpos($this->buffers[$source], LineInterface::EOL) !== false) {
226 2
            $messages = explode(LineInterface::EOL, $this->buffers[$source]);
227 2
            $this->buffers[$source] = array_pop($messages);
228 2
            $this->iterateMessages($messages, $source);
229 2
        }
230 3
    }
231
232
    /**
233
     * @param array $messages
234
     * @param string $source
235
     */
236 2
    protected function iterateMessages(array $messages, $source)
237
    {
238 2
        $messageFactory = $this->options['messageFactoryClass'];
239 2
        foreach ($messages as $message) {
240
            try {
241 2
                $messageFactory::fromLine($message, $this->options['lineOptions'])->handle($this, $source);
242 2
            } catch (\Exception $exception) {
243
                $this->emit('error', [$exception, $this]);
244
            }
245 2
        }
246 2
    }
247
248
    /**
249
     * @param ActionableMessageInterface $line
250
     * @return LineInterface
251
     */
252 4
    public function createLine(ActionableMessageInterface $line)
253
    {
254 4
        $lineCLass = $this->options['lineClass'];
255 4
        return (string) new $lineCLass($line, $this->options['lineOptions']);
256
    }
257
258
    /**
259
     * @return \React\Promise\Promise
260
     */
261 1
    public function softTerminate()
262
    {
263 1
        return $this->rpc(MessageFactory::rpc(static::TERMINATE_RPC));
264
    }
265
266
    /**
267
     * @return Stream
268
     */
269 3
    public function getStdin()
270
    {
271 3
        return $this->stdin;
272
    }
273
274
    /**
275
     * @return Stream
276
     */
277 3
    public function getStdout()
278
    {
279 3
        return $this->stdout;
280
    }
281
282
    /**
283
     * @return Stream
284
     */
285 3
    public function getStderr()
286
    {
287 3
        return $this->stderr;
288
    }
289
290
    /**
291
     * Forward any unknown calls when there is a call forward possible.
292
     *
293
     * @param string $name
294
     * @param array $arguments
295
     *
296
     * @return mixed
297
     */
298 1
    public function __call($name, array $arguments)
299
    {
300 1
        if (isset($this->options['callForward'])) {
301 1
            $call = $this->options['callForward'];
302 1
            return $call($name, $arguments);
303
        }
304
    }
305
}
306