Completed
Push — master ( 4a282c...f5ece7 )
by Mariano
03:58 queued 01:18
created

Node::readPriorityFromHello()   A

Complexity

Conditions 3
Paths 3

Size

Total Lines 12
Code Lines 5

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 4
CRAP Score 3.3332

Importance

Changes 2
Bugs 1 Features 0
Metric Value
c 2
b 1
f 0
dl 0
loc 12
ccs 4
cts 6
cp 0.6667
rs 9.4285
cc 3
eloc 5
nc 3
nop 2
crap 3.3332
1
<?php
2
namespace Disque\Connection\Node;
3
4
use Disque\Command\Auth;
5
use Disque\Command\Hello;
6
use Disque\Command\Response\HelloResponse;
7
use Disque\Connection\ConnectionException;
8
use Disque\Connection\ConnectionInterface;
9
use Disque\Connection\Credentials;
10
use Disque\Connection\AuthenticationException;
11
use Disque\Connection\Response\ResponseException;
12
13
/**
14
 * Describe one Disque node, its properties and the connection to it
15
 */
16
class Node
17
{
18
    /**
19
     * The response Disque returns if password authentication succeeded
20
     */
21
    const AUTH_SUCCESS_MESSAGE = 'OK';
22
23
    /**
24
     * The beginning of a response Disque returns if authentication required
25
     */
26
    const AUTH_REQUIRED_MESSAGE = 'NOAUTH';
27
28
    /**
29
     * Node prefix boundaries
30
     */
31
    const PREFIX_START = 0;
32
    const PREFIX_LENGTH = 8;
33
34
    /**
35
     * Disque-assigned node priorities
36
     * @see $priority
37
     */
38
    const PRIORITY_OK = 1;
39
    const PRIORITY_POSSIBLE_FAILURE = 10;
40
    const PRIORITY_FAILURE = 100;
41
42
    /**
43
     * A fallback node priority if the HELLO response doesn't contain a priority
44
     * This should not happen, but let's be sure.
45
     */
46
    const PRIORITY_FALLBACK = 2;
47
48
    /**
49
     * @var Credentials Credentials of this node - host, port, password
50
     */
51
    private $credentials;
52
53
    /**
54
     * @var ConnectionInterface The connection to this node
55
     */
56
    private $connection;
57
58
    /**
59
     * @var string Node ID
60
     */
61
    private $id;
62
63
    /**
64
     * @var string Node prefix, or the first 8 bytes of the ID
65
     */
66
    private $prefix;
67
68
    /**
69
     * @var int Node priority set by Disque, 1-100, lower is better
70
     *
71
     * This priority is set by Disque, lower number is better. As of 09/2015
72
     * there are three possible values:
73
     *
74
     * 1 - Node is working correctly
75
     * 10 - Possible failure (PFAIL) - Node may be failing
76
     * 100 - Failure (FAIL) - The majority of nodes agree that the node is failing
77
     *
78
     * For priority values,
79
     * @see https://github.com/antirez/disque/blob/master/src/cluster.c, helloCommand()
80
     *
81
     * For the difference between PFAIL and FAIL states,
82
     * @see http://redis.io/topics/cluster-spec#failure-detection
83
     * @see also https://github.com/antirez/disque/blob/master/src/cluster.c
84
     * Look for CLUSTER_NODE_PFAIL and CLUSTER_NODE_FAIL
85
     *
86
     */
87
    private $priority = 1;
88
89
    /**
90
     * @var array The result of the HELLO command
91
     *
92
     * @see Disque\Command\Response\HelloResponse
93
     */
94
    private $hello;
95
96
    /**
97
     * @var int The number of jobs from this node since the last counter reset
98
     *          This counter can be reset, eg. upon a node switch
99
     */
100
    private $jobCount = 0;
101
102
    /**
103
     * @var int The number of jobs from this node during its lifetime
104
     */
105
    private $totalJobCount = 0;
106
107 27
    public function __construct(Credentials $credentials, ConnectionInterface $connection)
108
    {
109 27
        $this->credentials = $credentials;
110 27
        $this->connection = $connection;
111 27
    }
112
113
    /**
114
     * Get the node credentials
115
     *
116
     * @return Credentials
117
     */
118 4
    public function getCredentials()
119
    {
120 4
        return $this->credentials;
121
    }
122
123
    /**
124
     * Get the node connection
125
     *
126
     * @return ConnectionInterface
127
     */
128 10
    public function getConnection()
129
    {
130 10
        return $this->connection;
131
    }
132
133
    /**
134
     * Get the node ID
135
     *
136
     * @return string
137
     */
138 11
    public function getId()
139
    {
140 11
        return $this->id;
141
    }
142
143
    /**
144
     * Get the node prefix - the first 8 bytes from the ID
145
     *
146
     * @return string
147
     */
148 2
    public function getPrefix()
149
    {
150 2
        return $this->prefix;
151
    }
152
153
    /**
154
     * Get the node priority as set by the cluster. 1-100, lower is better.
155
     *
156
     * @return int
157
     */
158 8
    public function getPriority()
159
    {
160 8
        return $this->priority;
161
    }
162
163
    /**
164
     * @param int $priority Disque priority as revealed by a HELLO
165
     */
166 9
    public function setPriority($priority)
167
    {
168 9
        $this->priority = (int) $priority;
169 9
    }
170
171
    /**
172
     * Get the node's last HELLO response
173
     *
174
     * @return array
175
     */
176 11
    public function getHello()
177
    {
178 11
        return $this->hello;
179
    }
180
181
    /**
182
     * Get the node job count since the last reset (usually a node switch)
183
     *
184
     * @return int
185
     */
186 6
    public function getJobCount()
187
    {
188 6
        return $this->jobCount;
189
    }
190
191
    /**
192
     * Increase the node job counts by the given number
193
     *
194
     * @param int $jobsAdded
195
     */
196 6
    public function addJobCount($jobsAdded)
197
    {
198 6
        $this->jobCount += $jobsAdded;
199 6
        $this->totalJobCount += $jobsAdded;
200
201 6
    }
202
203
    /**
204
     *  Reset the node job count
205
     */
206 2
    public function resetJobCount()
207
    {
208 2
        $this->jobCount = 0;
209 2
    }
210
211
    /**
212
     * Get the total job count since the node instantiation
213
     *
214
     * @return int
215
     */
216 3
    public function getTotalJobCount()
217
    {
218 3
        return $this->totalJobCount;
219
    }
220
221
    /**
222
     * Connect to the node and return the HELLO response
223
     *
224
     * This method is idempotent and can be called multiple times
225
     *
226
     * @return array The HELLO response
227
     * @throws ConnectionException
228
     * @throws AuthenticationException
229
     */
230 19
    public function connect()
231
    {
232 19
        if ($this->connection->isConnected() && !empty($this->hello)) {
233
            return $this->hello;
234
        }
235
236 18
        $this->connectToTheNode();
237 16
        $this->authenticateWithPassword();
238
239
        try {
240 13
            $this->sayHello();
241 13
        } catch (ResponseException $e) {
242
            /**
243
             * If the node requires a password but we didn't supply any,
244
             * Disque returns a message "NOAUTH Authentication required"
245
             *
246
             * HELLO is the first place we would get this error.
247
             *
248
             * @see https://github.com/antirez/disque/blob/master/src/server.c
249
             * Look for "noautherr"
250
             */
251 1
            $message = $e->getMessage();
252 1
            if (stripos($message, self::AUTH_REQUIRED_MESSAGE) === 0) {
253 1
                throw new AuthenticationException($message);
254
            }
255
        }
256
257 11
        return $this->hello;
258
    }
259
260
    /**
261
     * Say a new HELLO to the node and parse the response
262
     *
263
     * @return array The HELLO response
264
     *
265
     * @throws ConnectionException
266
     */
267 15
    public function sayHello()
268
    {
269 15
        $helloCommand = new Hello();
270 15
        $helloResponse = $this->connection->execute($helloCommand);
271 12
        $this->hello = (array) $helloCommand->parse($helloResponse);
272
273 12
        $this->id = $this->hello[HelloResponse::NODE_ID];
274 12
        $this->createPrefix($this->id);
275
276 12
        $this->priority = $this->readPriorityFromHello($this->hello, $this->id);
277
278 12
        return $this->hello;
279
    }
280
281
    /**
282
     * Connect to the node
283
     *
284
     * @throws ConnectionException
285
     */
286 18
    private function connectToTheNode()
287
    {
288 18
        $this->connection->connect(
289 18
            $this->credentials->getConnectionTimeout(),
290 18
            $this->credentials->getResponseTimeout()
291 18
        );
292 16
    }
293
294
    /**
295
     * Authenticate with the node with a password, if set
296
     *
297
     * @throws AuthenticationException
298
     */
299 16
    private function authenticateWithPassword()
300
    {
301 16
        if ($this->credentials->havePassword()) {
302 5
            $authCommand = new Auth();
303 5
            $authCommand->setArguments([$this->credentials->getPassword()]);
304 5
            $authResponse = $this->connection->execute($authCommand);
305 4
            $response = $authCommand->parse($authResponse);
306 4
            if ($response !== self::AUTH_SUCCESS_MESSAGE) {
307 2
                throw new AuthenticationException();
308
            }
309 2
        }
310 13
    }
311
312
    /**
313
     * Create a node prefix from the node ID
314
     *
315
     * @param string $id
316
     */
317 12
    private function createPrefix($id)
318
    {
319 12
        $this->prefix = substr($id, self::PREFIX_START, self::PREFIX_LENGTH);
320 12
    }
321
322
    /**
323
     * Read out the node's own priority from a HELLO response
324
     *
325
     * @param array  $hello The HELLO response
326
     * @param string $id    Node ID
327
     *
328
     * @return int Node priority
329
     */
330 12
    private function readPriorityFromHello($hello, $id)
331
    {
332 12
        foreach ($hello[HelloResponse::NODES] as $node) {
333 12
            if ($node[HelloResponse::NODE_ID] === $id) {
334 12
                return $node[HelloResponse::NODE_PRIORITY];
335
            }
336
        }
337
338
        // Node not found in the HELLO? This should not happen.
339
        // Return a fallback value
340
        return self::PRIORITY_FALLBACK;
341
    }
342
}