Completed
Push — master ( d7f7e8...4d00a0 )
by thomas
33:43
created

Peer::inboundHandshake()   A

Complexity

Conditions 2
Paths 1

Size

Total Lines 23
Code Lines 15

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 11
CRAP Score 2

Importance

Changes 0
Metric Value
dl 0
loc 23
ccs 11
cts 11
cp 1
rs 9.0856
c 0
b 0
f 0
cc 2
eloc 15
nc 1
nop 2
crap 2
1
<?php
2
3
namespace BitWasp\Bitcoin\Networking\Peer;
4
5
use BitWasp\Bitcoin\Block\BlockInterface;
6
use BitWasp\Bitcoin\Block\FilteredBlock;
7
use BitWasp\Bitcoin\Bloom\BloomFilter;
8
use BitWasp\Bitcoin\Chain\BlockLocator;
9
use BitWasp\Bitcoin\Networking\Message;
10
use BitWasp\Bitcoin\Networking\Messages\Version;
11
use BitWasp\Bitcoin\Networking\Messages\Ping;
12
use BitWasp\Bitcoin\Networking\NetworkMessage;
13
use BitWasp\Bitcoin\Networking\NetworkSerializable;
14
use BitWasp\Bitcoin\Networking\Structure\AlertDetail;
15
use BitWasp\Bitcoin\Networking\Structure\Header;
16
use BitWasp\Bitcoin\Networking\Structure\Inventory;
17
use BitWasp\Bitcoin\Networking\Structure\NetworkAddressInterface;
18
use BitWasp\Bitcoin\Networking\Structure\NetworkAddressTimestamp;
19
use BitWasp\Bitcoin\Crypto\EcAdapter\Signature\SignatureInterface;
20
use BitWasp\Bitcoin\Transaction\TransactionInterface;
21
use BitWasp\Buffertools\Buffer;
22
use BitWasp\Buffertools\BufferInterface;
23
use BitWasp\Buffertools\Parser;
24
use Evenement\EventEmitter;
25
use React\EventLoop\LoopInterface;
26
use React\Promise\Deferred;
27
use React\Stream\Stream;
28
29
class Peer extends EventEmitter
30
{
31
    /**
32
     * @var string
33
     */
34
    private $buffer = '';
35
36
    /**
37
     * @var LoopInterface
38
     */
39
    private $loop;
40
41
    /**
42
     * @var \BitWasp\Bitcoin\Networking\Messages\Factory
43
     */
44
    private $msgs;
45
46
    /**
47
     * @var Stream
48
     */
49
    private $stream;
50
51
    /**
52
     * @var Version
53
     */
54
    private $localVersion;
55
56
    /**
57
     * @var Version
58
     */
59
    private $remoteVersion;
60
61
    /**
62
     * @var NetworkAddressInterface
63
     */
64
    private $peerAddress;
65
66
    /**
67
     * @var ConnectionParams
68
     */
69
    private $connectionParams;
70
71
    /**
72
     * @var bool
73
     */
74
    private $exchangedVersion = false;
75
76
    /**
77
     * @var Header
78
     */
79
    private $incomingMsgHeader;
80 12
81
    /**
82 12
     * @param \BitWasp\Bitcoin\Networking\Messages\Factory $msgs
83 12
     * @param LoopInterface $loop
84 12
     */
85
    public function __construct(\BitWasp\Bitcoin\Networking\Messages\Factory $msgs, LoopInterface $loop)
86
    {
87
        $this->msgs = $msgs;
88
        $this->loop = $loop;
89 3
    }
90
91 3
    /**
92
     * @return Version
93
     */
94
    public function getLocalVersion()
95
    {
96
        return $this->localVersion;
97 3
    }
98
99 3
    /**
100
     * @return Version
101
     */
102
    public function getRemoteVersion()
103
    {
104
        return $this->remoteVersion;
105
    }
106
107
    /**
108
     * Reliably returns the remote peers NetAddr when known through
109 3
     * the connection process. Often better than the data contained
110
     * in a Version message.
111 3
     *
112
     * @return NetworkAddressInterface
113
     */
114
    public function getRemoteAddress()
115
    {
116
        return $this->peerAddress;
117 3
    }
118
119 3
    /**
120
     * @return ConnectionParams
121
     */
122
    public function getConnectionParams()
123
    {
124
        return $this->connectionParams;
125 12
    }
126
127 12
    /**
128 12
     * @param NetworkSerializable $msg
129 12
     */
130 12
    public function send(NetworkSerializable $msg)
131
    {
132
        $netMsg = $msg->getNetworkMessage($this->msgs->getNetwork());
133
        $serialized = $this->msgs->getSerializer()->serialize($netMsg);
134
        $this->stream->write($serialized->getBinary());
135
        $this->emit('send', [$netMsg]);
136 12
    }
137
138 12
    /**
139 12
     * @param Stream $stream
140
     * @return $this
141
     */
142 12
    public function setupStream(Stream $stream)
143 12
    {
144 12
        $this->stream = $stream;
145 8
        $this->stream->on('data', function ($data) {
146 12
            $this->buffer .= $data;
147 12
148
            $data = new Buffer($this->buffer);
149
            $parser = new Parser($data);
150 12
151
            $pos = $parser->getPosition();
152
            $sz = $parser->getSize();
153
154
            try {
155
                while ($pos < $sz) {
156 12
                    if (null === $this->incomingMsgHeader) {
157
                        if ($sz - $pos < 24) {
158 12
                            break;
159
                        }
160 12
161 12
                        $this->incomingMsgHeader = $this->msgs->getSerializer()->parseHeader($parser);
162 12
                        $pos = $parser->getPosition();
163
                    }
164
165 6
                    if ($sz - $pos < $this->incomingMsgHeader->getLength()) {
166 12
                        break;
167
                    }
168
169 12
                    $message = $this->msgs->getSerializer()->parsePacket($this->incomingMsgHeader, $parser);
170 12
                    $this->incomingMsgHeader = null;
171
                    $this->emit('msg', [$this, $message]);
172
                    $pos = $parser->getPosition();
173 12
                }
174 12
            } catch (\Exception $e) {
175
                echo $e->getMessage();
176 12
            }
177
178
            $this->buffer = $parser->getBuffer()->slice($pos)->getBinary();
179
        });
180
181
        $this->stream->once('close', function () {
182
            $this->close();
183
        });
184 9
185
        $this->on('msg', function (Peer $peer, NetworkMessage $msg) {
186 9
            $this->emit($msg->getCommand(), [$peer, $msg->getPayload()]);
187
        });
188 9
189
        return $this;
190
    }
191 9
192 9
    /**
193 9
     * @param Stream $connection
194 9
     * @param ConnectionParams $params
195 9
     * @return \React\Promise\Promise|\React\Promise\PromiseInterface
196
     */
197
    public function inboundHandshake(Stream $connection, ConnectionParams $params)
0 ignored issues
show
Unused Code introduced by
The parameter $connection is not used and could be removed.

This check looks from parameters that have been defined for a function or method, but which are not used in the method body.

Loading history...
198 9
    {
199 9
        $this->connectionParams = $params;
200 9
201 9
        $deferred = new Deferred();
202 9
        $this->on(Message::VERSION, function (Peer $peer, Version $version) use ($params) {
203 6
            $this->peerAddress = $version->getSenderAddress();
204 9
            $this->remoteVersion = $version;
205
            $this->localVersion = $localVersion = $params->produceVersion($this->msgs, $version->getSenderAddress());
206
            $this->send($localVersion);
207 9
        });
208
209
        $this->on(Message::VERACK, function () use ($deferred) {
210
            if (false === $this->exchangedVersion) {
211
                $this->exchangedVersion = true;
212
                $this->verack();
213
                $this->emit('ready', [$this]);
214
                $deferred->resolve($this);
215 12
            }
216
        });
217 12
218
        return $deferred->promise();
219 12
    }
220
221 6
    /**
222 6
     * @param NetworkAddressInterface $remotePeer
223 6
     * @param ConnectionParams $params
224 4
     * @return \React\Promise\Promise|\React\Promise\PromiseInterface
225 12
     */
226
    public function outboundHandshake(NetworkAddressInterface $remotePeer, ConnectionParams $params)
227
    {
228 12
        $deferred = new Deferred();
229 12
        
230 12
        $awaitVersion = true;
231
        $this->stream->once('close', function () use (&$awaitVersion, $deferred) {
232 12
            if ($awaitVersion) {
233 9
                $awaitVersion = false;
234 9
                $deferred->reject(new \Exception('peer disconnected'));
235 9
            }
236 9
        });
237 6
238 12
        $this->on(Message::VERSION, function (Peer $peer, Version $version) use ($params) {
239
            $this->remoteVersion = $version;
240 12
            $this->verack();
241 12
        });
242 12
243
        $this->on(Message::VERACK, function () use ($deferred) {
244 12
            if (false === $this->exchangedVersion) {
245
                $this->exchangedVersion = true;
246 12
                $this->emit('ready', [$this]);
247
                $deferred->resolve($this);
248
            }
249
        });
250
251
        $this->peerAddress = $remotePeer;
252
        $this->localVersion = $version = $params->produceVersion($this->msgs, $remotePeer);
253
        $this->connectionParams = $params;
254
255
        $this->send($version);
256
257
        return $deferred->promise();
258
    }
259
260
    /**
261 9
     *
262
     */
263 9
    public function intentionalClose()
264 9
    {
265 9
        $this->emit('intentionaldisconnect', [$this]);
266 9
        $this->close();
267
    }
268
269
    /**
270
     *
271
     */
272
    public function close()
273
    {
274
        $this->emit('close', [$this]);
275
        $this->stream->end();
276
        $this->removeAllListeners();
277
    }
278
279
    /**
280
     * @param int $protocolVersion
281
     * @param int $services
282
     * @param int $timestamp
283
     * @param NetworkAddressInterface $remoteAddr
284
     * @param NetworkAddressInterface $localAddr
285
     * @param string $userAgent
286
     * @param int $blockHeight
287
     * @param bool $relayToUs
288
     */
289
    public function version(
290
        $protocolVersion,
291
        $services,
292
        $timestamp,
293
        NetworkAddressInterface $remoteAddr,
294
        NetworkAddressInterface $localAddr,
295
        $userAgent,
296
        $blockHeight,
297
        $relayToUs
298
    ) {
299
        $this->send($this->msgs->version(
300
            $protocolVersion,
301
            $services,
302
            $timestamp,
303 12
            $remoteAddr,
304
            $localAddr,
305 12
            new Buffer($userAgent),
306 12
            $blockHeight,
307
            $relayToUs
308
        ));
309
    }
310
311
    /**
312
     *
313
     */
314
    public function verack()
315
    {
316
        $this->send($this->msgs->verack());
317
    }
318
319
    /**
320
     *
321
     */
322
    public function sendheaders()
323
    {
324
        $this->send($this->msgs->sendheaders());
325
    }
326
327
    /**
328
     * @param Inventory[] $vInv
329
     */
330
    public function inv(array $vInv)
331
    {
332
        $this->send($this->msgs->inv($vInv));
333
    }
334
335
    /**
336
     * @param Inventory[] $vInv
337
     */
338
    public function getdata(array $vInv)
339
    {
340
        $this->send($this->msgs->getdata($vInv));
341
    }
342
343
    /**
344
     * @param array $vInv
345
     */
346
    public function notfound(array $vInv)
347
    {
348
        $this->send($this->msgs->notfound($vInv));
349
    }
350
351
    /**
352
     * @param NetworkAddressTimestamp[] $vNetAddr
353
     */
354
    public function addr(array $vNetAddr)
355
    {
356
        $this->send($this->msgs->addr($vNetAddr));
357
    }
358
359
    /**
360
     *
361
     */
362
    public function getaddr()
363
    {
364
        $this->send($this->msgs->getaddr());
365
    }
366
367
    /**
368
     *
369
     */
370
    public function ping()
371
    {
372
        $this->send($this->msgs->ping());
373
    }
374
375
    /**
376
     * @param Ping $ping
377
     */
378
    public function pong(Ping $ping)
379
    {
380
        $this->send($this->msgs->pong($ping));
381
    }
382
383
    /**
384
     * @param TransactionInterface $tx
385
     */
386
    public function tx(TransactionInterface $tx)
387
    {
388
        $this->send($this->msgs->tx($tx));
389
    }
390
391
    /**
392
     * @param BlockLocator $locator
393
     */
394
    public function getblocks(BlockLocator $locator)
395
    {
396
        $this->send($this->msgs->getblocks(
397
            $this->localVersion->getVersion(),
398
            $locator
399
        ));
400
    }
401
402
    /**
403
     * @param BlockLocator $locator
404
     */
405
    public function getheaders(BlockLocator $locator)
406
    {
407
        $this->send($this->msgs->getheaders(
408
            $this->localVersion->getVersion(),
409
            $locator
410
        ));
411
    }
412
413
    /**
414
     * @param BlockInterface $block
415
     */
416
    public function block(BlockInterface $block)
417
    {
418
        $this->send($this->msgs->block($block));
419
    }
420
421
    /**
422
     * @param array $vHeaders
423
     */
424
    public function headers(array $vHeaders)
425
    {
426
        $this->send($this->msgs->headers($vHeaders));
427
    }
428
429
    /**
430
     * @param AlertDetail $detail
431
     * @param SignatureInterface $signature
432
     */
433
    public function alert(AlertDetail $detail, SignatureInterface $signature)
434
    {
435
        $this->send($this->msgs->alert($detail, $signature));
436
    }
437
438
    /**
439
     * @param int $feeRate
440
     */
441
    public function feefilter($feeRate)
442
    {
443
        $this->send($this->msgs->feefilter($feeRate));
444
    }
445
446
    /**
447
     * @param BufferInterface $data
448
     */
449
    public function filteradd(BufferInterface $data)
450
    {
451
        $this->send($this->msgs->filteradd($data));
452
    }
453
454
    /**
455
     * @param BloomFilter $filter
456
     */
457
    public function filterload(BloomFilter $filter)
458
    {
459
        $this->send($this->msgs->filterload($filter));
460
    }
461
462
    /**
463
     *
464
     */
465
    public function filterclear()
466
    {
467
        $this->send($this->msgs->filterclear());
468
    }
469
470
    /**
471
     * @param FilteredBlock $filtered
472
     */
473
    public function merkleblock(FilteredBlock $filtered)
474
    {
475
        $this->send($this->msgs->merkleblock($filtered));
476
    }
477
478
    /**
479
     *
480
     */
481
    public function mempool()
482
    {
483
        $this->send($this->msgs->mempool());
484
    }
485
486
    /**
487
     * Issue a Reject message, with a required $msg, $code, and $reason
488
     *
489
     * @param BufferInterface $msg
490
     * @param int $code
491
     * @param BufferInterface $reason
492
     * @param BufferInterface $data
493
     */
494
    public function reject(BufferInterface $msg, $code, BufferInterface $reason, BufferInterface $data = null)
495
    {
496
        $this->send($this->msgs->reject($msg, $code, $reason, $data));
497
    }
498
}
499