Queue::release()   A
last analyzed

Complexity

Conditions 2
Paths 1

Size

Total Lines 23
Code Lines 14

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 14
CRAP Score 2

Importance

Changes 0
Metric Value
dl 0
loc 23
ccs 14
cts 14
cp 1
rs 9.0856
c 0
b 0
f 0
cc 2
eloc 14
nc 1
nop 2
crap 2
1
<?php
2
3
namespace Simplario\Quedis;
4
5
use Simplario\Quedis\Exceptions\FlowException;
6
use Simplario\Quedis\Exceptions\QueueException;
7
use Simplario\Quedis\Interfaces\MessageInterface;
8
use Simplario\Quedis\Interfaces\QueueInterface;
9
10
/**
11
 * Class Queue
12
 *
13
 * @package Simplario\Quedis
14
 *
15
 *
16
 *
17
 *   Message flows (like in the Beanstalk: http://beanstalkc.readthedocs.io/en/latest/tutorial.html )
18
 *   ------------------------------------------------------------------------------------------------
19
 *
20
 *   1)   put            pop
21
 *       -----> [READY] --------> *poof*
22
 *
23
 *
24
 *   2)   put            reserve               delete
25
 *       -----> [READY] ---------> [RESERVED] --------> *poof*
26
 *
27
 *
28
 *   3)   put with delay               release with delay
29
 *       ----------------> [DELAYED] <------------.
30
 *                             |                   |
31
 *                             | (time passes)     |
32
 *                             |                   |
33
 *        put                  v     reserve       |       delete
34
 *       -----------------> [READY] ---------> [RESERVED] --------> *poof*
35
 *                            ^  ^                |  |
36
 *                            |   \  release      |  |
37
 *                            |    ``-------------'   |
38
 *                            |                      |
39
 *                            | kick                 |
40
 *                            |                      |
41
 *                            |       bury           |
42
 *                         [BURIED] <---------------'
43
 *                            |
44
 *                            |  delete
45
 *                             ``--------> *poof*
46
 *
47
 */
