Completed
Push — dev ( 0e054e...0a67e1 )
by Vlad
01:47
created

Queue::checkTransactionResult()   A

Complexity

Conditions 2
Paths 2

Size

Total Lines 8
Code Lines 4

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 4
CRAP Score 2

Importance

Changes 0
Metric Value
dl 0
loc 8
ccs 4
cts 4
cp 1
rs 9.4285
c 0
b 0
f 0
cc 2
eloc 4
nc 2
nop 2
crap 2
1
<?php
2
3
namespace Simplario\Quedis;
4
5
use Simplario\Quedis\Exceptions\FlowException;
6
use Simplario\Quedis\Exceptions\QueueException;
7
use Simplario\Quedis\Interfaces\MessageInterface;
8
use Simplario\Quedis\Interfaces\QueueInterface;
9
10
/**
11
 * Class Queue
12
 *
13
 * @package Simplario\Quedis
14
 *
15
 *
16
 *
17
 *   Message flows (like in the Beanstalk: http://beanstalkc.readthedocs.io/en/latest/tutorial.html )
18
 *   ------------------------------------------------------------------------------------------------
19
 *
20
 *   1)   put            pop
21
 *       -----> [READY] --------> *poof*
22
 *
23
 *
24
 *   2)   put            reserve               delete
25
 *       -----> [READY] ---------> [RESERVED] --------> *poof*
26
 *
27
 *
28
 *   3)   put with delay               release with delay
29
 *       ----------------> [DELAYED] <------------.
30
 *                             |                   |
31
 *                             | (time passes)     |
32
 *                             |                   |
33
 *        put                  v     reserve       |       delete
34
 *       -----------------> [READY] ---------> [RESERVED] --------> *poof*
35
 *                            ^  ^                |  |
36
 *                            |   \  release      |  |
37
 *                            |    ``-------------'   |
38
 *                            |                      |
39
 *                            | kick                 |
40
 *                            |                      |
41
 *                            |       bury           |
42
 *                         [BURIED] <---------------'
43
 *                            |
44
 *                            |  delete
45
 *                             ``--------> *poof*
46
 *
47
 */
48
class Queue implements QueueInterface
49
{
50
51
    const PRIORITY_HIGH = 'high';
52
    const PRIORITY_LOW = 'low';
53
54
    const NS_QUEUE = 'queue';
55
    const NS_QUEUE_STOP = 'queue2stop';
56
    const NS_MESSAGE = 'message';
57
    const NS_MESSAGE_TO_QUEUE = 'message2queue';
58
    const NS_MESSAGE_TO_STATE = 'message2state';
59
60
    // State
61
    const STATE_READY = 'ready';
62
    const STATE_DELAYED = 'delayed';
63
    const STATE_RESERVED = 'reserved';
64
    const STATE_BURIED = 'buried';
65
66
    // Stats
67
    const STATS_QUEUES_LIST = 'queues';
68
    const STATS_QUEUES = 'queues';
69
    const STATS_MESSAGE_TOTAL = 'total';
70
    const STATS_MESSAGE_READY = 'ready';
71
    const STATS_MESSAGE_RESERVED = 'reserved';
72
    const STATS_MESSAGE_DELAYED = 'delayed';
73
    const STATS_MESSAGE_BURIED = 'buried';
74
    const STATS_QUEUE_STOP = 'stop';
75
76
    /**
77
     * @var \Predis\Client
78
     */
79
    protected $redis;
80
81
    /**
82
     * @var string
83
     */
84
    protected $namespace;
85
86
    /**
87
     * Queue constructor.
88
     *
89
     * @param mixed  $redis
90
     * @param string $namespace
91
     */
92 35
    public function __construct($redis, $namespace = 'Quedis')
93
    {
94 35
        $this->redis = $redis;
95 35
        $this->namespace = $namespace;
96 35
    }
97
98
    /**
99
     * @return \Predis\Client
100
     */
101 35
    public function getRedis()
102
    {
103 35
        return $this->redis;
104
    }
105
106
    /**
107
     * @return string
108
     */
109 1
    public function getNamespace()
110
    {
111 1
        return $this->namespace;
112
    }
113
114
    /**
115
     * @param string $queue
116
     * @param mixed  $data
117
     * @param int    $delay
118
     * @param string $priority
119
     *
120
     * @return MessageInterface
121
     * @throws \Exception
122
     */
123 28
    public function put($queue, $data, $delay = 0, $priority = self::PRIORITY_LOW)
124
    {
125 28
        $message = $this->createMessage($data);
126 28
        $delay = $this->parseDelay($delay);
127
128 28
        $result = $this->getRedis()->transaction(function ($tx) use ($queue, $message, $delay, $priority) {
129
            /** @var $tx \Predis\Client */
130
131 28
            $state = $delay == 0 ? self::STATE_READY : self::STATE_DELAYED;
132
133 28
            $tx->hset($this->ns(self::NS_MESSAGE), $message->getToken(), $message->encode());
134 28
            $tx->hset($this->ns(self::NS_MESSAGE_TO_QUEUE), $message->getToken(), $queue);
135 28
            $tx->hset($this->ns(self::NS_MESSAGE_TO_STATE), $message->getToken(), $state);
136
137 28
            if ($state === self::STATE_READY) {
138 24
                if (self::PRIORITY_HIGH === $priority) {
139 1
                    $tx->lpush($this->getKey($queue, self::STATE_READY), $message->getToken());
140
                } else {
141 24
                    $tx->rpush($this->getKey($queue, self::STATE_READY), $message->getToken());
142
                }
143
            } else {
144 7
                $tx->zadd($this->getKey($queue, self::STATE_DELAYED), time() + $delay, $message->getToken());
145
            }
146 28
        });
147
148 28
        $this->checkTransactionResult($result, 'put');
0 ignored issues
show
Bug introduced by
It seems like $result defined by $this->getRedis()->trans...->getToken()); } }) on line 128 can also be of type object<Predis\Transaction\MultiExec>; however, Simplario\Quedis\Queue::checkTransactionResult() does only seem to accept array, maybe add an additional type check?

