Completed
Push — master ( 12ca42...2e55dd )
by Mariano
9s
created

Manager::connect()   B

Complexity

Conditions 4
Paths 6

Size

Total Lines 24
Code Lines 9

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 10
CRAP Score 4

Importance

Changes 7
Bugs 0 Features 0
Metric Value
c 7
b 0
f 0
dl 0
loc 24
ccs 10
cts 10
cp 1
rs 8.6845
cc 4
eloc 9
nc 6
nop 0
crap 4
1
<?php
2
namespace Disque\Connection;
3
4
use Disque\Connection\Node\Node;
5
use Disque\Connection\Node\NodePrioritizerInterface;
6
use Disque\Connection\Node\ConservativeJobCountPrioritizer;
7
use Disque\Command\CommandInterface;
8
use Disque\Command\GetJob;
9
use Disque\Command\Response\HelloResponse;
10
use Disque\Command\Response\JobsResponse;
11
use Disque\Connection\Factory\ConnectionFactoryInterface;
12
use Disque\Connection\Factory\SocketFactory;
13
14
/**
15
 * The connection manager connects to Disque nodes and chooses the best of them
16
 *
17
 * If there are multiple nodes to connect, the first connection is always
18
 * random. The manager then switches to the best nodes according to its
19
 * NodePriority strategy.
20
 *
21
 * If the manager knows the credentials of only one node, it will automatically
22
 * discover other nodes in the cluster and connect to them if needed (unless
23
 * they are password-protected).
24
 */
