Connection   A
last analyzed

Complexity

Total Complexity 25

Size/Duplication

Total Lines 165
Duplicated Lines 0 %

Coupling/Cohesion

Components 1
Dependencies 11

Test Coverage

Coverage 100%

Importance

Changes 1
Bugs 0 Features 0
Metric Value
wmc 25
c 1
b 0
f 0
lcom 1
cbo 11
dl 0
loc 165
ccs 74
cts 74
cp 1
rs 10

8 Methods

Rating   Name   Duplication   Size   Complexity  
A __construct() 0 6 1
A close() 0 4 1
A sendData() 0 4 1
A sendNotify() 0 4 1
A sendRequest() 0 7 1
A onData() 0 19 4
C onMessage() 0 56 15
A request() 0 5 1
1
<?php
2
3
namespace BitWasp\Stratum;
4
5
use BitWasp\Stratum\Api\ElectrumClient;
6
use BitWasp\Stratum\Api\MiningClient;
7
use BitWasp\Stratum\Notification\AddressNotification;
8
use BitWasp\Stratum\Notification\HeadersNotification;
9
use BitWasp\Stratum\Notification\MiningNotification;
10
use BitWasp\Stratum\Notification\NotificationInterface;
11
use BitWasp\Stratum\Notification\NumBlocksNotification;
12
use BitWasp\Stratum\Notification\SetDifficultyNotification;
13
use BitWasp\Stratum\Request\Request;
14
use BitWasp\Stratum\Request\RequestFactory;
15
use Evenement\EventEmitter;
16
use React\Promise\Deferred;
17
use React\Stream\Stream;
18
19
class Connection extends EventEmitter
20
{
21
    /**
22
     * @var Stream
23
     */
24
    private $stream;
25
    
26
    /**
27
     * @var RequestFactory
28
     */
29
    private $factory;
30
    
31
    /**
32
     * @var Deferred[]
33
     */
34
    private $deferred = [];
35
36
    /**
37
     * @var string
38
     */
39
    private $streamBuffer = '';
40
41
    /**
42
     * Connection constructor.
43
     * @param Stream $stream
44
     * @param RequestFactory $requestFactory
45
     */
46 20
    public function __construct(Stream $stream, RequestFactory $requestFactory)
47
    {
48 20
        $this->factory = $requestFactory;
49 20
        $this->stream = $stream;
50 20
        $this->stream->on('data', [$this, 'onData']);
51 20
    }
52
53 7
    public function close()
54
    {
55 7
        return $this->stream->close();
56
    }
57
58
    /**
59
     * @param string $data
60
     * @return bool|void
61
     */
62 4
    public function sendData($data)
63
    {
64 4
        return $this->stream->write($data);
65
    }
66
67
    /**
68
     * @param NotificationInterface $notification
69
     * @return \React\Promise\Promise|\React\Promise\PromiseInterface
70
     */
71 1
    public function sendNotify(NotificationInterface $notification)
72
    {
73 1
        return $this->sendData($notification->toRequest()->write());
74
    }
75
    
76
    /**
77
     * @param Request $request
78
     * @return \React\Promise\Promise|\React\Promise\PromiseInterface
79
     */
80 2
    public function sendRequest(Request $request)
81
    {
82 2
        $result = new Deferred();
83 2
        $this->deferred[$request->getId()] = $result;
84 2
        $this->sendData($request->write());
85 2
        return $result->promise();
86
    }
87
88
    /**
89
     * @param string $method
90
     * @param array $params
91
     * @return \React\Promise\Promise|\React\Promise\PromiseInterface
92
     */
93 2
    public function request($method, array $params = [])
94
    {
95 2
        $request = $this->factory->create($method, $params);
96 2
        return $this->sendRequest($request);
97
    }
98
    
99
    /**
100
     * @param string $data
101
     */
102 8
    public function onData($data)
103
    {
104 8
        $buffer = $this->streamBuffer . $data;
105
106 8
        while (($nextPos = strpos($buffer, "\n"))) {
107 8
            $msg = substr($buffer, 0, $nextPos);
108 8
            $buffer = substr($buffer, $nextPos);
109 8
            if (substr($buffer, -1) == "\n") {
110 8
                $buffer = substr($buffer, 1);
111 8
            }
112 8
            $this->onMessage($msg);
113 8
        }
114
115 8
        if (!$buffer) {
116 8
            $this->streamBuffer = '';
117 8
        } else {
118 1
            $this->streamBuffer = $buffer;
119
        }
120 8
    }
121
122
    /**
123
     * @param string $data
124
     * @throws \BitWasp\Stratum\Exception\ApiError
125
     * @throws \Exception
126
     */
127 15
    public function onMessage($data)
128
    {
129 15
        $response = $this->factory->response($data);
130 15
        if (isset($this->deferred[$response->getId()])) {
131 1
            $this->deferred[$response->getId()]->resolve($response);
132 1
        } else {
133 14
            $this->emit('message', [$response]);
134
135 14
            if ($response instanceof Request) {
136 12
                $params = $response->getParams();
137
138 12
                switch ($response->getMethod()) {
139 12
                    case ElectrumClient::HEADERS_SUBSCRIBE:
140 3
                        if (!isset($params[0])) {
141 1
                            throw new \RuntimeException('Headers notification missing body');
142
                        }
143
                        
144 2
                        $header = $params[0];
145 2
                        if (count($header) !== 8) {
146 1
                            throw new \RuntimeException('Headers notification missing parameter');
147
                        }
148
149 1
                        $this->emit(ElectrumClient::HEADERS_SUBSCRIBE, [new HeadersNotification($header[0], $header[1], $header[2], $header[3], $header[4], $header[5], $header[6], $header[7])]);
150 1
                        break;
151 9
                    case ElectrumClient::ADDRESS_SUBSCRIBE:
152 3
                        if (!isset($params[0]) || !isset($params[1])) {
153 2
                            throw new \RuntimeException('Address notification missing address/txid');
154
                        }
155
156 1
                        $this->emit(ElectrumClient::ADDRESS_SUBSCRIBE, [new AddressNotification($params[0], $params[1])]);
157 1
                        break;
158 6
                    case ElectrumClient::NUMBLOCKS_SUBSCRIBE:
159 2
                        if (!isset($params[0])) {
160 1
                            throw new \RuntimeException('Missing notification parameter: height');
161
                        }
162
163 1
                        $this->emit(ElectrumClient::NUMBLOCKS_SUBSCRIBE, [new NumBlocksNotification($params[0])]);
164 1
                        break;
165 4
                    case MiningClient::SET_DIFFICULTY:
166 2
                        if (!isset($params[0])) {
167 1
                            throw new \RuntimeException('Missing mining difficulty notification parameter');
168
                        }
169
170 1
                        $this->emit(MiningClient::SET_DIFFICULTY, [new SetDifficultyNotification($params[0])]);
171 1
                        break;
172 2
                    case MiningClient::NOTIFY:
173 2
                        if (count($params) !== 9) {
174 1
                            throw new \RuntimeException('Missing mining notification parameter');
175
                        }
176
177 1
                        $this->emit(MiningClient::NOTIFY, [new MiningNotification($params[0], $params[1], $params[2], $params[3], $params[4], $params[5], $params[6], $params[7], $params[8])]);
178 1
                        break;
179 5
                }
180 5
            }
181
        }
182 8
    }
183
}
184