If a method or function can return multiple different values and unless you are sure that you only can receive a single value in this context, we recommend to add an additional type check:

/**
 * @return array|string
 */
function returnsDifferentValues($x) {
    if ($x) {
        return 'foo';
    }

    return array();
}

$x = returnsDifferentValues($y);
if (is_array($x)) {
    // $x is an array.
}

If this a common case that PHP Analyzer should handle natively, please let us know by opening an issue.

Loading history...
149
150 28
        return $message;
151
    }
152
153
    /**
154
     * @param string        $queue
155
     * @param int           $timeout
156
     *
157
     * @return mixed|null|Queue
158
     */
159 11
    public function pop($queue, $timeout = 0)
160
    {
161 11
        $message = $this->reserve($queue, $timeout);
162
163 11
        if ($message !== null) {
164 11
            $this->delete($message);
165
        }
166
167 11
        return $message;
168
    }
169
170
    /**
171
     * @param string        $queue
172
     * @param int           $timeout
173
     *
174
     * @return MessageInterface|null
175
     */
176 22
    public function reserve($queue, $timeout = 0)
177
    {
178 22
        if ($this->isStop($queue)) {
179 1
            return null;
180
        }
181
182 21
        $this->migrate($queue);
183
184 21
        $token = $this->reserveToken($queue, $timeout);
185 21
        $message = $this->restoreMessage($token);
186
187 21
        if($message === null){
188 7
            return null;
189
        }
190
191 21
        $this->getRedis()->transaction(function ($tx) use ($queue, $token) {
192
            /** @var $tx \Predis\Client */
193 21
            $tx->zadd($this->getKey($queue, self::STATE_RESERVED), time(), $token);
194 21
            $tx->hset($this->ns(self::NS_MESSAGE_TO_STATE), $token, self::STATE_RESERVED);
195 21
        });
196
197 21
        return $message;
198
    }
199
200
    /**
201
     * @param string $queue
202
     * @param int    $timeout
203
     *
204
     * @return null|string
205
     */
206 21
    protected function reserveToken($queue, $timeout = 0)
207
    {
208
209 21
        $redis = $this->getRedis();
210
211 21
        if ($timeout > 0) {
212
            // example return [ '0' => queue name , '1' => jobId ]
213 3
            $token = $redis->blpop([$this->getKey($queue, self::STATE_READY)], $timeout);
214 3
            $token = (is_array($token)) ? $token[1] : null;
215
        } else {
216 18
            $token = $redis->lpop($this->getKey($queue, self::STATE_READY));
217
        }
218
219 21
        return $token;
220
    }
221
222
    /**
223
     * @param string|null $token
224
     *
225
     * @return null|MessageInterface
226
     */