48
class Queue implements QueueInterface
49
{
50
51
    const PRIORITY_HIGH = 'high';
52
    const PRIORITY_LOW = 'low';
53
54
    const NS_QUEUE = 'queue';
55
    const NS_QUEUE_STOP = 'queue2stop';
56
    const NS_MESSAGE = 'message';
57
    const NS_MESSAGE_TO_QUEUE = 'message2queue';
58
    const NS_MESSAGE_TO_STATE = 'message2state';
59
60
    // State
61
    const STATE_READY = 'ready';
62
    const STATE_DELAYED = 'delayed';
63
    const STATE_RESERVED = 'reserved';
64
    const STATE_BURIED = 'buried';
65
66
    // Stats
67
    const STATS_QUEUES_LIST = 'queues-list';
68
    const STATS_QUEUES = 'queues';
69
    const STATS_MESSAGE_TOTAL = 'total';
70
    const STATS_MESSAGE_READY = 'ready';
71
    const STATS_MESSAGE_RESERVED = 'reserved';
72
    const STATS_MESSAGE_DELAYED = 'delayed';
73
    const STATS_MESSAGE_BURIED = 'buried';
74
    const STATS_QUEUE_STOP = 'stop';
75
76
    /**
77
     * @var \Predis\Client
78
     */
79
    protected $redis;
80
81
    /**
82
     * @var string
83
     */
84
    protected $namespace;
85
86
    /**
87
     * Queue constructor.
88
     *
89
     * @param mixed  $redis
90
     * @param string $namespace
91
     */
92 33
    public function __construct($redis, $namespace = 'Quedis')
93
    {
94 33
        $this->redis = $redis;
95 33
        $this->namespace = $namespace;
96 33
    }
97
98
    /**
99
     * @return \Predis\Client
100
     */
101 33
    public function getRedis()
102
    {
103 33
        return $this->redis;
104
    }
105
106
    /**
107
     * @return string
108
     */
109 1
    public function getNamespace()
110
    {
111 1
        return $this->namespace;
112
    }
113
114
    /**
115
     * @param string $queue
116
     * @param mixed  $data
117
     * @param int    $delay
118
     * @param string $priority
119
     *
120
     * @return MessageInterface
121
     * @throws \Exception
122
     */
123 26
    public function put($queue, $data, $delay = 0, $priority = self::PRIORITY_LOW)
124
    {
125 26
        $message = $this->createMessage($data);
126 26
        $delay = $this->parseDelay($delay);
127
128 26
        $result = $this->getRedis()->transaction(function ($tx) use ($queue, $message, $delay, $priority) {
129
            /** @var $tx \Predis\Client */
130
131 26
            $state = $delay == 0 ? self::STATE_READY : self::STATE_DELAYED;
132
133 26
            $tx->hset($this->ns(self::NS_MESSAGE), $message->getToken(), $message->encode());
134 26
            $tx->hset($this->ns(self::NS_MESSAGE_TO_QUEUE), $message->getToken(), $queue);
135 26
            $tx->hset($this->ns(self::NS_MESSAGE_TO_STATE), $message->getToken(), $state);
136
137 26
            if ($state === self::STATE_READY) {
138 22
                if (self::PRIORITY_HIGH === $priority) {
139 1
                    $tx->lpush($this->getKey($queue, self::STATE_READY), $message->getToken());
140
                } else {
141 22
                    $tx->rpush($this->getKey($queue, self::STATE_READY), $message->getToken());
142
                }
143
            } else {
144 6
                $tx->zadd($this->getKey($queue, self::STATE_DELAYED), time() + $delay, $message->getToken());
145
            }
146 26
        });
147
148 26
        $this->checkTransactionResult($result, 'put');
149
150 26
        return $message;
151
    }
152
153
    /**
154
     * @param string        $queue
155
     * @param int           $timeout
156
     *
157
     * @return mixed|null|Queue
158
     */
159 9
    public function pop($queue, $timeout = 0)
160
    {
161 9
        $message = $this->reserve($queue, $timeout);
162
163 9
        if ($message !== null) {
164 9
            $this->delete($message);
165
        }
166
167 9
        return $message;
168
    }
169
170
    /**
171
     * @param string        $queue
172
     * @param int           $timeout
173
     *
174
     * @return MessageInterface|null
175
     */
176 20
    public function reserve($queue, $timeout = 0)
177
    {
178 20
        if ($this->isStop($queue)) {
179 1
            return null;
180
        }
181
182 19
        $this->migrate($queue);
183
184 19
        $token = $this->reserveToken($queue, $timeout);
185 19
        $message = $this->restoreMessage($token);
186
187 19
        if($message === null){
188 6
            return null;
189
        }
190
191 19
        $this->getRedis()->transaction(function ($tx) use ($queue, $token) {
192
            /** @var $tx \Predis\Client */
193 19
            $tx->zadd($this->getKey($queue, self::STATE_RESERVED), time(), $token);
194 19
            $tx->hset($this->ns(self::NS_MESSAGE_TO_STATE), $token, self::STATE_RESERVED);
195 19
        });
196
197 19
        return $message;
198
    }
199
200
    /**
201
     * @param string $queue
202
     * @param int    $timeout
203
     *
204
     * @return null|string
205
     */
206 19
    protected function reserveToken($queue, $timeout = 0)
207
    {
208
209 19
        $redis = $this->getRedis();
210
211 19
        if ($timeout > 0) {
212
            // example return [ '0' => queue name , '1' => jobId ]
213 2
            $token = $redis->blpop([$this->getKey($queue, self::STATE_READY)], $timeout);
214 2
            $token = (is_array($token)) ? $token[1] : null;
215
        } else {
216 17
            $token = $redis->lpop($this->getKey($queue, self::STATE_READY));
217
        }
218
219 19
        return $token;
220
    }
221
222
    /**
223
     * @param string|null $token
224
     *
225
     * @return null|MessageInterface
226
     */
227 19
    protected function restoreMessage($token)
228
    {
229 19
        if (is_null($token)) {
230 6
            return null;
231
        }
232
233 19
        $encoded = $this->getRedis()->hget($this->ns(self::NS_MESSAGE), $token);
234 19
        $message = Message::decode($encoded);
235
236 19
        return $message;
237
    }
238
239
    /**
240
     * @param string|MessageInterface $mixed
241
     *
242
     * @return $this
243
     * @throws QueueException
244
     */
245 18
    public function delete($mixed)
246
    {
247 18
        $payload = $this->payload($mixed);
248 18
        $this->checkMessageFlow($payload->getState(), 'delete');
249
250 17
        $this->getRedis()->transaction(function ($tx) use ($payload) {
251
            /** @var $tx \Predis\Client */
252 17
            $tx->zrem($this->getKey($payload->getQueue(), self::STATE_RESERVED), $payload->getToken());
253 17
            $tx->zrem($this->getKey($payload->getQueue(), self::STATE_BURIED), $payload->getToken());
254 17
            $tx->zrem($this->getKey($payload->getQueue(), self::STATE_DELAYED), $payload->getToken()); // ?
255 17
            $tx->hdel($this->ns(self::NS_MESSAGE), $payload->getToken());
256 17
            $tx->hdel($this->ns(self::NS_MESSAGE_TO_QUEUE), $payload->getToken());
257 17
            $tx->hdel($this->ns(self::NS_MESSAGE_TO_STATE), $payload->getToken());
258 17
        });
259
260 17
        return $this;
261
    }
262
263
    /**
264
     * @param MessageInterface|string $mixed
265
     * @param int $delay
266
     *
267
     * @return $this
268
     * @throws FlowException
269
     * @throws QueueException
270
     */
271 3
    public function release($mixed, $delay = 0)
272
    {
273 3
        $payload = $this->payload($mixed);
274 3
        $this->checkMessageFlow($payload->getState(), 'release');
275 2
        $delay = $this->parseDelay($delay);
276
277 2
        $result = $this->getRedis()->transaction(function ($tx) use ($payload, $delay) {
278
            /** @var $tx \Predis\Client */
279 2
            $tx->zrem($this->getKey($payload->getQueue(), self::STATE_RESERVED), $payload->getToken());
280 2
            $tx->hset($this->ns(self::NS_MESSAGE_TO_STATE), $payload->getToken(), self::STATE_READY);
281
282 2
            if ($delay == 0) {
283 1
                $tx->rpush($this->getKey($payload->getQueue(), self::STATE_READY), $payload->getToken());
284
            } else {
285 1
                $tx->zadd($this->getKey($payload->getQueue(), self::STATE_DELAYED), time() + $delay,
286 1
                    $payload->getToken());
287
            }
288 2
        });
289
290 2
        $this->checkTransactionResult($result, 'release');
291
292 2
        return $this;
293
    }
294
295
    /**
296
     * @param MessageInterface|string $mixed
297
     *
298
     * @return $this
299
     * @throws QueueException
300
     */
301 3
    public function kick($mixed)
302
    {
303 3
        return $this->moveMessage($mixed, 'kick');
304
    }
305
306
    /**
307
     * @param string|MessageInterface $mixed
308
     *
309
     * @return $this
310
     * @throws QueueException
311
     */
312 3
    public function bury($mixed)
313
    {
314 3
        return $this->moveMessage($mixed, 'bury');
315
    }
316
317
    /**
318
     * @param MessageInterface|string $mixed
319
     * @param string                  $moveTo
320
     *
321
     * @return $this
322
     * @throws QueueException
323
     */
324 5
    protected function moveMessage($mixed, $moveTo)
325
    {
326 5
        $payload = $this->payload($mixed);
327
328 5
        $this->checkMessageFlow($payload->getState(), $moveTo);
329
330 2
        $result = $this->getRedis()->transaction(function ($tx) use ($payload, $moveTo) {
331
            /** @var $tx \Predis\Client */
332 2
            if ($moveTo === 'bury') {
333 2
                $tx->zrem($this->getKey($payload->getQueue(), self::STATE_RESERVED), $payload->getToken());
334 2
                $tx->zadd($this->getKey($payload->getQueue(), self::STATE_BURIED), time(), $payload->getToken());
335 2
                $tx->hset($this->ns(self::NS_MESSAGE_TO_STATE), $payload->getToken(), self::STATE_BURIED);
336 1
            } elseif ($moveTo === 'kick') {
337 1
                $tx->zrem($this->getKey($payload->getQueue(), self::STATE_BURIED), $payload->getToken());
338 1
                $tx->rpush($this->getKey($payload->getQueue(), self::STATE_READY), $payload->getToken());
339 1
                $tx->hset($this->ns(self::NS_MESSAGE_TO_STATE), $payload->getToken(), self::STATE_READY);
340
            }
341 2
        });
342
343 2
        $this->checkTransactionResult($result, $moveTo);
344
345 2
        return $this;
346
    }
347
348
    /**
349
     * @return array
350
     */
351 4
    public function getQueueList()
352
    {
353 4
        $redis = $this->getRedis();
354 4
        $queueList = $redis->hgetall($this->ns(self::NS_MESSAGE_TO_QUEUE));
355 4
        $queueList = array_values($queueList);
356 4
        $queueList = array_unique($queueList);
357
358 4
        return $queueList;
359
    }
360
361
    /**
362
     * @param null|string $queue
363
     *
364
     * @return $this
365
     */
366 20
    public function migrate($queue = null)
367
    {
368 20
        if (null === $queue) {
369 1
            $queueList = $this->getQueueList();
370 1
            foreach ($queueList as $queue) {
371 1
                $this->migrate($queue);
372
            }
373
374 1
            return $this;
375
        }
376
377 20
        $keyReady = $this->getKey($queue, self::STATE_READY);
378 20
        $keyDelayed = $this->getKey($queue, self::STATE_DELAYED);
379
380 20
        $this->getRedis()->transaction(['cas' => true, 'watch' => [$keyReady, $keyDelayed], 'retry' => 10],
381 20
            function ($tx) use ($queue, $keyReady, $keyDelayed) {
382
383
                /** @var $tx \Predis\Client */
384 20
                $time = time();
385
386
                // get expired jobs from "delayed queue"
387 20
                $messageTokenSet = $tx->zrangebyscore($keyDelayed, '-inf', $time);
388
389 20
                if (count($messageTokenSet) > 0) {
390
                    // remove jobs from "delayed queue"
391 3
                    $tx->multi();
392 3
                    $tx->zremrangebyscore($keyDelayed, '-inf', $time);
393 3
                    foreach ($messageTokenSet as $token) {
394 3
                        $tx->rpush($keyReady, $token);
395 3
                        $tx->hset($this->ns(self::NS_MESSAGE_TO_STATE), $token, self::STATE_READY);
396
                    }
397
                }
398 20
            });
399
400 20
        return $this;
401
    }
402
403
404
    /**
405
     * @param string $queue
406
     * @param string $strategy
407
     * @param int    $timeout
408
     *
409
     * @return Iterator
410
     */
411 1
    public function iterator($queue, $strategy = 'pop', $timeout = 0)
412
    {
413 1
        return new Iterator($this, $queue, $strategy, $timeout);
414
    }
415
416
    /**
417
     * @param string $queue
418
     *
419
     * @return $this
420
     */
421 1
    public function start($queue)
422
    {
423 1
        $this->getRedis()->hdel($this->ns(self::NS_QUEUE_STOP), [$queue]);
424
425 1
        return $this;
426
    }
427
428
    /**
429
     * @param string $queue
430
     *
431
     * @return $this
432
     */
433 2
    public function stop($queue)
434
    {
435 2
        $this->getRedis()->hset($this->ns(self::NS_QUEUE_STOP), $queue, true);
436
437 2
        return $this;
438
    }
439
440
    /**
441
     * @param string $queue
442
     *
443
     * @return bool
444
     */
445 23
    public function isStop($queue)
446
    {
447 23
        return (bool)$this->getRedis()->hexists($this->ns(self::NS_QUEUE_STOP), $queue);
448
    }
449
450
    /**
451
     * @param string $queue
452
     *
453
     * @return array
454
     */
455 4
    protected function queueStats($queue)
456
    {
457
        $result = [
458 4
            self::STATS_MESSAGE_READY    => $this->size($queue, self::STATE_READY),
459 4
            self::STATS_MESSAGE_RESERVED => $this->size($queue, self::STATE_RESERVED),
460 4
            self::STATS_MESSAGE_DELAYED  => $this->size($queue, self::STATE_DELAYED),
461 4
            self::STATS_MESSAGE_BURIED   => $this->size($queue, self::STATE_BURIED),
462
        ];
463
464 4
        $result[self::STATS_MESSAGE_TOTAL] = array_sum($result);
465 4
        $result[self::STATS_QUEUE_STOP] = $this->isStop($queue);
466
467 4
        return $result;
468
    }
469
470
    /**
471
     * @param null|string $queue
472
     *
473
     * @return array
474
     */
475 6
    public function stats($queue = null)
476
    {
477 6
        if ($queue !== null) {
478 3
            return $this->queueStats($queue);
479
        }
480
481
        $result = [
482 4
            self::STATS_QUEUES_LIST      => [],
483 4
            self::STATS_MESSAGE_TOTAL    => 0,
484 4
            self::STATS_MESSAGE_READY    => 0,
485 4
            self::STATS_MESSAGE_RESERVED => 0,
486 4
            self::STATS_MESSAGE_DELAYED  => 0,
487 4
            self::STATS_MESSAGE_BURIED   => 0,
488
        ];
489
490 4
        $queueList = $this->getQueueList();
491
492 4
        if (count($queueList) == 0) {
493 4
            return $result;
494
        }
495
496 1
        $result[self::STATS_QUEUES_LIST] = $queueList;
497
498 1
        foreach ($queueList as $queue) {
499 1
            $itemStats = $this->queueStats($queue);
500 1
            $result[self::STATS_MESSAGE_READY] += $itemStats[self::STATS_MESSAGE_READY];
501 1
            $result[self::STATS_MESSAGE_RESERVED] += $itemStats[self::STATS_MESSAGE_RESERVED];
502 1
            $result[self::STATS_MESSAGE_DELAYED] += $itemStats[self::STATS_MESSAGE_DELAYED];
503 1
            $result[self::STATS_MESSAGE_BURIED] += $itemStats[self::STATS_MESSAGE_BURIED];
504 1
            $result[self::STATS_MESSAGE_TOTAL] += $itemStats[self::STATS_MESSAGE_TOTAL];
505 1
            $result[self::STATS_QUEUES][$queue] = $itemStats;
506
        }
507
508 1
        return $result;
509
    }
510
511
    /**
512
     * @param string $queue
513
     * @param string $state
514
     *
515
     * @return int|string
516
     * @throws QueueException
517
     */
518 5
    public function size($queue, $state = self::STATE_READY)
519
    {
520 5
        if (!in_array($state, [self::STATE_READY, self::STATE_DELAYED, self::STATE_BURIED, self::STATE_RESERVED])) {
521 1
            throw new QueueException('Unsupported state for size calculation.');
522
        }
523
524 4
        $redis = $this->getRedis();
525
526 4
        return ($state === self::STATE_READY)
527 4
            ? $redis->llen($this->getKey($queue, self::STATE_READY))
528 4
            : $redis->zcount($this->getKey($queue, $state), '-inf', '+inf');
529
    }
530
531
532
    /**
533
     * @param null|string $queue
534
     *
535
     * @return $this
536
     */
537 33
    public function clean($queue = null)
538
    {
539 33
        $redis = $this->getRedis();
540
541 33
        $keys = $redis->keys($this->namespace . '*');
542
543 33
        if (count($keys) == 0) {
544 23
            return $this;
545
        }
546
547 10
        foreach ($keys as $index => $name) {
548 10
            $redis->del($name);
549
        }
550
551 10
        return $this;
552
    }
553
554
    /**
555
     * @param mixed $mixed
556
     *
557
     * @return MessageInterface
558
     */
559 26
    protected function createMessage($mixed)
560
    {
561 26
        if ($mixed instanceof MessageInterface) {
562 1
            return $mixed;
563
        }
564
565 26
        return new Message($mixed);
566
    }
567
568
    /**
569
     * @param string $type
570
     *
571
     * @return string
572
     */
573 27
    protected function ns($type)
574
    {
575 27
        return implode(':', [$this->namespace, $type]);
576
    }
577
578
579
    /**
580
     * @param string $queue
581
     * @param string $state
582
     *
583
     * @return string
584
     */
585 26
    protected function getKey($queue, $state)
586
    {
587 26
        return implode(':', [$this->ns(self::NS_QUEUE), $queue, $state]);
588
    }
589
590
    /**
591
     * @param int|\DateTime $mixed
592
     *
593
     * @return int
594
     */
595 26
    protected function parseDelay($mixed)
596
    {
597 26
        if ($mixed instanceof \DateTime) {
598 1
            $delay = $mixed->getTimestamp() - time();
599
        } else {
600 26
            $delay = (int)$mixed;
601
        }
602
603 26
        return $delay < 0 ? 0 : $delay;
604
    }
605
606
    /**
607
     * @param string $currentState
608
     * @param string $action
609
     *
610
     * @return $this
611
     * @throws FlowException
612
     */
613 23
    protected function checkMessageFlow($currentState, $action)
614
    {
615
        $mapping = [
616 23
            'bury'    => [self::STATE_RESERVED],
617 23
            'delete'  => [self::STATE_RESERVED, self::STATE_BURIED, self::STATE_DELAYED],
618 23
            'kick'    => [self::STATE_BURIED],
619 23
            'release' => [self::STATE_RESERVED],
620
        ];
621
622 23
        if (!in_array($currentState, $mapping[$action], true)) {
623 5
            throw new FlowException("Flow error, the message state '{$currentState}' cannot be '{$action}'.");
624
        }
625
626 18
        return $this;
627
    }
628
629
    /**
630
     * @param mixed  $result
631
     * @param string $action
632
     *
633
     * @return $this
634
     * @throws QueueException
635
     */
636 27
    protected function checkTransactionResult($result, $action)
637
    {
638 27
        if (in_array(false, (array) $result, true)) {
639 1
            throw new QueueException("Transaction '{$action}' error.");
640
        }
641
642 27
        return $this;
643
    }
644
645
    /**
646
     * @param string|MessageInterface $mixed
647
     *
648
     * @return Payload
649
     * @throws QueueException
650
     */
651 23
    protected function payload($mixed)
652
    {
653 23
        $redis = $this->getRedis();
654 23
        $token = $mixed instanceof MessageInterface ? $mixed->getToken() : $mixed;
655 23
        $queue = $redis->hget($this->ns(self::NS_MESSAGE_TO_QUEUE), $token);
656 23
        $state = $redis->hget($this->ns(self::NS_MESSAGE_TO_STATE), $token);
657
658 23
        return new Payload($token, $queue, $state);
659
    }
660
661
}
662