Manager::connect()   B
last analyzed

Complexity

Conditions 4
Paths 6

Size

Total Lines 24
Code Lines 9

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 0
CRAP Score 20

Importance

Changes 0
Metric Value
dl 0
loc 24
ccs 0
cts 14
cp 0
rs 8.6845
c 0
b 0
f 0
cc 4
eloc 9
nc 6
nop 0
crap 20
1
<?php
2
namespace Disque\Connection;
3
4
use Disque\Connection\Node\Node;
5
use Disque\Connection\Node\NodePrioritizerInterface;
6
use Disque\Connection\Node\ConservativeJobCountPrioritizer;
7
use Disque\Command\CommandInterface;
8
use Disque\Command\GetJob;
9
use Disque\Command\Response\HelloResponse;
10
use Disque\Command\Response\JobsResponse;
11
use Disque\Connection\Factory\ConnectionFactoryInterface;
12
use Disque\Connection\Factory\SocketFactory;
13
14
/**
15
 * The connection manager connects to Disque nodes and chooses the best of them
16
 *
17
 * If there are multiple nodes to connect, the first connection is always
18
 * random. The manager then switches to the best nodes according to its
19
 * NodePriority strategy.
20
 *
21
 * If the manager knows the credentials of only one node, it will automatically
22
 * discover other nodes in the cluster and connect to them if needed (unless
23
 * they are password-protected).
24
 */