227 21
    protected function restoreMessage($token)
228
    {
229 21
        if (is_null($token)) {
230 7
            return null;
231
        }
232
233 21
        $encoded = $this->getRedis()->hget($this->ns(self::NS_MESSAGE), $token);
234 21
        $message = Message::decode($encoded);
235
236 21
        return $message;
237
    }
238
239
    /**
240
     * @param string|MessageInterface $mixed
241
     *
242
     * @return $this
243
     * @throws QueueException
244
     */
245 3 View Code Duplication
    public function bury($mixed)
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
246
    {
247 3
        $payload = $this->payload($mixed);
248 3
        $this->checkMessageFlow($payload->getState(), 'bury');
249
250 2
        $result = $this->getRedis()->transaction(function ($tx) use ($payload) {
251
            /** @var $tx \Predis\Client */
252 2
            $tx->zrem($this->getKey($payload->getQueue(), self::STATE_RESERVED), $payload->getToken());
253 2
            $tx->zadd($this->getKey($payload->getQueue(), self::STATE_BURIED), time(), $payload->getToken());
254 2
            $tx->hset($this->ns(self::NS_MESSAGE_TO_STATE), $payload->getToken(), self::STATE_BURIED);
255 2
        });
256
257 2
        $this->checkTransactionResult($result, 'bury');
0 ignored issues
show
Bug introduced by
It seems like $result defined by $this->getRedis()->trans...self::STATE_BURIED); }) on line 250 can also be of type object<Predis\Transaction\MultiExec>; however, Simplario\Quedis\Queue::checkTransactionResult() does only seem to accept array, maybe add an additional type check?

If a method or function can return multiple different values and unless you are sure that you only can receive a single value in this context, we recommend to add an additional type check:

/**
 * @return array|string
 */
function returnsDifferentValues($x) {
    if ($x) {
        return 'foo';
    }

    return array();
}

$x = returnsDifferentValues($y);
if (is_array($x)) {
    // $x is an array.
}

If this a common case that PHP Analyzer should handle natively, please let us know by opening an issue.

Loading history...
258
259 2
        return $this;
260
    }
261
262
    /**
263
     * @param string|MessageInterface $mixed
264
     *
265
     * @return $this
266
     * @throws QueueException
267
     */
268 20
    public function delete($mixed)
269
    {
270 20
        $payload = $this->payload($mixed);
271 20
        $this->checkMessageFlow($payload->getState(), 'delete');
272
273 19
        $this->getRedis()->transaction(function ($tx) use ($payload) {
274
            /** @var $tx \Predis\Client */
275 19
            $tx->zrem($this->getKey($payload->getQueue(), self::STATE_RESERVED), $payload->getToken());
276 19
            $tx->zrem($this->getKey($payload->getQueue(), self::STATE_BURIED), $payload->getToken());
277 19
            $tx->zrem($this->getKey($payload->getQueue(), self::STATE_DELAYED), $payload->getToken()); // ?
278 19
            $tx->hdel($this->ns(self::NS_MESSAGE), $payload->getToken());
279 19
            $tx->hdel($this->ns(self::NS_MESSAGE_TO_QUEUE), $payload->getToken());
280 19
            $tx->hdel($this->ns(self::NS_MESSAGE_TO_STATE), $payload->getToken());
281 19
        });
282
283 19
        return $this;
284
    }
285
286
    /**
287
     * @param MessageInterface|string $mixed
288
     *
289
     * @return $this
290
     * @throws QueueException
291
     */
292 3 View Code Duplication
    public function kick($mixed)
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
293
    {
294 3
        $payload = $this->payload($mixed);
295 3
        $this->checkMessageFlow($payload->getState(), 'kick');
296
297 1
        $result = $this->getRedis()->transaction(function ($tx) use ($payload) {
298
            /** @var $tx \Predis\Client */
299 1
            $tx->zrem($this->getKey($payload->getQueue(), self::STATE_BURIED), $payload->getToken());
300 1
            $tx->rpush($this->getKey($payload->getQueue(), self::STATE_READY), $payload->getToken());
301 1
            $tx->hset($this->ns(self::NS_MESSAGE_TO_STATE), $payload->getToken(), self::STATE_READY);
302 1
        });
303
304 1
        $this->checkTransactionResult($result, 'kick');
0 ignored issues
show
Bug introduced by
It seems like $result defined by $this->getRedis()->trans... self::STATE_READY); }) on line 297 can also be of type object<Predis\Transaction\MultiExec>; however, Simplario\Quedis\Queue::checkTransactionResult() does only seem to accept array, maybe add an additional type check?

