Completed
Push — master ( 12ca42...2e55dd )
by Mariano
9s
created

Node::isConnected()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 4
Code Lines 2

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 2
CRAP Score 1

Importance

Changes 1
Bugs 0 Features 1
Metric Value
c 1
b 0
f 1
dl 0
loc 4
ccs 2
cts 2
cp 1
rs 10
cc 1
eloc 2
nc 1
nop 0
crap 1
1
<?php
2
namespace Disque\Connection\Node;
3
4
use Disque\Command\Auth;
5
use Disque\Command\CommandInterface;
6
use Disque\Command\Hello;
7
use Disque\Command\Response\HelloResponse;
8
use Disque\Connection\ConnectionException;
9
use Disque\Connection\ConnectionInterface;
10
use Disque\Connection\Credentials;
11
use Disque\Connection\AuthenticationException;
12
use Disque\Connection\Response\ResponseException;
13
14
/**
15
 * Describe one Disque node, its properties and the connection to it
16
 */
17
class Node
18
{
19
    /**
20
     * The response Disque returns if password authentication succeeded
21
     */
22
    const AUTH_SUCCESS_MESSAGE = 'OK';
23
24
    /**
25
     * The beginning of a response Disque returns if authentication required
26
     */
27
    const AUTH_REQUIRED_MESSAGE = 'NOAUTH';
28
29
    /**
30
     * Node prefix boundaries
31
     */
32
    const PREFIX_START = 0;
33
    const PREFIX_LENGTH = 8;
34
35
    /**
36
     * Disque-assigned node priorities
37
     * @see $priority
38
     */
39
    const PRIORITY_OK = 1;
40
    const PRIORITY_POSSIBLE_FAILURE = 10;
41
    const PRIORITY_FAILURE = 100;
42
43
    /**
44
     * A fallback node priority if the HELLO response doesn't contain a priority
45
     * This should not happen, but let's be sure.
46
     */
47
    const PRIORITY_FALLBACK = 2;
48
49
    /**
50
     * @var Credentials Credentials of this node - host, port, password
51
     */
52
    private $credentials;
53
54
    /**
55
     * @var ConnectionInterface The connection to this node
56
     */
57
    private $connection;
58
59
    /**
60
     * @var string Node ID
61
     */
62
    private $id;
63
64
    /**
65
     * @var string Node prefix, or the first 8 bytes of the ID
66
     */
67
    private $prefix;
68
69
    /**
70
     * @var int Node priority set by Disque, 1-100, lower is better
71
     *
72
     * This priority is set by Disque, lower number is better. As of 09/2015
73
     * there are three possible values:
74
     *
75
     * 1 - Node is working correctly
76
     * 10 - Possible failure (PFAIL) - Node may be failing
77
     * 100 - Failure (FAIL) - The majority of nodes agree that the node is failing
78
     *
79
     * For priority values,
80
     * @see https://github.com/antirez/disque/blob/master/src/cluster.c, helloCommand()
81
     *
82
     * For the difference between PFAIL and FAIL states,
83
     * @see http://redis.io/topics/cluster-spec#failure-detection
84
     * @see also https://github.com/antirez/disque/blob/master/src/cluster.c
85
     * Look for CLUSTER_NODE_PFAIL and CLUSTER_NODE_FAIL
86
     *
87
     */
88
    private $priority = 1;
89
90
    /**
91
     * @var array The result of the HELLO command
92
     *
93
     * @see Disque\Command\Response\HelloResponse
94
     */
95
    private $hello;
96
97
    /**
98
     * @var int The number of jobs from this node since the last counter reset
99
     *          This counter can be reset, eg. upon a node switch
100
     */
101
    private $jobCount = 0;
102
103
    /**
104
     * @var int The number of jobs from this node during its lifetime
105
     */
106
    private $totalJobCount = 0;
107
108 29
    public function __construct(Credentials $credentials, ConnectionInterface $connection)
109
    {
110 29
        $this->credentials = $credentials;
111 29
        $this->connection = $connection;
112 29
    }
113
114
    /**
115
     * Get the node credentials
116
     *
117
     * @return Credentials
118
     */
119 4
    public function getCredentials()
120
    {
121 4
        return $this->credentials;
122
    }
123
124
    /**
125
     * Get the node connection
126
     *
127
     * @return ConnectionInterface
128
     */
129 4
    public function getConnection()
130
    {
131 4
        return $this->connection;
132
    }
133
134
    /**
135
     * Get the node ID
136
     *
137
     * @return string
138
     */
139 13
    public function getId()
140
    {
141 13
        return $this->id;
142
    }
143
144
    /**
145
     * Get the node prefix - the first 8 bytes from the ID
146
     *
147
     * @return string
148
     */
149 2
    public function getPrefix()
150
    {
151 2
        return $this->prefix;
152
    }
153
154
    /**
155
     * Get the node priority as set by the cluster. 1-100, lower is better.
156
     *
157
     * @return int
158
     */
159 8
    public function getPriority()
160
    {
161 8
        return $this->priority;
162
    }
163
164
    /**
165
     * @param int $priority Disque priority as revealed by a HELLO
166
     */
167 11
    public function setPriority($priority)
168
    {
169 11
        $this->priority = (int) $priority;
170 11
    }
171
172
    /**
173
     * Get the node's last HELLO response
174
     *
175
     * @return array
176
     */
177 13
    public function getHello()
178
    {
179 13
        return $this->hello;
180
    }
181
182
    /**
183
     * Get the node job count since the last reset (usually a node switch)
184
     *
185
     * @return int
186
     */
187 6
    public function getJobCount()
188
    {
189 6
        return $this->jobCount;
190
    }
191
192
    /**
193
     * Increase the node job counts by the given number
194
     *
195
     * @param int $jobsAdded
196
     */
197 7
    public function addJobCount($jobsAdded)
198
    {
199 7
        $this->jobCount += $jobsAdded;
200 7
        $this->totalJobCount += $jobsAdded;
201
202 7
    }
203
204
    /**
205
     *  Reset the node job count
206
     */
207 4
    public function resetJobCount()
208
    {
209 4
        $this->jobCount = 0;
210 4
    }
211
212
    /**
213
     * Get the total job count since the node instantiation
214
     *
215
     * @return int
216
     */
217 4
    public function getTotalJobCount()
218
    {
219 4
        return $this->totalJobCount;
220
    }
221
222
    /**
223
     * Connect to the node and return the HELLO response
224
     *
225
     * This method is idempotent and can be called multiple times
226
     *
227
     * @return array The HELLO response
228
     * @throws ConnectionException
229
     * @throws AuthenticationException
230
     */
231 21
    public function connect()
232
    {
233 21
        if ($this->connection->isConnected() && !empty($this->hello)) {
234
            return $this->hello;
235
        }
236
237 20
        $this->connectToTheNode();
238 18
        $this->authenticateWithPassword();
239
240
        try {
241 15
            $this->sayHello();
242 15
        } catch (ResponseException $e) {
243
            /**
244
             * If the node requires a password but we didn't supply any,
245
             * Disque returns a message "NOAUTH Authentication required"
246
             *
247
             * HELLO is the first place we would get this error.
248
             *
249
             * @see https://github.com/antirez/disque/blob/master/src/server.c
250
             * Look for "noautherr"
251
             */
252 1
            $message = $e->getMessage();
253 1
            if (stripos($message, self::AUTH_REQUIRED_MESSAGE) === 0) {
254 1
                throw new AuthenticationException($message);
255
            }
256
        }
257
258 13
        return $this->hello;
259
    }
260
261
    /**
262
     * Check if this object holds a working connection to Disque node
263
     *
264
     * @return bool
265
     */
266 11
    public function isConnected()
267
    {
268 11
        return $this->connection->isConnected();
269
    }
270
271
    /**
272
     * Execute a command on this Disque node
273
     *
274
     * @param CommandInterface $command
275
     * @return mixed Response
276
     *
277
     * @throws ConnectionException
278
     */
279 5
    public function execute(CommandInterface $command)
280
    {
281 5
        return $this->connection->execute($command);
282
    }
283
284
    /**
285
     * Say a new HELLO to the node and parse the response
286
     *
287
     * @return array The HELLO response
288
     *
289
     * @throws ConnectionException
290
     */
291 17
    public function sayHello()
292
    {
293 17
        $helloCommand = new Hello();
294 17
        $helloResponse = $this->connection->execute($helloCommand);
295 14
        $this->hello = (array) $helloCommand->parse($helloResponse);
296
297 14
        $this->id = $this->hello[HelloResponse::NODE_ID];
298 14
        $this->createPrefix($this->id);
299
300 14
        $this->priority = $this->readPriorityFromHello($this->hello, $this->id);
301
302 14
        return $this->hello;
303
    }
304
305
    /**
306
     * Connect to the node
307
     *
308
     * @throws ConnectionException
309
     */
310 20
    private function connectToTheNode()
311
    {
312 20
        $this->connection->connect(
313 20
            $this->credentials->getConnectionTimeout(),
314 20
            $this->credentials->getResponseTimeout()
315 20
        );
316 18
    }
317
318
    /**
319
     * Authenticate with the node with a password, if set
320
     *
321
     * @throws AuthenticationException
322
     */
323 18
    private function authenticateWithPassword()
324
    {
325 18
        if ($this->credentials->havePassword()) {
326 5
            $authCommand = new Auth();
327 5
            $authCommand->setArguments([$this->credentials->getPassword()]);
328 5
            $authResponse = $this->connection->execute($authCommand);
329 4
            $response = $authCommand->parse($authResponse);
330 4
            if ($response !== self::AUTH_SUCCESS_MESSAGE) {
331 2
                throw new AuthenticationException();
332
            }
333 2
        }
334 15
    }
335
336
    /**
337
     * Create a node prefix from the node ID
338
     *
339
     * @param string $id
340
     */
341 14
    private function createPrefix($id)
342
    {
343 14
        $this->prefix = substr($id, self::PREFIX_START, self::PREFIX_LENGTH);
344 14
    }
345
346
    /**
347
     * Read out the node's own priority from a HELLO response
348
     *
349
     * @param array  $hello The HELLO response
350
     * @param string $id    Node ID
351
     *
352
     * @return int Node priority
353
     */
354 14
    private function readPriorityFromHello($hello, $id)
355
    {
356 14
        foreach ($hello[HelloResponse::NODES] as $node) {
357 14
            if ($node[HelloResponse::NODE_ID] === $id) {
358 14
                return $node[HelloResponse::NODE_PRIORITY];
359
            }
360 1
        }
361
362
        // Node not found in the HELLO? This should not happen.
363
        // Return a fallback value
364
        return self::PRIORITY_FALLBACK;
365
    }
366
}
367