Completed
Pull Request — master (#64)
by thomas
56:33 queued 53:42
created

Peer::tx()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 4
Code Lines 2

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 0
CRAP Score 2

Importance

Changes 2
Bugs 0 Features 0
Metric Value
c 2
b 0
f 0
dl 0
loc 4
ccs 0
cts 4
cp 0
rs 10
cc 1
eloc 2
nc 1
nop 1
crap 2
1
<?php
2
3
namespace BitWasp\Bitcoin\Networking\Peer;
4
5
use BitWasp\Bitcoin\Block\BlockInterface;
6
use BitWasp\Bitcoin\Block\FilteredBlock;
7
use BitWasp\Bitcoin\Bloom\BloomFilter;
8
use BitWasp\Bitcoin\Chain\BlockLocator;
9
use BitWasp\Bitcoin\Networking\Message;
10
use BitWasp\Bitcoin\Networking\Messages\Version;
11
use BitWasp\Bitcoin\Networking\Messages\Ping;
12
use BitWasp\Bitcoin\Networking\NetworkMessage;
13
use BitWasp\Bitcoin\Networking\NetworkSerializable;
14
use BitWasp\Bitcoin\Networking\Structure\AlertDetail;
15
use BitWasp\Bitcoin\Networking\Structure\Inventory;
16
use BitWasp\Bitcoin\Networking\Structure\NetworkAddress;
17
use BitWasp\Bitcoin\Networking\Structure\NetworkAddressInterface;
18
use BitWasp\Bitcoin\Networking\Structure\NetworkAddressTimestamp;
19
use BitWasp\Bitcoin\Crypto\EcAdapter\Signature\SignatureInterface;
20
use BitWasp\Bitcoin\Transaction\TransactionInterface;
21
use BitWasp\Buffertools\Buffer;
22
use BitWasp\Buffertools\BufferInterface;
23
use BitWasp\Buffertools\Parser;
24
use Evenement\EventEmitter;
25
use React\EventLoop\LoopInterface;
26
use React\Promise\Deferred;
27
use React\Stream\Stream;
28
29
class Peer extends EventEmitter
30
{
31
    /**
32
     * @var string
33
     */
34
    private $buffer = '';
35
36
    /**
37
     * @var LoopInterface
38
     */
39
    private $loop;
40
41
    /**
42
     * @var \BitWasp\Bitcoin\Networking\Messages\Factory
43
     */
44
    private $msgs;
45
46
    /**
47
     * @var Stream
48
     */
49
    private $stream;
50
51
    /**
52
     * @var Version
53
     */
54
    private $localVersion;
55
56
    /**
57
     * @var Version
58
     */
59
    private $remoteVersion;
60
61
    /**
62
     * @var NetworkAddressInterface
63
     */
64
    private $peerAddress;
65
66
    /**
67
     * @var ConnectionParams
68
     */
69
    private $connectionParams;
70
71
    /**
72
     * @var bool
73
     */
74
    private $exchangedVersion = false;
75
76
    /**
77
     * @param \BitWasp\Bitcoin\Networking\Messages\Factory $msgs
78
     * @param LoopInterface $loop
79
     */
80
    public function __construct(\BitWasp\Bitcoin\Networking\Messages\Factory $msgs, LoopInterface $loop)
81
    {
82
        $this->msgs = $msgs;
83
        $this->loop = $loop;
84
    }
85
86
    /**
87
     * @return Version
88
     */
89
    public function getLocalVersion()
90
    {
91
        return $this->localVersion;
92
    }
93
94
    /**
95
     * @return Version
96
     */
97
    public function getRemoteVersion()
98
    {
99
        return $this->remoteVersion;
100
    }
101
102
    /**
103
     * Reliably returns the remote peers NetAddr when known through
104
     * the connection process. Often better than the data contained
105
     * in a Version message.
106
     *
107
     * @return NetworkAddressInterface
108
     */
109
    public function getRemoteAddress()
110
    {
111
        return $this->peerAddress;
112
    }
113
114
    /**
115
     * @return ConnectionParams
116
     */
117
    public function getConnectionParams()
118
    {
119
        return $this->connectionParams;
120
    }
121
122
    /**
123
     * @param NetworkSerializable $msg
124
     */
125
    public function send(NetworkSerializable $msg)
126
    {
127
        $net = $msg->getNetworkMessage();
128
        $this->stream->write($net->getBinary());
129
        $this->emit('send', [$net]);
130
    }
131
132
    /**
133
     * Handler for incoming data. Buffers possibly fragmented packets since they arrive sequentially.
134
     * Before finishing the version exchange, this will only emit Version and VerAck messages.
135
     */
136
    private function onData()
137
    {
138
        $tmp = $this->buffer;
139
        $parser = new Parser(new Buffer($tmp));
140
141
        try {
142
            while ($message = $this->msgs->parse($parser)) {
143
                $tmp = $parser->getBuffer()->slice($parser->getPosition())->getBinary();
144
                $this->emit('msg', [$this, $message]);
145
            }
146
        } catch (\Exception $e) {
147
            $this->buffer = $tmp;
148
            // Do nothing - it was probably a fragmented message
149
        }
150
    }
151
152
    /**
153
     * @param Stream $stream
154
     * @return $this
155
     */
156
    public function setupStream(Stream $stream)
157
    {
158
        $this->stream = $stream;
159
        $this->stream->on('data', function ($data) {
160
            $this->buffer .= $data;
161
            $this->emit('data');
162
        });
163
164
        $this->stream->on('close', function () {
165
            $this->close();
166
        });
167
168
        $this->on('data', function () {
169
            $this->onData();
170
        });
171
172
        $this->on('msg', function (Peer $peer, NetworkMessage $msg) {
173
            $this->emit($msg->getCommand(), [$peer, $msg->getPayload()]);
174
        });
175
176
        return $this;
177
    }
178
179
    /**
180
     * @param Stream $connection
181
     * @param ConnectionParams $params
182
     * @return \React\Promise\Promise|\React\Promise\PromiseInterface
183
     */
184
    public function inboundHandshake(Stream $connection, ConnectionParams $params)
0 ignored issues
show
Unused Code introduced by
The parameter $connection is not used and could be removed.

This check looks from parameters that have been defined for a function or method, but which are not used in the method body.

Loading history...
185
    {
186
        $this->connectionParams = $params;
187
188
        $deferred = new Deferred();
189
        $this->on(Message::VERSION, function (Peer $peer, Version $version) use ($params) {
190
191
            $this->peerAddress = $version->getSenderAddress();
192
            $this->remoteVersion = $version;
193
            $this->localVersion = $localVersion = $params->produceVersion($this->msgs, $version->getSenderAddress());
194
            $this->send($localVersion);
195
        });
196
197
        $this->on(Message::VERACK, function () use ($deferred) {
198
            if (false === $this->exchangedVersion) {
199
                $this->exchangedVersion = true;
200
                $this->verack();
201
                $this->emit('ready', [$this]);
202
                $deferred->resolve($this);
203
            }
204
        });
205
206
207
        return $deferred->promise();
208
    }
209
210
    /**
211
     * @param NetworkAddressInterface $remotePeer
212
     * @param ConnectionParams $params
213
     * @return \React\Promise\Promise|\React\Promise\PromiseInterface
214
     */
215
    public function outboundHandshake(NetworkAddressInterface $remotePeer, ConnectionParams $params)
216
    {
217
        $deferred = new Deferred();
218
219
        $this->on(Message::VERSION, function (Peer $peer, Version $version) {
220
            $this->remoteVersion = $version;
221
            $this->verack();
222
        });
223
224
        $this->on(Message::VERACK, function () use ($deferred) {
225
            if (false === $this->exchangedVersion) {
226
                $this->exchangedVersion = true;
227
                $this->emit('ready', [$this]);
228
                $deferred->resolve($this);
229
            }
230
        });
231
232
        $this->peerAddress = $remotePeer;
233
        $this->localVersion = $version = $params->produceVersion($this->msgs, $remotePeer);
234
        $this->connectionParams = $params;
235
236
        $this->send($version);
237
238
        return $deferred->promise();
239
    }
240
241
    /**
242
     *
243
     */
244
    public function intentionalClose()
245
    {
246
        $this->emit('intentionaldisconnect', [$this]);
247
        $this->close();
248
    }
249
250
    /**
251
     *
252
     */
253
    public function close()
254
    {
255
        $this->emit('close', [$this]);
256
        $this->stream->end();
257
        $this->removeAllListeners();
258
    }
259
260
    /**
261
     * @param int $protocolVersion
262
     * @param int $services
263
     * @param int $timestamp
264
     * @param NetworkAddressInterface $remoteAddr
265
     * @param NetworkAddressInterface $localAddr
266
     * @param string $userAgent
267
     * @param int $blockHeight
268
     * @param bool $relayToUs
269
     */
270
    public function version(
271
        $protocolVersion,
272
        $services,
273
        $timestamp,
274
        NetworkAddressInterface $remoteAddr,
275
        NetworkAddressInterface $localAddr,
276
        $userAgent,
277
        $blockHeight,
278
        $relayToUs
279
    ) {
280
        $this->send($this->msgs->version(
281
            $protocolVersion,
282
            $services,
283
            $timestamp,
284
            $remoteAddr,
285
            $localAddr,
286
            new Buffer($userAgent),
287
            $blockHeight,
288
            $relayToUs
289
        ));
290
    }
291
292
    /**
293
     *
294
     */
295
    public function verack()
296
    {
297
        $this->send($this->msgs->verack());
298
    }
299
300
    /**
301
     *
302
     */
303
    public function sendheaders()
304
    {
305
        $this->send($this->msgs->sendheaders());
306
    }
307
308
    /**
309
     * @param Inventory[] $vInv
310
     */
311
    public function inv(array $vInv)
312
    {
313
        $this->send($this->msgs->inv($vInv));
314
    }
315
316
    /**
317
     * @param Inventory[] $vInv
318
     */
319
    public function getdata(array $vInv)
320
    {
321
        $this->send($this->msgs->getdata($vInv));
322
    }
323
324
    /**
325
     * @param array $vInv
326
     */
327
    public function notfound(array $vInv)
328
    {
329
        $this->send($this->msgs->notfound($vInv));
330
    }
331
332
    /**
333
     * @param NetworkAddressTimestamp[] $vNetAddr
334
     */
335
    public function addr(array $vNetAddr)
336
    {
337
        $this->send($this->msgs->addr($vNetAddr));
338
    }
339
340
    /**
341
     *
342
     */
343
    public function getaddr()
344
    {
345
        $this->send($this->msgs->getaddr());
346
    }
347
348
    /**
349
     *
350
     */
351
    public function ping()
352
    {
353
        $this->send($this->msgs->ping());
354
    }
355
356
    /**
357
     * @param Ping $ping
358
     */
359
    public function pong(Ping $ping)
360
    {
361
        $this->send($this->msgs->pong($ping));
362
    }
363
364
    /**
365
     * @param TransactionInterface $tx
366
     */
367
    public function tx(TransactionInterface $tx)
368
    {
369
        $this->send($this->msgs->tx($tx));
370
    }
371
372
    /**
373
     * @param BlockLocator $locator
374
     */
375
    public function getblocks(BlockLocator $locator)
376
    {
377
        $this->send($this->msgs->getblocks(
378
            $this->localVersion->getVersion(),
379
            $locator
380
        ));
381
    }
382
383
    /**
384
     * @param BlockLocator $locator
385
     */
386
    public function getheaders(BlockLocator $locator)
387
    {
388
        $this->send($this->msgs->getheaders(
389
            $this->localVersion->getVersion(),
390
            $locator
391
        ));
392
    }
393
394
    /**
395
     * @param BlockInterface $block
396
     */
397
    public function block(BlockInterface $block)
398
    {
399
        $this->send($this->msgs->block($block));
400
    }
401
402
    /**
403
     * @param array $vHeaders
404
     */
405
    public function headers(array $vHeaders)
406
    {
407
        $this->send($this->msgs->headers($vHeaders));
408
    }
409
410
    /**
411
     * @param AlertDetail $detail
412
     * @param SignatureInterface $signature
413
     */
414
    public function alert(AlertDetail $detail, SignatureInterface $signature)
415
    {
416
        $this->send($this->msgs->alert($detail, $signature));
417
    }
418
419
    /**
420
     * @param int $feeRate
421
     */
422
    public function feefilter($feeRate)
423
    {
424
        $this->send($this->msgs->feefilter($feeRate));
425
    }
426
427
    /**
428
     * @param BufferInterface $data
429
     */
430
    public function filteradd(BufferInterface $data)
431
    {
432
        $this->send($this->msgs->filteradd($data));
433
    }
434
435
    /**
436
     * @param BloomFilter $filter
437
     */
438
    public function filterload(BloomFilter $filter)
439
    {
440
        $this->send($this->msgs->filterload($filter));
441
    }
442
443
    /**
444
     *
445
     */
446
    public function filterclear()
447
    {
448
        $this->send($this->msgs->filterclear());
449
    }
450
451
    /**
452
     * @param FilteredBlock $filtered
453
     */
454
    public function merkleblock(FilteredBlock $filtered)
455
    {
456
        $this->send($this->msgs->merkleblock($filtered));
457
    }
458
459
    /**
460
     *
461
     */
462
    public function mempool()
463
    {
464
        $this->send($this->msgs->mempool());
465
    }
466
467
    /**
468
     * Issue a Reject message, with a required $msg, $code, and $reason
469
     *
470
     * @param BufferInterface $msg
471
     * @param int $code
472
     * @param BufferInterface $reason
473
     * @param BufferInterface $data
474
     */
475
    public function reject(BufferInterface $msg, $code, BufferInterface $reason, BufferInterface $data = null)
476
    {
477
        $this->send($this->msgs->reject($msg, $code, $reason, $data));
478
    }
479
}
480