Completed
Pull Request — master (#85)
by thomas
02:06
created

Peer::outboundHandshake()   B

Complexity

Conditions 3
Paths 1

Size

Total Lines 33
Code Lines 20

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 23
CRAP Score 3

Importance

Changes 0
Metric Value
dl 0
loc 33
ccs 23
cts 23
cp 1
rs 8.8571
c 0
b 0
f 0
cc 3
eloc 20
nc 1
nop 2
crap 3
1
<?php
2
3
namespace BitWasp\Bitcoin\Networking\Peer;
4
5
use BitWasp\Bitcoin\Block\BlockInterface;
6
use BitWasp\Bitcoin\Block\FilteredBlock;
7
use BitWasp\Bitcoin\Bloom\BloomFilter;
8
use BitWasp\Bitcoin\Chain\BlockLocator;
9
use BitWasp\Bitcoin\Networking\Message;
10
use BitWasp\Bitcoin\Networking\Messages\Version;
11
use BitWasp\Bitcoin\Networking\Messages\Ping;
12
use BitWasp\Bitcoin\Networking\NetworkMessage;
13
use BitWasp\Bitcoin\Networking\NetworkSerializable;
14
use BitWasp\Bitcoin\Networking\Structure\AlertDetail;
15
use BitWasp\Bitcoin\Networking\Structure\Header;
16
use BitWasp\Bitcoin\Networking\Structure\Inventory;
17
use BitWasp\Bitcoin\Networking\Structure\NetworkAddressInterface;
18
use BitWasp\Bitcoin\Networking\Structure\NetworkAddressTimestamp;
19
use BitWasp\Bitcoin\Crypto\EcAdapter\Signature\SignatureInterface;
20
use BitWasp\Bitcoin\Transaction\TransactionInterface;
21
use BitWasp\Buffertools\Buffer;
22
use BitWasp\Buffertools\BufferInterface;
23
use BitWasp\Buffertools\Parser;
24
use Evenement\EventEmitter;
25
use React\EventLoop\LoopInterface;
26
use React\Promise\Deferred;
27
use React\Socket\ConnectionInterface;
28
use React\Stream\Stream;
29
30
class Peer extends EventEmitter
31
{
32
    /**
33
     * @var string
34
     */
35
    private $buffer = '';
36
37
    /**
38
     * @var LoopInterface
39
     */
40
    private $loop;
41
42
    /**
43
     * @var \BitWasp\Bitcoin\Networking\Messages\Factory
44
     */
45
    private $msgs;
46
47
    /**
48
     * @var Stream
49
     */
50
    private $stream;
51
52
    /**
53
     * @var Version
54
     */
55
    private $localVersion;
56
57
    /**
58
     * @var Version
59
     */
60
    private $remoteVersion;
61
62
    /**
63
     * @var NetworkAddressInterface
64
     */
65
    private $peerAddress;
66
67
    /**
68
     * @var ConnectionParams
69
     */
70
    private $connectionParams;
71
72
    /**
73
     * @var bool
74
     */
75
    private $exchangedVersion = false;
76
77
    /**
78
     * @var Header
79
     */
80
    private $incomingMsgHeader;
81
82
    /**
83
     * @param \BitWasp\Bitcoin\Networking\Messages\Factory $msgs
84
     * @param LoopInterface $loop
85
     */
86 12
    public function __construct(\BitWasp\Bitcoin\Networking\Messages\Factory $msgs, LoopInterface $loop)
87
    {
88 12
        $this->msgs = $msgs;
89 12
        $this->loop = $loop;
90 12
    }
91
92
    /**
93
     * @return Version
94
     */
95 3
    public function getLocalVersion()
96
    {
97 3
        return $this->localVersion;
98
    }
99
100
    /**
101
     * @return Version
102
     */
103 3
    public function getRemoteVersion()
104
    {
105 3
        return $this->remoteVersion;
106
    }
107
108
    /**
109
     * Reliably returns the remote peers NetAddr when known through
110
     * the connection process. Often better than the data contained
111
     * in a Version message.
112
     *
113
     * @return NetworkAddressInterface
114
     */
115 3
    public function getRemoteAddress()
116
    {
117 3
        return $this->peerAddress;
118
    }
119
120
    /**
121
     * @return ConnectionParams
122
     */
123 3
    public function getConnectionParams()
124
    {
125 3
        return $this->connectionParams;
126
    }
127
128
    /**
129
     * @param NetworkSerializable $msg
130
     */
131 12
    public function send(NetworkSerializable $msg)
132
    {
133 12
        $netMsg = $msg->getNetworkMessage($this->msgs->getNetwork());
134 12
        $serialized = $this->msgs->getSerializer()->serialize($netMsg);
135 12
        $this->stream->write($serialized->getBinary());
136 12
        $this->emit('send', [$netMsg]);
137 12
    }
138
139
    /**
140
     * @param Stream $stream
141
     * @return $this
142
     */
143 12
    public function setupStream(ConnectionInterface $stream)
144
    {
145 12
        $this->stream = $stream;
0 ignored issues
show
Documentation Bug introduced by
It seems like $stream of type object<React\Socket\ConnectionInterface> is incompatible with the declared type object<React\Stream\Stream> of property $stream.

Our type inference engine has found an assignment to a property that is incompatible with the declared type of that property.

Either this assignment is in error or the assigned type should be added to the documentation/type hint for that property..

Loading history...
146
        $this->stream->on('data', function ($data) {
147 12
            $this->buffer .= $data;
148
149 12
            $data = new Buffer($this->buffer);
150 12
            $parser = new Parser($data);
151
152 12
            $pos = $parser->getPosition();
153 12
            $sz = $parser->getSize();
154
155
            try {
156 12
                while ($pos < $sz) {
157 12
                    if (null === $this->incomingMsgHeader) {
158 12
                        if ($sz - $pos < 24) {
159
                            break;
160
                        }
161
162 12
                        $this->incomingMsgHeader = $this->msgs->getSerializer()->parseHeader($parser);
163 12
                        $pos = $parser->getPosition();
164 4
                    }
165
166 12
                    if ($sz - $pos < $this->incomingMsgHeader->getLength()) {
167
                        break;
168
                    }
169
170 12
                    $message = $this->msgs->getSerializer()->parsePacket($this->incomingMsgHeader, $parser);
171 12
                    $this->incomingMsgHeader = null;
172 12
                    $this->emit('msg', [$this, $message]);
173 12
                    $pos = $parser->getPosition();
174 4
                }
175 4
            } catch (\Exception $e) {
0 ignored issues
show
Coding Style Comprehensibility introduced by
Consider adding a comment why this CATCH block is empty.
Loading history...
176
            }
177
178 12
            $this->buffer = $parser->getBuffer()->slice($pos)->getBinary();
179 12
        });
180
181
        $this->stream->once('close', function () {
182 6
            $this->close();
183 12
        });
184
185
        $this->on('msg', function (Peer $peer, NetworkMessage $msg) {
186 12
            $this->emit($msg->getCommand(), [$peer, $msg->getPayload()]);
187 12
        });
188
189 12
        return $this;
190
    }
191
192
    /**
193
     * @param ConnectionInterface $connection
194
     * @param ConnectionParams $params
195
     * @return \React\Promise\Promise|\React\Promise\PromiseInterface
196
     */
197 9
    public function inboundHandshake(ConnectionInterface $connection, ConnectionParams $params)
0 ignored issues
show
Unused Code introduced by
The parameter $connection is not used and could be removed.

This check looks from parameters that have been defined for a function or method, but which are not used in the method body.

Loading history...
198
    {
199 9
        $this->connectionParams = $params;
200
201 9
        $deferred = new Deferred();
202
        $this->on(Message::VERSION, function (Peer $peer, Version $version) use ($params) {
203 9
            $this->peerAddress = $version->getSenderAddress();
204 9
            $this->remoteVersion = $version;
205 9
            $this->localVersion = $localVersion = $params->produceVersion($this->msgs, $version->getSenderAddress());
206 9
            $this->send($localVersion);
207 9
        });
208
209
        $this->on(Message::VERACK, function () use ($deferred) {
210 9
            if (false === $this->exchangedVersion) {
211 9
                $this->exchangedVersion = true;
212 9
                $this->verack();
213 9
                $this->emit('ready', [$this]);
214 9
                $deferred->resolve($this);
215 3
            }
216 9
        });
217
218 9
        return $deferred->promise();
219
    }
220
221
    /**
222
     * @param NetworkAddressInterface $remotePeer
223
     * @param ConnectionParams $params
224
     * @return \React\Promise\Promise|\React\Promise\PromiseInterface
225
     */
226 12
    public function outboundHandshake(NetworkAddressInterface $remotePeer, ConnectionParams $params)
227
    {
228 12
        $deferred = new Deferred();
229
        
230 12
        $awaitVersion = true;
231
        $this->stream->once('close', function () use (&$awaitVersion, $deferred) {
232 6
            if ($awaitVersion) {
233 6
                $awaitVersion = false;
234 6
                $deferred->reject(new \Exception('peer disconnected'));
235 2
            }
236 12
        });
237
238
        $this->on(Message::VERSION, function (Peer $peer, Version $version) use ($params) {
239 12
            $this->remoteVersion = $version;
240 12
            $this->verack();
241 12
        });
242
243 12
        $this->on(Message::VERACK, function () use ($deferred) {
244 9
            if (false === $this->exchangedVersion) {
245 9
                $this->exchangedVersion = true;
246 9
                $this->emit('ready', [$this]);
247 9
                $deferred->resolve($this);
248 3
            }
249 12
        });
250
251 12
        $this->peerAddress = $remotePeer;
252 12
        $this->localVersion = $version = $params->produceVersion($this->msgs, $remotePeer);
253 12
        $this->connectionParams = $params;
254
255 12
        $this->send($version);
256
257 12
        return $deferred->promise();
258
    }
259
260
    /**
261
     *
262
     */
263
    public function intentionalClose()
264
    {
265
        $this->emit('intentionaldisconnect', [$this]);
266
        $this->close();
267
    }
268
269
    /**
270
     *
271
     */
272 9
    public function close()
273
    {
274 9
        $this->emit('close', [$this]);
275 9
        $this->stream->end();
276 9
        $this->removeAllListeners();
277 9
    }
278
279
    /**
280
     * @param int $protocolVersion
281
     * @param int $services
282
     * @param int $timestamp
283
     * @param NetworkAddressInterface $remoteAddr
284
     * @param NetworkAddressInterface $localAddr
285
     * @param string $userAgent
286
     * @param int $blockHeight
287
     * @param bool $relayToUs
288
     */
289
    public function version(
290
        $protocolVersion,
291
        $services,
292
        $timestamp,
293
        NetworkAddressInterface $remoteAddr,
294
        NetworkAddressInterface $localAddr,
295
        $userAgent,
296
        $blockHeight,
297
        $relayToUs
298
    ) {
299
        $this->send($this->msgs->version(
300
            $protocolVersion,
301
            $services,
302
            $timestamp,
303
            $remoteAddr,
304
            $localAddr,
305
            new Buffer($userAgent),
306
            $blockHeight,
307
            $relayToUs
308
        ));
309
    }
310
311
    /**
312
     *
313
     */
314 12
    public function verack()
315
    {
316 12
        $this->send($this->msgs->verack());
317 12
    }
318
319
    /**
320
     *
321
     */
322
    public function sendheaders()
323
    {
324
        $this->send($this->msgs->sendheaders());
325
    }
326
327
    /**
328
     * @param Inventory[] $vInv
329
     */
330
    public function inv(array $vInv)
331
    {
332
        $this->send($this->msgs->inv($vInv));
333
    }
334
335
    /**
336
     * @param Inventory[] $vInv
337
     */
338
    public function getdata(array $vInv)
339
    {
340
        $this->send($this->msgs->getdata($vInv));
341
    }
342
343
    /**
344
     * @param array $vInv
345
     */
346
    public function notfound(array $vInv)
347
    {
348
        $this->send($this->msgs->notfound($vInv));
349
    }
350
351
    /**
352
     * @param NetworkAddressTimestamp[] $vNetAddr
353
     */
354
    public function addr(array $vNetAddr)
355
    {
356
        $this->send($this->msgs->addr($vNetAddr));
357
    }
358
359
    /**
360
     *
361
     */
362
    public function getaddr()
363
    {
364
        $this->send($this->msgs->getaddr());
365
    }
366
367
    /**
368
     *
369
     */
370
    public function ping()
371
    {
372
        $this->send($this->msgs->ping());
373
    }
374
375
    /**
376
     * @param Ping $ping
377
     */
378
    public function pong(Ping $ping)
379
    {
380
        $this->send($this->msgs->pong($ping));
381
    }
382
383
    /**
384
     * @param TransactionInterface $tx
385
     */
386
    public function tx(TransactionInterface $tx)
387
    {
388
        $this->send($this->msgs->tx($tx));
389
    }
390
391
    /**
392
     * @param BlockLocator $locator
393
     */
394
    public function getblocks(BlockLocator $locator)
395
    {
396
        $this->send($this->msgs->getblocks(
397
            $this->localVersion->getVersion(),
398
            $locator
399
        ));
400
    }
401
402
    /**
403
     * @param BlockLocator $locator
404
     */
405
    public function getheaders(BlockLocator $locator)
406
    {
407
        $this->send($this->msgs->getheaders(
408
            $this->localVersion->getVersion(),
409
            $locator
410
        ));
411
    }
412
413
    /**
414
     * @param BlockInterface $block
415
     */
416
    public function block(BlockInterface $block)
417
    {
418
        $this->send($this->msgs->block($block));
419
    }
420
421
    /**
422
     * @param array $vHeaders
423
     */
424
    public function headers(array $vHeaders)
425
    {
426
        $this->send($this->msgs->headers($vHeaders));
427
    }
428
429
    /**
430
     * @param AlertDetail $detail
431
     * @param SignatureInterface $signature
432
     */
433
    public function alert(AlertDetail $detail, SignatureInterface $signature)
434
    {
435
        $this->send($this->msgs->alert($detail, $signature));
436
    }
437
438
    /**
439
     * @param int $feeRate
440
     */
441
    public function feefilter($feeRate)
442
    {
443
        $this->send($this->msgs->feefilter($feeRate));
444
    }
445
446
    /**
447
     * @param BufferInterface $data
448
     */
449
    public function filteradd(BufferInterface $data)
450
    {
451
        $this->send($this->msgs->filteradd($data));
452
    }
453
454
    /**
455
     * @param BloomFilter $filter
456
     */
457
    public function filterload(BloomFilter $filter)
458
    {
459
        $this->send($this->msgs->filterload($filter));
460
    }
461
462
    /**
463
     *
464
     */
465
    public function filterclear()
466
    {
467
        $this->send($this->msgs->filterclear());
468
    }
469
470
    /**
471
     * @param FilteredBlock $filtered
472
     */
473
    public function merkleblock(FilteredBlock $filtered)
474
    {
475
        $this->send($this->msgs->merkleblock($filtered));
476
    }
477
478
    /**
479
     *
480
     */
481
    public function mempool()
482
    {
483
        $this->send($this->msgs->mempool());
484
    }
485
486
    /**
487
     * Issue a Reject message, with a required $msg, $code, and $reason
488
     *
489
     * @param BufferInterface $msg
490
     * @param int $code
491
     * @param BufferInterface $reason
492
     * @param BufferInterface $data
493
     */
494
    public function reject(BufferInterface $msg, $code, BufferInterface $reason, BufferInterface $data = null)
495
    {
496
        $this->send($this->msgs->reject($msg, $code, $reason, $data));
497
    }
498
}
499