25
class Manager implements ManagerInterface
26
{
27
    /**
28
     * Servers we can connect to initially, without knowing the cluster
29
     *
30
     * After connecting to one, the server returns a list of other nodes
31
     * in the cluster so we can connect to them automatically, unless
32
     * the discovered nodes are secured with a password.
33
     *
34
     * 'serverAddress' => Credentials
35
     *
36
     * @var Credentials[]
37
     */
38
    protected $credentials = [];
39
40
    /**
41
     * A strategy to prioritize nodes and find the best one to switch to
42
     *
43
     * The default strategy is the ConservativeJobCountPrioritizer. It
44
     * prioritizes nodes by their job count, but prefers the current node
45
     * in order to avoid switching until there is a clearly better node.
46
     *
47
     * @var NodePrioritizerInterface
48
     */
49
    protected $priorityStrategy;
50
51
    /**
52
     * List of nodes, ie Disque instances available in the cluster
53
     *
54
     * 'nodeId' => Node
55
     *
56
     * @var Node[]
57
     */
58
    protected $nodes = [];
59
60
    /**
61
     * Node prefixes and their corresponding node ID
62
     *
63
     * Node prefix consists of the first 8 bytes from the node ID. Because job
64
     * IDs contain the node prefix, it can be used to identify on which node
65
     * a job lives.
66
     *
67
     * 'nodePrefix' => 'nodeId'
68
     *
69
     * @var array
70
     */
71
    protected $nodePrefixes = [];
72
73
    /**
74
     * The ID of the node we are currently connected to
75
     *
76
     * @var string
77
     */
78
    protected $nodeId;
79
80
    /**
81
     * @var ConnectionFactoryInterface
82
     */
83
    private $connectionFactory;
84
85
    public function __construct()
86
    {
87
        $this->connectionFactory = new SocketFactory();
88
        $this->priorityStrategy = new ConservativeJobCountPrioritizer();
89
    }
90
91
    /**
92
     * @inheritdoc
93
     */
94
    public function getConnectionFactory()
95
    {
96
        return $this->connectionFactory;
97
    }
98
99
    /**
100
     * @inheritdoc
101
     */
102
    public function setConnectionFactory(
103
        ConnectionFactoryInterface $connectionFactory
104
    ) {
105
        $this->connectionFactory = $connectionFactory;
106
    }
107
108
    /**
109
     * @inheritdoc
110
     */
111
    public function getCredentials()
112
    {
113
        return $this->credentials;
114
    }
115
116
    /**
117
     * @inheritdoc
118
     */
119
    public function addServer(Credentials $credentials)
120
    {
121
        $address = $credentials->getAddress();
122
        $this->credentials[$address] = $credentials;
123
    }
124
125
    /**
126
     * @inheritdoc
127
     */
128
    public function getPriorityStrategy()
129
    {
130
        return $this->priorityStrategy;
131
    }
132
133
    /**
134
     * @inheritdoc
135
     */
136
    public function setPriorityStrategy(NodePrioritizerInterface $priorityStrategy)
137
    {
138
        $this->priorityStrategy = $priorityStrategy;
139
    }
140
141
    /**
142
     * @inheritdoc
143
     */
144
    public function isConnected()
145
    {
146
        return (
147
            isset($this->nodeId) &&
148
            $this->getCurrentNode()->isConnected()
149
        );
150
    }
151
152
    /**
153
     * @inheritdoc
154
     */
155
    public function connect()
156
    {
157
        // If the manager was already connected, connect to a node from the last
158
        // HELLO response. This information is newer than the credentials
159
        // supplied by the user at the beginning.
160
        if ($this->wasAlreadyConnected()) {
161
            try {
162
                $this->switchNodeIfNeeded();
163
            } catch (ConnectionException $e) {
164
                // Ignore the error, we'll try reconnecting with credentials below
165
            }
166
        }
167
168
        // Use the user-supplied credentials in case this is the initial
169
        // connection.
170
        // If the reconnection attempt above didn't work, fall back
171
        // to the user-supplied credentials, too.
172
        if ( ! $this->isConnected()) {
173
            $currentNode = $this->findAvailableConnection();
174
            $this->switchToNode($currentNode);
175
        }
176
177
        return $this->getCurrentNode();
178
    }
179
180
    /**
181
     * @inheritdoc
182
     */
183
    public function execute(CommandInterface $command)
184
    {
185
        $this->shouldBeConnected();
186
        $command = $this->preprocessExecution($command);
187
188
        $response = $this->getCurrentNode()->execute($command);
189
190
        $response = $this->postprocessExecution($command, $response);
191
        return $response;
192
    }
193
194
    /**
195
     * @inheritdoc
196
     */
197
    public function getCurrentNode()
198
    {
199
        return $this->nodes[$this->nodeId];
200
    }
201
202
    /**
203
     * Get a functional connection to any known node
204
     *
205
     * Disque suggests the first connection should be chosen randomly
206
     * We go through the user-supplied credentials randomly and try to connect.
207
     *
208
     * @return Node A connected node
209
     *
210
     * @throws ConnectionException
211
     */
212
    protected function findAvailableConnection()
213
    {
214
        $servers = $this->credentials;
215
        shuffle($servers);
216
        $previous = null;
217
218
        foreach ($servers as $server) {
219
            try {
220
                $node = $this->getNodeConnection($server);
221
            } catch (ConnectionException $e) {
222
                $previous = $e;
223
                continue;
224
            }
225
226
            if ($node->isConnected()) {
227
                return $node;
228
            }
229
        }
230
231
        throw new ConnectionException('No servers available', 0, $previous);
232
    }
233
234
    /**
235
     * Connect to the node given in the credentials
236
     *
237
     * @param Credentials $server
238
     *
239
     * @return Node A connected node
240
     *
241
     * @throws ConnectionException
242
     * @throws AuthenticationException
243
     */
244
    protected function getNodeConnection(Credentials $server)
245
    {
246
        $node = $this->createNode($server);
247
        $node->connect();
248
        return $node;
249
    }
250
251
    /**
252
     * Reset node counters that should be reset upon node switch
253
     */
254
    protected function resetNodeCounters()
255
    {
256
        foreach($this->nodes as $node) {
257
            $node->resetJobCount();
258
        }
259
    }
260
261
    /**
262
     * Hook into the command execution and do anything before it's executed
263
     *
264
     * Eg. start measuring node latency etc.
265
     *
266
     * @param CommandInterface $command
267
     *
268
     * @return CommandInterface $command
269
     */
270
    protected function preprocessExecution(CommandInterface $command)
271
    {
272
        return $command;
273
    }
274
275
    /**
276
     * Postprocess the command execution, eg. update node stats
277
     *
278
     * @param CommandInterface $command
279
     * @param mixed            $response
280
     *
281
     * @return mixed
282
     * @throws ConnectionException
283
     */
284
    protected function postprocessExecution(
285
        CommandInterface $command,
286
        $response
287
    ) {
288
        if ($command instanceof GetJob) {
289
            $this->updateNodeStats($command->parse($response));
290
            $this->switchNodeIfNeeded();
291
        }
292
293
        return $response;
294
    }
295
296
    /**
297
     * Update node counters indicating how many jobs the node has produced
298
     *
299
     * @param array $jobs Jobs
300
     */
301
    protected function updateNodeStats(array $jobs)
302
    {
303
        foreach ($jobs as $job) {
304
            $jobId = $job[JobsResponse::KEY_ID];
305
            $nodeId = $this->getNodeIdFromJobId($jobId);
306
            if (!isset($nodeId) || !isset($this->nodes[$nodeId])) {
307
                continue;
308
            }
309
310
            $node = $this->nodes[$nodeId];
311
            $node->addJobCount(1);
312
        }
313
    }
314
315
    /**
316
     * Decide if we should switch to a better node
317
     *
318
     * @throws ConnectionException
319
     */
320
    private function switchNodeIfNeeded()
321
    {
322
        $sortedNodes = $this->priorityStrategy->sort(
323
            $this->nodes,
324
            $this->nodeId
325
        );
326
327
        $previous = null;
328
329
        // Try to connect by priority, continue on error, return on success
330
        foreach($sortedNodes as $nodeCandidate) {
331
            // If the first recommended node is our current node and it has
332
            // a working connection, return early.
333
            // If its connection is not working, let's try and reconnect further
334
            // below, or find the first best connected node.
335
            if ($nodeCandidate->getId() === $this->nodeId &&
336
                $nodeCandidate->isConnected()) {
337
                return;
338
            }
339
340
            try {
341
                if ($nodeCandidate->isConnected()) {
342
                    // Say a new HELLO to the node, the cluster might have changed
343
                    $nodeCandidate->sayHello();
344
                } else {
345
                    $nodeCandidate->connect();
346
                }
347
            } catch (ConnectionException $e) {
348
                $previous = $e;
349
                continue;
350
            }
351
352
            $this->switchToNode($nodeCandidate);
353
            return;
354
        }
355
356
        throw new ConnectionException('Could not switch to any node', 0, $previous);
357
    }
358
359
    /**
360
     * Get a node ID based off a Job ID
361
     *
362
     * @param string       $jobId Job ID
363
     * @return string|null        Node ID
364
     */
365
    private function getNodeIdFromJobId($jobId)
366
    {
367
        $nodePrefix = $this->getNodePrefixFromJobId($jobId);
368
        if (
369
            isset($this->nodePrefixes[$nodePrefix]) &&
370
            array_key_exists($this->nodePrefixes[$nodePrefix], $this->nodes)
371
        ) {
372
            return $this->nodePrefixes[$nodePrefix];
373
        }
374
375
        return null;
376
    }
377
378
    /**
379
     * Get the node prefix from the job ID
380
     *
381
     * @param string  $jobId
382
     * @return string        Node prefix
383
     */
384
    private function getNodePrefixFromJobId($jobId)
385
    {
386
        $nodePrefix = substr(
387
            $jobId,
388
            JobsResponse::ID_NODE_PREFIX_START,
389
            Node::PREFIX_LENGTH
390
        );
391
392
        return $nodePrefix;
393
    }
394
395
    /**
396
     * We should be connected
397
     *
398
     * @return void
399
     * @throws ConnectionException
400
     */
401
    protected function shouldBeConnected()
402
    {
403
        // If we lost the connection, first let's try and reconnect
404
        if (!$this->isConnected()) {
405
            try {
406
                $this->switchNodeIfNeeded();
407
            } catch (ConnectionException $e) {
408
                throw new ConnectionException('Not connected. ' . $e->getMessage(), 0, $e);
409
            }
410
        }
411
    }
412
413
    /**
414
     * Create a new Node object
415
     *
416
     * @param Credentials $credentials
417
     *
418
     * @return Node An unconnected Node
419
     */
420
    private function createNode(Credentials $credentials)
421
    {
422
        $host = $credentials->getHost();
423
        $port = $credentials->getPort();
424
        $connection = $this->connectionFactory->create($host, $port);
425
426
        return new Node($credentials, $connection);
427
    }
428
429
    /**
430
     * Switch to the given node and map the cluster from its HELLO
431
     *
432
     * @param Node $node
433
     */
434
    private function switchToNode(Node $node)
435
    {
436
        $nodeId = $node->getId();
437
        // Return early if we're trying to switch to the current node.
438
        if (($this->nodeId === $nodeId)) {
439
            // But return early only if the current node is connected to Disque.
440
            // If it is disconnected, we want to overwrite it with the node
441
            // from the method argument, because that one is connected.
442
            if ($this->getCurrentNode()->isConnected()) {
443
                return;
444
            }
445
446
            // Copy the stats from the now-disconnected node object
447
            $this->copyNodeStats($this->getCurrentNode(), $node);
448
        }
449
450
        $this->resetNodeCounters();
451
452
        $this->nodeId = $nodeId;
453
        $this->nodes[$nodeId] = $node;
454
        $this->revealClusterFromHello($node);
455
    }
456
457
    /**
458
     * Reveal the whole Disque cluster from a node HELLO response
459
     *
460
     * The HELLO response from a Disque node contains addresses of all other
461
     * nodes in the cluster. We want to learn about them and save them, so that
462
     * we can switch to them later, if needed.
463
     *
464
     * @param Node $node The current node
465
     */
466
    private function revealClusterFromHello(Node $node)
467
    {
468
        $hello = $node->getHello();
469
        $revealedNodes = [];
470
471
        foreach ($hello[HelloResponse::NODES] as $node) {
472
            $id = $node[HelloResponse::NODE_ID];
473
            $revealedNode = $this->revealNodeFromHello($id, $node);
474
475
            // Update or set the node's priority as determined by Disque
476
            $priority = $node[HelloResponse::NODE_PRIORITY];
477
            $revealedNode->setPriority($priority);
478
479
            $revealedNodes[$id] = $revealedNode;
480
        }
481
482
        $this->nodes = $revealedNodes;
483
    }
484
485
    /**
486
     * Reveal a single node from a HELLO response, or use an existing node
487
     *
488
     * @param string $nodeId The node ID
489
     * @param array  $node   Node information as returned by the HELLO command
490
     *
491
     * @return Node $node A node in the current cluster
492
     */
493
    private function revealNodeFromHello($nodeId, array $node)
494
    {
495
        /**
496
         * Add the node prefix to the pool. We create the prefix manually
497
         * from the node ID rather than asking the Node object. Newly created
498
         * Nodes aren't connected and thus don't know their ID or prefix.
499
         *
500
         * @see Node::sayHello()
501
         */
502
        $prefix = substr($nodeId, Node::PREFIX_START, Node::PREFIX_LENGTH);
503
        $this->nodePrefixes[$prefix] = $nodeId;
504
505
        // If the node already exists in our pool, use it, don't overwrite it
506
        // with a new one. We would lose its stats and connection.
507
        if (isset($this->nodes[$nodeId])) {
508
            return $this->nodes[$nodeId];
509
        }
510
511
        $host = $node[HelloResponse::NODE_HOST];
512
        $port = $node[HelloResponse::NODE_PORT];
513
        $credentials = new Credentials($host, $port);
514
515
        $address = $credentials->getAddress();
516
        // If there are user-supplied credentials for this node, use them.
517
        // They may contain a password
518
        if (isset($this->credentials[$address])) {
519
            $credentials = $this->credentials[$address];
520
        }
521
522
        // Instantiate a new Node object for the newly revealed node
523
        return $this->createNode($credentials);
524
    }
525
526
    /**
527
     * Check if the manager held a connection to Disque already
528
     *
529
     * @return bool
530
     */
531
    private function wasAlreadyConnected()
532
    {
533
        return ( ! empty($this->nodes));
534
    }
535
536
    /**
537
     * Copy node stats from the old to the new node
538
     *
539
     * @param Node $oldNode
540
     * @param Node $newNode
541
     */
542
    private function copyNodeStats(Node $oldNode, Node $newNode)
543
    {
544
        $oldNodeJobCount = $oldNode->getTotalJobCount();
545
        $newNode->addJobCount($oldNodeJobCount);
546
    }
547
}