If a method or function can return multiple different values and unless you are sure that you only can receive a single value in this context, we recommend to add an additional type check:

/**
 * @return array|string
 */
function returnsDifferentValues($x) {
    if ($x) {
        return 'foo';
    }

    return array();
}

$x = returnsDifferentValues($y);
if (is_array($x)) {
    // $x is an array.
}

If this a common case that PHP Analyzer should handle natively, please let us know by opening an issue.

Loading history...
305
306 1
        return $this;
307
    }
308
309
    /**
310
     * @param MessageInterface|string $mixed
311
     * @param int $delay
312
     *
313
     * @return $this
314
     * @throws FlowException
315
     * @throws QueueException
316
     */
317 3
    public function release($mixed, $delay = 0)
318
    {
319 3
        $payload = $this->payload($mixed);
320 3
        $this->checkMessageFlow($payload->getState(), 'release');
321 2
        $delay = $this->parseDelay($delay);
322
323 2
        $result = $this->getRedis()->transaction(function ($tx) use ($payload, $delay) {
324
            /** @var $tx \Predis\Client */
325 2
            $tx->zrem($this->getKey($payload->getQueue(), self::STATE_RESERVED), $payload->getToken());
326 2
            $tx->hset($this->ns(self::NS_MESSAGE_TO_STATE), $payload->getToken(), self::STATE_READY);
327
328 2
            if ($delay == 0) {
329 1
                $tx->rpush($this->getKey($payload->getQueue(), self::STATE_READY), $payload->getToken());
330
            } else {
331 1
                $tx->zadd($this->getKey($payload->getQueue(), self::STATE_DELAYED), time() + $delay,
332 1
                    $payload->getToken());
333
            }
334 2
        });
335
336 2
        $this->checkTransactionResult($result, 'release');
0 ignored issues
show
Bug introduced by
It seems like $result defined by $this->getRedis()->trans...->getToken()); } }) on line 323 can also be of type object<Predis\Transaction\MultiExec>; however, Simplario\Quedis\Queue::checkTransactionResult() does only seem to accept array, maybe add an additional type check?

If a method or function can return multiple different values and unless you are sure that you only can receive a single value in this context, we recommend to add an additional type check:

/**
 * @return array|string
 */
function returnsDifferentValues($x) {
    if ($x) {
        return 'foo';
    }

    return array();
}

$x = returnsDifferentValues($y);
if (is_array($x)) {
    // $x is an array.
}

If this a common case that PHP Analyzer should handle natively, please let us know by opening an issue.

Loading history...
337
338 2
        return $this;
339
    }
340
341
    /**
342
     * @return array
343
     */
344 4
    public function getQueueList()
345
    {
346 4
        $redis = $this->getRedis();
347 4
        $queueList = $redis->hgetall($this->ns(self::NS_MESSAGE_TO_QUEUE));
348 4
        $queueList = array_values($queueList);
349 4
        $queueList = array_unique($queueList);
350
351 4
        return $queueList;
352
    }
353
354
    /**
355
     * @param null|string $queue
356
     *
357
     * @return $this
358
     */
359 22
    public function migrate($queue = null)
360
    {
361 22
        if (null === $queue) {
362 1
            $queueList = $this->getQueueList();
363 1
            foreach ($queueList as $queue) {
364 1
                $this->migrate($queue);
365
            }
366
367 1
            return $this;
368
        }
369
370 22
        $keyReady = $this->getKey($queue, self::STATE_READY);
371 22
        $keyDelayed = $this->getKey($queue, self::STATE_DELAYED);
372
373 22
        $this->getRedis()->transaction(['cas' => true, 'watch' => [$keyReady, $keyDelayed], 'retry' => 10],
374 22
            function ($tx) use ($queue, $keyReady, $keyDelayed) {
375
376
                /** @var $tx \Predis\Client */
377 22
                $time = time();
378
379
                // get expired jobs from "delayed queue"
380 22
                $messageTokenSet = $tx->zrangebyscore($keyDelayed, '-inf', $time);
381
382 22
                if (count($messageTokenSet) > 0) {
383
                    // remove jobs from "delayed queue"
384 3
                    $tx->multi();
385 3
                    $tx->zremrangebyscore($keyDelayed, '-inf', $time);
386 3
                    foreach ($messageTokenSet as $token) {
387 3
                        $tx->rpush($keyReady, $token);
388 3
                        $tx->hset($this->ns(self::NS_MESSAGE_TO_STATE), $token, self::STATE_READY);
389
                    }
390
                }
391 22
            });
392
393 22
        return $this;
394
    }
