Passed
Pull Request — development (#3708)
by Martyn
15:25
created

RedisCluster::count()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 4
Code Lines 2

Duplication

Lines 0
Ratio 0 %

Importance

Changes 1
Bugs 0 Features 0
Metric Value
cc 1
eloc 2
c 1
b 0
f 0
nc 1
nop 0
dl 0
loc 4
rs 10
1
<?php
2
3
/*
4
 * This file is part of the Predis package.
5
 *
6
 * (c) 2009-2020 Daniele Alessandri
7
 * (c) 2021-2023 Till Krüss
8
 *
9
 * For the full copyright and license information, please view the LICENSE
10
 * file that was distributed with this source code.
11
 */
12
13
namespace Predis\Connection\Cluster;
14
15
use ArrayIterator;
16
use Countable;
17
use IteratorAggregate;
18
use OutOfBoundsException;
19
use Predis\ClientException;
20
use Predis\Cluster\RedisStrategy as RedisClusterStrategy;
21
use Predis\Cluster\SlotMap;
22
use Predis\Cluster\StrategyInterface;
23
use Predis\Command\CommandInterface;
24
use Predis\Command\RawCommand;
25
use Predis\Connection\ConnectionException;
26
use Predis\Connection\FactoryInterface;
27
use Predis\Connection\NodeConnectionInterface;
28
use Predis\NotSupportedException;
29
use Predis\Response\Error as ErrorResponse;
30
use Predis\Response\ErrorInterface as ErrorResponseInterface;
31
use Predis\Response\ServerException;
32
use ReturnTypeWillChange;
0 ignored issues
show
Bug introduced by
The type ReturnTypeWillChange was not found. Maybe you did not declare it correctly or list all dependencies?

The issue could also be caused by a filter entry in the build configuration. If the path has been excluded in your configuration, e.g. excluded_paths: ["lib/*"], you can move it to the dependency path list as follows:

filter:
    dependency_paths: ["lib/*"]

For further information see https://scrutinizer-ci.com/docs/tools/php/php-scrutinizer/#list-dependency-paths

Loading history...
33
use Throwable;
34
35
/**
36
 * Abstraction for a Redis-backed cluster of nodes (Redis >= 3.0.0).
37
 *
38
 * This connection backend offers smart support for redis-cluster by handling
39
 * automatic slots map (re)generation upon -MOVED or -ASK responses returned by
40
 * Redis when redirecting a client to a different node.
41
 *
42
 * The cluster can be pre-initialized using only a subset of the actual nodes in
43
 * the cluster, Predis will do the rest by adjusting the slots map and creating
44
 * the missing underlying connection instances on the fly.
45
 *
46
 * It is possible to pre-associate connections to a slots range with the "slots"
47
 * parameter in the form "$first-$last". This can greatly reduce runtime node
48
 * guessing and redirections.
49
 *
50
 * It is also possible to ask for the full and updated slots map directly to one
51
 * of the nodes and optionally enable such a behaviour upon -MOVED redirections.
52
 * Asking for the cluster configuration to Redis is actually done by issuing a
53
 * CLUSTER SLOTS command to a random node in the pool.
54
 */
55
class RedisCluster implements ClusterInterface, IteratorAggregate, Countable
56
{
57
    private $useClusterSlots = true;
58
    private $pool = [];
59
    private $slots = [];
60
    private $slotmap;
61
    private $strategy;
62
    private $connections;
63
    private $retryLimit = 5;
64
    private $retryInterval = 10;
65
66
    /**
67
     * @param FactoryInterface  $connections Optional connection factory.
68
     * @param StrategyInterface $strategy    Optional cluster strategy.
69
     */
70
    public function __construct(
71
        FactoryInterface $connections,
72
        StrategyInterface $strategy = null
73
    ) {
74
        $this->connections = $connections;
75
        $this->strategy = $strategy ?: new RedisClusterStrategy();
76
        $this->slotmap = new SlotMap();
77
    }
78
79
    /**
80
     * Sets the maximum number of retries for commands upon server failure.
81
     *
82
     * -1 = unlimited retry attempts
83
     *  0 = no retry attempts (fails immediately)
84
     *  n = fail only after n retry attempts
85
     *
86
     * @param int $retry Number of retry attempts.
87
     */
88
    public function setRetryLimit($retry)
89
    {
90
        $this->retryLimit = (int) $retry;
91
    }
92
93
    /**
94
     * Sets the initial retry interval (milliseconds).
95
     *
96
     * @param int $retryInterval Milliseconds between retries.
97
     */
98
    public function setRetryInterval($retryInterval)
99
    {
100
        $this->retryInterval = (int) $retryInterval;
101
    }
102
103
    /**
104
     * Returns the retry interval (milliseconds).
105
     *
106
     * @return int Milliseconds between retries.
107
     */
108
    public function getRetryInterval()
109
    {
110
        return (int) $this->retryInterval;
111
    }
112
113
    /**
114
     * {@inheritdoc}
115
     */
116
    public function isConnected()
117
    {
118
        foreach ($this->pool as $connection) {
119
            if ($connection->isConnected()) {
120
                return true;
121
            }
122
        }
123
124
        return false;
125
    }
126
127
    /**
128
     * {@inheritdoc}
129
     */
130
    public function connect()
131
    {
132
        if ($connection = $this->getRandomConnection()) {
133
            $connection->connect();
134
        }
135
    }
136
137
    /**
138
     * {@inheritdoc}
139
     */
140
    public function disconnect()
141
    {
142
        foreach ($this->pool as $connection) {
143
            $connection->disconnect();
144
        }
145
    }
146
147
    /**
148
     * {@inheritdoc}
149
     */
150
    public function add(NodeConnectionInterface $connection)
151
    {
152
        $this->pool[(string) $connection] = $connection;
153
        $this->slotmap->reset();
154
    }
155
156
    /**
157
     * {@inheritdoc}
158
     */
159
    public function remove(NodeConnectionInterface $connection)
160
    {
161
        if (false !== $id = array_search($connection, $this->pool, true)) {
162
            $this->slotmap->reset();
163
            $this->slots = array_diff($this->slots, [$connection]);
164
            unset($this->pool[$id]);
165
166
            return true;
167
        }
168
169
        return false;
170
    }
171
172
    /**
173
     * Removes a connection instance by using its identifier.
174
     *
175
     * @param string $connectionID Connection identifier.
176
     *
177
     * @return bool True if the connection was in the pool.
178
     */
179
    public function removeById($connectionID)
180
    {
181
        if (isset($this->pool[$connectionID])) {
182
            $this->slotmap->reset();
183
            $this->slots = array_diff($this->slots, [$connectionID]);
184
            unset($this->pool[$connectionID]);
185
186
            return true;
187
        }
188
189
        return false;
190
    }
191
192
    /**
193
     * Generates the current slots map by guessing the cluster configuration out
194
     * of the connection parameters of the connections in the pool.
195
     *
196
     * Generation is based on the same algorithm used by Redis to generate the
197
     * cluster, so it is most effective when all of the connections supplied on
198
     * initialization have the "slots" parameter properly set accordingly to the
199
     * current cluster configuration.
200
     */
201
    public function buildSlotMap()
202
    {
203
        $this->slotmap->reset();
204
205
        foreach ($this->pool as $connectionID => $connection) {
206
            $parameters = $connection->getParameters();
207
208
            if (!isset($parameters->slots)) {
209
                continue;
210
            }
211
212
            foreach (explode(',', $parameters->slots) as $slotRange) {
213
                $slots = explode('-', $slotRange, 2);
214
215
                if (!isset($slots[1])) {
216
                    $slots[1] = $slots[0];
217
                }
218
219
                $this->slotmap->setSlots($slots[0], $slots[1], $connectionID);
220
            }
221
        }
222
    }
223
224
    /**
225
     * Queries the specified node of the cluster to fetch the updated slots map.
226
     *
227
     * When the connection fails, this method tries to execute the same command
228
     * on a different connection picked at random from the pool of known nodes,
229
     * up until the retry limit is reached.
230
     *
231
     * @param NodeConnectionInterface $connection Connection to a node of the cluster.
232
     *
233
     * @return mixed
234
     */
235
    private function queryClusterNodeForSlotMap(NodeConnectionInterface $connection)
236
    {
237
        $retries = 0;
238
        $retryAfter = $this->retryInterval;
239
        $command = RawCommand::create('CLUSTER', 'SLOTS');
240
241
        while ($retries <= $this->retryLimit) {
242
            try {
243
                $response = $connection->executeCommand($command);
244
                break;
245
            } catch (ConnectionException $exception) {
246
                $connection = $exception->getConnection();
247
                $connection->disconnect();
248
249
                $this->remove($connection);
250
251
                if ($retries === $this->retryLimit) {
252
                    throw $exception;
253
                }
254
255
                if (!$connection = $this->getRandomConnection()) {
256
                    throw new ClientException('No connections left in the pool for `CLUSTER SLOTS`');
257
                }
258
259
                usleep($retryAfter * 1000);
260
                $retryAfter = $retryAfter * 2;
261
                ++$retries;
262
            }
263
        }
264
265
        return $response;
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable $response does not seem to be defined for all execution paths leading up to this point.
Loading history...
266
    }
267
268
    /**
269
     * Generates an updated slots map fetching the cluster configuration using
270
     * the CLUSTER SLOTS command against the specified node or a random one from
271
     * the pool.
272
     *
273
     * @param NodeConnectionInterface $connection Optional connection instance.
274
     */
275
    public function askSlotMap(NodeConnectionInterface $connection = null)
276
    {
277
        if (!$connection && !$connection = $this->getRandomConnection()) {
278
            return;
279
        }
280
281
        $this->slotmap->reset();
282
283
        $response = $this->queryClusterNodeForSlotMap($connection);
284
285
        foreach ($response as $slots) {
286
            // We only support master servers for now, so we ignore subsequent
287
            // elements in the $slots array identifying slaves.
288
            [$start, $end, $master] = $slots;
289
290
            if ($master[0] === '') {
291
                $this->slotmap->setSlots($start, $end, (string) $connection);
292
            } else {
293
                $this->slotmap->setSlots($start, $end, "{$master[0]}:{$master[1]}");
294
            }
295
        }
296
    }
297
298
    /**
299
     * Guesses the correct node associated to a given slot using a precalculated
300
     * slots map, falling back to the same logic used by Redis to initialize a
301
     * cluster (best-effort).
302
     *
303
     * @param int $slot Slot index.
304
     *
305
     * @return string Connection ID.
306
     */
307
    protected function guessNode($slot)
308
    {
309
        if (!$this->pool) {
0 ignored issues
show
Bug Best Practice introduced by
The expression $this->pool of type array is implicitly converted to a boolean; are you sure this is intended? If so, consider using empty($expr) instead to make it clear that you intend to check for an array without elements.

This check marks implicit conversions of arrays to boolean values in a comparison. While in PHP an empty array is considered to be equal (but not identical) to false, this is not always apparent.

Consider making the comparison explicit by using empty(..) or ! empty(...) instead.

Loading history...
310
            throw new ClientException('No connections available in the pool');
311
        }
312
313
        if ($this->slotmap->isEmpty()) {
314
            $this->buildSlotMap();
315
        }
316
317
        if ($node = $this->slotmap[$slot]) {
318
            return $node;
319
        }
320
321
        $count = count($this->pool);
322
        $index = min((int) ($slot / (int) (16384 / $count)), $count - 1);
323
        $nodes = array_keys($this->pool);
324
325
        return $nodes[$index];
326
    }
327
328
    /**
329
     * Creates a new connection instance from the given connection ID.
330
     *
331
     * @param string $connectionID Identifier for the connection.
332
     *
333
     * @return NodeConnectionInterface
334
     */
335
    protected function createConnection($connectionID)
336
    {
337
        $separator = strrpos($connectionID, ':');
338
339
        return $this->connections->create([
340
            'host' => substr($connectionID, 0, $separator),
341
            'port' => substr($connectionID, $separator + 1),
342
        ]);
343
    }
344
345
    /**
346
     * {@inheritdoc}
347
     */
348
    public function getConnectionByCommand(CommandInterface $command)
349
    {
350
        $slot = $this->strategy->getSlot($command);
351
352
        if (!isset($slot)) {
353
            throw new NotSupportedException(
354
                "Cannot use '{$command->getId()}' with redis-cluster."
355
            );
356
        }
357
358
        if (isset($this->slots[$slot])) {
359
            return $this->slots[$slot];
360
        } else {
361
            return $this->getConnectionBySlot($slot);
362
        }
363
    }
364
365
    /**
366
     * Returns the connection currently associated to a given slot.
367
     *
368
     * @param int $slot Slot index.
369
     *
370
     * @return NodeConnectionInterface
371
     * @throws OutOfBoundsException
372
     */
373
    public function getConnectionBySlot($slot)
374
    {
375
        if (!SlotMap::isValid($slot)) {
376
            throw new OutOfBoundsException("Invalid slot [$slot].");
377
        }
378
379
        if (isset($this->slots[$slot])) {
380
            return $this->slots[$slot];
381
        }
382
383
        $connectionID = $this->guessNode($slot);
384
385
        if (!$connection = $this->getConnectionById($connectionID)) {
386
            $connection = $this->createConnection($connectionID);
387
            $this->pool[$connectionID] = $connection;
388
        }
389
390
        return $this->slots[$slot] = $connection;
391
    }
392
393
    /**
394
     * {@inheritdoc}
395
     */
396
    public function getConnectionById($connectionID)
397
    {
398
        return $this->pool[$connectionID] ?? null;
399
    }
400
401
    /**
402
     * Returns a random connection from the pool.
403
     *
404
     * @return NodeConnectionInterface|null
405
     */
406
    protected function getRandomConnection()
407
    {
408
        if (!$this->pool) {
0 ignored issues
show
Bug Best Practice introduced by
The expression $this->pool of type array is implicitly converted to a boolean; are you sure this is intended? If so, consider using empty($expr) instead to make it clear that you intend to check for an array without elements.

This check marks implicit conversions of arrays to boolean values in a comparison. While in PHP an empty array is considered to be equal (but not identical) to false, this is not always apparent.

Consider making the comparison explicit by using empty(..) or ! empty(...) instead.

Loading history...
409
            return null;
410
        }
411
412
        return $this->pool[array_rand($this->pool)];
413
    }
414
415
    /**
416
     * Permanently associates the connection instance to a new slot.
417
     * The connection is added to the connections pool if not yet included.
418
     *
419
     * @param NodeConnectionInterface $connection Connection instance.
420
     * @param int                     $slot       Target slot index.
421
     */
422
    protected function move(NodeConnectionInterface $connection, $slot)
423
    {
424
        $this->pool[(string) $connection] = $connection;
425
        $this->slots[(int) $slot] = $connection;
426
        $this->slotmap[(int) $slot] = $connection;
427
    }
428
429
    /**
430
     * Handles -ERR responses returned by Redis.
431
     *
432
     * @param CommandInterface       $command Command that generated the -ERR response.
433
     * @param ErrorResponseInterface $error   Redis error response object.
434
     *
435
     * @return mixed
436
     */
437
    protected function onErrorResponse(CommandInterface $command, ErrorResponseInterface $error)
438
    {
439
        $details = explode(' ', $error->getMessage(), 2);
440
441
        switch ($details[0]) {
442
            case 'MOVED':
443
                return $this->onMovedResponse($command, $details[1]);
444
445
            case 'ASK':
446
                return $this->onAskResponse($command, $details[1]);
447
448
            default:
449
                return $error;
450
        }
451
    }
452
453
    /**
454
     * Handles -MOVED responses by executing again the command against the node
455
     * indicated by the Redis response.
456
     *
457
     * @param CommandInterface $command Command that generated the -MOVED response.
458
     * @param string           $details Parameters of the -MOVED response.
459
     *
460
     * @return mixed
461
     */
462
    protected function onMovedResponse(CommandInterface $command, $details)
463
    {
464
        [$slot, $connectionID] = explode(' ', $details, 2);
465
466
        if (!$connection = $this->getConnectionById($connectionID)) {
467
            $connection = $this->createConnection($connectionID);
468
        }
469
470
        if ($this->useClusterSlots) {
471
            $this->askSlotMap($connection);
472
        }
473
474
        $this->move($connection, $slot);
0 ignored issues
show
Bug introduced by
$slot of type string is incompatible with the type integer expected by parameter $slot of Predis\Connection\Cluster\RedisCluster::move(). ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-type  annotation

474
        $this->move($connection, /** @scrutinizer ignore-type */ $slot);
Loading history...
475
476
        return $this->executeCommand($command);
477
    }
478
479
    /**
480
     * Handles -ASK responses by executing again the command against the node
481
     * indicated by the Redis response.
482
     *
483
     * @param CommandInterface $command Command that generated the -ASK response.
484
     * @param string           $details Parameters of the -ASK response.
485
     *
486
     * @return mixed
487
     */
488
    protected function onAskResponse(CommandInterface $command, $details)
489
    {
490
        [$slot, $connectionID] = explode(' ', $details, 2);
491
492
        if (!$connection = $this->getConnectionById($connectionID)) {
493
            $connection = $this->createConnection($connectionID);
494
        }
495
496
        $connection->executeCommand(RawCommand::create('ASKING'));
497
498
        return $connection->executeCommand($command);
499
    }
500
501
    /**
502
     * Ensures that a command is executed one more time on connection failure.
503
     *
504
     * The connection to the node that generated the error is evicted from the
505
     * pool before trying to fetch an updated slots map from another node. If
506
     * the new slots map points to an unreachable server the client gives up and
507
     * throws the exception as the nodes participating in the cluster may still
508
     * have to agree that something changed in the configuration of the cluster.
509
     *
510
     * @param CommandInterface $command Command instance.
511
     * @param string           $method  Actual method.
512
     *
513
     * @return mixed
514
     */
515
    private function retryCommandOnFailure(CommandInterface $command, $method)
516
    {
517
        $retries = 0;
518
        $retryAfter = $this->retryInterval;
519
520
        while ($retries <= $this->retryLimit) {
521
            try {
522
                $response = $this->getConnectionByCommand($command)->$method($command);
523
524
                if ($response instanceof ErrorResponse) {
525
                    $message = $response->getMessage();
526
527
                    if (strpos($message, 'CLUSTERDOWN') !== false) {
528
                        throw new ServerException($message);
529
                    }
530
                }
531
532
                break;
533
            } catch (Throwable $exception) {
534
                usleep($retryAfter * 1000);
535
                $retryAfter = $retryAfter * 2;
536
537
                if ($exception instanceof ConnectionException) {
538
                    $connection = $exception->getConnection();
539
540
                    if ($connection) {
541
                        $connection->disconnect();
542
                        $this->remove($connection);
543
                    }
544
                }
545
546
                if ($retries === $this->retryLimit) {
547
                    throw $exception;
548
                }
549
550
                if ($this->useClusterSlots) {
551
                    $this->askSlotMap();
552
                }
553
554
                ++$retries;
555
            }
556
        }
557
558
        return $response;
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable $response does not seem to be defined for all execution paths leading up to this point.
Loading history...
559
    }
560
561
    /**
562
     * {@inheritdoc}
563
     */
564
    public function writeRequest(CommandInterface $command)
565
    {
566
        $this->retryCommandOnFailure($command, __FUNCTION__);
567
    }
568
569
    /**
570
     * {@inheritdoc}
571
     */
572
    public function readResponse(CommandInterface $command)
573
    {
574
        return $this->retryCommandOnFailure($command, __FUNCTION__);
575
    }
576
577
    /**
578
     * {@inheritdoc}
579
     */
580
    public function executeCommand(CommandInterface $command)
581
    {
582
        $response = $this->retryCommandOnFailure($command, __FUNCTION__);
583
584
        if ($response instanceof ErrorResponseInterface) {
585
            return $this->onErrorResponse($command, $response);
586
        }
587
588
        return $response;
589
    }
590
591
    /**
592
     * {@inheritdoc}
593
     */
594
    #[ReturnTypeWillChange]
595
    public function count()
596
    {
597
        return count($this->pool);
598
    }
599
600
    /**
601
     * {@inheritdoc}
602
     */
603
    #[ReturnTypeWillChange]
604
    public function getIterator()
605
    {
606
        if ($this->slotmap->isEmpty()) {
607
            $this->useClusterSlots ? $this->askSlotMap() : $this->buildSlotMap();
608
        }
609
610
        $connections = [];
611
612
        foreach ($this->slotmap->getNodes() as $node) {
613
            if (!$connection = $this->getConnectionById($node)) {
614
                $this->add($connection = $this->createConnection($node));
615
            }
616
617
            $connections[] = $connection;
618
        }
619
620
        return new ArrayIterator($connections);
621
    }
622
623
    /**
624
     * Returns the underlying slot map.
625
     *
626
     * @return SlotMap
627
     */
628
    public function getSlotMap()
629
    {
630
        return $this->slotmap;
631
    }
632
633
    /**
634
     * Returns the underlying command hash strategy used to hash commands by
635
     * using keys found in their arguments.
636
     *
637
     * @return StrategyInterface
638
     */
639
    public function getClusterStrategy()
640
    {
641
        return $this->strategy;
642
    }
643
644
    /**
645
     * Returns the underlying connection factory used to create new connection
646
     * instances to Redis nodes indicated by redis-cluster.
647
     *
648
     * @return FactoryInterface
649
     */
650
    public function getConnectionFactory()
651
    {
652
        return $this->connections;
653
    }
654
655
    /**
656
     * Enables automatic fetching of the current slots map from one of the nodes
657
     * using the CLUSTER SLOTS command. This option is enabled by default as
658
     * asking the current slots map to Redis upon -MOVED responses may reduce
659
     * overhead by eliminating the trial-and-error nature of the node guessing
660
     * procedure, mostly when targeting many keys that would end up in a lot of
661
     * redirections.
662
     *
663
     * The slots map can still be manually fetched using the askSlotMap()
664
     * method whether or not this option is enabled.
665
     *
666
     * @param bool $value Enable or disable the use of CLUSTER SLOTS.
667
     */
668
    public function useClusterSlots($value)
669
    {
670
        $this->useClusterSlots = (bool) $value;
671
    }
672
}
673