Completed
Push — master ( fc4613...2e6493 )
by thomas
30:51
created

Peer::outboundHandshake()   B

Complexity

Conditions 3
Paths 1

Size

Total Lines 33
Code Lines 20

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 23
CRAP Score 3

Importance

Changes 0
Metric Value
dl 0
loc 33
ccs 23
cts 23
cp 1
rs 8.8571
c 0
b 0
f 0
cc 3
eloc 20
nc 1
nop 2
crap 3
1
<?php
2
3
namespace BitWasp\Bitcoin\Networking\Peer;
4
5
use BitWasp\Bitcoin\Block\BlockInterface;
6
use BitWasp\Bitcoin\Block\FilteredBlock;
7
use BitWasp\Bitcoin\Bloom\BloomFilter;
8
use BitWasp\Bitcoin\Chain\BlockLocator;
9
use BitWasp\Bitcoin\Networking\Message;
10
use BitWasp\Bitcoin\Networking\Messages\Version;
11
use BitWasp\Bitcoin\Networking\Messages\Ping;
12
use BitWasp\Bitcoin\Networking\NetworkMessage;
13
use BitWasp\Bitcoin\Networking\NetworkSerializable;
14
use BitWasp\Bitcoin\Networking\Structure\AlertDetail;
15
use BitWasp\Bitcoin\Networking\Structure\Inventory;
16
use BitWasp\Bitcoin\Networking\Structure\NetworkAddress;
17
use BitWasp\Bitcoin\Networking\Structure\NetworkAddressInterface;
18
use BitWasp\Bitcoin\Networking\Structure\NetworkAddressTimestamp;
19
use BitWasp\Bitcoin\Crypto\EcAdapter\Signature\SignatureInterface;
20
use BitWasp\Bitcoin\Transaction\TransactionInterface;
21
use BitWasp\Buffertools\Buffer;
22
use BitWasp\Buffertools\BufferInterface;
23
use BitWasp\Buffertools\Parser;
24
use Evenement\EventEmitter;
25
use React\EventLoop\LoopInterface;
26
use React\Promise\Deferred;
27
use React\Stream\Stream;
28
29
class Peer extends EventEmitter
30
{
31
    /**
32
     * @var string
33
     */
34
    private $buffer = '';
35
36
    /**
37
     * @var LoopInterface
38
     */
39
    private $loop;
40
41
    /**
42
     * @var \BitWasp\Bitcoin\Networking\Messages\Factory
43
     */
44
    private $msgs;
45
46
    /**
47
     * @var Stream
48
     */
49
    private $stream;
50
51
    /**
52
     * @var Version
53
     */
54
    private $localVersion;
55
56
    /**
57
     * @var Version
58
     */
59
    private $remoteVersion;
60
61
    /**
62
     * @var NetworkAddressInterface
63
     */
64
    private $peerAddress;
65
66
    /**
67
     * @var ConnectionParams
68
     */
69
    private $connectionParams;
70
71
    /**
72
     * @var bool
73
     */
74
    private $exchangedVersion = false;
75
76
    /**
77
     * @param \BitWasp\Bitcoin\Networking\Messages\Factory $msgs
78
     * @param LoopInterface $loop
79
     */
80 12
    public function __construct(\BitWasp\Bitcoin\Networking\Messages\Factory $msgs, LoopInterface $loop)
81
    {
82 12
        $this->msgs = $msgs;
83 12
        $this->loop = $loop;
84 12
    }
85
86
    /**
87
     * @return Version
88
     */
89 3
    public function getLocalVersion()
90
    {
91 3
        return $this->localVersion;
92
    }
93
94
    /**
95
     * @return Version
96
     */
97 3
    public function getRemoteVersion()
98
    {
99 3
        return $this->remoteVersion;
100
    }
101
102
    /**
103
     * Reliably returns the remote peers NetAddr when known through
104
     * the connection process. Often better than the data contained
105
     * in a Version message.
106
     *
107
     * @return NetworkAddressInterface
108
     */
109 3
    public function getRemoteAddress()
110
    {
111 3
        return $this->peerAddress;
112
    }
113
114
    /**
115
     * @return ConnectionParams
116
     */
117 3
    public function getConnectionParams()
118
    {
119 3
        return $this->connectionParams;
120
    }
121
122
    /**
123
     * @param NetworkSerializable $msg
124
     */
125 12
    public function send(NetworkSerializable $msg)
126
    {
127 12
        $net = $msg->getNetworkMessage($this->msgs->getNetwork());
128 12
        $this->stream->write($net->getBinary());
129 12
        $this->emit('send', [$net]);
130 12
    }
131
132
    /**
133
     * Handler for incoming data. Buffers possibly fragmented packets since they arrive sequentially.
134
     * Before finishing the version exchange, this will only emit Version and VerAck messages.
135
     */
136 12
    private function onData()
137
    {
138 12
        $tmp = $this->buffer;
139 12
        $parser = new Parser(new Buffer($tmp));
140
141
        try {
142 12
            while ($message = $this->msgs->parse($parser)) {
143 12
                $tmp = $parser->getBuffer()->slice($parser->getPosition())->getBinary();
144 12
                $this->emit('msg', [$this, $message]);
145 8
            }
146 12
        } catch (\Exception $e) {
147 12
            $this->buffer = $tmp;
148
            // Do nothing - it was probably a fragmented message
149
        }
150 12
    }
151
152
    /**
153
     * @param Stream $stream
154
     * @return $this
155
     */
156 12
    public function setupStream(Stream $stream)
157
    {
158 12
        $this->stream = $stream;
159
        $this->stream->on('data', function ($data) {
160 12
            $this->buffer .= $data;
161 12
            $this->emit('data');
162 12
        });
163
164
        $this->stream->on('close', function () {
165 7
            $this->close();
166 12
        });
167
168
        $this->on('data', function () {
169 12
            $this->onData();
170 12
        });
171
172
        $this->on('msg', function (Peer $peer, NetworkMessage $msg) {
173 12
            $this->emit($msg->getCommand(), [$peer, $msg->getPayload()]);
174 12
        });
175
176 12
        return $this;
177
    }
178
179
    /**
180
     * @param Stream $connection
181
     * @param ConnectionParams $params
182
     * @return \React\Promise\Promise|\React\Promise\PromiseInterface
183
     */
184 9
    public function inboundHandshake(Stream $connection, ConnectionParams $params)
0 ignored issues
show
Unused Code introduced by
The parameter $connection is not used and could be removed.

This check looks from parameters that have been defined for a function or method, but which are not used in the method body.

Loading history...
185
    {
186 9
        $this->connectionParams = $params;
187
188 9
        $deferred = new Deferred();
189
        $this->on(Message::VERSION, function (Peer $peer, Version $version) use ($params) {
190
191 9
            $this->peerAddress = $version->getSenderAddress();
192 9
            $this->remoteVersion = $version;
193 9
            $this->localVersion = $localVersion = $params->produceVersion($this->msgs, $version->getSenderAddress());
194 9
            $this->send($localVersion);
195 9
        });
196
197
        $this->on(Message::VERACK, function () use ($deferred) {
198 9
            if (false === $this->exchangedVersion) {
199 9
                $this->exchangedVersion = true;
200 9
                $this->verack();
201 9
                $this->emit('ready', [$this]);
202 9
                $deferred->resolve($this);
203 6
            }
204 9
        });
205
206
207 9
        return $deferred->promise();
208
    }
209
210
    /**
211
     * @param NetworkAddressInterface $remotePeer
212
     * @param ConnectionParams $params
213
     * @return \React\Promise\Promise|\React\Promise\PromiseInterface
214
     */
215 12
    public function outboundHandshake(NetworkAddressInterface $remotePeer, ConnectionParams $params)
216
    {
217 12
        $deferred = new Deferred();
218
        
219 12
        $awaitVersion = true;
220
        $this->stream->on('close', function () use (&$awaitVersion, $deferred) {
221 7
            if ($awaitVersion) {
222 7
                $awaitVersion = false;
223 7
                $deferred->reject('peer disconnected');
224 5
            }
225 12
        });
226
227
        $this->on(Message::VERSION, function (Peer $peer, Version $version) use ($params) {
228 12
            $this->remoteVersion = $version;
229 12
            $this->verack();
230 12
        });
231
232 12
        $this->on(Message::VERACK, function () use ($deferred) {
233 9
            if (false === $this->exchangedVersion) {
234 9
                $this->exchangedVersion = true;
235 9
                $this->emit('ready', [$this]);
236 9
                $deferred->resolve($this);
237 6
            }
238 12
        });
239
240 12
        $this->peerAddress = $remotePeer;
241 12
        $this->localVersion = $version = $params->produceVersion($this->msgs, $remotePeer);
242 12
        $this->connectionParams = $params;
243
244 12
        $this->send($version);
245
246 12
        return $deferred->promise();
247
    }
248
249
    /**
250
     *
251
     */
252
    public function intentionalClose()
253
    {
254
        $this->emit('intentionaldisconnect', [$this]);
255
        $this->close();
256
    }
257
258
    /**
259
     *
260
     */
261 9
    public function close()
262
    {
263 9
        $this->emit('close', [$this]);
264 9
        $this->stream->end();
265 9
        $this->removeAllListeners();
266 9
    }
267
268
    /**
269
     * @param int $protocolVersion
270
     * @param int $services
271
     * @param int $timestamp
272
     * @param NetworkAddressInterface $remoteAddr
273
     * @param NetworkAddressInterface $localAddr
274
     * @param string $userAgent
275
     * @param int $blockHeight
276
     * @param bool $relayToUs
277
     */
278
    public function version(
279
        $protocolVersion,
280
        $services,
281
        $timestamp,
282
        NetworkAddressInterface $remoteAddr,
283
        NetworkAddressInterface $localAddr,
284
        $userAgent,
285
        $blockHeight,
286
        $relayToUs
287
    ) {
288
        $this->send($this->msgs->version(
289
            $protocolVersion,
290
            $services,
291
            $timestamp,
292
            $remoteAddr,
293
            $localAddr,
294
            new Buffer($userAgent),
295
            $blockHeight,
296
            $relayToUs
297
        ));
298
    }
299
300
    /**
301
     *
302
     */
303 12
    public function verack()
304
    {
305 12
        $this->send($this->msgs->verack());
306 12
    }
307
308
    /**
309
     *
310
     */
311
    public function sendheaders()
312
    {
313
        $this->send($this->msgs->sendheaders());
314
    }
315
316
    /**
317
     * @param Inventory[] $vInv
318
     */
319
    public function inv(array $vInv)
320
    {
321
        $this->send($this->msgs->inv($vInv));
322
    }
323
324
    /**
325
     * @param Inventory[] $vInv
326
     */
327
    public function getdata(array $vInv)
328
    {
329
        $this->send($this->msgs->getdata($vInv));
330
    }
331
332
    /**
333
     * @param array $vInv
334
     */
335
    public function notfound(array $vInv)
336
    {
337
        $this->send($this->msgs->notfound($vInv));
338
    }
339
340
    /**
341
     * @param NetworkAddressTimestamp[] $vNetAddr
342
     */
343
    public function addr(array $vNetAddr)
344
    {
345
        $this->send($this->msgs->addr($vNetAddr));
346
    }
347
348
    /**
349
     *
350
     */
351
    public function getaddr()
352
    {
353
        $this->send($this->msgs->getaddr());
354
    }
355
356
    /**
357
     *
358
     */
359
    public function ping()
360
    {
361
        $this->send($this->msgs->ping());
362
    }
363
364
    /**
365
     * @param Ping $ping
366
     */
367
    public function pong(Ping $ping)
368
    {
369
        $this->send($this->msgs->pong($ping));
370
    }
371
372
    /**
373
     * @param TransactionInterface $tx
374
     */
375
    public function tx(TransactionInterface $tx)
376
    {
377
        $this->send($this->msgs->tx($tx));
378
    }
379
380
    /**
381
     * @param BlockLocator $locator
382
     */
383
    public function getblocks(BlockLocator $locator)
384
    {
385
        $this->send($this->msgs->getblocks(
386
            $this->localVersion->getVersion(),
387
            $locator
388
        ));
389
    }
390
391
    /**
392
     * @param BlockLocator $locator
393
     */
394
    public function getheaders(BlockLocator $locator)
395
    {
396
        $this->send($this->msgs->getheaders(
397
            $this->localVersion->getVersion(),
398
            $locator
399
        ));
400
    }
401
402
    /**
403
     * @param BlockInterface $block
404
     */
405
    public function block(BlockInterface $block)
406
    {
407
        $this->send($this->msgs->block($block));
408
    }
409
410
    /**
411
     * @param array $vHeaders
412
     */
413
    public function headers(array $vHeaders)
414
    {
415
        $this->send($this->msgs->headers($vHeaders));
416
    }
417
418
    /**
419
     * @param AlertDetail $detail
420
     * @param SignatureInterface $signature
421
     */
422
    public function alert(AlertDetail $detail, SignatureInterface $signature)
423
    {
424
        $this->send($this->msgs->alert($detail, $signature));
425
    }
426
427
    /**
428
     * @param int $feeRate
429
     */
430
    public function feefilter($feeRate)
431
    {
432
        $this->send($this->msgs->feefilter($feeRate));
433
    }
434
435
    /**
436
     * @param BufferInterface $data
437
     */
438
    public function filteradd(BufferInterface $data)
439
    {
440
        $this->send($this->msgs->filteradd($data));
441
    }
442
443
    /**
444
     * @param BloomFilter $filter
445
     */
446
    public function filterload(BloomFilter $filter)
447
    {
448
        $this->send($this->msgs->filterload($filter));
449
    }
450
451
    /**
452
     *
453
     */
454
    public function filterclear()
455
    {
456
        $this->send($this->msgs->filterclear());
457
    }
458
459
    /**
460
     * @param FilteredBlock $filtered
461
     */
462
    public function merkleblock(FilteredBlock $filtered)
463
    {
464
        $this->send($this->msgs->merkleblock($filtered));
465
    }
466
467
    /**
468
     *
469
     */
470
    public function mempool()
471
    {
472
        $this->send($this->msgs->mempool());
473
    }
474
475
    /**
476
     * Issue a Reject message, with a required $msg, $code, and $reason
477
     *
478
     * @param BufferInterface $msg
479
     * @param int $code
480
     * @param BufferInterface $reason
481
     * @param BufferInterface $data
482
     */
483
    public function reject(BufferInterface $msg, $code, BufferInterface $reason, BufferInterface $data = null)
484
    {
485
        $this->send($this->msgs->reject($msg, $code, $reason, $data));
486
    }
487
}
488