395
396
397
    /**
398
     * @param string $queue
399
     * @param array  $options
400
     *
401
     * @return Iterator
402
     */
403 1
    public function iterator($queue, array $options = [])
404
    {
405 1
        $options['queue'] = $queue;
406
407 1
        return new Iterator($this, $options);
408
    }
409
410
    /**
411
     * @param string $queue
412
     *
413
     * @return $this
414
     */
415 1
    public function start($queue)
416
    {
417 1
        $this->getRedis()->hdel($this->ns(self::NS_QUEUE_STOP), [$queue]);
418
419 1
        return $this;
420
    }
421
422
    /**
423
     * @param string $queue
424
     *
425
     * @return $this
426
     */
427 2
    public function stop($queue)
428
    {
429 2
        $this->getRedis()->hset($this->ns(self::NS_QUEUE_STOP), $queue, true);
430
431 2
        return $this;
432
    }
433
434
    /**
435
     * @param string $queue
436
     *
437
     * @return bool
438
     */
439 25
    public function isStop($queue)
440
    {
441 25
        return (bool)$this->getRedis()->hexists($this->ns(self::NS_QUEUE_STOP), $queue);
442
    }
443
444
    /**
445
     * @param string $queue
446
     *
447
     * @return array
448
     */
449 4
    protected function queueStats($queue)
450
    {
451
        $result = [
452 4
            self::STATS_MESSAGE_READY    => $this->size($queue, self::STATE_READY),
453 4
            self::STATS_MESSAGE_RESERVED => $this->size($queue, self::STATE_RESERVED),
454 4
            self::STATS_MESSAGE_DELAYED  => $this->size($queue, self::STATE_DELAYED),
455 4
            self::STATS_MESSAGE_BURIED   => $this->size($queue, self::STATE_BURIED),
456
        ];
457
458 4
        $result[self::STATS_MESSAGE_TOTAL] = array_sum($result);
459 4
        $result[self::STATS_QUEUE_STOP] = $this->isStop($queue);
460
461 4
        return $result;
462
    }
463
464
    /**
465
     * @param null|string $queue
466
     *
467
     * @return array
468
     */
469 6
    public function stats($queue = null)
470
    {
471 6
        if ($queue !== null) {
472 3
            return $this->queueStats($queue);
473
        }
474
475
        $result = [
476 4
            self::STATS_QUEUES_LIST      => [],
477 4
            self::STATS_MESSAGE_TOTAL    => 0,
478 4
            self::STATS_MESSAGE_READY    => 0,
479 4
            self::STATS_MESSAGE_RESERVED => 0,
480 4
            self::STATS_MESSAGE_DELAYED  => 0,
481 4
            self::STATS_MESSAGE_BURIED   => 0,
482
        ];
483
484 4
        $queueList = $this->getQueueList();
485
486 4
        if (count($queueList) == 0) {
487 4
            return $result;
488
        }
489
490 1
        $result[self::STATS_QUEUES_LIST] = $queueList;
491
492 1
        foreach ($queueList as $queue) {
493 1
            $itemStats = $this->queueStats($queue);
494 1
            $result[self::STATS_MESSAGE_READY] += $itemStats[self::STATS_MESSAGE_READY];
495 1
            $result[self::STATS_MESSAGE_RESERVED] += $itemStats[self::STATS_MESSAGE_RESERVED];
496 1
            $result[self::STATS_MESSAGE_DELAYED] += $itemStats[self::STATS_MESSAGE_DELAYED];
497 1
            $result[self::STATS_MESSAGE_BURIED] += $itemStats[self::STATS_MESSAGE_BURIED];
498 1
            $result[self::STATS_MESSAGE_TOTAL] += $itemStats[self::STATS_MESSAGE_TOTAL];
499 1
            $result[self::STATS_QUEUES][$queue] = $itemStats;
500
        }
501
502 1
        return $result;
503
    }
