Completed
Pull Request — master (#47)
by thomas
08:52
created

Peer::__construct()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 5
Code Lines 3

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 4
CRAP Score 1

Importance

Changes 2
Bugs 0 Features 0
Metric Value
c 2
b 0
f 0
dl 0
loc 5
ccs 4
cts 4
cp 1
rs 9.4285
cc 1
eloc 3
nc 1
nop 2
crap 1
1
<?php
2
3
namespace BitWasp\Bitcoin\Networking\Peer;
4
5
use BitWasp\Bitcoin\Block\BlockInterface;
6
use BitWasp\Bitcoin\Block\FilteredBlock;
7
use BitWasp\Bitcoin\Bloom\BloomFilter;
8
use BitWasp\Bitcoin\Chain\BlockLocator;
9
use BitWasp\Bitcoin\Networking\Messages\Version;
10
use BitWasp\Bitcoin\Networking\Messages\Ping;
11
use BitWasp\Bitcoin\Networking\NetworkMessage;
12
use BitWasp\Bitcoin\Networking\NetworkSerializable;
13
use BitWasp\Bitcoin\Networking\Structure\AlertDetail;
14
use BitWasp\Bitcoin\Networking\Structure\Inventory;
15
use BitWasp\Bitcoin\Networking\Structure\NetworkAddress;
16
use BitWasp\Bitcoin\Networking\Structure\NetworkAddressInterface;
17
use BitWasp\Bitcoin\Networking\Structure\NetworkAddressTimestamp;
18
use BitWasp\Bitcoin\Crypto\EcAdapter\Signature\SignatureInterface;
19
use BitWasp\Bitcoin\Transaction\TransactionInterface;
20
use BitWasp\Buffertools\Buffer;
21
use BitWasp\Buffertools\BufferInterface;
22
use BitWasp\Buffertools\Parser;
23
use Evenement\EventEmitter;
24
use React\EventLoop\LoopInterface;
25
use React\Promise\Deferred;
26
use React\Stream\Stream;
27
28
class Peer extends EventEmitter
29
{
30
    /**
31
     * @var string
32
     */
33
    private $buffer = '';
34
35
    /**
36
     * @var LoopInterface
37
     */
38
    private $loop;
39
40
    /**
41
     * @var \BitWasp\Bitcoin\Networking\Messages\Factory
42
     */
43
    private $msgs;
44
45
    /**
46
     * @var Stream
47
     */
48
    private $stream;
49
50
    /**
51
     * @var Version
52
     */
53
    private $localVersion;
54
55
    /**
56
     * @var Version
57
     */
58
    private $remoteVersion;
59
60
    /**
61
     * @var NetworkAddressInterface
62
     */
63
    private $peerAddress;
64
65
    /**
66
     * @var ConnectionParams
67
     */
68
    private $connectionParams;
69
70
    /**
71
     * @var bool
72
     */
73
    private $exchangedVersion = false;
74
75
    /**
76
     * @param \BitWasp\Bitcoin\Networking\Messages\Factory $msgs
77
     * @param LoopInterface $loop
78
     */
79 12
    public function __construct(\BitWasp\Bitcoin\Networking\Messages\Factory $msgs, LoopInterface $loop)
80
    {
81 12
        $this->msgs = $msgs;
82 12
        $this->loop = $loop;
83 12
    }
84
85
    /**
86
     * @return Version
87
     */
88
    public function getLocalVersion()
89
    {
90
        return $this->localVersion;
91
    }
92
93
    /**
94
     * @return Version
95
     */
96
    public function getRemoteVersion()
97
    {
98
        return $this->remoteVersion;
99
    }
100
101
    /**
102
     * Reliably returns the remote peers NetAddr when known through
103
     * the connection process. Often better than the data contained
104
     * in a Version message.
105
     *
106
     * @return NetworkAddressInterface
107
     */
108
    public function getRemoteAddress()
109
    {
110
        return $this->peerAddress;
111
    }
112
113
    /**
114
     * @return ConnectionParams
115
     */
116
    public function getConnectionParams()
117
    {
118
        return $this->connectionParams;
119
    }
120
121
    /**
122
     * @param NetworkSerializable $msg
123
     */
124 12
    public function send(NetworkSerializable $msg)
125
    {
126 12
        $net = $msg->getNetworkMessage();
127 12
        $this->stream->write($net->getBinary());
128 12
        $this->emit('send', [$net]);
129 12
    }
130
131
    /**
132
     * Handler for incoming data. Buffers possibly fragmented packets since they arrive sequentially.
133
     * Before finishing the version exchange, this will only emit Version and VerAck messages.
134
     */
135 12
    private function onData()
136
    {
137 12
        $tmp = $this->buffer;
138 12
        $parser = new Parser(new Buffer($tmp));
139
140
        try {
141 12
            while ($message = $this->msgs->parse($parser)) {
142 12
                $tmp = $parser->getBuffer()->slice($parser->getPosition())->getBinary();
143 12
                $this->emit('msg', [$this, $message]);
144 12
            }
145 12
        } catch (\Exception $e) {
146 12
            $this->buffer = $tmp;
147
            // Do nothing - it was probably a fragmented message
148
        }
149 12
    }
150
151
    /**
152
     * @param Stream $stream
153
     * @return $this
154
     */
155 12
    public function setupStream(Stream $stream)
156
    {
157 12
        $this->stream = $stream;
158
        $this->stream->on('data', function ($data) {
159 12
            $this->buffer .= $data;
160 12
            $this->emit('data');
161 12
        });
162
163
        $this->stream->on('close', function () {
164 9
            $this->close();
165 12
        });
166
167
        $this->on('data', function () {
168 12
            $this->onData();
169 12
        });
170
171
        $this->on('msg', function (Peer $peer, NetworkMessage $msg) {
172 12
            $this->emit($msg->getCommand(), [$peer, $msg->getPayload()]);
173 12
        });
174
175 12
        return $this;
176
    }
177
178
    /**
179
     * @param Stream $connection
180
     * @param ConnectionParams $params
181
     * @return \React\Promise\Promise|\React\Promise\PromiseInterface
182
     */
183 9
    public function inboundHandshake(Stream $connection, ConnectionParams $params)
184
    {
185 9
        $this->setupStream($connection);
186 9
        $deferred = new Deferred();
187
188
        $this->on('version', function (Peer $peer, Version $version) use ($params) {
189 9
            $split = str_split(stream_socket_get_name($this->stream->stream, true));
190 9
            if (count($split) === 2) {
191
                list ($ip, $port) = $split;
192
                $this->peerAddress = new NetworkAddress($version->getServices(), $ip, $port);
193
            } else {
194 9
                $this->peerAddress = $version->getSenderAddress();
195
            }
196
197 9
            $this->remoteVersion = $version;
198 9
            $this->localVersion = $localVersion = $params->produceVersion($this->msgs, $version->getSenderAddress());
199 9
            $this->send($localVersion);
200 9
        });
201
202
        $this->on('verack', function () use ($deferred) {
203 9
            if (false === $this->exchangedVersion) {
204 9
                $this->exchangedVersion = true;
205 9
                $this->verack();
206 9
                $this->emit('ready', [$this]);
207 9
                $deferred->resolve($this);
208 9
            }
209 9
        });
210
211
212 9
        return $deferred->promise();
213
    }
214
215
    /**
216
     * @param NetworkAddressInterface $remotePeer
217
     * @param ConnectionParams $params
218
     * @return \React\Promise\Promise|\React\Promise\PromiseInterface
219
     */
220 12
    public function outboundHandshake(NetworkAddressInterface $remotePeer, ConnectionParams $params)
221
    {
222 12
        $deferred = new Deferred();
223
224
        $this->on('version', function (Peer $peer, Version $version) {
225 12
            $this->remoteVersion = $version;
226 12
            $this->verack();
227 12
        });
228
229 12
        $this->on('verack', function () use ($deferred) {
230 9
            if (false === $this->exchangedVersion) {
231 9
                $this->exchangedVersion = true;
232 9
                $this->emit('ready', [$this]);
233 9
                $deferred->resolve($this);
234 9
            }
235 12
        });
236
237 12
        $this->peerAddress = $remotePeer;
238 12
        $this->localVersion = $version = $params->produceVersion($this->msgs, $remotePeer);
239 12
        $this->send($version);
240
241 12
        return $deferred->promise();
242
    }
243
244
    /**
245
     *
246
     */
247
    public function intentionalClose()
248
    {
249
        $this->emit('intentionaldisconnect', [$this]);
250
        $this->close();
251
    }
252
253
    /**
254
     *
255
     */
256 9
    public function close()
257
    {
258 9
        $this->emit('close', [$this]);
259 9
        $this->stream->end();
260 9
        $this->removeAllListeners();
261 9
    }
262
263
    /**
264
     * @param int $protocolVersion
265
     * @param BufferInterface $services
266
     * @param int $timestamp
267
     * @param NetworkAddressInterface $remoteAddr
268
     * @param NetworkAddressInterface $localAddr
269
     * @param string $userAgent
270
     * @param int $blockHeight
271
     * @param bool $relayToUs
272
     */
273
    public function version(
274
        $protocolVersion,
275
        BufferInterface $services,
276
        $timestamp,
277
        NetworkAddressInterface $remoteAddr,
278
        NetworkAddressInterface $localAddr,
279
        $userAgent,
280
        $blockHeight,
281
        $relayToUs
282
    ) {
283
        $this->send($this->msgs->version(
284
            $protocolVersion,
285
            $services,
286
            $timestamp,
287
            $remoteAddr,
288
            $localAddr,
289
            new Buffer($userAgent),
290
            $blockHeight,
291
            $relayToUs
292
        ));
293
    }
294
295
    /**
296
     *
297
     */
298 12
    public function verack()
299
    {
300 12
        $this->send($this->msgs->verack());
301 12
    }
302
303
    /**
304
     *
305
     */
306
    public function sendheaders()
307
    {
308
        $this->send($this->msgs->sendheaders());
309
    }
310
311
    /**
312
     * @param Inventory[] $vInv
313
     */
314
    public function inv(array $vInv)
315
    {
316
        $this->send($this->msgs->inv($vInv));
317
    }
318
319
    /**
320
     * @param Inventory[] $vInv
321
     */
322
    public function getdata(array $vInv)
323
    {
324
        $this->send($this->msgs->getdata($vInv));
325
    }
326
327
    /**
328
     * @param array $vInv
329
     */
330
    public function notfound(array $vInv)
331
    {
332
        $this->send($this->msgs->notfound($vInv));
333
    }
334
335
    /**
336
     * @param NetworkAddressTimestamp[] $vNetAddr
337
     */
338
    public function addr(array $vNetAddr)
339
    {
340
        $this->send($this->msgs->addr($vNetAddr));
341
    }
342
343
    /**
344
     *
345
     */
346
    public function getaddr()
347
    {
348
        $this->send($this->msgs->getaddr());
349
    }
350
351
    /**
352
     *
353
     */
354
    public function ping()
355
    {
356
        $this->send($this->msgs->ping());
357
    }
358
359
    /**
360
     * @param Ping $ping
361
     */
362
    public function pong(Ping $ping)
363
    {
364
        $this->send($this->msgs->pong($ping));
365
    }
366
367
    /**
368
     * @param TransactionInterface $tx
369
     */
370
    public function tx(TransactionInterface $tx)
371
    {
372
        $this->send($this->msgs->tx($tx));
373
    }
374
375
    /**
376
     * @param BlockLocator $locator
377
     */
378
    public function getblocks(BlockLocator $locator)
379
    {
380
        $this->send($this->msgs->getblocks(
381
            $this->localVersion->getVersion(),
382
            $locator
383
        ));
384
    }
385
386
    /**
387
     * @param BlockLocator $locator
388
     */
389
    public function getheaders(BlockLocator $locator)
390
    {
391
        $this->send($this->msgs->getheaders(
392
            $this->localVersion->getVersion(),
393
            $locator
394
        ));
395
    }
396
397
    /**
398
     * @param BlockInterface $block
399
     */
400
    public function block(BlockInterface $block)
401
    {
402
        $this->send($this->msgs->block($block));
403
    }
404
405
    /**
406
     * @param array $vHeaders
407
     */
408
    public function headers(array $vHeaders)
409
    {
410
        $this->send($this->msgs->headers($vHeaders));
411
    }
412
413
    /**
414
     * @param AlertDetail $detail
415
     * @param SignatureInterface $signature
416
     */
417
    public function alert(AlertDetail $detail, SignatureInterface $signature)
418
    {
419
        $this->send($this->msgs->alert($detail, $signature));
420
    }
421
422
    /**
423
     * @param int $feeRate
424
     */
425
    public function feefilter($feeRate)
426
    {
427
        $this->send($this->msgs->feefilter($feeRate));
428
    }
429
430
    /**
431
     * @param BufferInterface $data
432
     */
433
    public function filteradd(BufferInterface $data)
434
    {
435
        $this->send($this->msgs->filteradd($data));
436
    }
437
438
    /**
439
     * @param BloomFilter $filter
440
     */
441
    public function filterload(BloomFilter $filter)
442
    {
443
        $this->send($this->msgs->filterload($filter));
444
    }
445
446
    /**
447
     *
448
     */
449
    public function filterclear()
450
    {
451
        $this->send($this->msgs->filterclear());
452
    }
453
454
    /**
455
     * @param FilteredBlock $filtered
456
     */
457
    public function merkleblock(FilteredBlock $filtered)
458
    {
459
        $this->send($this->msgs->merkleblock($filtered));
460
    }
461
462
    /**
463
     *
464
     */
465
    public function mempool()
466
    {
467
        $this->send($this->msgs->mempool());
468
    }
469
470
    /**
471
     * Issue a Reject message, with a required $msg, $code, and $reason
472
     *
473
     * @param BufferInterface $msg
474
     * @param int $code
475
     * @param BufferInterface $reason
476
     * @param BufferInterface $data
477
     */
478
    public function reject(BufferInterface $msg, $code, BufferInterface $reason, BufferInterface $data = null)
479
    {
480
        $this->send($this->msgs->reject($msg, $code, $reason, $data));
481
    }
482
}
483