Passed
Pull Request — development (#3708)
by Martyn
15:25
created

SentinelReplication::writeRequest()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 3
Code Lines 1

Duplication

Lines 0
Ratio 0 %

Importance

Changes 1
Bugs 0 Features 0
Metric Value
cc 1
eloc 1
c 1
b 0
f 0
nc 1
nop 1
dl 0
loc 3
rs 10
1
<?php
2
3
/*
4
 * This file is part of the Predis package.
5
 *
6
 * (c) 2009-2020 Daniele Alessandri
7
 * (c) 2021-2023 Till Krüss
8
 *
9
 * For the full copyright and license information, please view the LICENSE
10
 * file that was distributed with this source code.
11
 */
12
13
namespace Predis\Connection\Replication;
14
15
use InvalidArgumentException;
16
use Predis\Command\CommandInterface;
17
use Predis\Command\RawCommand;
18
use Predis\CommunicationException;
19
use Predis\Connection\ConnectionException;
20
use Predis\Connection\FactoryInterface as ConnectionFactoryInterface;
21
use Predis\Connection\NodeConnectionInterface;
22
use Predis\Connection\Parameters;
23
use Predis\Replication\ReplicationStrategy;
24
use Predis\Replication\RoleException;
25
use Predis\Response\Error;
26
use Predis\Response\ErrorInterface as ErrorResponseInterface;
27
use Predis\Response\ServerException;
28
29
/**
30
 * @author Daniele Alessandri <[email protected]>
31
 * @author Ville Mattila <[email protected]>
32
 */
33
class SentinelReplication implements ReplicationInterface
34
{
35
    /**
36
     * @var NodeConnectionInterface
37
     */
38
    protected $master;
39
40
    /**
41
     * @var NodeConnectionInterface[]
42
     */
43
    protected $slaves = [];
44
45
    /**
46
     * @var NodeConnectionInterface[]
47
     */
48
    protected $pool = [];
49
50
    /**
51
     * @var NodeConnectionInterface
52
     */
53
    protected $current;
54
55
    /**
56
     * @var string
57
     */
58
    protected $service;
59
60
    /**
61
     * @var ConnectionFactoryInterface
62
     */
63
    protected $connectionFactory;
64
65
    /**
66
     * @var ReplicationStrategy
67
     */
68
    protected $strategy;
69
70
    /**
71
     * @var NodeConnectionInterface[]
72
     */
73
    protected $sentinels = [];
74
75
    /**
76
     * @var int
77
     */
78
    protected $sentinelIndex = 0;
79
80
    /**
81
     * @var NodeConnectionInterface
82
     */
83
    protected $sentinelConnection;
84
85
    /**
86
     * @var float
87
     */
88
    protected $sentinelTimeout = 0.100;
89
90
    /**
91
     * Max number of automatic retries of commands upon server failure.
92
     *
93
     * -1 = unlimited retry attempts
94
     *  0 = no retry attempts (fails immediately)
95
     *  n = fail only after n retry attempts
96
     *
97
     * @var int
98
     */
99
    protected $retryLimit = 20;
100
101
    /**
102
     * Time to wait in milliseconds before fetching a new configuration from one
103
     * of the sentinel servers.
104
     *
105
     * @var int
106
     */
107
    protected $retryWait = 1000;
108
109
    /**
110
     * Flag for automatic fetching of available sentinels.
111
     *
112
     * @var bool
113
     */
114
    protected $updateSentinels = false;
115
116
    /**
117
     * @param string                     $service           Name of the service for autodiscovery.
118
     * @param array                      $sentinels         Sentinel servers connection parameters.
119
     * @param ConnectionFactoryInterface $connectionFactory Connection factory instance.
120
     * @param ReplicationStrategy        $strategy          Replication strategy instance.
121
     */
122
    public function __construct(
123
        $service,
124
        array $sentinels,
125
        ConnectionFactoryInterface $connectionFactory,
126
        ReplicationStrategy $strategy = null
127
    ) {
128
        $this->sentinels = $sentinels;
129
        $this->service = $service;
130
        $this->connectionFactory = $connectionFactory;
131
        $this->strategy = $strategy ?: new ReplicationStrategy();
132
    }
133
134
    /**
135
     * Sets a default timeout for connections to sentinels.
136
     *
137
     * When "timeout" is present in the connection parameters of sentinels, its
138
     * value overrides the default sentinel timeout.
139
     *
140
     * @param float $timeout Timeout value.
141
     */
142
    public function setSentinelTimeout($timeout)
143
    {
144
        $this->sentinelTimeout = (float) $timeout;
145
    }
146
147
    /**
148
     * Sets the maximum number of retries for commands upon server failure.
149
     *
150
     * -1 = unlimited retry attempts
151
     *  0 = no retry attempts (fails immediately)
152
     *  n = fail only after n retry attempts
153
     *
154
     * @param int $retry Number of retry attempts.
155
     */
156
    public function setRetryLimit($retry)
157
    {
158
        $this->retryLimit = (int) $retry;
159
    }
160
161
    /**
162
     * Sets the time to wait (in milliseconds) before fetching a new configuration
163
     * from one of the sentinels.
164
     *
165
     * @param float $milliseconds Time to wait before the next attempt.
166
     */
167
    public function setRetryWait($milliseconds)
168
    {
169
        $this->retryWait = (float) $milliseconds;
170
    }
171
172
    /**
173
     * Set automatic fetching of available sentinels.
174
     *
175
     * @param bool $update Enable or disable automatic updates.
176
     */
177
    public function setUpdateSentinels($update)
178
    {
179
        $this->updateSentinels = (bool) $update;
180
    }
181
182
    /**
183
     * Resets the current connection.
184
     */
185
    protected function reset()
186
    {
187
        $this->current = null;
188
    }
189
190
    /**
191
     * Wipes the current list of master and slaves nodes.
192
     */
193
    protected function wipeServerList()
194
    {
195
        $this->reset();
196
197
        $this->master = null;
198
        $this->slaves = [];
199
        $this->pool = [];
200
    }
201
202
    /**
203
     * {@inheritdoc}
204
     */
205
    public function add(NodeConnectionInterface $connection)
206
    {
207
        $parameters = $connection->getParameters();
208
        $role = $parameters->role;
0 ignored issues
show
Bug Best Practice introduced by
The property role does not exist on Predis\Connection\ParametersInterface. Since you implemented __get, consider adding a @property annotation.
Loading history...
209
210
        if ('master' === $role) {
211
            $this->master = $connection;
212
        } elseif ('sentinel' === $role) {
213
            $this->sentinels[] = $connection;
214
            // sentinels are not considered part of the pool.
215
            return;
216
        } else {
217
            // everything else is considered a slave.
218
            $this->slaves[] = $connection;
219
        }
220
221
        $this->pool[(string) $connection] = $connection;
222
223
        $this->reset();
224
    }
225
226
    /**
227
     * {@inheritdoc}
228
     */
229
    public function remove(NodeConnectionInterface $connection)
230
    {
231
        if ($connection === $this->master) {
232
            $this->master = null;
233
        } elseif (false !== $id = array_search($connection, $this->slaves, true)) {
234
            unset($this->slaves[$id]);
235
        } elseif (false !== $id = array_search($connection, $this->sentinels, true)) {
236
            unset($this->sentinels[$id]);
237
238
            return true;
239
        } else {
240
            return false;
241
        }
242
243
        unset($this->pool[(string) $connection]);
244
245
        $this->reset();
246
247
        return true;
248
    }
249
250
    /**
251
     * Creates a new connection to a sentinel server.
252
     *
253
     * @return NodeConnectionInterface
254
     */
255
    protected function createSentinelConnection($parameters)
256
    {
257
        if ($parameters instanceof NodeConnectionInterface) {
258
            return $parameters;
259
        }
260
261
        if (is_string($parameters)) {
262
            $parameters = Parameters::parse($parameters);
263
        }
264
265
        if (is_array($parameters)) {
266
            // NOTE: sentinels do not accept AUTH and SELECT commands so we must
267
            // explicitly set them to NULL to avoid problems when using default
268
            // parameters set via client options. Actually AUTH is supported for
269
            // sentinels starting with Redis 5 but we have to differentiate from
270
            // sentinels passwords and nodes passwords, this will be implemented
271
            // in a later release.
272
            $parameters['database'] = null;
273
            $parameters['username'] = null;
274
275
            // don't leak password from between configurations
276
            // https://github.com/predis/predis/pull/807/#discussion_r985764770
277
            if (!isset($parameters['password'])) {
278
                $parameters['password'] = null;
279
            }
280
281
            if (!isset($parameters['timeout'])) {
282
                $parameters['timeout'] = $this->sentinelTimeout;
283
            }
284
        }
285
286
        return $this->connectionFactory->create($parameters);
287
    }
288
289
    /**
290
     * Returns the current sentinel connection.
291
     *
292
     * If there is no active sentinel connection, a new connection is created.
293
     *
294
     * @return NodeConnectionInterface
295
     */
296
    public function getSentinelConnection()
297
    {
298
        if (!$this->sentinelConnection) {
299
            if ($this->sentinelIndex >= count($this->sentinels)) {
300
                $this->sentinelIndex = 0;
301
                throw new \Predis\ClientException('No sentinel server available for autodiscovery.');
302
            }
303
304
            $sentinel = $this->sentinels[$this->sentinelIndex];
305
            ++$this->sentinelIndex;
306
            $this->sentinelConnection = $this->createSentinelConnection($sentinel);
307
        }
308
309
        return $this->sentinelConnection;
310
    }
311
312
    /**
313
     * Fetches an updated list of sentinels from a sentinel.
314
     */
315
    public function updateSentinels()
316
    {
317
        SENTINEL_QUERY: {
318
            $sentinel = $this->getSentinelConnection();
319
320
            try {
321
                $payload = $sentinel->executeCommand(
322
                    RawCommand::create('SENTINEL', 'sentinels', $this->service)
323
                );
324
325
                $this->sentinels = [];
326
                $this->sentinelIndex = 0;
327
                // NOTE: sentinel server does not return itself, so we add it back.
328
                $this->sentinels[] = $sentinel->getParameters()->toArray();
329
330
                foreach ($payload as $sentinel) {
331
                    $this->sentinels[] = [
332
                        'host' => $sentinel[3],
333
                        'port' => $sentinel[5],
334
                        'role' => 'sentinel',
335
                    ];
336
                }
337
            } catch (ConnectionException $exception) {
338
                $this->sentinelConnection = null;
339
340
                goto SENTINEL_QUERY;
341
            }
342
        }
343
    }
344
345
    /**
346
     * Fetches the details for the master and slave servers from a sentinel.
347
     */
348
    public function querySentinel()
349
    {
350
        $this->wipeServerList();
351
352
        $this->updateSentinels();
353
        $this->getMaster();
354
        $this->getSlaves();
355
    }
356
357
    /**
358
     * Handles error responses returned by redis-sentinel.
359
     *
360
     * @param NodeConnectionInterface $sentinel Connection to a sentinel server.
361
     * @param ErrorResponseInterface  $error    Error response.
362
     */
363
    private function handleSentinelErrorResponse(NodeConnectionInterface $sentinel, ErrorResponseInterface $error)
364
    {
365
        if ($error->getErrorType() === 'IDONTKNOW') {
366
            throw new ConnectionException($sentinel, $error->getMessage());
367
        } else {
368
            throw new ServerException($error->getMessage());
369
        }
370
    }
371
372
    /**
373
     * Fetches the details for the master server from a sentinel.
374
     *
375
     * @param NodeConnectionInterface $sentinel Connection to a sentinel server.
376
     * @param string                  $service  Name of the service.
377
     *
378
     * @return array
379
     */
380
    protected function querySentinelForMaster(NodeConnectionInterface $sentinel, $service)
381
    {
382
        $payload = $sentinel->executeCommand(
383
            RawCommand::create('SENTINEL', 'get-master-addr-by-name', $service)
384
        );
385
386
        if ($payload === null) {
387
            throw new ServerException('ERR No such master with that name');
388
        }
389
390
        if ($payload instanceof ErrorResponseInterface) {
391
            $this->handleSentinelErrorResponse($sentinel, $payload);
392
        }
393
394
        return [
395
            'host' => $payload[0],
396
            'port' => $payload[1],
397
            'role' => 'master',
398
        ];
399
    }
400
401
    /**
402
     * Fetches the details for the slave servers from a sentinel.
403
     *
404
     * @param NodeConnectionInterface $sentinel Connection to a sentinel server.
405
     * @param string                  $service  Name of the service.
406
     *
407
     * @return array
408
     */
409
    protected function querySentinelForSlaves(NodeConnectionInterface $sentinel, $service)
410
    {
411
        $slaves = [];
412
413
        $payload = $sentinel->executeCommand(
414
            RawCommand::create('SENTINEL', 'slaves', $service)
415
        );
416
417
        if ($payload instanceof ErrorResponseInterface) {
418
            $this->handleSentinelErrorResponse($sentinel, $payload);
419
        }
420
421
        foreach ($payload as $slave) {
422
            $flags = explode(',', $slave[9]);
423
424
            if (array_intersect($flags, ['s_down', 'o_down', 'disconnected'])) {
425
                continue;
426
            }
427
428
            $slaves[] = [
429
                'host' => $slave[3],
430
                'port' => $slave[5],
431
                'role' => 'slave',
432
            ];
433
        }
434
435
        return $slaves;
436
    }
437
438
    /**
439
     * {@inheritdoc}
440
     */
441
    public function getCurrent()
442
    {
443
        return $this->current;
444
    }
445
446
    /**
447
     * {@inheritdoc}
448
     */
449
    public function getMaster()
450
    {
451
        if ($this->master) {
452
            return $this->master;
453
        }
454
455
        if ($this->updateSentinels) {
456
            $this->updateSentinels();
457
        }
458
459
        SENTINEL_QUERY: {
460
            $sentinel = $this->getSentinelConnection();
461
462
            try {
463
                $masterParameters = $this->querySentinelForMaster($sentinel, $this->service);
464
                $masterConnection = $this->connectionFactory->create($masterParameters);
465
466
                $this->add($masterConnection);
467
            } catch (ConnectionException $exception) {
468
                $this->sentinelConnection = null;
469
470
                goto SENTINEL_QUERY;
471
            }
472
        }
473
474
        return $masterConnection;
475
    }
476
477
    /**
478
     * {@inheritdoc}
479
     */
480
    public function getSlaves()
481
    {
482
        if ($this->slaves) {
0 ignored issues
show
Bug Best Practice introduced by
The expression $this->slaves of type Predis\Connection\NodeConnectionInterface[] is implicitly converted to a boolean; are you sure this is intended? If so, consider using ! empty($expr) instead to make it clear that you intend to check for an array without elements.

This check marks implicit conversions of arrays to boolean values in a comparison. While in PHP an empty array is considered to be equal (but not identical) to false, this is not always apparent.

Consider making the comparison explicit by using empty(..) or ! empty(...) instead.

Loading history...
483
            return array_values($this->slaves);
484
        }
485
486
        if ($this->updateSentinels) {
487
            $this->updateSentinels();
488
        }
489
490
        SENTINEL_QUERY: {
491
            $sentinel = $this->getSentinelConnection();
492
493
            try {
494
                $slavesParameters = $this->querySentinelForSlaves($sentinel, $this->service);
495
496
                foreach ($slavesParameters as $slaveParameters) {
497
                    $this->add($this->connectionFactory->create($slaveParameters));
498
                }
499
            } catch (ConnectionException $exception) {
500
                $this->sentinelConnection = null;
501
502
                goto SENTINEL_QUERY;
503
            }
504
        }
505
506
        return array_values($this->slaves);
507
    }
508
509
    /**
510
     * Returns a random slave.
511
     *
512
     * @return NodeConnectionInterface|null
513
     */
514
    protected function pickSlave()
515
    {
516
        $slaves = $this->getSlaves();
517
518
        return $slaves
519
            ? $slaves[rand(1, count($slaves)) - 1]
520
            : null;
521
    }
522
523
    /**
524
     * Returns the connection instance in charge for the given command.
525
     *
526
     * @param CommandInterface $command Command instance.
527
     *
528
     * @return NodeConnectionInterface
529
     */
530
    private function getConnectionInternal(CommandInterface $command)
531
    {
532
        if (!$this->current) {
533
            if ($this->strategy->isReadOperation($command) && $slave = $this->pickSlave()) {
534
                $this->current = $slave;
535
            } else {
536
                $this->current = $this->getMaster();
537
            }
538
539
            return $this->current;
540
        }
541
542
        if ($this->current === $this->master) {
543
            return $this->current;
544
        }
545
546
        if (!$this->strategy->isReadOperation($command)) {
547
            $this->current = $this->getMaster();
548
        }
549
550
        return $this->current;
551
    }
552
553
    /**
554
     * Asserts that the specified connection matches an expected role.
555
     *
556
     * @param NodeConnectionInterface $connection Connection to a redis server.
557
     * @param string                  $role       Expected role of the server ("master", "slave" or "sentinel").
558
     *
559
     * @throws RoleException|ConnectionException
560
     */
561
    protected function assertConnectionRole(NodeConnectionInterface $connection, $role)
562
    {
563
        $role = strtolower($role);
564
        $actualRole = $connection->executeCommand(RawCommand::create('ROLE'));
565
566
        if ($actualRole instanceof Error) {
567
            throw new ConnectionException($connection, $actualRole->getMessage());
568
        }
569
570
        if ($role !== $actualRole[0]) {
571
            throw new RoleException($connection, "Expected $role but got $actualRole[0] [$connection]");
572
        }
573
    }
574
575
    /**
576
     * {@inheritdoc}
577
     */
578
    public function getConnectionByCommand(CommandInterface $command)
579
    {
580
        $connection = $this->getConnectionInternal($command);
581
582
        if (!$connection->isConnected()) {
583
            // When we do not have any available slave in the pool we can expect
584
            // read-only operations to hit the master server.
585
            $expectedRole = $this->strategy->isReadOperation($command) && $this->slaves ? 'slave' : 'master';
0 ignored issues
show
Bug Best Practice introduced by
The expression $this->slaves of type Predis\Connection\NodeConnectionInterface[] is implicitly converted to a boolean; are you sure this is intended? If so, consider using ! empty($expr) instead to make it clear that you intend to check for an array without elements.

This check marks implicit conversions of arrays to boolean values in a comparison. While in PHP an empty array is considered to be equal (but not identical) to false, this is not always apparent.

Consider making the comparison explicit by using empty(..) or ! empty(...) instead.

Loading history...
586
            $this->assertConnectionRole($connection, $expectedRole);
587
        }
588
589
        return $connection;
590
    }
591
592
    /**
593
     * {@inheritdoc}
594
     */
595
    public function getConnectionById($id)
596
    {
597
        return $this->pool[$id] ?? null;
598
    }
599
600
    /**
601
     * Returns a connection by its role.
602
     *
603
     * @param string $role Connection role (`master`, `slave` or `sentinel`)
604
     *
605
     * @return NodeConnectionInterface|null
606
     */
607
    public function getConnectionByRole($role)
608
    {
609
        if ($role === 'master') {
610
            return $this->getMaster();
611
        } elseif ($role === 'slave') {
612
            return $this->pickSlave();
613
        } elseif ($role === 'sentinel') {
614
            return $this->getSentinelConnection();
615
        } else {
616
            return null;
617
        }
618
    }
619
620
    /**
621
     * Switches the internal connection in use by the backend.
622
     *
623
     * Sentinel connections are not considered as part of the pool, meaning that
624
     * trying to switch to a sentinel will throw an exception.
625
     *
626
     * @param NodeConnectionInterface $connection Connection instance in the pool.
627
     */
628
    public function switchTo(NodeConnectionInterface $connection)
629
    {
630
        if ($connection && $connection === $this->current) {
631
            return;
632
        }
633
634
        if ($connection !== $this->master && !in_array($connection, $this->slaves, true)) {
635
            throw new InvalidArgumentException('Invalid connection or connection not found.');
636
        }
637
638
        $connection->connect();
639
640
        if ($this->current) {
641
            $this->current->disconnect();
642
        }
643
644
        $this->current = $connection;
645
    }
646
647
    /**
648
     * {@inheritdoc}
649
     */
650
    public function switchToMaster()
651
    {
652
        $connection = $this->getConnectionByRole('master');
653
        $this->switchTo($connection);
654
    }
655
656
    /**
657
     * {@inheritdoc}
658
     */
659
    public function switchToSlave()
660
    {
661
        $connection = $this->getConnectionByRole('slave');
662
        $this->switchTo($connection);
0 ignored issues
show
Bug introduced by
It seems like $connection can also be of type null; however, parameter $connection of Predis\Connection\Replic...Replication::switchTo() does only seem to accept Predis\Connection\NodeConnectionInterface, maybe add an additional type check? ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-type  annotation

662
        $this->switchTo(/** @scrutinizer ignore-type */ $connection);
Loading history...
663
    }
664
665
    /**
666
     * {@inheritdoc}
667
     */
668
    public function isConnected()
669
    {
670
        return $this->current ? $this->current->isConnected() : false;
671
    }
672
673
    /**
674
     * {@inheritdoc}
675
     */
676
    public function connect()
677
    {
678
        if (!$this->current) {
679
            if (!$this->current = $this->pickSlave()) {
680
                $this->current = $this->getMaster();
681
            }
682
        }
683
684
        $this->current->connect();
685
    }
686
687
    /**
688
     * {@inheritdoc}
689
     */
690
    public function disconnect()
691
    {
692
        foreach ($this->pool as $connection) {
693
            $connection->disconnect();
694
        }
695
    }
696
697
    /**
698
     * Retries the execution of a command upon server failure after asking a new
699
     * configuration to one of the sentinels.
700
     *
701
     * @param CommandInterface $command Command instance.
702
     * @param string           $method  Actual method.
703
     *
704
     * @return mixed
705
     */
706
    private function retryCommandOnFailure(CommandInterface $command, $method)
707
    {
708
        $retries = 0;
709
710
        while ($retries <= $this->retryLimit) {
711
            try {
712
                $response = $this->getConnectionByCommand($command)->$method($command);
713
                break;
714
            } catch (CommunicationException $exception) {
715
                $this->wipeServerList();
716
                $exception->getConnection()->disconnect();
717
718
                if ($retries === $this->retryLimit) {
719
                    throw $exception;
720
                }
721
722
                usleep($this->retryWait * 1000);
723
724
                ++$retries;
725
            }
726
        }
727
728
        return $response;
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable $response does not seem to be defined for all execution paths leading up to this point.
Loading history...
729
    }
730
731
    /**
732
     * {@inheritdoc}
733
     */
734
    public function writeRequest(CommandInterface $command)
735
    {
736
        $this->retryCommandOnFailure($command, __FUNCTION__);
737
    }
738
739
    /**
740
     * {@inheritdoc}
741
     */
742
    public function readResponse(CommandInterface $command)
743
    {
744
        return $this->retryCommandOnFailure($command, __FUNCTION__);
745
    }
746
747
    /**
748
     * {@inheritdoc}
749
     */
750
    public function executeCommand(CommandInterface $command)
751
    {
752
        return $this->retryCommandOnFailure($command, __FUNCTION__);
753
    }
754
755
    /**
756
     * Returns the underlying replication strategy.
757
     *
758
     * @return ReplicationStrategy
759
     */
760
    public function getReplicationStrategy()
761
    {
762
        return $this->strategy;
763
    }
764
765
    /**
766
     * {@inheritdoc}
767
     */
768
    public function __sleep()
769
    {
770
        return [
771
            'master', 'slaves', 'pool', 'service', 'sentinels', 'connectionFactory', 'strategy',
772
        ];
773
    }
774
}
775