Completed
Push — master ( d2dd08...01649b )
by thomas
13:53 queued 11:41
created

Peer::getRemoteAddress()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 4
Code Lines 2

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 2
CRAP Score 1

Importance

Changes 1
Bugs 0 Features 0
Metric Value
c 1
b 0
f 0
dl 0
loc 4
ccs 2
cts 2
cp 1
rs 10
cc 1
eloc 2
nc 1
nop 0
crap 1
1
<?php
2
3
namespace BitWasp\Bitcoin\Networking\Peer;
4
5
use BitWasp\Bitcoin\Block\BlockInterface;
6
use BitWasp\Bitcoin\Block\FilteredBlock;
7
use BitWasp\Bitcoin\Bloom\BloomFilter;
8
use BitWasp\Bitcoin\Chain\BlockLocator;
9
use BitWasp\Bitcoin\Networking\Message;
10
use BitWasp\Bitcoin\Networking\Messages\Version;
11
use BitWasp\Bitcoin\Networking\Messages\Ping;
12
use BitWasp\Bitcoin\Networking\NetworkMessage;
13
use BitWasp\Bitcoin\Networking\NetworkSerializable;
14
use BitWasp\Bitcoin\Networking\Structure\AlertDetail;
15
use BitWasp\Bitcoin\Networking\Structure\Inventory;
16
use BitWasp\Bitcoin\Networking\Structure\NetworkAddress;
17
use BitWasp\Bitcoin\Networking\Structure\NetworkAddressInterface;
18
use BitWasp\Bitcoin\Networking\Structure\NetworkAddressTimestamp;
19
use BitWasp\Bitcoin\Crypto\EcAdapter\Signature\SignatureInterface;
20
use BitWasp\Bitcoin\Transaction\TransactionInterface;
21
use BitWasp\Buffertools\Buffer;
22
use BitWasp\Buffertools\BufferInterface;
23
use BitWasp\Buffertools\Parser;
24
use Evenement\EventEmitter;
25
use React\EventLoop\LoopInterface;
26
use React\Promise\Deferred;
27
use React\Stream\Stream;
28
29
class Peer extends EventEmitter
30
{
31
    /**
32
     * @var string
33
     */
34
    private $buffer = '';
35
36
    /**
37
     * @var LoopInterface
38
     */
39
    private $loop;
40
41
    /**
42
     * @var \BitWasp\Bitcoin\Networking\Messages\Factory
43
     */
44
    private $msgs;
45
46
    /**
47
     * @var Stream
48
     */
49
    private $stream;
50
51
    /**
52
     * @var Version
53
     */
54
    private $localVersion;
55
56
    /**
57
     * @var Version
58
     */
59
    private $remoteVersion;
60
61
    /**
62
     * @var NetworkAddressInterface
63
     */
64
    private $peerAddress;
65
66
    /**
67
     * @var ConnectionParams
68
     */
69
    private $connectionParams;
70
71
    /**
72
     * @var bool
73
     */
74
    private $exchangedVersion = false;
75
76
    /**
77
     * @param \BitWasp\Bitcoin\Networking\Messages\Factory $msgs
78
     * @param LoopInterface $loop
79
     */
80 12
    public function __construct(\BitWasp\Bitcoin\Networking\Messages\Factory $msgs, LoopInterface $loop)
81
    {
82 12
        $this->msgs = $msgs;
83 12
        $this->loop = $loop;
84 12
    }
85
86
    /**
87
     * @return Version
88
     */
89 3
    public function getLocalVersion()
90
    {
91 3
        return $this->localVersion;
92
    }
93
94
    /**
95
     * @return Version
96
     */
97 3
    public function getRemoteVersion()
98
    {
99 3
        return $this->remoteVersion;
100
    }
101
102
    /**
103
     * Reliably returns the remote peers NetAddr when known through
104
     * the connection process. Often better than the data contained
105
     * in a Version message.
106
     *
107
     * @return NetworkAddressInterface
108
     */
109 3
    public function getRemoteAddress()
110
    {
111 3
        return $this->peerAddress;
112
    }
113
114
    /**
115
     * @return ConnectionParams
116
     */
117 3
    public function getConnectionParams()
118
    {
119 3
        return $this->connectionParams;
120
    }
121
122
    /**
123
     * @param NetworkSerializable $msg
124
     */
125 12
    public function send(NetworkSerializable $msg)
126
    {
127 12
        $net = $msg->getNetworkMessage();
128 12
        $this->stream->write($net->getBinary());
129 12
        $this->emit('send', [$net]);
130 12
    }
131
132
    /**
133
     * Handler for incoming data. Buffers possibly fragmented packets since they arrive sequentially.
134
     * Before finishing the version exchange, this will only emit Version and VerAck messages.
135
     */
136 12
    private function onData()
137
    {
138 12
        $tmp = $this->buffer;
139 12
        $parser = new Parser(new Buffer($tmp));
140
141
        try {
142 12
            while ($message = $this->msgs->parse($parser)) {
143 12
                $tmp = $parser->getBuffer()->slice($parser->getPosition())->getBinary();
144 12
                $this->emit('msg', [$this, $message]);
145 12
            }
146 12
        } catch (\Exception $e) {
147 12
            $this->buffer = $tmp;
148
            // Do nothing - it was probably a fragmented message
149
        }
150 12
    }
151
152
    /**
153
     * @param Stream $stream
154
     * @return $this
155
     */
156 12
    public function setupStream(Stream $stream)
157
    {
158 12
        $this->stream = $stream;
159
        $this->stream->on('data', function ($data) {
160 12
            $this->buffer .= $data;
161 12
            $this->emit('data');
162 12
        });
163
164
        $this->stream->on('close', function () {
165 9
            $this->close();
166 12
        });
167
168
        $this->on('data', function () {
169 12
            $this->onData();
170 12
        });
171
172
        $this->on('msg', function (Peer $peer, NetworkMessage $msg) {
173 12
            $this->emit($msg->getCommand(), [$peer, $msg->getPayload()]);
174 12
        });
175
176 12
        return $this;
177
    }
178
179
    /**
180
     * @param Stream $connection
181
     * @param ConnectionParams $params
182
     * @return \React\Promise\Promise|\React\Promise\PromiseInterface
183
     */
184 9
    public function inboundHandshake(Stream $connection, ConnectionParams $params)
185
    {
186 9
        $this->setupStream($connection);
187 9
        $this->connectionParams = $params;
188
189 9
        $deferred = new Deferred();
190
        $this->on(Message::VERSION, function (Peer $peer, Version $version) use ($params) {
191 9
            $split = explode(":", stream_socket_get_name($this->stream->stream, true));
192 9
            if (count($split) === 2) {
193 9
                list ($ip, $port) = $split;
194 9
                $this->peerAddress = new NetworkAddress($version->getServices(), $ip, $port);
195 9
            } else {
196
                $this->peerAddress = $version->getSenderAddress();
197
            }
198
199 9
            $this->remoteVersion = $version;
200 9
            $this->localVersion = $localVersion = $params->produceVersion($this->msgs, $version->getSenderAddress());
201 9
            $this->send($localVersion);
202 9
        });
203
204
        $this->on(Message::VERACK, function () use ($deferred) {
205 9
            if (false === $this->exchangedVersion) {
206 9
                $this->exchangedVersion = true;
207 9
                $this->verack();
208 9
                $this->emit('ready', [$this]);
209 9
                $deferred->resolve($this);
210 9
            }
211 9
        });
212
213
214 9
        return $deferred->promise();
215
    }
216
217
    /**
218
     * @param NetworkAddressInterface $remotePeer
219
     * @param ConnectionParams $params
220
     * @return \React\Promise\Promise|\React\Promise\PromiseInterface
221
     */
222 12
    public function outboundHandshake(NetworkAddressInterface $remotePeer, ConnectionParams $params)
223
    {
224 12
        $deferred = new Deferred();
225
226
        $this->on(Message::VERSION, function (Peer $peer, Version $version) {
227 12
            $this->remoteVersion = $version;
228 12
            $this->verack();
229 12
        });
230
231 12
        $this->on(Message::VERACK, function () use ($deferred) {
232 9
            if (false === $this->exchangedVersion) {
233 9
                $this->exchangedVersion = true;
234 9
                $this->emit('ready', [$this]);
235 9
                $deferred->resolve($this);
236 9
            }
237 12
        });
238
239 12
        $this->peerAddress = $remotePeer;
240 12
        $this->localVersion = $version = $params->produceVersion($this->msgs, $remotePeer);
241 12
        $this->connectionParams = $params;
242
243 12
        $this->send($version);
244
245 12
        return $deferred->promise();
246
    }
247
248
    /**
249
     *
250
     */
251
    public function intentionalClose()
252
    {
253
        $this->emit('intentionaldisconnect', [$this]);
254
        $this->close();
255
    }
256
257
    /**
258
     *
259
     */
260 9
    public function close()
261
    {
262 9
        $this->emit('close', [$this]);
263 9
        $this->stream->end();
264 9
        $this->removeAllListeners();
265 9
    }
266
267
    /**
268
     * @param int $protocolVersion
269
     * @param int $services
270
     * @param int $timestamp
271
     * @param NetworkAddressInterface $remoteAddr
272
     * @param NetworkAddressInterface $localAddr
273
     * @param string $userAgent
274
     * @param int $blockHeight
275
     * @param bool $relayToUs
276
     */
277
    public function version(
278
        $protocolVersion,
279
        $services,
280
        $timestamp,
281
        NetworkAddressInterface $remoteAddr,
282
        NetworkAddressInterface $localAddr,
283
        $userAgent,
284
        $blockHeight,
285
        $relayToUs
286
    ) {
287
        $this->send($this->msgs->version(
288
            $protocolVersion,
289
            $services,
290
            $timestamp,
291
            $remoteAddr,
292
            $localAddr,
293
            new Buffer($userAgent),
294
            $blockHeight,
295
            $relayToUs
296
        ));
297
    }
298
299
    /**
300
     *
301
     */
302 12
    public function verack()
303
    {
304 12
        $this->send($this->msgs->verack());
305 12
    }
306
307
    /**
308
     *
309
     */
310
    public function sendheaders()
311
    {
312
        $this->send($this->msgs->sendheaders());
313
    }
314
315
    /**
316
     * @param Inventory[] $vInv
317
     */
318
    public function inv(array $vInv)
319
    {
320
        $this->send($this->msgs->inv($vInv));
321
    }
322
323
    /**
324
     * @param Inventory[] $vInv
325
     */
326
    public function getdata(array $vInv)
327
    {
328
        $this->send($this->msgs->getdata($vInv));
329
    }
330
331
    /**
332
     * @param array $vInv
333
     */
334
    public function notfound(array $vInv)
335
    {
336
        $this->send($this->msgs->notfound($vInv));
337
    }
338
339
    /**
340
     * @param NetworkAddressTimestamp[] $vNetAddr
341
     */
342
    public function addr(array $vNetAddr)
343
    {
344
        $this->send($this->msgs->addr($vNetAddr));
345
    }
346
347
    /**
348
     *
349
     */
350
    public function getaddr()
351
    {
352
        $this->send($this->msgs->getaddr());
353
    }
354
355
    /**
356
     *
357
     */
358
    public function ping()
359
    {
360
        $this->send($this->msgs->ping());
361
    }
362
363
    /**
364
     * @param Ping $ping
365
     */
366
    public function pong(Ping $ping)
367
    {
368
        $this->send($this->msgs->pong($ping));
369
    }
370
371
    /**
372
     * @param TransactionInterface $tx
373
     */
374
    public function tx(TransactionInterface $tx)
375
    {
376
        $this->send($this->msgs->tx($tx));
377
    }
378
379
    /**
380
     * @param BlockLocator $locator
381
     */
382
    public function getblocks(BlockLocator $locator)
383
    {
384
        $this->send($this->msgs->getblocks(
385
            $this->localVersion->getVersion(),
386
            $locator
387
        ));
388
    }
389
390
    /**
391
     * @param BlockLocator $locator
392
     */
393
    public function getheaders(BlockLocator $locator)
394
    {
395
        $this->send($this->msgs->getheaders(
396
            $this->localVersion->getVersion(),
397
            $locator
398
        ));
399
    }
400
401
    /**
402
     * @param BlockInterface $block
403
     */
404
    public function block(BlockInterface $block)
405
    {
406
        $this->send($this->msgs->block($block));
407
    }
408
409
    /**
410
     * @param array $vHeaders
411
     */
412
    public function headers(array $vHeaders)
413
    {
414
        $this->send($this->msgs->headers($vHeaders));
415
    }
416
417
    /**
418
     * @param AlertDetail $detail
419
     * @param SignatureInterface $signature
420
     */
421
    public function alert(AlertDetail $detail, SignatureInterface $signature)
422
    {
423
        $this->send($this->msgs->alert($detail, $signature));
424
    }
425
426
    /**
427
     * @param int $feeRate
428
     */
429
    public function feefilter($feeRate)
430
    {
431
        $this->send($this->msgs->feefilter($feeRate));
432
    }
433
434
    /**
435
     * @param BufferInterface $data
436
     */
437
    public function filteradd(BufferInterface $data)
438
    {
439
        $this->send($this->msgs->filteradd($data));
440
    }
441
442
    /**
443
     * @param BloomFilter $filter
444
     */
445
    public function filterload(BloomFilter $filter)
446
    {
447
        $this->send($this->msgs->filterload($filter));
448
    }
449
450
    /**
451
     *
452
     */
453
    public function filterclear()
454
    {
455
        $this->send($this->msgs->filterclear());
456
    }
457
458
    /**
459
     * @param FilteredBlock $filtered
460
     */
461
    public function merkleblock(FilteredBlock $filtered)
462
    {
463
        $this->send($this->msgs->merkleblock($filtered));
464
    }
465
466
    /**
467
     *
468
     */
469
    public function mempool()
470
    {
471
        $this->send($this->msgs->mempool());
472
    }
473
474
    /**
475
     * Issue a Reject message, with a required $msg, $code, and $reason
476
     *
477
     * @param BufferInterface $msg
478
     * @param int $code
479
     * @param BufferInterface $reason
480
     * @param BufferInterface $data
481
     */
482
    public function reject(BufferInterface $msg, $code, BufferInterface $reason, BufferInterface $data = null)
483
    {
484
        $this->send($this->msgs->reject($msg, $code, $reason, $data));
485
    }
486
}
487