504
505
    /**
506
     * @param string $queue
507
     * @param string $state
508
     *
509
     * @return int|string
510
     * @throws QueueException
511
     */
512 5
    public function size($queue, $state = self::STATE_READY)
513
    {
514 5
        if (!in_array($state, [self::STATE_READY, self::STATE_DELAYED, self::STATE_BURIED, self::STATE_RESERVED])) {
515 1
            throw new QueueException('Unsupported state for size calculation.');
516
        }
517
518 4
        $redis = $this->getRedis();
519
520 4
        return ($state === self::STATE_READY)
521 4
            ? $redis->llen($this->getKey($queue, self::STATE_READY))
522 4
            : $redis->zcount($this->getKey($queue, $state), '-inf', '+inf');
523
    }
524
525
526
    /**
527
     * @param null|string $queue
528
     *
529
     * @return $this
530
     */
531 35
    public function clean($queue = null)
532
    {
533 35
        $redis = $this->getRedis();
534
535 35
        $keys = $redis->keys($this->namespace . '*');
536
537 35
        if (count($keys) == 0) {
538 24
            return $this;
539
        }
540
541 11
        foreach ($keys as $index => $name) {
542 11
            $redis->del($name);
543
        }
544
545 11
        return $this;
546
    }
547
548
    /**
549
     * @param mixed $mixed
550
     *
551
     * @return MessageInterface
552
     */
553 28
    protected function createMessage($mixed)
554
    {
555 28
        if ($mixed instanceof MessageInterface) {
556 1
            return $mixed;
557
        }
558
559 28
        return new Message($mixed);
560
    }
561
562
    /**
563
     * @param string $type
564
     *
565
     * @return string
566
     */
567 29
    protected function ns($type)
568
    {
569 29
        return implode(':', [$this->namespace, $type]);
570
    }
571
572
573
    /**
574
     * @param string $queue
575
     * @param string $state
576
     *
577
     * @return string
578
     */
579 28
    protected function getKey($queue, $state)
580
    {
581 28
        return implode(':', [$this->ns(self::NS_QUEUE), $queue, $state]);
582
    }
583
584
    /**
585
     * @param int|\DateTime $mixed
586
     *
587
     * @return int
588
     */
589 28
    protected function parseDelay($mixed)
590
    {
591 28
        if ($mixed instanceof \DateTime) {
592 1
            $delay = $mixed->getTimestamp() - time();
593
        } else {
594 28
            $delay = (int)$mixed;
595
        }
596
597 28
        return $delay < 0 ? 0 : $delay;
598
    }
599
600
    /**
601
     * @param string $currentState
602
     * @param string $action
603
     *
604
     * @return $this
605
     * @throws FlowException
606
     */
607 25
    protected function checkMessageFlow($currentState, $action)
608
    {
609
        $mapping = [
610 25
            'bury'    => [self::STATE_RESERVED],
611 25
            'delete'  => [self::STATE_RESERVED, self::STATE_BURIED, self::STATE_DELAYED],
612 25
            'kick'    => [self::STATE_BURIED],
613 25
            'release' => [self::STATE_RESERVED],
614
        ];
615
616 25
        if (!in_array($currentState, $mapping[$action], true)) {
617 5
            throw new FlowException("Flow error, the message state '{$currentState}' cannot be '{$action}'.");
618
        }
619
620 20
        return $this;
621
    }
622
623
    /**
624
     * @param array  $result
625
     * @param string $action
626
     *
627
     * @return $this
628
     * @throws QueueException
629
     */
630 29
    protected function checkTransactionResult($result, $action)
631
    {
632 29
        if (in_array(false, $result, true)) {
633 1
            throw new QueueException("Transaction '{$action}' error.");
634
        }
635
636 29
        return $this;
637
    }
638
639
    /**
640
     * @param string|MessageInterface $mixed
641
     *
642
     * @return Payload
643
     * @throws QueueException
644
     */
645 25
    protected function payload($mixed)
646
    {
647 25
        $redis = $this->getRedis();
648 25
        $token = $mixed instanceof MessageInterface ? $mixed->getToken() : $mixed;
649 25
        $queue = $redis->hget($this->ns(self::NS_MESSAGE_TO_QUEUE), $token);
650 25
        $state = $redis->hget($this->ns(self::NS_MESSAGE_TO_STATE), $token);
651
652 25
        return new Payload($token, $queue, $state);
653
    }
654
655
}
656