Completed
Push — 2.x ( 91e12b...ed04a7 )
by Cy
01:35
created

PredisConnection::pubSubLoop()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 6
Code Lines 2

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
dl 0
loc 6
rs 9.4285
c 0
b 0
f 0
cc 1
eloc 2
nc 1
nop 2
1
<?php
2
3
namespace Monospice\LaravelRedisSentinel\Connections;
4
5
use Closure;
6
use Illuminate\Redis\Connections\PredisConnection as LaravelPredisConnection;
7
use Monospice\SpicyIdentifiers\DynamicMethod;
8
use Predis\ClientInterface as Client;
9
use Predis\CommunicationException;
10
use Predis\PubSub\Consumer as PubSub;
11
use RuntimeException;
12
13
/**
14
 * Executes Redis commands using the Predis client.
15
 *
16
 * This package extends Laravel's PredisConnection class to work around issues
17
 * experienced when using the Predis client to send commands over "aggregate"
18
 * connections (in this case, Sentinel connections).
19
 *
20
 * @category Package
21
 * @package  Monospice\LaravelRedisSentinel
22
 * @author   @pdbreen, Cy Rossignol <[email protected]>
23
 * @license  See LICENSE file
24
 * @link     https://github.com/monospice/laravel-redis-sentinel-drivers
25
 */
