Passed
Pull Request — development (#3708)
by Martyn
15:25
created

MultiExec::assertClient()   A

Complexity

Conditions 3
Paths 3

Size

Total Lines 11
Code Lines 6

Duplication

Lines 0
Ratio 0 %

Importance

Changes 1
Bugs 0 Features 0
Metric Value
cc 3
eloc 6
c 1
b 0
f 0
nc 3
nop 1
dl 0
loc 11
rs 10
1
<?php
2
3
/*
4
 * This file is part of the Predis package.
5
 *
6
 * (c) 2009-2020 Daniele Alessandri
7
 * (c) 2021-2023 Till Krüss
8
 *
9
 * For the full copyright and license information, please view the LICENSE
10
 * file that was distributed with this source code.
11
 */
12
13
namespace Predis\Transaction;
14
15
use Exception;
16
use InvalidArgumentException;
17
use Predis\ClientContextInterface;
18
use Predis\ClientException;
19
use Predis\ClientInterface;
20
use Predis\Command\CommandInterface;
21
use Predis\CommunicationException;
22
use Predis\Connection\Cluster\ClusterInterface;
23
use Predis\NotSupportedException;
24
use Predis\Protocol\ProtocolException;
25
use Predis\Response\ErrorInterface as ErrorResponseInterface;
26
use Predis\Response\ServerException;
27
use Predis\Response\Status as StatusResponse;
28
use SplQueue;
29
30
/**
31
 * Client-side abstraction of a Redis transaction based on MULTI / EXEC.
32
 *
33
 * {@inheritdoc}
34
 */
35
class MultiExec implements ClientContextInterface
36
{
37
    private $state;
38
39
    protected $client;
40
    protected $commands;
41
    protected $exceptions = true;
42
    protected $attempts = 0;
43
    protected $watchKeys = [];
44
    protected $modeCAS = false;
45
46
    /**
47
     * @param ClientInterface $client  Client instance used by the transaction.
48
     * @param array           $options Initialization options.
49
     */
50
    public function __construct(ClientInterface $client, array $options = null)
51
    {
52
        $this->assertClient($client);
53
54
        $this->client = $client;
55
        $this->state = new MultiExecState();
56
57
        $this->configure($client, $options ?: []);
58
        $this->reset();
59
    }
60
61
    /**
62
     * Checks if the passed client instance satisfies the required conditions
63
     * needed to initialize the transaction object.
64
     *
65
     * @param ClientInterface $client Client instance used by the transaction object.
66
     *
67
     * @throws NotSupportedException
68
     */
69
    private function assertClient(ClientInterface $client)
70
    {
71
        if ($client->getConnection() instanceof ClusterInterface) {
72
            throw new NotSupportedException(
73
                'Cannot initialize a MULTI/EXEC transaction over cluster connections.'
74
            );
75
        }
76
77
        if (!$client->getCommandFactory()->supports('MULTI', 'EXEC', 'DISCARD')) {
78
            throw new NotSupportedException(
79
                'MULTI, EXEC and DISCARD are not supported by the current command factory.'
80
            );
81
        }
82
    }
83
84
    /**
85
     * Configures the transaction using the provided options.
86
     *
87
     * @param ClientInterface $client  Underlying client instance.
88
     * @param array           $options Array of options for the transaction.
89
     **/
90
    protected function configure(ClientInterface $client, array $options)
91
    {
92
        if (isset($options['exceptions'])) {
93
            $this->exceptions = (bool) $options['exceptions'];
94
        } else {
95
            $this->exceptions = $client->getOptions()->exceptions;
96
        }
97
98
        if (isset($options['cas'])) {
99
            $this->modeCAS = (bool) $options['cas'];
100
        }
101
102
        if (isset($options['watch']) && $keys = $options['watch']) {
103
            $this->watchKeys = $keys;
104
        }
105
106
        if (isset($options['retry'])) {
107
            $this->attempts = (int) $options['retry'];
108
        }
109
    }
110
111
    /**
112
     * Resets the state of the transaction.
113
     */
114
    protected function reset()
115
    {
116
        $this->state->reset();
117
        $this->commands = new SplQueue();
118
    }
119
120
    /**
121
     * Initializes the transaction context.
122
     */
123
    protected function initialize()
124
    {
125
        if ($this->state->isInitialized()) {
126
            return;
127
        }
128
129
        if ($this->modeCAS) {
130
            $this->state->flag(MultiExecState::CAS);
131
        }
132
133
        if ($this->watchKeys) {
0 ignored issues
show
Bug Best Practice introduced by
The expression $this->watchKeys of type array is implicitly converted to a boolean; are you sure this is intended? If so, consider using ! empty($expr) instead to make it clear that you intend to check for an array without elements.

This check marks implicit conversions of arrays to boolean values in a comparison. While in PHP an empty array is considered to be equal (but not identical) to false, this is not always apparent.

Consider making the comparison explicit by using empty(..) or ! empty(...) instead.

Loading history...
134
            $this->watch($this->watchKeys);
135
        }
136
137
        $cas = $this->state->isCAS();
138
        $discarded = $this->state->isDiscarded();
139
140
        if (!$cas || ($cas && $discarded)) {
141
            $this->call('MULTI');
142
143
            if ($discarded) {
144
                $this->state->unflag(MultiExecState::CAS);
145
            }
146
        }
147
148
        $this->state->unflag(MultiExecState::DISCARDED);
149
        $this->state->flag(MultiExecState::INITIALIZED);
150
    }
151
152
    /**
153
     * Dynamically invokes a Redis command with the specified arguments.
154
     *
155
     * @param string $method    Command ID.
156
     * @param array  $arguments Arguments for the command.
157
     *
158
     * @return mixed
159
     */
160
    public function __call($method, $arguments)
161
    {
162
        return $this->executeCommand(
163
            $this->client->createCommand($method, $arguments)
164
        );
165
    }
166
167
    /**
168
     * Executes a Redis command bypassing the transaction logic.
169
     *
170
     * @param string $commandID Command ID.
171
     * @param array  $arguments Arguments for the command.
172
     *
173
     * @return mixed
174
     * @throws ServerException
175
     */
176
    protected function call($commandID, array $arguments = [])
177
    {
178
        $response = $this->client->executeCommand(
179
            $this->client->createCommand($commandID, $arguments)
180
        );
181
182
        if ($response instanceof ErrorResponseInterface) {
183
            throw new ServerException($response->getMessage());
184
        }
185
186
        return $response;
187
    }
188
189
    /**
190
     * Executes the specified Redis command.
191
     *
192
     * @param CommandInterface $command Command instance.
193
     *
194
     * @return $this|mixed
195
     * @throws AbortedMultiExecException
196
     * @throws CommunicationException
197
     */
198
    public function executeCommand(CommandInterface $command)
199
    {
200
        $this->initialize();
201
202
        if ($this->state->isCAS()) {
203
            return $this->client->executeCommand($command);
204
        }
205
206
        $response = $this->client->getConnection()->executeCommand($command);
207
208
        if ($response instanceof StatusResponse && $response == 'QUEUED') {
0 ignored issues
show
introduced by
The condition $response == 'QUEUED' is always false.
Loading history...
209
            $this->commands->enqueue($command);
210
        } elseif ($response instanceof ErrorResponseInterface) {
211
            throw new AbortedMultiExecException($this, $response->getMessage());
212
        } else {
213
            $this->onProtocolError('The server did not return a +QUEUED status response.');
214
        }
215
216
        return $this;
217
    }
218
219
    /**
220
     * Executes WATCH against one or more keys.
221
     *
222
     * @param string|array $keys One or more keys.
223
     *
224
     * @return mixed
225
     * @throws NotSupportedException
226
     * @throws ClientException
227
     */
228
    public function watch($keys)
229
    {
230
        if (!$this->client->getCommandFactory()->supports('WATCH')) {
231
            throw new NotSupportedException('WATCH is not supported by the current command factory.');
232
        }
233
234
        if ($this->state->isWatchAllowed()) {
235
            throw new ClientException('Sending WATCH after MULTI is not allowed.');
236
        }
237
238
        $response = $this->call('WATCH', is_array($keys) ? $keys : [$keys]);
239
        $this->state->flag(MultiExecState::WATCH);
240
241
        return $response;
242
    }
243
244
    /**
245
     * Finalizes the transaction by executing MULTI on the server.
246
     *
247
     * @return MultiExec
248
     */
249
    public function multi()
250
    {
251
        if ($this->state->check(MultiExecState::INITIALIZED | MultiExecState::CAS)) {
252
            $this->state->unflag(MultiExecState::CAS);
253
            $this->call('MULTI');
254
        } else {
255
            $this->initialize();
256
        }
257
258
        return $this;
259
    }
260
261
    /**
262
     * Executes UNWATCH.
263
     *
264
     * @return MultiExec
265
     * @throws NotSupportedException
266
     */
267
    public function unwatch()
268
    {
269
        if (!$this->client->getCommandFactory()->supports('UNWATCH')) {
270
            throw new NotSupportedException(
271
                'UNWATCH is not supported by the current command factory.'
272
            );
273
        }
274
275
        $this->state->unflag(MultiExecState::WATCH);
276
        $this->__call('UNWATCH', []);
277
278
        return $this;
279
    }
280
281
    /**
282
     * Resets the transaction by UNWATCH-ing the keys that are being WATCHed and
283
     * DISCARD-ing pending commands that have been already sent to the server.
284
     *
285
     * @return MultiExec
286
     */
287
    public function discard()
288
    {
289
        if ($this->state->isInitialized()) {
290
            $this->call($this->state->isCAS() ? 'UNWATCH' : 'DISCARD');
291
292
            $this->reset();
293
            $this->state->flag(MultiExecState::DISCARDED);
294
        }
295
296
        return $this;
297
    }
298
299
    /**
300
     * Executes the whole transaction.
301
     *
302
     * @return mixed
303
     */
304
    public function exec()
305
    {
306
        return $this->execute();
307
    }
308
309
    /**
310
     * Checks the state of the transaction before execution.
311
     *
312
     * @param mixed $callable Callback for execution.
313
     *
314
     * @throws InvalidArgumentException
315
     * @throws ClientException
316
     */
317
    private function checkBeforeExecution($callable)
318
    {
319
        if ($this->state->isExecuting()) {
320
            throw new ClientException(
321
                'Cannot invoke "execute" or "exec" inside an active transaction context.'
322
            );
323
        }
324
325
        if ($callable) {
326
            if (!is_callable($callable)) {
327
                throw new InvalidArgumentException('The argument must be a callable object.');
328
            }
329
330
            if (!$this->commands->isEmpty()) {
331
                $this->discard();
332
333
                throw new ClientException(
334
                    'Cannot execute a transaction block after using fluent interface.'
335
                );
336
            }
337
        } elseif ($this->attempts) {
338
            $this->discard();
339
340
            throw new ClientException(
341
                'Automatic retries are supported only when a callable block is provided.'
342
            );
343
        }
344
    }
345
346
    /**
347
     * Handles the actual execution of the whole transaction.
348
     *
349
     * @param mixed $callable Optional callback for execution.
350
     *
351
     * @return array
352
     * @throws CommunicationException
353
     * @throws AbortedMultiExecException
354
     * @throws ServerException
355
     */
356
    public function execute($callable = null)
357
    {
358
        $this->checkBeforeExecution($callable);
359
360
        $execResponse = null;
0 ignored issues
show
Unused Code introduced by
The assignment to $execResponse is dead and can be removed.
Loading history...
361
        $attempts = $this->attempts;
362
363
        do {
364
            if ($callable) {
365
                $this->executeTransactionBlock($callable);
366
            }
367
368
            if ($this->commands->isEmpty()) {
369
                if ($this->state->isWatching()) {
370
                    $this->discard();
371
                }
372
373
                return;
374
            }
375
376
            $execResponse = $this->call('EXEC');
377
378
            if ($execResponse === null) {
379
                if ($attempts === 0) {
380
                    throw new AbortedMultiExecException(
381
                        $this, 'The current transaction has been aborted by the server.'
382
                    );
383
                }
384
385
                $this->reset();
386
387
                continue;
388
            }
389
390
            break;
391
        } while ($attempts-- > 0);
392
393
        $response = [];
394
        $commands = $this->commands;
395
        $size = count($execResponse);
396
397
        if ($size !== count($commands)) {
398
            $this->onProtocolError('EXEC returned an unexpected number of response items.');
399
        }
400
401
        for ($i = 0; $i < $size; ++$i) {
402
            $cmdResponse = $execResponse[$i];
403
404
            if ($cmdResponse instanceof ErrorResponseInterface && $this->exceptions) {
405
                throw new ServerException($cmdResponse->getMessage());
406
            }
407
408
            $response[$i] = $commands->dequeue()->parseResponse($cmdResponse);
409
        }
410
411
        return $response;
412
    }
413
414
    /**
415
     * Passes the current transaction object to a callable block for execution.
416
     *
417
     * @param mixed $callable Callback.
418
     *
419
     * @throws CommunicationException
420
     * @throws ServerException
421
     */
422
    protected function executeTransactionBlock($callable)
423
    {
424
        $exception = null;
425
        $this->state->flag(MultiExecState::INSIDEBLOCK);
426
427
        try {
428
            call_user_func($callable, $this);
429
        } catch (CommunicationException $exception) {
430
            // NOOP
431
        } catch (ServerException $exception) {
432
            // NOOP
433
        } catch (Exception $exception) {
434
            $this->discard();
435
        }
436
437
        $this->state->unflag(MultiExecState::INSIDEBLOCK);
438
439
        if ($exception) {
440
            throw $exception;
441
        }
442
    }
443
444
    /**
445
     * Helper method for protocol errors encountered inside the transaction.
446
     *
447
     * @param string $message Error message.
448
     */
449
    private function onProtocolError($message)
450
    {
451
        // Since a MULTI/EXEC block cannot be initialized when using aggregate
452
        // connections we can safely assume that Predis\Client::getConnection()
453
        // will return a Predis\Connection\NodeConnectionInterface instance.
454
        CommunicationException::handle(new ProtocolException(
455
            $this->client->getConnection(), $message
456
        ));
457
    }
458
}
459