25
class Manager implements ManagerInterface
26
{
27
    /**
28
     * Servers we can connect to initially, without knowing the cluster
29
     *
30
     * After connecting to one, the server returns a list of other nodes
31
     * in the cluster so we can connect to them automatically, unless
32
     * the discovered nodes are secured with a password.
33
     *
34
     * 'serverAddress' => Credentials
35
     *
36
     * @var Credentials[]
37
     */
38
    protected $credentials = [];
39
40
    /**
41
     * A strategy to prioritize nodes and find the best one to switch to
42
     *
43
     * The default strategy is the ConservativeJobCountPrioritizer. It
44
     * prioritizes nodes by their job count, but prefers the current node
45
     * in order to avoid switching until there is a clearly better node.
46
     *
47
     * @var NodePrioritizerInterface
48
     */
49
    protected $priorityStrategy;
50
51
    /**
52
     * List of nodes, ie Disque instances available in the cluster
53
     *
54
     * 'nodeId' => Node
55
     *
56
     * @var Node[]
57
     */
58
    protected $nodes = [];
59
60
    /**
61
     * Node prefixes and their corresponding node ID
62
     *
63
     * Node prefix consists of the first 8 bytes from the node ID. Because job
64
     * IDs contain the node prefix, it can be used to identify on which node
65
     * a job lives.
66
     *
67
     * 'nodePrefix' => 'nodeId'
68
     *
69
     * @var array
70
     */
71
    protected $nodePrefixes = [];
72
73
    /**
74
     * The ID of the node we are currently connected to
75
     *
76
     * @var string
77
     */
78
    protected $nodeId;
79
80
    /**
81
     * @var ConnectionFactoryInterface
82
     */
83
    private $connectionFactory;
84
85 41
    public function __construct()
86
    {
87 41
        $this->connectionFactory = new SocketFactory();
88 41
        $this->priorityStrategy = new ConservativeJobCountPrioritizer();
89 41
    }
90
91
    /**
92
     * @inheritdoc
93
     */
94 2
    public function getConnectionFactory()
95
    {
96 2
        return $this->connectionFactory;
97
    }
98
99
    /**
100
     * @inheritdoc
101
     */
102 17
    public function setConnectionFactory(
103
        ConnectionFactoryInterface $connectionFactory
104
    ) {
105 17
        $this->connectionFactory = $connectionFactory;
106 17
    }
107
108
    /**
109
     * @inheritdoc
110
     */
111 5
    public function getCredentials()
112
    {
113 5
        return $this->credentials;
114
    }
115
116
    /**
117
     * @inheritdoc
118
     */
119 19
    public function addServer(Credentials $credentials)
120
    {
121 19
        $address = $credentials->getAddress();
122 19
        $this->credentials[$address] = $credentials;
123 19
    }
124
125
    /**
126
     * @inheritdoc
127
     */
128 1
    public function getPriorityStrategy()
129
    {
130 1
        return $this->priorityStrategy;
131
    }
132
133
    /**
134
     * @inheritdoc
135
     */
136 7
    public function setPriorityStrategy(NodePrioritizerInterface $priorityStrategy)
137
    {
138 7
        $this->priorityStrategy = $priorityStrategy;
139 7
    }
140
141
    /**
142
     * @inheritdoc
143
     */
144 20
    public function isConnected()
145
    {
146
        return (
147 20
            isset($this->nodeId) &&
148 7
            $this->getCurrentNode()->isConnected()
149 20
        );
150
    }
151
152
    /**
153
     * @inheritdoc
154
     */
155 18
    public function connect()
156
    {
157
        // If the manager was already connected, connect to a node from the last
158
        // HELLO response. This information is newer than the credentials
159
        // supplied by the user at the beginning.
160 18
        if ($this->wasAlreadyConnected()) {
161
            try {
162 2
                $this->switchNodeIfNeeded();
163 2
            } catch (ConnectionException $e) {
164
                // Ignore the error, we'll try reconnecting with credentials below
165
            }
166 2
        }
167
168
        // Use the user-supplied credentials in case this is the initial
169
        // connection.
170
        // If the reconnection attempt above didn't work, fall back
171
        // to the user-supplied credentials, too.
172 18
        if ( ! $this->isConnected()) {
173 18
            $currentNode = $this->findAvailableConnection();
174 11
            $this->switchToNode($currentNode);
175 11
        }
176
177 11
        return $this->getCurrentNode();
178
    }
179
180
    /**
181
     * @inheritdoc
182
     */
183 7
    public function execute(CommandInterface $command)
184
    {
185 7
        $this->shouldBeConnected();
186 5
        $command = $this->preprocessExecution($command);
187
188 5
        $response = $this->getCurrentNode()->execute($command);
189
190 5
        $response = $this->postprocessExecution($command, $response);
191 5
        return $response;
192
    }
193
194
    /**
195
     * @inheritdoc
196
     */
197 11
    public function getCurrentNode()
198
    {
199 11
        return $this->nodes[$this->nodeId];
200
    }
201
202
    /**
203
     * Get a functional connection to any known node
204
     *
205
     * Disque suggests the first connection should be chosen randomly
206
     * We go through the user-supplied credentials randomly and try to connect.
207
     *
208
     * @return Node A connected node
209
     *
210
     * @throws ConnectionException
211
     */
212 18
    protected function findAvailableConnection()
213
    {
214 18
        $servers = $this->credentials;
215 18
        shuffle($servers);
216 18
        foreach ($servers as $server) {
217
            try {
218 16
                $node = $this->getNodeConnection($server);
219 16
            } catch (ConnectionException $e) {
220 6
                continue;
221
            }
222
223 11
            if ($node->isConnected()) {
224 11
                return $node;
225
            }
226 7
        }
227
228 7
        throw new ConnectionException('No servers available');
229
    }
230
231
    /**
232
     * Connect to the node given in the credentials
233
     *
234
     * @param Credentials $server
235
     *
236
     * @return Node A connected node
237
     *
238
     * @throws ConnectionException
239
     * @throws AuthenticationException
240
     */
241 16
    protected function getNodeConnection(Credentials $server)
242
    {
243 16
        $node = $this->createNode($server);
244 16
        $node->connect();
245 11
        return $node;
246
    }
247
248
    /**
249
     * Reset node counters that should be reset upon node switch
250
     */
251 11
    protected function resetNodeCounters()
252
    {
253 11
        foreach($this->nodes as $node) {
254 3
            $node->resetJobCount();
255 11
        }
256 11
    }
257
258
    /**
259
     * Hook into the command execution and do anything before it's executed
260
     *
261
     * Eg. start measuring node latency etc.
262
     *
263
     * @param CommandInterface $command
264
     *
265
     * @return CommandInterface $command
266
     */
267 5
    protected function preprocessExecution(CommandInterface $command)
268
    {
269 5
        return $command;
270
    }
271
272
    /**
273
     * Postprocess the command execution, eg. update node stats
274
     *
275
     * @param CommandInterface $command
276
     * @param mixed            $response
277
     *
278
     * @return mixed
279
     * @throws ConnectionException
280
     */
281 5
    protected function postprocessExecution(
282
        CommandInterface $command,
283
        $response
284
    ) {
285 5
        if ($command instanceof GetJob) {
286 4
            $this->updateNodeStats($command->parse($response));
287 4
            $this->switchNodeIfNeeded();
288 4
        }
289
290 5
        return $response;
291
    }
292
293
    /**
294
     * Update node counters indicating how many jobs the node has produced
295
     *
296
     * @param array $jobs Jobs
297
     */
298 4
    protected function updateNodeStats(array $jobs)
299
    {
300 4
        foreach ($jobs as $job) {
301 4
            $jobId = $job[JobsResponse::KEY_ID];
302 4
            $nodeId = $this->getNodeIdFromJobId($jobId);
303 4
            if (!isset($nodeId) || !isset($this->nodes[$nodeId])) {
304 1
                continue;
305
            }
306
307 4
            $node = $this->nodes[$nodeId];
308 4
            $node->addJobCount(1);
309 4
        }
310 4
    }
311
312
    /**
313
     * Decide if we should switch to a better node
314
     *
315
     * @throws ConnectionException
316
     */
317 8
    private function switchNodeIfNeeded()
318
    {
319 8
        $sortedNodes = $this->priorityStrategy->sort(
320 8
            $this->nodes,
321 8
            $this->nodeId
322 8
        );
323
324
        // Try to connect by priority, continue on error, return on success
325 8
        foreach($sortedNodes as $nodeCandidate) {
326
            // If the first recommended node is our current node and it has
327
            // a working connection, return early.
328
            // If its connection is not working, let's try and reconnect further
329
            // below, or find the first best connected node.
330 6
            if ($nodeCandidate->getId() === $this->nodeId &&
331 6
                $nodeCandidate->isConnected()) {
332 4
                return;
333
            }
334
335
            try {
336 4
                if ($nodeCandidate->isConnected()) {
337
                    // Say a new HELLO to the node, the cluster might have changed
338
                    $nodeCandidate->sayHello();
339
                } else {
340 4
                    $nodeCandidate->connect();
341
                }
342 4
            } catch (ConnectionException $e) {
343 2
                continue;
344
            }
345
346 2
            $this->switchToNode($nodeCandidate);
347 2
            return;
348 3
        }
349
350 3
        throw new ConnectionException('Could not switch to any node');
351
    }
352
353
    /**
354
     * Get a node ID based off a Job ID
355
     *
356
     * @param string       $jobId Job ID
357
     * @return string|null        Node ID
358
     */
359 4
    private function getNodeIdFromJobId($jobId)
360
    {
361 4
        $nodePrefix = $this->getNodePrefixFromJobId($jobId);
362
        if (
363 4
            isset($this->nodePrefixes[$nodePrefix]) &&
364 4
            array_key_exists($this->nodePrefixes[$nodePrefix], $this->nodes)
365 4
        ) {
366 4
            return $this->nodePrefixes[$nodePrefix];
367
        }
368
369 1
        return null;
370
    }
371
372
    /**
373
     * Get the node prefix from the job ID
374
     *
375
     * @param string  $jobId
376
     * @return string        Node prefix
377
     */
378 4
    private function getNodePrefixFromJobId($jobId)
379
    {
380 4
        $nodePrefix = substr(
381 4
            $jobId,
382 4
            JobsResponse::ID_NODE_PREFIX_START,
383
            Node::PREFIX_LENGTH
384 4
        );
385
386 4
        return $nodePrefix;
387
    }
388
389
    /**
390
     * We should be connected
391
     *
392
     * @return void
393
     * @throws ConnectionException
394
     */
395 7
    private function shouldBeConnected()
396
    {
397
        // If we lost the connection, first let's try and reconnect
398 7
        if (!$this->isConnected()) {
399
            try {
400 2
                $this->switchNodeIfNeeded();
401 2
            } catch (ConnectionException $e) {
402 2
                throw new ConnectionException('Not connected. ' . $e->getMessage());
403
            }
404
        }
405 5
    }
406
407
    /**
408
     * Create a new Node object
409
     *
410
     * @param Credentials $credentials
411
     *
412
     * @return Node An unconnected Node
413
     */
414 16
    private function createNode(Credentials $credentials)
415
    {
416 16
        $host = $credentials->getHost();
417 16
        $port = $credentials->getPort();
418 16
        $connection = $this->connectionFactory->create($host, $port);
419
420 16
        return new Node($credentials, $connection);
421
    }
422
423
    /**
424
     * Switch to the given node and map the cluster from its HELLO
425
     *
426
     * @param Node $node
427
     */
428 11
    private function switchToNode(Node $node)
429
    {
430 11
        $nodeId = $node->getId();
431
        // Return early if we're trying to switch to the current node.
432 11
        if (($this->nodeId === $nodeId)) {
433
            // But return early only if the current node is connected to Disque.
434
            // If it is disconnected, we want to overwrite it with the node
435
            // from the method argument, because that one is connected.
436 1
            if ($this->getCurrentNode()->isConnected()) {
437
                return;
438
            }
439
440
            // Copy the stats from the now-disconnected node object
441 1
            $this->copyNodeStats($this->getCurrentNode(), $node);
442 1
        }
443
444 11
        $this->resetNodeCounters();
445
446 11
        $this->nodeId = $nodeId;
447 11
        $this->nodes[$nodeId] = $node;
448 11
        $this->revealClusterFromHello($node);
449 11
    }
450
451
    /**
452
     * Reveal the whole Disque cluster from a node HELLO response
453
     *
454
     * The HELLO response from a Disque node contains addresses of all other
455
     * nodes in the cluster. We want to learn about them and save them, so that
456
     * we can switch to them later, if needed.
457
     *
458
     * @param Node $node The current node
459
     */
460 11
    private function revealClusterFromHello(Node $node)
461
    {
462 11
        $hello = $node->getHello();
463 11
        $revealedNodes = [];
464
465 11
        foreach ($hello[HelloResponse::NODES] as $node) {
466 11
            $id = $node[HelloResponse::NODE_ID];
467 11
            $revealedNode = $this->revealNodeFromHello($id, $node);
468
469
            // Update or set the node's priority as determined by Disque
470 11
            $priority = $node[HelloResponse::NODE_PRIORITY];
471 11
            $revealedNode->setPriority($priority);
472
473 11
            $revealedNodes[$id] = $revealedNode;
474 11
        }
475
476 11
        $this->nodes = $revealedNodes;
477 11
    }
478
479
    /**
480
     * Reveal a single node from a HELLO response, or use an existing node
481
     *
482
     * @param string $nodeId The node ID
483
     * @param array  $node   Node information as returned by the HELLO command
484
     *
485
     * @return Node $node A node in the current cluster
486
     */
487 11
    private function revealNodeFromHello($nodeId, array $node)
488
    {
489
        /**
490
         * Add the node prefix to the pool. We create the prefix manually
491
         * from the node ID rather than asking the Node object. Newly created
492
         * Nodes aren't connected and thus don't know their ID or prefix.
493
         *
494
         * @see Node::sayHello()
495
         */
496 11
        $prefix = substr($nodeId, Node::PREFIX_START, Node::PREFIX_LENGTH);
497 11
        $this->nodePrefixes[$prefix] = $nodeId;
498
499
        // If the node already exists in our pool, use it, don't overwrite it
500
        // with a new one. We would lose its stats and connection.
501 11
        if (isset($this->nodes[$nodeId])) {
502 11
            return $this->nodes[$nodeId];
503
        }
504
505 6
        $host = $node[HelloResponse::NODE_HOST];
506 6
        $port = $node[HelloResponse::NODE_PORT];
507 6
        $credentials = new Credentials($host, $port);
508
509 6
        $address = $credentials->getAddress();
510
        // If there are user-supplied credentials for this node, use them.
511
        // They may contain a password
512 6
        if (isset($this->credentials[$address])) {
513
            $credentials = $this->credentials[$address];
514
        }
515
516
        // Instantiate a new Node object for the newly revealed node
517 6
        return $this->createNode($credentials);
518
    }
519
520
    /**
521
     * Check if the manager held a connection to Disque already
522
     *
523
     * @return bool
524
     */
525 18
    private function wasAlreadyConnected()
526
    {
527 18
        return ( ! empty($this->nodes));
528
    }
529
530
    /**
531
     * Copy node stats from the old to the new node
532
     *
533
     * @param Node $oldNode
534
     * @param Node $newNode
535
     */
536 1
    private function copyNodeStats(Node $oldNode, Node $newNode)
537
    {
538 1
        $oldNodeJobCount = $oldNode->getTotalJobCount();
539 1
        $newNode->addJobCount($oldNodeJobCount);
540 1
    }
541
}
542