Completed
Push — master ( 4c10a8...999ca2 )
by thomas
8s
created

Peer::inboundHandshake()   B

Complexity

Conditions 2
Paths 1

Size

Total Lines 26
Code Lines 16

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 17
CRAP Score 2

Importance

Changes 6
Bugs 0 Features 1
Metric Value
c 6
b 0
f 1
dl 0
loc 26
ccs 17
cts 17
cp 1
rs 8.8571
cc 2
eloc 16
nc 1
nop 2
crap 2
1
<?php
2
3
namespace BitWasp\Bitcoin\Networking\Peer;
4
5
use BitWasp\Bitcoin\Block\BlockInterface;
6
use BitWasp\Bitcoin\Block\FilteredBlock;
7
use BitWasp\Bitcoin\Bloom\BloomFilter;
8
use BitWasp\Bitcoin\Chain\BlockLocator;
9
use BitWasp\Bitcoin\Networking\Message;
10
use BitWasp\Bitcoin\Networking\Messages\Version;
11
use BitWasp\Bitcoin\Networking\Messages\Ping;
12
use BitWasp\Bitcoin\Networking\NetworkMessage;
13
use BitWasp\Bitcoin\Networking\NetworkSerializable;
14
use BitWasp\Bitcoin\Networking\Structure\AlertDetail;
15
use BitWasp\Bitcoin\Networking\Structure\Inventory;
16
use BitWasp\Bitcoin\Networking\Structure\NetworkAddress;
17
use BitWasp\Bitcoin\Networking\Structure\NetworkAddressInterface;
18
use BitWasp\Bitcoin\Networking\Structure\NetworkAddressTimestamp;
19
use BitWasp\Bitcoin\Crypto\EcAdapter\Signature\SignatureInterface;
20
use BitWasp\Bitcoin\Transaction\TransactionInterface;
21
use BitWasp\Buffertools\Buffer;
22
use BitWasp\Buffertools\BufferInterface;
23
use BitWasp\Buffertools\Parser;
24
use Evenement\EventEmitter;
25
use React\EventLoop\LoopInterface;
26
use React\Promise\Deferred;
27
use React\Stream\Stream;
28
29
class Peer extends EventEmitter
30
{
31
    /**
32
     * @var string
33
     */
34
    private $buffer = '';
35
36
    /**
37
     * @var LoopInterface
38
     */
39
    private $loop;
40
41
    /**
42
     * @var \BitWasp\Bitcoin\Networking\Messages\Factory
43
     */
44
    private $msgs;
45
46
    /**
47
     * @var Stream
48
     */
49
    private $stream;
50
51
    /**
52
     * @var Version
53
     */
54
    private $localVersion;
55
56
    /**
57
     * @var Version
58
     */
59
    private $remoteVersion;
60
61
    /**
62
     * @var NetworkAddressInterface
63
     */
64
    private $peerAddress;
65
66
    /**
67
     * @var ConnectionParams
68
     */
69
    private $connectionParams;
70
71
    /**
72
     * @var bool
73
     */
74
    private $exchangedVersion = false;
75
76
    /**
77
     * @param \BitWasp\Bitcoin\Networking\Messages\Factory $msgs
78
     * @param LoopInterface $loop
79
     */
80 12
    public function __construct(\BitWasp\Bitcoin\Networking\Messages\Factory $msgs, LoopInterface $loop)
81
    {
82 12
        $this->msgs = $msgs;
83 12
        $this->loop = $loop;
84 12
    }
85
86
    /**
87
     * @return Version
88
     */
89 3
    public function getLocalVersion()
90
    {
91 3
        return $this->localVersion;
92
    }
93
94
    /**
95
     * @return Version
96
     */
97 3
    public function getRemoteVersion()
98
    {
99 3
        return $this->remoteVersion;
100
    }
101
102
    /**
103
     * Reliably returns the remote peers NetAddr when known through
104
     * the connection process. Often better than the data contained
105
     * in a Version message.
106
     *
107
     * @return NetworkAddressInterface
108
     */
109 3
    public function getRemoteAddress()
110
    {
111 3
        return $this->peerAddress;
112
    }
113
114
    /**
115
     * @return ConnectionParams
116
     */
117 3
    public function getConnectionParams()
118
    {
119 3
        return $this->connectionParams;
120
    }
121
122
    /**
123
     * @param NetworkSerializable $msg
124
     */
125 12
    public function send(NetworkSerializable $msg)
126
    {
127 12
        $net = $msg->getNetworkMessage();
128 12
        $this->stream->write($net->getBinary());
129 12
        $this->emit('send', [$net]);
130 12
    }
131
132
    /**
133
     * Handler for incoming data. Buffers possibly fragmented packets since they arrive sequentially.
134
     * Before finishing the version exchange, this will only emit Version and VerAck messages.
135
     */
136 12
    private function onData()
137
    {
138 12
        $tmp = $this->buffer;
139 12
        $parser = new Parser(new Buffer($tmp));
140
141
        try {
142 12
            while ($message = $this->msgs->parse($parser)) {
143 12
                $tmp = $parser->getBuffer()->slice($parser->getPosition())->getBinary();
144 12
                $this->emit('msg', [$this, $message]);
145 12
            }
146 12
        } catch (\Exception $e) {
147 12
            $this->buffer = $tmp;
148
            // Do nothing - it was probably a fragmented message
149
        }
150 12
    }
151
152
    /**
153
     * @param Stream $stream
154
     * @return $this
155
     */
156 12
    public function setupStream(Stream $stream)
157
    {
158 12
        $this->stream = $stream;
159
        $this->stream->on('data', function ($data) {
160 12
            $this->buffer .= $data;
161 12
            $this->emit('data');
162 12
        });
163
164
        $this->stream->on('close', function () {
165 9
            $this->close();
166 12
        });
167
168
        $this->on('data', function () {
169 12
            $this->onData();
170 12
        });
171
172
        $this->on('msg', function (Peer $peer, NetworkMessage $msg) {
173 12
            $this->emit($msg->getCommand(), [$peer, $msg->getPayload()]);
174 12
        });
175
176 12
        return $this;
177
    }
178
179
    /**
180
     * @param Stream $connection
181
     * @param ConnectionParams $params
182
     * @return \React\Promise\Promise|\React\Promise\PromiseInterface
183
     */
184 9
    public function inboundHandshake(Stream $connection, ConnectionParams $params)
185
    {
186 9
        $this->setupStream($connection);
187 9
        $this->connectionParams = $params;
188
189 9
        $deferred = new Deferred();
190
        $this->on(Message::VERSION, function (Peer $peer, Version $version) use ($params) {
191
192 9
            $this->peerAddress = $version->getSenderAddress();
193 9
            $this->remoteVersion = $version;
194 9
            $this->localVersion = $localVersion = $params->produceVersion($this->msgs, $version->getSenderAddress());
195 9
            $this->send($localVersion);
196 9
        });
197
198
        $this->on(Message::VERACK, function () use ($deferred) {
199 9
            if (false === $this->exchangedVersion) {
200 9
                $this->exchangedVersion = true;
201 9
                $this->verack();
202 9
                $this->emit('ready', [$this]);
203 9
                $deferred->resolve($this);
204 9
            }
205 9
        });
206
207
208 9
        return $deferred->promise();
209
    }
210
211
    /**
212
     * @param NetworkAddressInterface $remotePeer
213
     * @param ConnectionParams $params
214
     * @return \React\Promise\Promise|\React\Promise\PromiseInterface
215
     */
216 12
    public function outboundHandshake(NetworkAddressInterface $remotePeer, ConnectionParams $params)
217
    {
218 12
        $deferred = new Deferred();
219
220
        $this->on(Message::VERSION, function (Peer $peer, Version $version) {
221 12
            $this->remoteVersion = $version;
222 12
            $this->verack();
223 12
        });
224
225 12
        $this->on(Message::VERACK, function () use ($deferred) {
226 9
            if (false === $this->exchangedVersion) {
227 9
                $this->exchangedVersion = true;
228 9
                $this->emit('ready', [$this]);
229 9
                $deferred->resolve($this);
230 9
            }
231 12
        });
232
233 12
        $this->peerAddress = $remotePeer;
234 12
        $this->localVersion = $version = $params->produceVersion($this->msgs, $remotePeer);
235 12
        $this->connectionParams = $params;
236
237 12
        $this->send($version);
238
239 12
        return $deferred->promise();
240
    }
241
242
    /**
243
     *
244
     */
245
    public function intentionalClose()
246
    {
247
        $this->emit('intentionaldisconnect', [$this]);
248
        $this->close();
249
    }
250
251
    /**
252
     *
253
     */
254 9
    public function close()
255
    {
256 9
        $this->emit('close', [$this]);
257 9
        $this->stream->end();
258 9
        $this->removeAllListeners();
259 9
    }
260
261
    /**
262
     * @param int $protocolVersion
263
     * @param int $services
264
     * @param int $timestamp
265
     * @param NetworkAddressInterface $remoteAddr
266
     * @param NetworkAddressInterface $localAddr
267
     * @param string $userAgent
268
     * @param int $blockHeight
269
     * @param bool $relayToUs
270
     */
271
    public function version(
272
        $protocolVersion,
273
        $services,
274
        $timestamp,
275
        NetworkAddressInterface $remoteAddr,
276
        NetworkAddressInterface $localAddr,
277
        $userAgent,
278
        $blockHeight,
279
        $relayToUs
280
    ) {
281
        $this->send($this->msgs->version(
282
            $protocolVersion,
283
            $services,
284
            $timestamp,
285
            $remoteAddr,
286
            $localAddr,
287
            new Buffer($userAgent),
288
            $blockHeight,
289
            $relayToUs
290
        ));
291
    }
292
293
    /**
294
     *
295
     */
296 12
    public function verack()
297
    {
298 12
        $this->send($this->msgs->verack());
299 12
    }
300
301
    /**
302
     *
303
     */
304
    public function sendheaders()
305
    {
306
        $this->send($this->msgs->sendheaders());
307
    }
308
309
    /**
310
     * @param Inventory[] $vInv
311
     */
312
    public function inv(array $vInv)
313
    {
314
        $this->send($this->msgs->inv($vInv));
315
    }
316
317
    /**
318
     * @param Inventory[] $vInv
319
     */
320
    public function getdata(array $vInv)
321
    {
322
        $this->send($this->msgs->getdata($vInv));
323
    }
324
325
    /**
326
     * @param array $vInv
327
     */
328
    public function notfound(array $vInv)
329
    {
330
        $this->send($this->msgs->notfound($vInv));
331
    }
332
333
    /**
334
     * @param NetworkAddressTimestamp[] $vNetAddr
335
     */
336
    public function addr(array $vNetAddr)
337
    {
338
        $this->send($this->msgs->addr($vNetAddr));
339
    }
340
341
    /**
342
     *
343
     */
344
    public function getaddr()
345
    {
346
        $this->send($this->msgs->getaddr());
347
    }
348
349
    /**
350
     *
351
     */
352
    public function ping()
353
    {
354
        $this->send($this->msgs->ping());
355
    }
356
357
    /**
358
     * @param Ping $ping
359
     */
360
    public function pong(Ping $ping)
361
    {
362
        $this->send($this->msgs->pong($ping));
363
    }
364
365
    /**
366
     * @param TransactionInterface $tx
367
     */
368
    public function tx(TransactionInterface $tx)
369
    {
370
        $this->send($this->msgs->tx($tx));
371
    }
372
373
    /**
374
     * @param BlockLocator $locator
375
     */
376
    public function getblocks(BlockLocator $locator)
377
    {
378
        $this->send($this->msgs->getblocks(
379
            $this->localVersion->getVersion(),
380
            $locator
381
        ));
382
    }
383
384
    /**
385
     * @param BlockLocator $locator
386
     */
387
    public function getheaders(BlockLocator $locator)
388
    {
389
        $this->send($this->msgs->getheaders(
390
            $this->localVersion->getVersion(),
391
            $locator
392
        ));
393
    }
394
395
    /**
396
     * @param BlockInterface $block
397
     */
398
    public function block(BlockInterface $block)
399
    {
400
        $this->send($this->msgs->block($block));
401
    }
402
403
    /**
404
     * @param array $vHeaders
405
     */
406
    public function headers(array $vHeaders)
407
    {
408
        $this->send($this->msgs->headers($vHeaders));
409
    }
410
411
    /**
412
     * @param AlertDetail $detail
413
     * @param SignatureInterface $signature
414
     */
415
    public function alert(AlertDetail $detail, SignatureInterface $signature)
416
    {
417
        $this->send($this->msgs->alert($detail, $signature));
418
    }
419
420
    /**
421
     * @param int $feeRate
422
     */
423
    public function feefilter($feeRate)
424
    {
425
        $this->send($this->msgs->feefilter($feeRate));
426
    }
427
428
    /**
429
     * @param BufferInterface $data
430
     */
431
    public function filteradd(BufferInterface $data)
432
    {
433
        $this->send($this->msgs->filteradd($data));
434
    }
435
436
    /**
437
     * @param BloomFilter $filter
438
     */
439
    public function filterload(BloomFilter $filter)
440
    {
441
        $this->send($this->msgs->filterload($filter));
442
    }
443
444
    /**
445
     *
446
     */
447
    public function filterclear()
448
    {
449
        $this->send($this->msgs->filterclear());
450
    }
451
452
    /**
453
     * @param FilteredBlock $filtered
454
     */
455
    public function merkleblock(FilteredBlock $filtered)
456
    {
457
        $this->send($this->msgs->merkleblock($filtered));
458
    }
459
460
    /**
461
     *
462
     */
463
    public function mempool()
464
    {
465
        $this->send($this->msgs->mempool());
466
    }
467
468
    /**
469
     * Issue a Reject message, with a required $msg, $code, and $reason
470
     *
471
     * @param BufferInterface $msg
472
     * @param int $code
473
     * @param BufferInterface $reason
474
     * @param BufferInterface $data
475
     */
476
    public function reject(BufferInterface $msg, $code, BufferInterface $reason, BufferInterface $data = null)
477
    {
478
        $this->send($this->msgs->reject($msg, $code, $reason, $data));
479
    }
480
}
481