Node::resetJobCount()   A
last analyzed

Complexity

Conditions 1
Paths 1

Size

Total Lines 4
Code Lines 2

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 0
CRAP Score 2

Importance

Changes 0
Metric Value
dl 0
loc 4
ccs 0
cts 4
cp 0
rs 10
c 0
b 0
f 0
cc 1
eloc 2
nc 1
nop 0
crap 2
1
<?php
2
namespace Disque\Connection\Node;
3
4
use Disque\Command\Auth;
5
use Disque\Command\CommandInterface;
6
use Disque\Command\Hello;
7
use Disque\Command\Response\HelloResponse;
8
use Disque\Connection\ConnectionException;
9
use Disque\Connection\ConnectionInterface;
10
use Disque\Connection\Credentials;
11
use Disque\Connection\AuthenticationException;
12
use Disque\Connection\Response\ResponseException;
13
14
/**
15
 * Describe one Disque node, its properties and the connection to it
16
 */
17
class Node
18
{
19
    /**
20
     * The response Disque returns if password authentication succeeded
21
     */
22
    const AUTH_SUCCESS_MESSAGE = 'OK';
23
24
    /**
25
     * The beginning of a response Disque returns if authentication required
26
     */
27
    const AUTH_REQUIRED_MESSAGE = 'NOAUTH';
28
29
    /**
30
     * Node prefix boundaries
31
     */
32
    const PREFIX_START = 0;
33
    const PREFIX_LENGTH = 8;
34
35
    /**
36
     * Disque-assigned node priorities
37
     * @see $priority
38
     */
39
    const PRIORITY_OK = 1;
40
    const PRIORITY_POSSIBLE_FAILURE = 10;
41
    const PRIORITY_FAILURE = 100;
42
43
    /**
44
     * A fallback node priority if the HELLO response doesn't contain a priority
45
     * This should not happen, but let's be sure.
46
     */
47
    const PRIORITY_FALLBACK = 2;
48
49
    /**
50
     * @var Credentials Credentials of this node - host, port, password
51
     */
52
    private $credentials;
53
54
    /**
55
     * @var ConnectionInterface The connection to this node
56
     */
57
    private $connection;
58
59
    /**
60
     * @var string Node ID
61
     */
62
    private $id;
63
64
    /**
65
     * @var string Node prefix, or the first 8 bytes of the ID
66
     */
67
    private $prefix;
68
69
    /**
70
     * @var int Node priority set by Disque, 1-100, lower is better
71
     *
72
     * This priority is set by Disque, lower number is better. As of 09/2015
73
     * there are three possible values:
74
     *
75
     * 1 - Node is working correctly
76
     * 10 - Possible failure (PFAIL) - Node may be failing
77
     * 100 - Failure (FAIL) - The majority of nodes agree that the node is failing
78
     *
79
     * For priority values,
80
     * @see https://github.com/antirez/disque/blob/master/src/cluster.c, helloCommand()
81
     *
82
     * For the difference between PFAIL and FAIL states,
83
     * @see http://redis.io/topics/cluster-spec#failure-detection
84
     * @see also https://github.com/antirez/disque/blob/master/src/cluster.c
85
     * Look for CLUSTER_NODE_PFAIL and CLUSTER_NODE_FAIL
86
     *
87
     */
88
    private $priority = 1;
89
90
    /**
91
     * @var array The result of the HELLO command
92
     *
93
     * @see Disque\Command\Response\HelloResponse
94
     */
95
    private $hello;
96
97
    /**
98
     * @var int The number of jobs from this node since the last counter reset
99
     *          This counter can be reset, eg. upon a node switch
100
     */
101
    private $jobCount = 0;
102
103
    /**
104
     * @var int The number of jobs from this node during its lifetime
105
     */
106
    private $totalJobCount = 0;
107
108
    public function __construct(Credentials $credentials, ConnectionInterface $connection)
109
    {
110
        $this->credentials = $credentials;
111
        $this->connection = $connection;
112
    }
113
114
    /**
115
     * Get the node credentials
116
     *
117
     * @return Credentials
118
     */
119
    public function getCredentials()
120
    {
121
        return $this->credentials;
122
    }
123
124
    /**
125
     * Get the node connection
126
     *
127
     * @return ConnectionInterface
128
     */
129
    public function getConnection()
130
    {
131
        return $this->connection;
132
    }
133
134
    /**
135
     * Get the node ID
136
     *
137
     * @return string
138
     */
139
    public function getId()
140
    {
141
        return $this->id;
142
    }
143
144
    /**
145
     * Get the node prefix - the first 8 bytes from the ID
146
     *
147
     * @return string
148
     */
149
    public function getPrefix()
150
    {
151
        return $this->prefix;
152
    }
153
154
    /**
155
     * Get the node priority as set by the cluster. 1-100, lower is better.
156
     *
157
     * @return int
158
     */
159
    public function getPriority()
160
    {
161
        return $this->priority;
162
    }
163
164
    /**
165
     * @param int $priority Disque priority as revealed by a HELLO
166
     */
167
    public function setPriority($priority)
168
    {
169
        $this->priority = (int) $priority;
170
    }
171
172
    /**
173
     * Get the node's last HELLO response
174
     *
175
     * @return array
176
     */
177
    public function getHello()
178
    {
179
        return $this->hello;
180
    }
181
182
    /**
183
     * Get the node job count since the last reset (usually a node switch)
184
     *
185
     * @return int
186
     */
187
    public function getJobCount()
188
    {
189
        return $this->jobCount;
190
    }
191
192
    /**
193
     * Increase the node job counts by the given number
194
     *
195
     * @param int $jobsAdded
196
     */
197
    public function addJobCount($jobsAdded)
198
    {
199
        $this->jobCount += $jobsAdded;
200
        $this->totalJobCount += $jobsAdded;
201
202
    }
203
204
    /**
205
     *  Reset the node job count
206
     */
207
    public function resetJobCount()
208
    {
209
        $this->jobCount = 0;
210
    }
211
212
    /**
213
     * Get the total job count since the node instantiation
214
     *
215
     * @return int
216
     */
217
    public function getTotalJobCount()
218
    {
219
        return $this->totalJobCount;
220
    }
221
222
    /**
223
     * Connect to the node and return the HELLO response
224
     *
225
     * This method is idempotent and can be called multiple times
226
     *
227
     * @return array The HELLO response
228
     * @throws ConnectionException
229
     * @throws AuthenticationException
230
     */
231
    public function connect()
232
    {
233
        if ($this->connection->isConnected() && !empty($this->hello)) {
234
            return $this->hello;
235
        }
236
237
        $this->connectToTheNode();
238
        $this->authenticateWithPassword();
239
240
        try {
241
            $this->sayHello();
242
        } catch (ResponseException $e) {
243
            /**
244
             * If the node requires a password but we didn't supply any,
245
             * Disque returns a message "NOAUTH Authentication required"
246
             *
247
             * HELLO is the first place we would get this error.
248
             *
249
             * @see https://github.com/antirez/disque/blob/master/src/server.c
250
             * Look for "noautherr"
251
             */
252
            $message = $e->getMessage();
253
            if (stripos($message, self::AUTH_REQUIRED_MESSAGE) === 0) {
254
                throw new AuthenticationException($message, 0, $e);
255
            }
256
        }
257
258
        return $this->hello;
259
    }
260
261
    /**
262
     * Check if this object holds a working connection to Disque node
263
     *
264
     * @return bool
265
     */
266
    public function isConnected()
267
    {
268
        return $this->connection->isConnected();
269
    }
270
271
    /**
272
     * Execute a command on this Disque node
273
     *
274
     * @param CommandInterface $command
275
     * @return mixed Response
276
     *
277
     * @throws ConnectionException
278
     */
279
    public function execute(CommandInterface $command)
280
    {
281
        return $this->connection->execute($command);
282
    }
283
284
    /**
285
     * Say a new HELLO to the node and parse the response
286
     *
287
     * @return array The HELLO response
288
     *
289
     * @throws ConnectionException
290
     */
291
    public function sayHello()
292
    {
293
        $helloCommand = new Hello();
294
        $helloResponse = $this->connection->execute($helloCommand);
295
        $this->hello = (array) $helloCommand->parse($helloResponse);
296
297
        $this->id = $this->hello[HelloResponse::NODE_ID];
298
        $this->createPrefix($this->id);
299
300
        $this->priority = $this->readPriorityFromHello($this->hello, $this->id);
301
302
        return $this->hello;
303
    }
304
305
    /**
306
     * Connect to the node
307
     *
308
     * @throws ConnectionException
309
     */
310
    private function connectToTheNode()
311
    {
312
        $this->connection->connect(
313
            $this->credentials->getConnectionTimeout(),
314
            $this->credentials->getResponseTimeout()
315
        );
316
    }
317
318
    /**
319
     * Authenticate with the node with a password, if set
320
     *
321
     * @throws AuthenticationException
322
     */
323
    private function authenticateWithPassword()
324
    {
325
        if ($this->credentials->havePassword()) {
326
            $authCommand = new Auth();
327
            $authCommand->setArguments([$this->credentials->getPassword()]);
328
            $authResponse = $this->connection->execute($authCommand);
329
            $response = $authCommand->parse($authResponse);
330
            if ($response !== self::AUTH_SUCCESS_MESSAGE) {
331
                throw new AuthenticationException();
332
            }
333
        }
334
    }
335
336
    /**
337
     * Create a node prefix from the node ID
338
     *
339
     * @param string $id
340
     */
341
    private function createPrefix($id)
342
    {
343
        $this->prefix = substr($id, self::PREFIX_START, self::PREFIX_LENGTH);
344
    }
345
346
    /**
347
     * Read out the node's own priority from a HELLO response
348
     *
349
     * @param array  $hello The HELLO response
350
     * @param string $id    Node ID
351
     *
352
     * @return int Node priority
353
     */
354
    private function readPriorityFromHello($hello, $id)
355
    {
356
        foreach ($hello[HelloResponse::NODES] as $node) {
357
            if ($node[HelloResponse::NODE_ID] === $id) {
358
                return $node[HelloResponse::NODE_PRIORITY];
359
            }
360
        }
361
362
        // Node not found in the HELLO? This should not happen.
363
        // Return a fallback value
364
        return self::PRIORITY_FALLBACK;
365
    }
366
}
367