Completed
Push — master ( 4a282c...f5ece7 )
by Mariano
03:58 queued 01:18
created

Manager::revealNodeFromHello()   B

Complexity

Conditions 3
Paths 3

Size

Total Lines 32
Code Lines 12

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 11
CRAP Score 3.0327

Importance

Changes 1
Bugs 0 Features 0
Metric Value
c 1
b 0
f 0
dl 0
loc 32
ccs 11
cts 13
cp 0.8462
rs 8.8571
cc 3
eloc 12
nc 3
nop 2
crap 3.0327
1
<?php
2
namespace Disque\Connection;
3
4
use Disque\Connection\Node\Node;
5
use Disque\Connection\Node\NodePrioritizerInterface;
6
use Disque\Connection\Node\ConservativeJobCountPrioritizer;
7
use Disque\Command\CommandInterface;
8
use Disque\Command\GetJob;
9
use Disque\Command\Response\HelloResponse;
10
use Disque\Command\Response\JobsResponse;
11
use Disque\Connection\Factory\ConnectionFactoryInterface;
12
use Disque\Connection\Factory\SocketFactory;
13
14
/**
15
 * The connection manager connects to Disque nodes and chooses the best of them
16
 *
17
 * If there are multiple nodes to connect, the first connection is always
18
 * random. The manager then switches to the best nodes according to its
19
 * NodePriority strategy.
20
 *
21
 * If the manager knows the credentials of only one node, it will automatically
22
 * discover other nodes in the cluster and connect to them if needed (unless
23
 * they are password-protected).
24
 */