26
class PredisConnection extends LaravelPredisConnection
27
{
28
    /**
29
     * The number of times the client attempts to retry a command when it fails
30
     * to connect to a Redis instance behind Sentinel.
31
     *
32
     * @var int
33
     */
34
    protected $retryLimit = 20;
35
36
    /**
37
     * The time in milliseconds to wait before the client retries a failed
38
     * command.
39
     *
40
     * @var int
41
     */
42
    protected $retryWait = 1000;
43
44
    /**
45
     * Create a Redis Sentinel connection using a Predis client.
46
     *
47
     * @param Client $client          The Redis client to wrap.
48
     * @param array  $sentinelOptions Sentinel-specific connection options.
49
     */
50
    public function __construct(Client $client, array $sentinelOptions = [ ])
51
    {
52
        parent::__construct($client);
0 ignored issues
show
Compatibility introduced by
$client of type object<Predis\ClientInterface> is not a sub-type of object<Predis\Client>. It seems like you assume a concrete implementation of the interface Predis\ClientInterface to be always present.

This check looks for parameters that are defined as one type in their type hint or doc comment but seem to be used as a narrower type, i.e an implementation of an interface or a subclass.

Consider changing the type of the parameter or doing an instanceof check before assuming your parameter is of the expected type.

Loading history...
53
54
        // Set the Sentinel-specific connection options on the Predis Client
55
        // connection and the current instance of this class.
56
        foreach ($sentinelOptions as $option => $value) {
57
            DynamicMethod::parseFromUnderscore($option)
58
                ->prepend('set')
59
                ->callOn($this, [ $value ]);
60
        }
61
    }
62
63
    /**
64
     * Set the default amount of time to wait before determining that a
65
     * connection attempt to a Sentinel server failed.
66
     *
67
     * @param float $seconds The timeout value in seconds.
68
     *
69
     * @return $this The current instance for method chaining.
70
     */
71
    public function setSentinelTimeout($seconds)
72
    {
73
        $this->client->getConnection()->setSentinelTimeout($seconds);
0 ignored issues
show
Bug introduced by
It seems like you code against a concrete implementation and not the interface Predis\Connection\ConnectionInterface as the method setSentinelTimeout() does only exist in the following implementations of said interface: Predis\Connection\Aggregate\SentinelReplication.

Let’s take a look at an example:

interface User
{
    /** @return string */
    public function getPassword();
}

class MyUser implements User
{
    public function getPassword()
    {
        // return something
    }

    public function getDisplayName()
    {
        // return some name.
    }
}

class AuthSystem
{
    public function authenticate(User $user)
    {
        $this->logger->info(sprintf('Authenticating %s.', $user->getDisplayName()));
        // do something.
    }
}

In the above example, the authenticate() method works fine as long as you just pass instances of MyUser. However, if you now also want to pass a different implementation of User which does not have a getDisplayName() method, the code will break.

Available Fixes

  1. Change the type-hint for the parameter:

    class AuthSystem
    {
        public function authenticate(MyUser $user) { /* ... */ }
    }
    
  2. Add an additional type-check:

    class AuthSystem
    {
        public function authenticate(User $user)
        {
            if ($user instanceof MyUser) {
                $this->logger->info(/** ... */);
            }
    
            // or alternatively
            if ( ! $user instanceof MyUser) {
                throw new \LogicException(
                    '$user must be an instance of MyUser, '
                   .'other instances are not supported.'
                );
            }
    
        }
    }
    
Note: PHP Analyzer uses reverse abstract interpretation to narrow down the types inside the if block in such a case.
  1. Add the method to the interface:

    interface User
    {
        /** @return string */
        public function getPassword();
    
        /** @return string */
        public function getDisplayName();
    }
    
Loading history...
74
75
        return $this;
76
    }
77
78
    /**
79
     * Set the default number of attempts to retry a command when the client
80
     * fails to connect to a Redis instance behind Sentinel.
81
     *
82
     * @param int $attempts With a value of 0, throw an exception after the
83
     * first failed attempt. Pass a value of -1 to retry connections forever.
84
     *
85
     * @return $this The current instance for method chaining.
86
     */
87
    public function setRetryLimit($attempts)
88
    {
89
        $this->retryLimit = (int) $attempts;
90
        $this->client->getConnection()->setRetryLimit($attempts);
0 ignored issues
show
Bug introduced by
It seems like you code against a concrete implementation and not the interface Predis\Connection\ConnectionInterface as the method setRetryLimit() does only exist in the following implementations of said interface: Predis\Connection\Aggregate\RedisCluster, Predis\Connection\Aggregate\SentinelReplication.

Let’s take a look at an example:

interface User
{
    /** @return string */
    public function getPassword();
}

class MyUser implements User
{
    public function getPassword()
    {
        // return something
    }

    public function getDisplayName()
    {
        // return some name.
    }
}

class AuthSystem
{
    public function authenticate(User $user)
    {
        $this->logger->info(sprintf('Authenticating %s.', $user->getDisplayName()));
        // do something.
    }
}

In the above example, the authenticate() method works fine as long as you just pass instances of MyUser. However, if you now also want to pass a different implementation of User which does not have a getDisplayName() method, the code will break.

Available Fixes

  1. Change the type-hint for the parameter:

    class AuthSystem
    {
        public function authenticate(MyUser $user) { /* ... */ }
    }
    
  2. Add an additional type-check:

    class AuthSystem
    {
        public function authenticate(User $user)
        {
            if ($user instanceof MyUser) {
                $this->logger->info(/** ... */);
            }
    
            // or alternatively
            if ( ! $user instanceof MyUser) {
                throw new \LogicException(
                    '$user must be an instance of MyUser, '
                   .'other instances are not supported.'
                );
            }
    
        }
    }
    
Note: PHP Analyzer uses reverse abstract interpretation to narrow down the types inside the if block in such a case.
  1. Add the method to the interface:

    interface User
    {
        /** @return string */
        public function getPassword();
    
        /** @return string */
        public function getDisplayName();
    }
    
Loading history...
91
92
        return $this;
93
    }
94
95
    /**
96
     * Set the time to wait before retrying a command after a connection
97
     * attempt failed.
98
     *
99
     * @param int $milliseconds The wait time in milliseconds. When 0, retry
100
     * a failed command immediately.
101
     *
102
     * @return $this The current instance for method chaining.
103
     */
104
    public function setRetryWait($milliseconds)
105
    {
106
        $this->retryWait = (int) $milliseconds;
107
        $this->client->getConnection()->setRetryWait($milliseconds);
0 ignored issues
show
Bug introduced by
It seems like you code against a concrete implementation and not the interface Predis\Connection\ConnectionInterface as the method setRetryWait() does only exist in the following implementations of said interface: Predis\Connection\Aggregate\SentinelReplication.

Let’s take a look at an example:

interface User
{
    /** @return string */
    public function getPassword();
}

class MyUser implements User
{
    public function getPassword()
    {
        // return something
    }

    public function getDisplayName()
    {
        // return some name.
    }
}

class AuthSystem
{
    public function authenticate(User $user)
    {
        $this->logger->info(sprintf('Authenticating %s.', $user->getDisplayName()));
        // do something.
    }
}

In the above example, the authenticate() method works fine as long as you just pass instances of MyUser. However, if you now also want to pass a different implementation of User which does not have a getDisplayName() method, the code will break.

Available Fixes

  1. Change the type-hint for the parameter:

    class AuthSystem
    {
        public function authenticate(MyUser $user) { /* ... */ }
    }
    
  2. Add an additional type-check:

    class AuthSystem
    {
        public function authenticate(User $user)
        {
            if ($user instanceof MyUser) {
                $this->logger->info(/** ... */);
            }
    
            // or alternatively
            if ( ! $user instanceof MyUser) {
                throw new \LogicException(
                    '$user must be an instance of MyUser, '
                   .'other instances are not supported.'
                );
            }
    
        }
    }
    
Note: PHP Analyzer uses reverse abstract interpretation to narrow down the types inside the if block in such a case.
  1. Add the method to the interface:

    interface User
    {
        /** @return string */
        public function getPassword();
    
        /** @return string */
        public function getDisplayName();
    }
    
Loading history...
108
109
        return $this;
110
    }
111
112
    /**
113
     * Set whether the client should update the list of known Sentinels each
114
     * time it needs to connect to a Redis server behind Sentinel.
115
     *
116
     * @param bool $enable If TRUE, fetch the updated Sentinel list.
117
     *
118
     * @return $this The current instance for method chaining.
119
     */
120
    public function setUpdateSentinels($enable)
121
    {
122
        $this->client->getConnection()->setUpdateSentinels($enable);
0 ignored issues
show
Bug introduced by
It seems like you code against a concrete implementation and not the interface Predis\Connection\ConnectionInterface as the method setUpdateSentinels() does only exist in the following implementations of said interface: Predis\Connection\Aggregate\SentinelReplication.

Let’s take a look at an example:

interface User
{
    /** @return string */
    public function getPassword();
}

class MyUser implements User
{
    public function getPassword()
    {
        // return something
    }

    public function getDisplayName()
    {
        // return some name.
    }
}

class AuthSystem
{
    public function authenticate(User $user)
    {
        $this->logger->info(sprintf('Authenticating %s.', $user->getDisplayName()));
        // do something.
    }
}

In the above example, the authenticate() method works fine as long as you just pass instances of MyUser. However, if you now also want to pass a different implementation of User which does not have a getDisplayName() method, the code will break.

Available Fixes

  1. Change the type-hint for the parameter:

    class AuthSystem
    {
        public function authenticate(MyUser $user) { /* ... */ }
    }
    
  2. Add an additional type-check:

    class AuthSystem
    {
        public function authenticate(User $user)
        {
            if ($user instanceof MyUser) {
                $this->logger->info(/** ... */);
            }
    
            // or alternatively
            if ( ! $user instanceof MyUser) {
                throw new \LogicException(
                    '$user must be an instance of MyUser, '
                   .'other instances are not supported.'
                );
            }
    
        }
    }
    
Note: PHP Analyzer uses reverse abstract interpretation to narrow down the types inside the if block in such a case.
  1. Add the method to the interface:

    interface User
    {
        /** @return string */
        public function getPassword();
    
        /** @return string */
        public function getDisplayName();
    }
    
Loading history...
123
124
        return $this;
125
    }
126
127
    /**
128
     * Subscribe to a set of given channels for messages.
129
     *
130
     * @param  array|string $channels The names of the channels to subscribe to.
131
     * @param  Closure      $callback Executed for each message. Receives the
132
     * message string in the first argument and the message channel as the
133
     * second argument. Return FALSE to unsubscribe.
134
     * @param  string       $method   The subscription command ("subscribe" or
135
     * "psubscribe").
136
     *
137
     * @return void
138
     */
139
    public function createSubscription(
140
        $channels,
141
        Closure $callback,
142
        $method = 'subscribe'
143
    ) {
144
        $this->retryOnFailure(function () use ($method, $channels, $callback) {
145
            $loop = $this->pubSubLoop([ $method => (array) $channels ]);
146
147
            if ($method === 'psubscribe') {
148
                $messageKind = 'pmessage';
149
            } else {
150
                $messageKind = 'message';
151
            }
152
153
            $this->consumeMessages($loop, $messageKind, $callback);
154
155
            unset($loop);
156
        });
157
    }
158
159
    /**
160
     * Create a new PUB/SUB subscriber and pass messages to the callback if
161
     * provided.
162
     *
163
     * WARNING: Consumers created using this method are not monitored for
164
     * connection failures. For Sentinel support, use one of the methods
165
     * provided by the Laravel API instead (subscribe() and psubscribe()).
166
     *
167
     * @param array|null $options  Configures the channel(s) to subscribe to.
168
     * @param callable   $callback Optional callback executed for each message
0 ignored issues
show
Documentation introduced by
Should the type for parameter $callback not be callable|null? Also, consider making the array more specific, something like array<String>, or String[].

This check looks for @param annotations where the type inferred by our type inference engine differs from the declared type.

It makes a suggestion as to what type it considers more descriptive. In addition it looks for parameters that have the generic type array and suggests a stricter type like array<String>.

Most often this is a case of a parameter that can be null in addition to its declared types.

Loading history...
169
     * published to the configured channel(s).
170
     *
171
     * @return \Predis\PubSub\Consumer|null A PUB/SUB context used to create
172
     * a subscription loop if no callback provided.
173
     */
174
    public function pubSubLoop($options = null, $callback = null)
175
    {
176
        // Messages published to the master propagate to each of the slaves. We
177
        // pick a random slave to distribute load away from the master:
178
        return $this->getRandomSlave()->pubSubLoop($options, $callback);
179
    }
180
181
    /**
182
     * Execute commands in a transaction.
183
     *
184
     * This package overrides the transaction() method to work around a
185
     * limitation in the Predis API that disallows transactions on "aggregate"
186
     * connections like Sentinel. Note that transactions execute on the Redis
187
     * master instance.
188
     *
189
     * @param callable|null $callback Contains the Redis commands to execute in
190
     * the transaction. The callback receives a Predis\Transaction\MultiExec
191
     * transaction abstraction as the only argument. We use this object to
192
     * execute Redis commands by calling its methods just like we would with
193
     * the Laravel Redis service.
194
     *
195
     * @return array|Predis\Transaction\MultiExec An array containing the
196
     * result for each command executed during the transaction. If no callback
197
     * provided, returns an instance of the Predis transaction abstraction.
198
     */
199
    public function transaction(callable $callback = null)
200
    {
201
        return $this->retryOnFailure(function () use ($callback) {
202
            return $this->getMaster()->transaction($callback);
203
        });
204
    }
205
206
    /**
207
     * Attempt to retry the provided operation when the client fails to connect
208
     * to a Redis server.
209
     *
210
     * We adapt Predis' Sentinel connection failure handling logic here to
211
     * reproduce the high-availability mode provided by the actual client. To
212
     * work around "aggregate" connection limitations in Predis, this class
213
     * provides methods that don't use the high-level Sentinel connection API
214
     * of Predis directly, so it needs to handle connection failures itself.
215
     *
216
     * @param callable $callback The operation to execute.
217
     *
218
     * @return mixed The result of the first successful attempt.
219
     */
220
    protected function retryOnFailure(callable $callback)
221
    {
222
        $attempts = 0;
223
224
        do {
225
            try {
226
                return $callback();
227
            } catch (CommunicationException $exception) {
228
                $exception->getConnection()->disconnect();
229
                $this->client->getConnection()->querySentinel();
0 ignored issues
show
Bug introduced by
It seems like you code against a concrete implementation and not the interface Predis\Connection\ConnectionInterface as the method querySentinel() does only exist in the following implementations of said interface: Predis\Connection\Aggregate\SentinelReplication.

Let’s take a look at an example:

interface User
{
    /** @return string */
    public function getPassword();
}

class MyUser implements User
{
    public function getPassword()
    {
        // return something
    }

    public function getDisplayName()
    {
        // return some name.
    }
}

class AuthSystem
{
    public function authenticate(User $user)
    {
        $this->logger->info(sprintf('Authenticating %s.', $user->getDisplayName()));
        // do something.
    }
}

In the above example, the authenticate() method works fine as long as you just pass instances of MyUser. However, if you now also want to pass a different implementation of User which does not have a getDisplayName() method, the code will break.

Available Fixes

  1. Change the type-hint for the parameter:

    class AuthSystem
    {
        public function authenticate(MyUser $user) { /* ... */ }
    }
    
  2. Add an additional type-check:

    class AuthSystem
    {
        public function authenticate(User $user)
        {
            if ($user instanceof MyUser) {
                $this->logger->info(/** ... */);
            }
    
            // or alternatively
            if ( ! $user instanceof MyUser) {
                throw new \LogicException(
                    '$user must be an instance of MyUser, '
                   .'other instances are not supported.'
                );
            }
    
        }
    }
    
Note: PHP Analyzer uses reverse abstract interpretation to narrow down the types inside the if block in such a case.
  1. Add the method to the interface:

    interface User
    {
        /** @return string */
        public function getPassword();
    
        /** @return string */
        public function getDisplayName();
    }
    
Loading history...
230
231
                usleep($this->retryWait * 1000);
232
233
                $attempts++;
234
            }
235
        } while ($attempts <= $this->retryLimit);
236
237
        throw $exception;
238
    }
239
240
    /**
241
     * Execute the provided callback for each message read by the PUB/SUB
242
     * consumer.
243
     *
244
     * @param PubSub  $loop     Reads the messages published to a channel.
245
     * @param string  $kind     The subscribed message type ([p]message).
246
     * @param Closure $callback Executed for each message.
247
     *
248
     * @return void
249
     */
250
    protected function consumeMessages(PubSub $loop, $kind, Closure $callback)
251
    {
252
        foreach ($loop as $message) {
253
            if ($message->kind === $kind) {
254
                if ($callback($message->payload, $message->channel) === false) {
255
                    return;
256
                }
257
            }
258
        }
259
    }
260
261
    /**
262
     * Get a Predis client instance for the master.
263
     *
264
     * @return Client The client instance for the current master.
265
     */
266
    protected function getMaster()
267
    {
268
        return $this->client->getClientFor('master');
269
    }
270
271
    /**
272
     * Get a Predis client instance for a random slave.
273
     *
274
     * @param bool $fallbackToMaster If TRUE, return a client for the master
275
     * if the connection does not include any slaves.
276
     *
277
     * @return Client The client instance for the selected slave.
278
     */
279
    protected function getRandomSlave($fallbackToMaster = true)
280
    {
281
        $slaves = $this->client->getConnection()->getSlaves();
0 ignored issues
show
Bug introduced by
It seems like you code against a concrete implementation and not the interface Predis\Connection\ConnectionInterface as the method getSlaves() does only exist in the following implementations of said interface: Predis\Connection\Aggregate\MasterSlaveReplication, Predis\Connection\Aggregate\SentinelReplication.

Let’s take a look at an example:

interface User
{
    /** @return string */
    public function getPassword();
}

class MyUser implements User
{
    public function getPassword()
    {
        // return something
    }

    public function getDisplayName()
    {
        // return some name.
    }
}

class AuthSystem
{
    public function authenticate(User $user)
    {
        $this->logger->info(sprintf('Authenticating %s.', $user->getDisplayName()));
        // do something.
    }
}

In the above example, the authenticate() method works fine as long as you just pass instances of MyUser. However, if you now also want to pass a different implementation of User which does not have a getDisplayName() method, the code will break.

Available Fixes

  1. Change the type-hint for the parameter:

    class AuthSystem
    {
        public function authenticate(MyUser $user) { /* ... */ }
    }
    
  2. Add an additional type-check:

    class AuthSystem
    {
        public function authenticate(User $user)
        {
            if ($user instanceof MyUser) {
                $this->logger->info(/** ... */);
            }
    
            // or alternatively
            if ( ! $user instanceof MyUser) {
                throw new \LogicException(
                    '$user must be an instance of MyUser, '
                   .'other instances are not supported.'
                );
            }
    
        }
    }
    
Note: PHP Analyzer uses reverse abstract interpretation to narrow down the types inside the if block in such a case.
  1. Add the method to the interface:

    interface User
    {
        /** @return string */
        public function getPassword();
    
        /** @return string */
        public function getDisplayName();
    }
    
Loading history...
282
283
        if (count($slaves) > 0) {
284
            $slave = $slaves[rand(1, count($slaves)) - 1];
285
286
            return $this->client->getClientFor($slave->getParameters()->alias);
287
        }
288
289
        if ($fallbackToMaster) {
290
            return $this->getMaster();
291
        }
292
293
        throw new RuntimeException('No slave present on connection.');
294
    }
295
}
296