Passed
Push — master ( 22dc46...8f2d00 )
by thomas
32:41 queued 17:41
created

Peer::setupStream()   A

Complexity

Conditions 5
Paths 1

Size

Total Lines 45
Code Lines 26

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 21
CRAP Score 5

Importance

Changes 0
Metric Value
eloc 26
dl 0
loc 45
ccs 21
cts 21
cp 1
rs 9.1928
c 0
b 0
f 0
cc 5
nc 1
nop 1
crap 5
1
<?php
2
3
declare(strict_types=1);
4
5
namespace BitWasp\Bitcoin\Networking\Peer;
6
7
use BitWasp\Bitcoin\Block\FilteredBlock;
8
use BitWasp\Bitcoin\Bloom\BloomFilter;
9
use BitWasp\Bitcoin\Chain\BlockLocator;
10
use BitWasp\Bitcoin\Crypto\EcAdapter\Signature\SignatureInterface;
11
use BitWasp\Bitcoin\Networking\Message;
12
use BitWasp\Bitcoin\Networking\Messages\Ping;
13
use BitWasp\Bitcoin\Networking\Messages\Version;
14
use BitWasp\Bitcoin\Networking\NetworkMessage;
15
use BitWasp\Bitcoin\Networking\NetworkSerializable;
16
use BitWasp\Bitcoin\Networking\Structure\AlertDetail;
17
use BitWasp\Bitcoin\Networking\Structure\Header;
18
use BitWasp\Bitcoin\Networking\Structure\Inventory;
19
use BitWasp\Bitcoin\Networking\Structure\NetworkAddress;
20
use BitWasp\Bitcoin\Networking\Structure\NetworkAddressTimestamp;
21
use BitWasp\Buffertools\Buffer;
22
use BitWasp\Buffertools\BufferInterface;
23
use BitWasp\Buffertools\Parser;
24
use Evenement\EventEmitter;
25
use React\EventLoop\LoopInterface;
26
use React\Promise\Deferred;
27
use React\Socket\ConnectionInterface;
28
29
class Peer extends EventEmitter
30
{
31
    /**
32
     * @var string
33
     */
34
    private $buffer = '';
35
36
    /**
37
     * @var LoopInterface
38
     */
39
    private $loop;
40
41
    /**
42
     * @var \BitWasp\Bitcoin\Networking\Messages\Factory
43
     */
44
    private $msgs;
45
46
    /**
47
     * @var ConnectionInterface
48
     */
49
    private $stream;
50
51
    /**
52
     * @var Version
53
     */
54
    private $localVersion;
55
56
    /**
57
     * @var Version
58
     */
59
    private $remoteVersion;
60
61
    /**
62
     * @var NetworkAddress
63
     */
64
    private $peerAddress;
65
66
    /**
67
     * @var ConnectionParams
68
     */
69
    private $connectionParams;
70
71
    /**
72
     * @var bool
73
     */
74
    private $exchangedVersion = false;
75
76
    /**
77
     * @var Header|null
78
     */
79
    private $incomingMsgHeader;
80 12
81
    /**
82 12
     * @param \BitWasp\Bitcoin\Networking\Messages\Factory $msgs
83 12
     * @param LoopInterface $loop
84 12
     */
85
    public function __construct(\BitWasp\Bitcoin\Networking\Messages\Factory $msgs, LoopInterface $loop)
86
    {
87
        $this->msgs = $msgs;
88
        $this->loop = $loop;
89 3
    }
90
91 3
    /**
92
     * @return Version
93
     */
94
    public function getLocalVersion(): Version
95
    {
96
        return $this->localVersion;
97 3
    }
98
99 3
    /**
100
     * @return Version
101
     */
102
    public function getRemoteVersion(): Version
103
    {
104
        return $this->remoteVersion;
105
    }
106
107
    /**
108
     * Reliably returns the remote peers NetAddr when known through
109 3
     * the connection process. Often better than the data contained
110
     * in a Version message.
111 3
     *
112
     * @return NetworkAddress
113
     */
114
    public function getRemoteAddress(): NetworkAddress
115
    {
116
        return $this->peerAddress;
117 3
    }
118
119 3
    /**
120
     * @return ConnectionParams
121
     */
122
    public function getConnectionParams(): ConnectionParams
123
    {
124
        return $this->connectionParams;
125 12
    }
126
127 12
    /**
128 12
     * @param NetworkSerializable $msg
129 12
     */
130 12
    public function send(NetworkSerializable $msg)
131
    {
132
        $netMsg = $msg->getNetworkMessage($this->msgs->getNetwork());
133
        $serialized = $this->msgs->getSerializer()->serialize($netMsg);
134
        $this->stream->write($serialized->getBinary());
135
        $this->emit('send', [$netMsg]);
136 12
    }
137
138 12
    /**
139 12
     * @param ConnectionInterface $stream
140
     * @return $this
141
     */
142 12
    public function setupStream(ConnectionInterface $stream)
143 12
    {
144 12
        $this->stream = $stream;
145 8
        $this->stream->on('data', function ($data) {
146 12
            $this->buffer .= $data;
147 12
148
            $data = new Buffer($this->buffer);
149
            $parser = new Parser($data);
150 12
151
            $pos = $parser->getPosition();
152
            $sz = $data->getSize();
153
154
            while ($pos < $sz) {
155
                if (null === $this->incomingMsgHeader) {
156 12
                    if ($sz - $pos < 24) {
157
                        break;
158 12
                    }
159
                    $this->incomingMsgHeader = $this->msgs->getSerializer()->parseHeader($parser);
160 12
                    $pos = $parser->getPosition();
161 12
                }
162 12
163
                if ($sz - $pos < $this->incomingMsgHeader->getLength()) {
164
                    break;
165 6
                }
166 12
167
                $message = $this->msgs->getSerializer()->parsePacket($this->incomingMsgHeader, $parser);
168
                $this->incomingMsgHeader = null;
169 12
                $this->loop->futureTick(function () use ($message) {
170 12
                    $this->emit('msg', [$this, $message]);
171
                });
172
                $pos = $parser->getPosition();
173 12
            }
174 12
175
            $this->buffer = $parser->getBuffer()->slice($pos)->getBinary();
176 12
        });
177
178
        $this->stream->once('close', function () {
179
            $this->close();
180
        });
181
182
        $this->on('msg', function (Peer $peer, NetworkMessage $msg) {
183
            $this->emit($msg->getCommand(), [$peer, $msg->getPayload()]);
184 9
        });
185
186 9
        return $this;
187
    }
188 9
189
    /**
190
     * @param ConnectionInterface $connection
191 9
     * @param ConnectionParams $params
192 9
     * @return \React\Promise\Promise|\React\Promise\PromiseInterface
193 9
     */
194 9
    public function inboundHandshake(ConnectionInterface $connection, ConnectionParams $params)
0 ignored issues
show
Unused Code introduced by
The parameter $connection is not used and could be removed. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-unused  annotation

194
    public function inboundHandshake(/** @scrutinizer ignore-unused */ ConnectionInterface $connection, ConnectionParams $params)

This check looks for parameters that have been defined for a function or method, but which are not used in the method body.

Loading history...
195 9
    {
196
        $this->connectionParams = $params;
197
198 9
        $deferred = new Deferred();
199 9
        $this->on(Message::VERSION, function (Peer $peer, Version $version) use ($params) {
200 9
            $this->peerAddress = $version->getSenderAddress();
201 9
            $this->remoteVersion = $version;
202 9
            $this->localVersion = $localVersion = $params->produceVersion($this->msgs, $version->getSenderAddress());
203 6
            $this->send($localVersion);
204 9
        });
205
206
        $this->on(Message::VERACK, function () use ($deferred) {
207 9
            if (false === $this->exchangedVersion) {
208
                $this->exchangedVersion = true;
209
                $this->verack();
210
                $this->emit('ready', [$this]);
211
                $deferred->resolve($this);
212
            }
213
        });
214
215 12
        return $deferred->promise();
216
    }
217 12
218
    /**
219 12
     * @param NetworkAddress $remotePeer
220
     * @param ConnectionParams $params
221 6
     * @return \React\Promise\Promise|\React\Promise\PromiseInterface
222 6
     */
223 6
    public function outboundHandshake(NetworkAddress $remotePeer, ConnectionParams $params)
224 4
    {
225 12
        $deferred = new Deferred();
226
        
227
        $awaitVersion = true;
228 12
        $this->stream->once('close', function () use (&$awaitVersion, $deferred) {
229 12
            if ($awaitVersion) {
230 12
                $awaitVersion = false;
231
                $deferred->reject(new \Exception('peer disconnected'));
232 12
            }
233 9
        });
234 9
235 9
        $this->on(Message::VERSION, function (Peer $peer, Version $version) {
236 9
            $this->remoteVersion = $version;
237 6
            $this->verack();
238 12
        });
239
240 12
        $this->on(Message::VERACK, function () use ($deferred) {
241 12
            if (false === $this->exchangedVersion) {
242 12
                $this->exchangedVersion = true;
243
                $this->emit('ready', [$this]);
244 12
                $deferred->resolve($this);
245
            }
246 12
        });
247
248
        $this->peerAddress = $remotePeer;
249
        $this->localVersion = $version = $params->produceVersion($this->msgs, $remotePeer);
250
        $this->connectionParams = $params;
251
252
        $this->send($version);
253
254
        return $deferred->promise();
255
    }
256
257
    /**
258
     *
259
     */
260
    public function intentionalClose()
261 9
    {
262
        $this->emit('intentionaldisconnect', [$this]);
263 9
        $this->close();
264 9
    }
265 9
266 9
    /**
267
     *
268
     */
269
    public function close()
270
    {
271
        $this->emit('close', [$this]);
272
        $this->stream->end();
273
        $this->removeAllListeners();
274
    }
275
276
    /**
277
     * @param int $protocolVersion
278
     * @param int $services
279
     * @param int $timestamp
280
     * @param NetworkAddress $remoteAddr
281
     * @param NetworkAddress $localAddr
282
     * @param string $userAgent
283
     * @param int $blockHeight
284
     * @param bool $relayToUs
285
     */
286
    public function version(
287
        int $protocolVersion,
288
        int $services,
289
        int $timestamp,
290
        NetworkAddress $remoteAddr,
291
        NetworkAddress $localAddr,
292
        string $userAgent,
293
        int $blockHeight,
294
        bool $relayToUs
295
    ) {
296
        $this->send($this->msgs->version(
297
            $protocolVersion,
298
            $services,
299
            $timestamp,
300
            $remoteAddr,
301
            $localAddr,
302
            new Buffer($userAgent),
303 12
            $blockHeight,
304
            $relayToUs
305 12
        ));
306 12
    }
307
308
    /**
309
     *
310
     */
311
    public function verack()
312
    {
313
        $this->send($this->msgs->verack());
314
    }
315
316
    /**
317
     *
318
     */
319
    public function sendheaders()
320
    {
321
        $this->send($this->msgs->sendheaders());
322
    }
323
324
    /**
325
     * @param Inventory[] $vInv
326
     */
327
    public function inv(array $vInv)
328
    {
329
        $this->send($this->msgs->inv($vInv));
330
    }
331
332
    /**
333
     * @param Inventory[] $vInv
334
     */
335
    public function getdata(array $vInv)
336
    {
337
        $this->send($this->msgs->getdata($vInv));
338
    }
339
340
    /**
341
     * @param Inventory[] $vInv
342
     */
343
    public function notfound(array $vInv)
344
    {
345
        $this->send($this->msgs->notfound($vInv));
346
    }
347
348
    /**
349
     * @param NetworkAddressTimestamp[] $vNetAddr
350
     */
351
    public function addr(array $vNetAddr)
352
    {
353
        $this->send($this->msgs->addr($vNetAddr));
354
    }
355
356
    /**
357
     *
358
     */
359
    public function getaddr()
360
    {
361
        $this->send($this->msgs->getaddr());
362
    }
363
364
    /**
365
     *
366
     */
367
    public function ping()
368
    {
369
        $this->send($this->msgs->ping());
370
    }
371
372
    /**
373
     * @param Ping $ping
374
     */
375
    public function pong(Ping $ping)
376
    {
377
        $this->send($this->msgs->pong($ping));
378
    }
379
380
    /**
381
     * @param BufferInterface $txData
382
     */
383
    public function tx(BufferInterface $txData)
384
    {
385
        $this->send($this->msgs->tx($txData));
386
    }
387
388
    /**
389
     * @param BlockLocator $locator
390
     */
391
    public function getblocks(BlockLocator $locator)
392
    {
393
        $this->send($this->msgs->getblocks(
394
            $this->localVersion->getVersion(),
395
            $locator
396
        ));
397
    }
398
399
    /**
400
     * @param BlockLocator $locator
401
     */
402
    public function getheaders(BlockLocator $locator)
403
    {
404
        $this->send($this->msgs->getheaders(
405
            $this->localVersion->getVersion(),
406
            $locator
407
        ));
408
    }
409
410
    /**
411
     * @param BufferInterface $blockData
412
     */
413
    public function block(BufferInterface $blockData)
414
    {
415
        $this->send($this->msgs->block($blockData));
416
    }
417
418
    /**
419
     * @param BufferInterface ...$vHeaders
420
     */
421
    public function headers(BufferInterface ...$vHeaders)
422
    {
423
        $this->send($this->msgs->headers(...$vHeaders));
424
    }
425
426
    /**
427
     * @param AlertDetail $detail
428
     * @param SignatureInterface $signature
429
     */
430
    public function alert(AlertDetail $detail, SignatureInterface $signature)
431
    {
432
        $this->send($this->msgs->alert($detail, $signature));
433
    }
434
435
    /**
436
     * @param int $feeRate
437
     */
438
    public function feefilter($feeRate)
439
    {
440
        $this->send($this->msgs->feefilter($feeRate));
441
    }
442
443
    /**
444
     * @param BufferInterface $data
445
     */
446
    public function filteradd(BufferInterface $data)
447
    {
448
        $this->send($this->msgs->filteradd($data));
449
    }
450
451
    /**
452
     * @param BloomFilter $filter
453
     */
454
    public function filterload(BloomFilter $filter)
455
    {
456
        $this->send($this->msgs->filterload($filter));
457
    }
458
459
    /**
460
     *
461
     */
462
    public function filterclear()
463
    {
464
        $this->send($this->msgs->filterclear());
465
    }
466
467
    /**
468
     * @param FilteredBlock $filtered
469
     */
470
    public function merkleblock(FilteredBlock $filtered)
471
    {
472
        $this->send($this->msgs->merkleblock($filtered));
473
    }
474
475
    /**
476
     *
477
     */
478
    public function mempool()
479
    {
480
        $this->send($this->msgs->mempool());
481
    }
482
483
    /**
484
     * Issue a Reject message, with a required $msg, $code, and $reason
485
     *
486
     * @param BufferInterface $msg
487
     * @param int $code
488
     * @param BufferInterface $reason
489
     * @param BufferInterface $data
490
     */
491
    public function reject(BufferInterface $msg, $code, BufferInterface $reason, BufferInterface $data = null)
492
    {
493
        $this->send($this->msgs->reject($msg, $code, $reason, $data));
494
    }
495
}
496