Completed
Push — master ( f7f683...a725ae )
by thomas
34:30 queued 31:27
created

Peer::addr()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 4
Code Lines 2

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 0
CRAP Score 2

Importance

Changes 2
Bugs 0 Features 0
Metric Value
c 2
b 0
f 0
dl 0
loc 4
ccs 0
cts 3
cp 0
rs 10
cc 1
eloc 2
nc 1
nop 1
crap 2
1
<?php
2
3
namespace BitWasp\Bitcoin\Networking\Peer;
4
5
use BitWasp\Bitcoin\Block\BlockInterface;
6
use BitWasp\Bitcoin\Block\FilteredBlock;
7
use BitWasp\Bitcoin\Bloom\BloomFilter;
8
use BitWasp\Bitcoin\Chain\BlockLocator;
9
use BitWasp\Bitcoin\Networking\Messages\Version;
10
use BitWasp\Bitcoin\Networking\Messages\Ping;
11
use BitWasp\Bitcoin\Networking\NetworkMessage;
12
use BitWasp\Bitcoin\Networking\NetworkSerializable;
13
use BitWasp\Bitcoin\Networking\Structure\AlertDetail;
14
use BitWasp\Bitcoin\Networking\Structure\Inventory;
15
use BitWasp\Bitcoin\Networking\Structure\NetworkAddressInterface;
16
use BitWasp\Bitcoin\Networking\Structure\NetworkAddressTimestamp;
17
use BitWasp\Bitcoin\Crypto\EcAdapter\Signature\SignatureInterface;
18
use BitWasp\Bitcoin\Transaction\TransactionInterface;
19
use BitWasp\Buffertools\Buffer;
20
use BitWasp\Buffertools\BufferInterface;
21
use BitWasp\Buffertools\Parser;
22
use Evenement\EventEmitter;
23
use React\EventLoop\LoopInterface;
24
use React\Promise\Deferred;
25
use React\Stream\Stream;
26
27
class Peer extends EventEmitter
28
{
29
    /**
30
     * @var string
31
     */
32
    private $buffer = '';
33
34
    /**
35
     * @var LoopInterface
36
     */
37
    private $loop;
38
39
    /**
40
     * @var \BitWasp\Bitcoin\Networking\Messages\Factory
41
     */
42
    private $msgs;
43
44
    /**
45
     * @var Stream
46
     */
47
    private $stream;
48
49
    /**
50
     * @var Version
51
     */
52
    private $localVersion;
53
54
    /**
55
     * @var Version
56
     */
57
    private $remoteVersion;
58
59
    /**
60
     * @var ConnectionParams
61
     */
62
    private $connectionParams;
63
64
    /**
65
     * @var bool
66
     */
67
    private $exchangedVersion = false;
68
69
    /**
70
     * @param \BitWasp\Bitcoin\Networking\Messages\Factory $msgs
71
     * @param LoopInterface $loop
72
     */
73 12
    public function __construct(\BitWasp\Bitcoin\Networking\Messages\Factory $msgs, LoopInterface $loop)
74
    {
75 12
        $this->msgs = $msgs;
76 12
        $this->loop = $loop;
77 12
    }
78
79
    /**
80
     * @return Version
81
     */
82
    public function getLocalVersion()
83
    {
84
        return $this->localVersion;
85
    }
86
87
    /**
88
     * @return Version
89
     */
90
    public function getRemoteVersion()
91
    {
92
        return $this->remoteVersion;
93
    }
94
95
    /**
96
     * @return ConnectionParams
97
     */
98
    public function getConnectionParams()
99
    {
100
        return $this->connectionParams;
101
    }
102
103
    /**
104
     * @param NetworkSerializable $msg
105
     */
106 12
    public function send(NetworkSerializable $msg)
107
    {
108 12
        $net = $msg->getNetworkMessage();
109 12
        $this->stream->write($net->getBinary());
110 12
        $this->emit('send', [$net]);
111 12
    }
112
113
    /**
114
     * Handler for incoming data. Buffers possibly fragmented packets since they arrive sequentially.
115
     * Before finishing the version exchange, this will only emit Version and VerAck messages.
116
     */
117 12
    private function onData()
118
    {
119 12
        $tmp = $this->buffer;
120 12
        $parser = new Parser(new Buffer($tmp));
121
122
        try {
123 12
            while ($message = $this->msgs->parse($parser)) {
124 12
                $tmp = $parser->getBuffer()->slice($parser->getPosition())->getBinary();
125
                //if ($this->exchangedVersion || ($command == 'version' || $command == 'verack')) {
126 12
                $this->emit('msg', [$this, $message]);
127
                //}
128 12
            }
129 12
        } catch (\Exception $e) {
130 12
            $this->buffer = $tmp;
131
            // Do nothing - it was probably a fragmented message
132
        }
133 12
    }
134
135
    /**
136
     * @param Stream $stream
137
     * @return $this
138
     */
139 12
    public function setupStream(Stream $stream)
140
    {
141 12
        $this->stream = $stream;
142
        $this->stream->on('data', function ($data) {
143 12
            $this->buffer .= $data;
144 12
            $this->emit('data');
145 12
        });
146
147
        $this->stream->on('close', function () {
148 9
            $this->close();
149 12
        });
150
151
        $this->on('data', function () {
152 12
            $this->onData();
153 12
        });
154
155
        $this->on('msg', function (Peer $peer, NetworkMessage $msg) {
156 12
            $this->emit($msg->getCommand(), [$peer, $msg->getPayload()]);
157 12
        });
158
159 12
        return $this;
160
    }
161
162
    /**
163
     * @param Stream $connection
164
     * @param ConnectionParams $params
165
     * @return \React\Promise\Promise|\React\Promise\PromiseInterface
166
     */
167 9
    public function inboundHandshake(Stream $connection, ConnectionParams $params)
168
    {
169 9
        $this->setupStream($connection);
170 9
        $deferred = new Deferred();
171
172
        $this->on('version', function (Peer $peer, Version $version) use ($params) {
173 9
            $this->remoteVersion = $version;
174 9
            $this->localVersion = $localVersion = $params->produceVersion($this->msgs, $version->getSenderAddress());
175 9
            $this->send($localVersion);
176 9
        });
177
178
        $this->on('verack', function () use ($deferred) {
179 9
            if (false === $this->exchangedVersion) {
180 9
                $this->exchangedVersion = true;
181 9
                $this->verack();
182 9
                $this->emit('ready', [$this]);
183 9
                $deferred->resolve($this);
184 9
            }
185 9
        });
186
187 9
        return $deferred->promise();
188
    }
189
190
    /**
191
     * @param NetworkAddressInterface $remotePeer
192
     * @param ConnectionParams $params
193
     * @return \React\Promise\Promise|\React\Promise\PromiseInterface
194
     */
195 12
    public function outboundHandshake(NetworkAddressInterface $remotePeer, ConnectionParams $params)
196
    {
197 12
        $deferred = new Deferred();
198
199
        $this->on('version', function (Peer $peer, Version $version) {
200 12
            $this->remoteVersion = $version;
201 12
            $this->verack();
202 12
        });
203
204 12
        $this->on('verack', function () use ($deferred) {
205 9
            if (false === $this->exchangedVersion) {
206 9
                $this->exchangedVersion = true;
207 9
                $this->emit('ready', [$this]);
208 9
                $deferred->resolve($this);
209 9
            }
210 12
        });
211
212 12
        $this->localVersion = $version = $params->produceVersion($this->msgs, $remotePeer);
213 12
        $this->send($version);
214
215 12
        return $deferred->promise();
216
    }
217
218
    /**
219
     *
220
     */
221
    public function intentionalClose()
222
    {
223
        $this->emit('intentionaldisconnect', [$this]);
224
        $this->close();
225
    }
226
227
    /**
228
     *
229
     */
230 9
    public function close()
231
    {
232 9
        $this->emit('close', [$this]);
233 9
        $this->stream->end();
234 9
        $this->removeAllListeners();
235 9
    }
236
237
    /**
238
     * @param int $protocolVersion
239
     * @param BufferInterface $services
240
     * @param int $timestamp
241
     * @param NetworkAddressInterface $remoteAddr
242
     * @param NetworkAddressInterface $localAddr
243
     * @param string $userAgent
244
     * @param int $blockHeight
245
     * @param bool $relayToUs
246
     */
247
    public function version(
248
        $protocolVersion,
249
        BufferInterface $services,
250
        $timestamp,
251
        NetworkAddressInterface $remoteAddr,
252
        NetworkAddressInterface $localAddr,
253
        $userAgent,
254
        $blockHeight,
255
        $relayToUs
256
    ) {
257
        $this->send($this->msgs->version(
258
            $protocolVersion,
259
            $services,
260
            $timestamp,
261
            $remoteAddr,
262
            $localAddr,
263
            new Buffer($userAgent),
264
            $blockHeight,
265
            $relayToUs
266
        ));
267
    }
268
269
    /**
270
     *
271
     */
272 12
    public function verack()
273
    {
274 12
        $this->send($this->msgs->verack());
275 12
    }
276
277
    /**
278
     *
279
     */
280
    public function sendheaders()
281
    {
282
        $this->send($this->msgs->sendheaders());
283
    }
284
285
    /**
286
     * @param Inventory[] $vInv
287
     */
288
    public function inv(array $vInv)
289
    {
290
        $this->send($this->msgs->inv($vInv));
291
    }
292
293
    /**
294
     * @param Inventory[] $vInv
295
     */
296
    public function getdata(array $vInv)
297
    {
298
        $this->send($this->msgs->getdata($vInv));
299
    }
300
301
    /**
302
     * @param array $vInv
303
     */
304
    public function notfound(array $vInv)
305
    {
306
        $this->send($this->msgs->notfound($vInv));
307
    }
308
309
    /**
310
     * @param NetworkAddressTimestamp[] $vNetAddr
311
     */
312
    public function addr(array $vNetAddr)
313
    {
314
        $this->send($this->msgs->addr($vNetAddr));
315
    }
316
317
    /**
318
     *
319
     */
320
    public function getaddr()
321
    {
322
        $this->send($this->msgs->getaddr());
323
    }
324
325
    /**
326
     *
327
     */
328
    public function ping()
329
    {
330
        $this->send($this->msgs->ping());
331
    }
332
333
    /**
334
     * @param Ping $ping
335
     */
336
    public function pong(Ping $ping)
337
    {
338
        $this->send($this->msgs->pong($ping));
339
    }
340
341
    /**
342
     * @param TransactionInterface $tx
343
     */
344
    public function tx(TransactionInterface $tx)
345
    {
346
        $this->send($this->msgs->tx($tx));
347
    }
348
349
    /**
350
     * @param BlockLocator $locator
351
     */
352
    public function getblocks(BlockLocator $locator)
353
    {
354
        $this->send($this->msgs->getblocks(
355
            $this->localVersion->getVersion(),
356
            $locator
357
        ));
358
    }
359
360
    /**
361
     * @param BlockLocator $locator
362
     */
363
    public function getheaders(BlockLocator $locator)
364
    {
365
        $this->send($this->msgs->getheaders(
366
            $this->localVersion->getVersion(),
367
            $locator
368
        ));
369
    }
370
371
    /**
372
     * @param BlockInterface $block
373
     */
374
    public function block(BlockInterface $block)
375
    {
376
        $this->send($this->msgs->block($block));
377
    }
378
379
    /**
380
     * @param array $vHeaders
381
     */
382
    public function headers(array $vHeaders)
383
    {
384
        $this->send($this->msgs->headers($vHeaders));
385
    }
386
387
    /**
388
     * @param AlertDetail $detail
389
     * @param SignatureInterface $signature
390
     */
391
    public function alert(AlertDetail $detail, SignatureInterface $signature)
392
    {
393
        $this->send($this->msgs->alert($detail, $signature));
394
    }
395
396
    /**
397
     * @param int $feeRate
398
     */
399
    public function feefilter($feeRate)
400
    {
401
        $this->send($this->msgs->feefilter($feeRate));
402
    }
403
404
    /**
405
     * @param BufferInterface $data
406
     */
407
    public function filteradd(BufferInterface $data)
408
    {
409
        $this->send($this->msgs->filteradd($data));
410
    }
411
412
    /**
413
     * @param BloomFilter $filter
414
     */
415
    public function filterload(BloomFilter $filter)
416
    {
417
        $this->send($this->msgs->filterload($filter));
418
    }
419
420
    /**
421
     *
422
     */
423
    public function filterclear()
424
    {
425
        $this->send($this->msgs->filterclear());
426
    }
427
428
    /**
429
     * @param FilteredBlock $filtered
430
     */
431
    public function merkleblock(FilteredBlock $filtered)
432
    {
433
        $this->send($this->msgs->merkleblock($filtered));
434
    }
435
436
    /**
437
     *
438
     */
439
    public function mempool()
440
    {
441
        $this->send($this->msgs->mempool());
442
    }
443
444
    /**
445
     * Issue a Reject message, with a required $msg, $code, and $reason
446
     *
447
     * @param BufferInterface $msg
448
     * @param int $code
449
     * @param BufferInterface $reason
450
     * @param BufferInterface $data
451
     */
452
    public function reject(BufferInterface $msg, $code, BufferInterface $reason, BufferInterface $data = null)
453
    {
454
        $this->send($this->msgs->reject($msg, $code, $reason, $data));
455
    }
456
}
457