25
class Manager implements ManagerInterface
26
{
27
    /**
28
     * Servers we can connect to initially, without knowing the cluster
29
     *
30
     * After connecting to one, the server returns a list of other nodes
31
     * in the cluster so we can connect to them automatically, unless
32
     * the discovered nodes are secured with a password.
33
     *
34
     * 'serverAddress' => Credentials
35
     *
36
     * @var Credentials[]
37
     */
38
    protected $credentials = [];
39
40
    /**
41
     * A strategy to prioritize nodes and find the best one to switch to
42
     *
43
     * The default strategy is the ConservativeJobCountPrioritizer. It
44
     * prioritizes nodes by their job count, but prefers the current node
45
     * in order to avoid switching until there is a clearly better node.
46
     *
47
     * @var NodePrioritizerInterface
48
     */
49
    protected $priorityStrategy;
50
51
    /**
52
     * List of nodes, ie Disque instances available in the cluster
53
     *
54
     * 'nodeId' => Node
55
     *
56
     * @var Node[]
57
     */
58
    protected $nodes = [];
59
60
    /**
61
     * Node prefixes and their corresponding node ID
62
     *
63
     * Node prefix consists of the first 8 bytes from the node ID. Because job
64
     * IDs contain the node prefix, it can be used to identify on which node
65
     * a job lives.
66
     *
67
     * 'nodePrefix' => 'nodeId'
68
     *
69
     * @var array
70
     */
71
    protected $nodePrefixes = [];
72
73
    /**
74
     * The ID of the node we are currently connected to
75
     *
76
     * @var string
77
     */
78
    protected $nodeId;
79
80
    /**
81
     * @var ConnectionFactoryInterface
82
     */
83
    private $connectionFactory;
84
85 39
    public function __construct()
86
    {
87 39
        $this->connectionFactory = new SocketFactory();
88 39
        $this->priorityStrategy = new ConservativeJobCountPrioritizer();
89 39
    }
90
91
    /**
92
     * @inheritdoc
93
     */
94 2
    public function getConnectionFactory()
95
    {
96 2
        return $this->connectionFactory;
97
    }
98
99
    /**
100
     * @inheritdoc
101
     */
102 15
    public function setConnectionFactory(
103
        ConnectionFactoryInterface $connectionFactory
104
    ) {
105 15
        $this->connectionFactory = $connectionFactory;
106 15
    }
107
108
    /**
109
     * @inheritdoc
110
     */
111 5
    public function getCredentials()
112
    {
113 5
        return $this->credentials;
114
    }
115
116
    /**
117
     * @inheritdoc
118
     */
119 17
    public function addServer(Credentials $credentials)
120
    {
121 17
        $address = $credentials->getAddress();
122 17
        $this->credentials[$address] = $credentials;
123 17
    }
124
125
    /**
126
     * @inheritdoc
127
     */
128 1
    public function getPriorityStrategy()
129
    {
130 1
        return $this->priorityStrategy;
131
    }
132
133
    /**
134
     * @inheritdoc
135
     */
136 5
    public function setPriorityStrategy(NodePrioritizerInterface $priorityStrategy)
137
    {
138 5
        $this->priorityStrategy = $priorityStrategy;
139 5
    }
140
141
    /**
142
     * @inheritdoc
143
     */
144 7
    public function isConnected()
145
    {
146
        return (
147 7
            isset($this->nodeId) &&
148 5
            $this->nodes[$this->nodeId]->getConnection()->isConnected()
149 7
        );
150
    }
151
152
    /**
153
     * @inheritdoc
154
     */
155 16
    public function connect()
156
    {
157 16
        $currentNode = $this->findAvailableConnection();
158 9
        $this->switchToNode($currentNode);
159 9
        return $currentNode;
160
    }
161
162
    /**
163
     * @inheritdoc
164
     */
165 7
    public function execute(CommandInterface $command)
166
    {
167 7
        $this->shouldBeConnected();
168 5
        $command = $this->preprocessExecution($command);
169
170 5
        $currentNodeConnection = $this->nodes[$this->nodeId]->getConnection();
171 5
        $response = $currentNodeConnection->execute($command);
172
173 5
        $response = $this->postprocessExecution($command, $response);
174 5
        return $response;
175
    }
176
177
    /**
178
     * @inheritdoc
179
     */
180 4
    public function getCurrentNode()
181
    {
182 4
        return $this->nodes[$this->nodeId];
183
    }
184
185
    /**
186
     * Get a functional connection to any known node
187
     *
188
     * Disque suggests the first connection should be chosen randomly
189
     * We go through the user-supplied credentials randomly and try to connect.
190
     *
191
     * @return Node A connected node
192
     *
193
     * @throws ConnectionException
194
     */
195 16
    protected function findAvailableConnection()
196
    {
197 16
        $servers = $this->credentials;
198 16
        shuffle($servers);
199 16
        foreach ($servers as $server) {
200
            try {
201 14
                $node = $this->getNodeConnection($server);
202 14
            } catch (ConnectionException $e) {
203 6
                continue;
204
            }
205
206 9
            if ($node->getConnection()->isConnected()) {
207 9
                return $node;
208
            }
209 7
        }
210
211 7
        throw new ConnectionException('No servers available');
212
    }
213
214
    /**
215
     * Connect to the node given in the credentials
216
     *
217
     * @param Credentials $server
218
     *
219
     * @return Node A connected node
220
     *
221
     * @throws ConnectionException
222
     * @throws AuthenticationException
223
     */
224 14
    protected function getNodeConnection(Credentials $server)
225
    {
226 14
        $node = $this->createNode($server);
227 14
        $node->connect();
228 9
        return $node;
229
    }
230
231
    /**
232
     * Reset node counters that should be reset upon node switch
233
     */
234 9
    protected function resetNodeCounters()
235
    {
236 9
        foreach($this->nodes as $node) {
237 1
            $node->resetJobCount();
238 9
        }
239 9
    }
240
241
    /**
242
     * Hook into the command execution and do anything before it's executed
243
     *
244
     * Eg. start measuring node latency etc.
245
     *
246
     * @param CommandInterface $command
247
     *
248
     * @return CommandInterface $command
249
     */
250 5
    protected function preprocessExecution(CommandInterface $command)
251
    {
252 5
        return $command;
253
    }
254
255
    /**
256
     * Postprocess the command execution, eg. update node stats
257
     *
258
     * @param CommandInterface $command
259
     * @param mixed            $response
260
     *
261
     * @return mixed
262
     * @throws ConnectionException
263
     */
264 5
    protected function postprocessExecution(
265
        CommandInterface $command,
266
        $response
267
    ) {
268 5
        if ($command instanceof GetJob) {
269 4
            $this->updateNodeStats($command->parse($response));
270 4
            $this->switchNodeIfNeeded();
271 4
        }
272
273 5
        return $response;
274
    }
275
276
    /**
277
     * Update node counters indicating how many jobs the node has produced
278
     *
279
     * @param array $jobs Jobs
280
     */
281 4
    protected function updateNodeStats(array $jobs)
282
    {
283 4
        foreach ($jobs as $job) {
284 4
            $jobId = $job[JobsResponse::KEY_ID];
285 4
            $nodeId = $this->getNodeIdFromJobId($jobId);
286 4
            if (!isset($nodeId) || !isset($this->nodes[$nodeId])) {
287 1
                continue;
288
            }
289
290 4
            $node = $this->nodes[$nodeId];
291 4
            $node->addJobCount(1);
292 4
        }
293 4
    }
294
295
    /**
296
     * Decide if we should switch to a better node
297
     *
298
     * @throws ConnectionException
299
     */
300 4
    private function switchNodeIfNeeded()
301
    {
302 4
        $sortedNodes = $this->priorityStrategy->sort(
303 4
            $this->nodes,
304 4
            $this->nodeId
305 4
        );
306
307
        // Try to connect by priority, continue on error, return on success
308 4
        foreach($sortedNodes as $nodeCandidate) {
309 4
            if ($nodeCandidate->getId() === $this->nodeId) {
310 4
                return;
311
            }
312
313
            try {
314 2
                if ($nodeCandidate->getConnection()->isConnected()) {
315
                    // Say a new HELLO to the node, the cluster might have changed
316
                    $nodeCandidate->sayHello();
317
                } else {
318 2
                    $nodeCandidate->connect();
319
                }
320 2
            } catch (ConnectionException $e) {
321 1
                continue;
322
            }
323
324 1
            $this->switchToNode($nodeCandidate);
325 1
            return;
326
        }
327
328
        throw new ConnectionException('Could not switch to any node');
329
    }
330
331
    /**
332
     * Get a node ID based off a Job ID
333
     *
334
     * @param string       $jobId Job ID
335
     * @return string|null        Node ID
336
     */
337 4
    private function getNodeIdFromJobId($jobId)
338
    {
339 4
        $nodePrefix = $this->getNodePrefixFromJobId($jobId);
340
        if (
341 4
            isset($this->nodePrefixes[$nodePrefix]) &&
342 4
            array_key_exists($this->nodePrefixes[$nodePrefix], $this->nodes)
343 4
        ) {
344 4
            return $this->nodePrefixes[$nodePrefix];
345
        }
346
347 1
        return null;
348
    }
349
350
    /**
351
     * Get the node prefix from the job ID
352
     *
353
     * @param string  $jobId
354
     * @return string        Node prefix
355
     */
356 4
    private function getNodePrefixFromJobId($jobId)
357
    {
358 4
        $nodePrefix = substr(
359 4
            $jobId,
360 4
            JobsResponse::ID_NODE_PREFIX_START,
361
            Node::PREFIX_LENGTH
362 4
        );
363
364 4
        return $nodePrefix;
365
    }
366
367
    /**
368
     * We should be connected
369
     *
370
     * @return void
371
     * @throws ConnectionException
372
     */
373 7
    private function shouldBeConnected()
374
    {
375 7
        if (!$this->isConnected()) {
376 2
            throw new ConnectionException('Not connected');
377
        }
378 5
    }
379
380
    /**
381
     * Create a new Node object
382
     *
383
     * @param Credentials $credentials
384
     *
385
     * @return Node An unconnected Node
386
     */
387 14
    private function createNode(Credentials $credentials)
388
    {
389 14
        $host = $credentials->getHost();
390 14
        $port = $credentials->getPort();
391 14
        $connection = $this->connectionFactory->create($host, $port);
392
393 14
        return new Node($credentials, $connection);
394
    }
395
396
    /**
397
     * Switch to the given node and map the cluster from its HELLO
398
     *
399
     * @param Node $node
400
     */
401 9
    private function switchToNode(Node $node)
402
    {
403 9
        $nodeId = $node->getId();
404 9
        if ($this->nodeId === $nodeId) {
405
            return;
406
        }
407
408 9
        $this->resetNodeCounters();
409
410 9
        $this->nodeId = $nodeId;
411 9
        $this->nodes[$nodeId] = $node;
412 9
        $this->revealClusterFromHello($node);
413 9
    }
414
415
    /**
416
     * Reveal the whole Disque cluster from a node HELLO response
417
     *
418
     * The HELLO response from a Disque node contains addresses of all other
419
     * nodes in the cluster. We want to learn about them and save them, so that
420
     * we can switch to them later, if needed.
421
     *
422
     * @param Node $node The current node
423
     */
424 9
    private function revealClusterFromHello(Node $node)
425
    {
426 9
        $hello = $node->getHello();
427 9
        $revealedNodes = [];
428
429 9
        foreach ($hello[HelloResponse::NODES] as $node) {
430 9
            $id = $node[HelloResponse::NODE_ID];
431 9
            $revealedNode = $this->revealNodeFromHello($id, $node);
432
433
            // Update or set the node's priority as determined by Disque
434 9
            $priority = $node[HelloResponse::NODE_PRIORITY];
435 9
            $revealedNode->setPriority($priority);
436
437 9
            $revealedNodes[$id] = $revealedNode;
438 9
        }
439
440 9
        $this->nodes = $revealedNodes;
441 9
    }
442
443
    /**
444
     * Reveal a single node from a HELLO response, or use an existing node
445
     *
446
     * @param string $nodeId The node ID
447
     * @param array  $node   Node information as returned by the HELLO command
448
     *
449
     * @return Node $node A node in the current cluster
450
     */
451 9
    private function revealNodeFromHello($nodeId, array $node)
452
    {
453
        /**
454
         * Add the node prefix to the pool. We create the prefix manually
455
         * from the node ID rather than asking the Node object. Newly created
456
         * Nodes aren't connected and thus don't know their ID or prefix.
457
         *
458
         * @see Node::sayHello()
459
         */
460 9
        $prefix = substr($nodeId, Node::PREFIX_START, Node::PREFIX_LENGTH);
461 9
        $this->nodePrefixes[$prefix] = $nodeId;
462
463
        // If the node already exists in our pool, use it, don't overwrite it
464
        // with a new one. We would lose its stats and connection.
465 9
        if (isset($this->nodes[$nodeId])) {
466 9
            return $this->nodes[$nodeId];
467
        }
468
469 4
        $host = $node[HelloResponse::NODE_HOST];
470 4
        $port = $node[HelloResponse::NODE_PORT];
471 4
        $credentials = new Credentials($host, $port);
472
473 4
        $address = $credentials->getAddress();
474
        // If there are user-supplied credentials for this node, use them.
475
        // They may contain a password
476 4
        if (isset($this->credentials[$address])) {
477
            $credentials = $this->credentials[$address];
478
        }
479
480
        // Instantiate a new Node object for the newly revealed node
481 4
        return $this->createNode($credentials);
482
    }
483
}