Completed
Pull Request — master (#45)
by thomas
37:30
created

Peer::notfound()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 4
Code Lines 2

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 4
CRAP Score 1

Importance

Changes 1
Bugs 0 Features 0
Metric Value
c 1
b 0
f 0
dl 0
loc 4
ccs 4
cts 4
cp 1
rs 10
cc 1
eloc 2
nc 1
nop 1
crap 1
1
<?php
2
3
namespace BitWasp\Bitcoin\Networking\Peer;
4
5
use BitWasp\Bitcoin\Block\BlockInterface;
6
use BitWasp\Bitcoin\Block\FilteredBlock;
7
use BitWasp\Bitcoin\Bloom\BloomFilter;
8
use BitWasp\Bitcoin\Chain\BlockLocator;
9
use BitWasp\Bitcoin\Networking\Messages\Version;
10
use BitWasp\Bitcoin\Networking\Messages\Ping;
11
use BitWasp\Bitcoin\Networking\NetworkMessage;
12
use BitWasp\Bitcoin\Networking\NetworkSerializable;
13
use BitWasp\Bitcoin\Networking\Structure\AlertDetail;
14
use BitWasp\Bitcoin\Networking\Structure\Inventory;
15
use BitWasp\Bitcoin\Networking\Structure\NetworkAddressInterface;
16
use BitWasp\Bitcoin\Networking\Structure\NetworkAddressTimestamp;
17
use BitWasp\Bitcoin\Crypto\EcAdapter\Signature\SignatureInterface;
18
use BitWasp\Bitcoin\Transaction\TransactionInterface;
19
use BitWasp\Buffertools\Buffer;
20
use BitWasp\Buffertools\BufferInterface;
21
use BitWasp\Buffertools\Parser;
22
use Evenement\EventEmitter;
23
use React\EventLoop\LoopInterface;
24
use React\Promise\Deferred;
25
use React\Stream\Stream;
26
27
class Peer extends EventEmitter
28
{
29
    /**
30
     * @var string
31
     */
32
    private $buffer = '';
33
34
    /**
35
     * @var LoopInterface
36
     */
37
    private $loop;
38
39
    /**
40
     * @var \BitWasp\Bitcoin\Networking\Messages\Factory
41
     */
42
    private $msgs;
43
44
    /**
45
     * @var Stream
46
     */
47
    private $stream;
48
49
    /**
50
     * @var Version
51
     */
52
    private $localVersion;
53
54
    /**
55
     * @var Version
56
     */
57
    private $remoteVersion;
58
59
    /**
60
     * @var ConnectionParams
61
     */
62
    private $connectionParams;
63
64
    /**
65
     * @var bool
66
     */
67
    private $exchangedVersion = false;
68
69
    /**
70
     * @param \BitWasp\Bitcoin\Networking\Messages\Factory $msgs
71
     * @param LoopInterface $loop
72
     */
73
    public function __construct(\BitWasp\Bitcoin\Networking\Messages\Factory $msgs, LoopInterface $loop)
74
    {
75
        $this->msgs = $msgs;
76
        $this->loop = $loop;
77
    }
78
79
    /**
80
     * @return Version
81
     */
82
    public function getLocalVersion()
83
    {
84
        return $this->localVersion;
85
    }
86
87
    /**
88
     * @return Version
89
     */
90
    public function getRemoteVersion()
91
    {
92
        return $this->remoteVersion;
93
    }
94
95
    /**
96
     * @return ConnectionParams
97
     */
98
    public function getConnectionParams()
99
    {
100
        return $this->connectionParams;
101
    }
102
103 18
    /**
104
     * @param NetworkSerializable $msg
105
     */
106
    public function send(NetworkSerializable $msg)
107
    {
108 18
        $net = $msg->getNetworkMessage();
109 18
        $this->stream->write($net->getBinary());
110 18
        $this->emit('send', [$net]);
111 18
    }
112 18
113
    /**
114
     * Handler for incoming data. Buffers possibly fragmented packets since they arrive sequentially.
115
     * Before finishing the version exchange, this will only emit Version and VerAck messages.
116
     */
117
    private function onData()
118
    {
119
        $tmp = $this->buffer;
120
        $parser = new Parser(new Buffer($tmp));
121
122
        try {
123
            while ($message = $this->msgs->parse($parser)) {
124
                $tmp = $parser->getBuffer()->slice($parser->getPosition())->getBinary();
125
                //if ($this->exchangedVersion || ($command == 'version' || $command == 'verack')) {
126
                $this->emit('msg', [$this, $message]);
127
                //}
128
            }
129
        } catch (\Exception $e) {
130
            $this->buffer = $tmp;
131
            // Do nothing - it was probably a fragmented message
132
        }
133
    }
134
135
    /**
136
     * @param Stream $stream
137
     * @return $this
138
     */
139
    public function setupStream(Stream $stream)
140
    {
141
        $this->stream = $stream;
142
        $this->stream->on('data', function ($data) {
143
            $this->buffer .= $data;
144
            $this->emit('data');
145
        });
146
147
        $this->stream->on('close', function () {
148
            $this->close();
149
        });
150
151
        $this->on('data', function () {
152
            $this->onData();
153
        });
154
155
        $this->on('msg', function (Peer $peer, NetworkMessage $msg) {
156
            $this->emit($msg->getCommand(), [$peer, $msg->getPayload()]);
157
        });
158
159
        return $this;
160
    }
161
162
    /**
163
     * @param Stream $connection
164
     * @param ConnectionParams $params
165
     * @return \React\Promise\Promise|\React\Promise\PromiseInterface
166
     */
167
    public function inboundHandshake(Stream $connection, ConnectionParams $params)
168
    {
169
        $this->setupStream($connection);
170
        $deferred = new Deferred();
171 3
172
        $this->on('version', function (Peer $peer, Version $version) use ($params) {
173 3
            $this->remoteVersion = $version;
174 3
            $this->localVersion = $localVersion = $params->produceVersion($this->msgs, $version->getSenderAddress());
175
            $this->send($localVersion);
176
        });
177
178
        $this->on('verack', function () use ($deferred) {
179
            if (false === $this->exchangedVersion) {
180
                $this->exchangedVersion = true;
181
                $this->verack();
182
                $this->emit('ready', [$this]);
183
                $deferred->resolve($this);
184
            }
185
        });
186
187
        return $deferred->promise();
188
    }
189
190
    /**
191
     * @param NetworkAddressInterface $remotePeer
192
     * @param ConnectionParams $params
193
     * @return \React\Promise\Promise|\React\Promise\PromiseInterface
194
     */
195
    public function outboundHandshake(NetworkAddressInterface $remotePeer, ConnectionParams $params)
196
    {
197
        $deferred = new Deferred();
198
199
        $this->on('version', function (Peer $peer, Version $version) {
200
            $this->remoteVersion = $version;
201
            $this->verack();
202
        });
203
204
        $this->on('verack', function () use ($deferred) {
205
            if (false === $this->exchangedVersion) {
206
                $this->exchangedVersion = true;
207
                $this->emit('ready', [$this]);
208
                $deferred->resolve($this);
209 15
            }
210
        });
211 15
212 15
        $this->localVersion = $version = $params->produceVersion($this->msgs, $remotePeer);
213 15
        $this->send($version);
214 15
215
        return $deferred->promise();
216
    }
217
218
    /**
219
     *
220 15
     */
221
    public function intentionalClose()
222 15
    {
223 15
        $this->emit('intentionaldisconnect', [$this]);
224
        $this->close();
225
    }
226 15
227 15
    /**
228 15
     *
229 15
     */
230 15
    public function close()
231 15
    {
232 15
        $this->emit('close', [$this]);
233 15
        $this->stream->end();
234 15
        $this->removeAllListeners();
235
    }
236
237 15
    /**
238
     * @param int $protocolVersion
239
     * @param BufferInterface $services
240
     * @param int $timestamp
241
     * @param NetworkAddressInterface $remoteAddr
242
     * @param NetworkAddressInterface $localAddr
243
     * @param string $userAgent
244 15
     * @param int $blockHeight
245
     * @param bool $relayToUs
246
     */
247 15
    public function version(
248 15
        $protocolVersion,
249 15
        BufferInterface $services,
250
        $timestamp,
251
        NetworkAddressInterface $remoteAddr,
252 9
        NetworkAddressInterface $localAddr,
253 15
        $userAgent,
254
        $blockHeight,
255
        $relayToUs
256 15
    ) {
257 15
        $this->send($this->msgs->version(
258
            $protocolVersion,
259
            $services,
260 15
            $timestamp,
261 15
            $remoteAddr,
262
            $localAddr,
263 15
            new Buffer($userAgent),
264
            $blockHeight,
265
            $relayToUs
266
        ));
267
    }
268
269
    /**
270
     *
271
     */
272
    public function verack()
273
    {
274
        $this->send($this->msgs->verack());
275
    }
276
277
    /**
278
     *
279
     */
280
    public function sendheaders()
281
    {
282
        $this->send($this->msgs->sendheaders());
283
    }
284
285
    /**
286
     * @param Inventory[] $vInv
287
     */
288 9
    public function inv(array $vInv)
289
    {
290 9
        $this->send($this->msgs->inv($vInv));
291 9
    }
292 9
293 9
    /**
294
     * @param Inventory[] $vInv
295
     */
296 9
    public function getdata(array $vInv)
297 9
    {
298 9
        $this->send($this->msgs->getdata($vInv));
299
    }
300
301 9
    /**
302 9
     * @param array $vInv
303 9
     */
304 9
    public function notfound(array $vInv)
305 9
    {
306 9
        $this->send($this->msgs->notfound($vInv));
307 9
    }
308
309 9
    /**
310
     * @param NetworkAddressTimestamp[] $vNetAddr
311
     */
312
    public function addr(array $vNetAddr)
313
    {
314
        $this->send($this->msgs->addr($vNetAddr));
315
    }
316
317 15
    /**
318
     *
319 15
     */
320 15
    public function getaddr()
321 15
    {
322
        $this->send($this->msgs->getaddr());
323
    }
324 15
325
    /**
326 15
     *
327 15
     */
328
    public function ping()
329
    {
330 15
        $this->send($this->msgs->ping());
331 15
    }
332
333
    /**
334 12
     * @param Ping $ping
335 12
     */
336 12
    public function pong(Ping $ping)
337 12
    {
338 12
        $this->send($this->msgs->pong($ping));
339 15
    }
340
341 15
    /**
342
     * @param TransactionInterface $tx
343 15
     */
344 2
    public function tx(TransactionInterface $tx)
345 15
    {
346
        $this->send($this->msgs->tx($tx));
347
    }
348 15
349
    /**
350
     * @param BlockLocator $locator
351
     */
352
    public function getblocks(BlockLocator $locator)
353
    {
354
        $this->send($this->msgs->getblocks(
355
            $this->localVersion->getVersion(),
356
            $locator
357
        ));
358
    }
359
360
    /**
361
     * @param BlockLocator $locator
362
     */
363 10
    public function getheaders(BlockLocator $locator)
364
    {
365 10
        $this->send($this->msgs->getheaders(
366 10
            $this->localVersion->getVersion(),
367 10
            $locator
368 10
        ));
369
    }
370
371
    /**
372
     * @param BlockInterface $block
373 15
     */
374
    public function block(BlockInterface $block)
375 15
    {
376 15
        $this->send($this->msgs->block($block));
377 15
    }
378 15
379 15
    /**
380 15
     * @param array $vHeaders
381 15
     */
382 15
    public function headers(array $vHeaders)
383 15
    {
384 15
        $this->send($this->msgs->headers($vHeaders));
385 15
    }
386
387
    /**
388
     * @param AlertDetail $detail
389
     * @param SignatureInterface $signature
390 15
     */
391
    public function alert(AlertDetail $detail, SignatureInterface $signature)
392 15
    {
393 15
        $this->send($this->msgs->alert($detail, $signature));
394
    }
395
396
    /**
397
     * @param int $feeRate
398
     */
399
    public function feefilter($feeRate)
400
    {
401
        $this->send($this->msgs->feefilter($feeRate));
402
    }
403
404
    /**
405
     * @param BufferInterface $data
406
     */
407
    public function filteradd(BufferInterface $data)
408
    {
409
        $this->send($this->msgs->filteradd($data));
410
    }
411
412
    /**
413
     * @param BloomFilter $filter
414
     */
415
    public function filterload(BloomFilter $filter)
416
    {
417
        $this->send($this->msgs->filterload($filter));
418
    }
419
420
    /**
421
     *
422
     */
423
    public function filterclear()
424
    {
425
        $this->send($this->msgs->filterclear());
426
    }
427
428
    /**
429
     * @param FilteredBlock $filtered
430
     */
431
    public function merkleblock(FilteredBlock $filtered)
432
    {
433
        $this->send($this->msgs->merkleblock($filtered));
434
    }
435
436
    /**
437
     *
438
     */
439
    public function mempool()
440
    {
441
        $this->send($this->msgs->mempool());
442
    }
443
444
    /**
445
     * Issue a Reject message, with a required $msg, $code, and $reason
446
     *
447
     * @param BufferInterface $msg
448
     * @param int $code
449
     * @param BufferInterface $reason
450
     * @param BufferInterface $data
451
     */
452
    public function reject(BufferInterface $msg, $code, BufferInterface $reason, BufferInterface $data = null)
453
    {
454
        $this->send($this->msgs->reject($msg, $code, $reason, $data));
455
    }
456
}
457