Completed
Push — dev ( 1fb22a...d32538 )
by Vlad
01:20
created

Queue::iterator()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 6
Code Lines 3

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 3
CRAP Score 1

Importance

Changes 0
Metric Value
dl 0
loc 6
c 0
b 0
f 0
ccs 3
cts 3
cp 1
rs 9.4285
cc 1
eloc 3
nc 1
nop 2
crap 1
1
<?php
2
3
namespace Simplario\Quedis;
4
5
use Simplario\Quedis\Exceptions\FlowException;
6
use Simplario\Quedis\Exceptions\QueueException;
7
use Simplario\Quedis\Interfaces\MessageInterface;
8
use Simplario\Quedis\Interfaces\QueueInterface;
9
10
/**
11
 * Class Queue
12
 *
13
 * @package Simplario\Quedis
14
 *
15
 *
16
 *
17
 *   Message flows (like in the Beanstalk: http://beanstalkc.readthedocs.io/en/latest/tutorial.html )
18
 *   ------------------------------------------------------------------------------------------------
19
 *
20
 *   1)   put            pop
21
 *       -----> [READY] --------> *poof*
22
 *
23
 *
24
 *   2)   put            reserve               delete
25
 *       -----> [READY] ---------> [RESERVED] --------> *poof*
26
 *
27
 *
28
 *   3)   put with delay               release with delay
29
 *       ----------------> [DELAYED] <------------.
30
 *                             |                   |
31
 *                             | (time passes)     |
32
 *                             |                   |
33
 *        put                  v     reserve       |       delete
34
 *       -----------------> [READY] ---------> [RESERVED] --------> *poof*
35
 *                            ^  ^                |  |
36
 *                            |   \  release      |  |
37
 *                            |    ``-------------'   |
38
 *                            |                      |
39
 *                            | kick                 |
40
 *                            |                      |
41
 *                            |       bury           |
42
 *                         [BURIED] <---------------'
43
 *                            |
44
 *                            |  delete
45
 *                             ``--------> *poof*
46
 *
47
 */
48
class Queue implements QueueInterface
49
{
50
51
    const PRIORITY_HIGH = 'high';
52
    const PRIORITY_LOW = 'low';
53
54
    const NS_QUEUE = 'queue';
55
    const NS_QUEUE_STOP = 'queue2stop';
56
    const NS_MESSAGE = 'message';
57
    const NS_MESSAGE_TO_QUEUE = 'message2queue';
58
    const NS_MESSAGE_TO_STATE = 'message2state';
59
60
    // State
61
    const STATE_READY = 'ready';
62
    const STATE_DELAYED = 'delayed';
63
    const STATE_RESERVED = 'reserved';
64
    const STATE_BURIED = 'buried';
65
66
    // Stats
67
    const STATS_QUEUES_LIST = 'queues';
68
    const STATS_QUEUES = 'queues';
69
    const STATS_MESSAGE_TOTAL = 'total';
70
    const STATS_MESSAGE_READY = 'ready';
71
    const STATS_MESSAGE_RESERVED = 'reserved';
72
    const STATS_MESSAGE_DELAYED = 'delayed';
73
    const STATS_MESSAGE_BURIED = 'buried';
74
    const STATS_QUEUE_STOP = 'stop';
75
76
    /**
77
     * @var \Predis\Client
78
     */
79
    protected $redis;
80
81
    /**
82
     * @var string
83
     */
84
    protected $namespace;
85
86
    /**
87
     * Queue constructor.
88
     *
89
     * @param mixed  $redis
90
     * @param string $namespace
91
     */
92 33
    public function __construct($redis, $namespace = 'Quedis')
93
    {
94 33
        $this->redis = $redis;
95 33
        $this->namespace = $namespace;
96 33
    }
97
98
    /**
99
     * @return \Predis\Client
100
     */
101 33
    public function getRedis()
102
    {
103 33
        return $this->redis;
104
    }
105
106
    /**
107
     * @return string
108
     */
109 1
    public function getNamespace()
110
    {
111 1
        return $this->namespace;
112
    }
113
114
    /**
115
     * @param string $queue
116
     * @param mixed  $data
117
     * @param int    $delay
118
     * @param string $priority
119
     *
120
     * @return MessageInterface
121
     * @throws \Exception
122
     */
123 27
    public function put($queue, $data, $delay = 0, $priority = self::PRIORITY_LOW)
124
    {
125 27
        $message = $this->createMessage($data);
126 27
        $delay = $this->parseDelay($delay);
127
128 27
        $result = $this->getRedis()->transaction(function ($tx) use ($queue, $message, $delay, $priority) {
129
            /** @var $tx \Predis\Client */
130
131 27
            $state = $delay == 0 ? self::STATE_READY : self::STATE_DELAYED;
132
133 27
            $tx->hset($this->ns(self::NS_MESSAGE), $message->getToken(), $message->encode());
134 27
            $tx->hset($this->ns(self::NS_MESSAGE_TO_QUEUE), $message->getToken(), $queue);
135 27
            $tx->hset($this->ns(self::NS_MESSAGE_TO_STATE), $message->getToken(), $state);
136
137 27
            if ($state === self::STATE_READY) {
138 23
                if (self::PRIORITY_HIGH === $priority) {
139 1
                    $tx->lpush($this->getKey($queue, self::STATE_READY), $message->getToken());
140
                } else {
141 23
                    $tx->rpush($this->getKey($queue, self::STATE_READY), $message->getToken());
142
                }
143
            } else {
144 6
                $tx->zadd($this->getKey($queue, self::STATE_DELAYED), time() + $delay, $message->getToken());
145
            }
146 27
        });
147
148 27
        if (in_array(false, $result, true)) {
149
            throw new QueueException('Can not put message to queue');
150
        }
151
152 27
        return $message;
153
    }
154
155
    /**
156
     * @param string        $queue
157
     * @param int           $timeout
158
     *
159
     * @return mixed|null|Queue
160
     */
161 11
    public function pop($queue, $timeout = 0)
162
    {
163 11
        $message = $this->reserve($queue, $timeout);
164
165 11
        if ($message !== null) {
166 11
            $this->delete($message);
167
        }
168
169 11
        return $message;
170
    }
171
172
    /**
173
     * @param string        $queue
174
     * @param int           $timeout
175
     *
176
     * @return Message|null
177
     */
178 21
    public function reserve($queue, $timeout = 0)
179
    {
180 21
        if ($this->isStop($queue)) {
181 1
            return null;
182
        }
183
184 20
        $this->migrate($queue);
185
186 20
        $token = $this->reserveToken($queue, $timeout);
187 20
        $message = $this->restoreMessage($token);
188
189 20
        if($message === null){
190 7
            return null;
191
        }
192
193 20
        $this->getRedis()->transaction(function ($tx) use ($queue, $token) {
194
            /** @var $tx \Predis\Client */
195 20
            $tx->zadd($this->getKey($queue, self::STATE_RESERVED), time(), $token);
196 20
            $tx->hset($this->ns(self::NS_MESSAGE_TO_STATE), $token, self::STATE_RESERVED);
197 20
        });
198
199 20
        return $message;
200
    }
201
202
    /**
203
     * @param string $queue
204
     * @param int    $timeout
205
     *
206
     * @return null|string
207
     */
208 20
    protected function reserveToken($queue, $timeout = 0)
209
    {
210
211 20
        $redis = $this->getRedis();
212
213 20
        if ($timeout > 0) {
214
            // example return [ '0' => queue name , '1' => jobId ]
215 3
            $token = $redis->blpop([$this->getKey($queue, self::STATE_READY)], $timeout);
216 3
            $token = (is_array($token)) ? $token[1] : null;
217
        } else {
218 17
            $token = $redis->lpop($this->getKey($queue, self::STATE_READY));
219
        }
220
221 20
        return $token;
222
    }
223
224
    /**
225
     * @param string|null $token
226
     *
227
     * @return null|MessageInterface
228
     */
229 20
    protected function restoreMessage($token)
230
    {
231 20
        if (is_null($token)) {
232 7
            return null;
233
        }
234
235 20
        $encoded = $this->getRedis()->hget($this->ns(self::NS_MESSAGE), $token);
236 20
        $message = Message::decode($encoded);
237
238 20
        return $message;
239
    }
240
241
    /**
242
     * @param string|MessageInterface $mixed
243
     *
244
     * @return $this
245
     * @throws QueueException
246
     */
247 3 View Code Duplication
    public function bury($mixed)
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
248
    {
249 3
        $payload = $this->payload($mixed);
250 3
        $this->checkMessageFlow($payload->getState(), 'bury');
251
252 2
        $result = $this->getRedis()->transaction(function ($tx) use ($payload) {
253
            /** @var $tx \Predis\Client */
254 2
            $tx->zrem($this->getKey($payload->getQueue(), self::STATE_RESERVED), $payload->getToken());
255 2
            $tx->zadd($this->getKey($payload->getQueue(), self::STATE_BURIED), time(), $payload->getToken());
256 2
            $tx->hset($this->ns(self::NS_MESSAGE_TO_STATE), $payload->getToken(), self::STATE_BURIED);
257 2
        });
258
259 2
        if (in_array(false, $result, true)) {
260
            throw new QueueException("The message bury error.");
261
        }
262
263 2
        return $this;
264
    }
265
266
    /**
267
     * @param string|MessageInterface $mixed
268
     *
269
     * @return $this
270
     * @throws QueueException
271
     */
272 20
    public function delete($mixed)
273
    {
274 20
        $payload = $this->payload($mixed);
275 20
        $this->checkMessageFlow($payload->getState(), 'delete');
276
277 19
        $result = $this->getRedis()->transaction(function ($tx) use ($payload) {
278
            /** @var $tx \Predis\Client */
279 19
            $tx->zrem($this->getKey($payload->getQueue(), self::STATE_RESERVED), $payload->getToken());
280 19
            $tx->zrem($this->getKey($payload->getQueue(), self::STATE_BURIED), $payload->getToken());
281 19
            $tx->zrem($this->getKey($payload->getQueue(), self::STATE_DELAYED), $payload->getToken()); // ?
282 19
            $tx->hdel($this->ns(self::NS_MESSAGE), $payload->getToken());
283 19
            $tx->hdel($this->ns(self::NS_MESSAGE_TO_QUEUE), $payload->getToken());
284 19
            $tx->hdel($this->ns(self::NS_MESSAGE_TO_STATE), $payload->getToken());
285 19
        });
286
287 19
        if (array_count_values($result) < 1) {
288
            throw new QueueException("The message deleting error");
289
        }
290
291 19
        return $this;
292
    }
293
294
    /**
295
     * @param MessageInterface|string $mixed
296
     *
297
     * @return $this
298
     * @throws QueueException
299
     */
300 3 View Code Duplication
    public function kick($mixed)
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
301
    {
302 3
        $payload = $this->payload($mixed);
303 3
        $this->checkMessageFlow($payload->getState(), 'kick');
304
305 1
        $result = $this->getRedis()->transaction(function ($tx) use ($payload) {
306
            /** @var $tx \Predis\Client */
307 1
            $tx->zrem($this->getKey($payload->getQueue(), self::STATE_BURIED), $payload->getToken());
308 1
            $tx->rpush($this->getKey($payload->getQueue(), self::STATE_READY), $payload->getToken());
309 1
            $tx->hset($this->ns(self::NS_MESSAGE_TO_STATE), $payload->getToken(), self::STATE_READY);
310 1
        });
311
312 1
        if (in_array(false, $result, true)) {
313
            throw new QueueException("The message kicked error");
314
        }
315
316 1
        return $this;
317
    }
318
319
    /**
320
     * @param MessageInterface|string $mixed
321
     * @param int $delay
322
     *
323
     * @return $this
324
     * @throws FlowException
325
     * @throws QueueException
326
     */
327 2
    public function release($mixed, $delay = 0)
328
    {
329 2
        $payload = $this->payload($mixed);
330 2
        $this->checkMessageFlow($payload->getState(), 'release');
331 1
        $delay = $this->parseDelay($delay);
332
333 1
        $result = $this->getRedis()->transaction(function ($tx) use ($payload, $delay) {
334
            /** @var $tx \Predis\Client */
335 1
            $tx->zrem($this->getKey($payload->getQueue(), self::STATE_RESERVED), $payload->getToken());
336 1
            $tx->hset($this->ns(self::NS_MESSAGE_TO_STATE), $payload->getToken(), self::STATE_READY);
337
338 1
            if ($delay == 0) {
339 1
                $tx->rpush($this->getKey($payload->getQueue(), self::STATE_READY), $payload->getToken());
340
            } else {
341
                $tx->zadd($this->getKey($payload->getQueue(), self::STATE_DELAYED), time() + $delay,
342
                    $payload->getToken());
343
            }
344 1
        });
345
346 1
        if (in_array(false, $result, true)) {
347
            throw new QueueException("The message release error.");
348
        }
349
350 1
        return $this;
351
    }
352
353
    /**
354
     * @return array
355
     */
356 4
    public function getQueueList()
357
    {
358 4
        $redis = $this->getRedis();
359 4
        $queueList = $redis->hgetall($this->ns(self::NS_MESSAGE_TO_QUEUE));
360 4
        $queueList = array_values($queueList);
361 4
        $queueList = array_unique($queueList);
362
363 4
        return $queueList;
364
    }
365
366
    /**
367
     * @param null|string $queue
368
     *
369
     * @return $this
370
     */
371 21
    public function migrate($queue = null)
372
    {
373 21
        if (null === $queue) {
374 1
            $queueList = $this->getQueueList();
375 1
            foreach ($queueList as $queue) {
376 1
                $this->migrate($queue);
377
            }
378
379 1
            return $this;
380
        }
381
382 21
        $keyReady = $this->getKey($queue, self::STATE_READY);
383 21
        $keyDelayed = $this->getKey($queue, self::STATE_DELAYED);
384
385 21
        $this->getRedis()->transaction(['cas' => true, 'watch' => [$keyReady, $keyDelayed], 'retry' => 10],
386 21
            function ($tx) use ($queue, $keyReady, $keyDelayed) {
387
388
                /** @var $tx \Predis\Client */
389 21
                $time = time();
390
391
                // get expired jobs from "delayed queue"
392 21
                $messageTokenSet = $tx->zrangebyscore($keyDelayed, '-inf', $time);
393
394 21
                if (count($messageTokenSet) > 0) {
395
                    // remove jobs from "delayed queue"
396 3
                    $tx->multi();
397 3
                    $tx->zremrangebyscore($keyDelayed, '-inf', $time);
398 3
                    foreach ($messageTokenSet as $token) {
399 3
                        $tx->rpush($keyReady, $token);
400 3
                        $tx->hset($this->ns(self::NS_MESSAGE_TO_STATE), $token, self::STATE_READY);
401
                    }
402
                }
403 21
            });
404
405 21
        return $this;
406
    }
407
408
409
    /**
410
     * @param string $queue
411
     * @param array  $options
412
     *
413
     * @return Iterator
414
     */
415 1
    public function iterator($queue, array $options = [])
416
    {
417 1
        $options['queue'] = $queue;
418
419 1
        return new Iterator($this, $options);
420
    }
421
422
    /**
423
     * @param string $queue
424
     *
425
     * @return $this
426
     */
427 1
    public function start($queue)
428
    {
429 1
        $this->getRedis()->hdel($this->ns(self::NS_QUEUE_STOP), [$queue]);
430
431 1
        return $this;
432
    }
433
434
    /**
435
     * @param string $queue
436
     *
437
     * @return $this
438
     */
439 2
    public function stop($queue)
440
    {
441 2
        $this->getRedis()->hset($this->ns(self::NS_QUEUE_STOP), $queue, true);
442
443 2
        return $this;
444
    }
445
446
    /**
447
     * @param string $queue
448
     *
449
     * @return bool
450
     */
451 24
    public function isStop($queue)
452
    {
453 24
        return (bool)$this->getRedis()->hexists($this->ns(self::NS_QUEUE_STOP), $queue);
454
    }
455
456
    /**
457
     * @param string $queue
458
     *
459
     * @return array
460
     */
461 3
    protected function queueStats($queue)
462
    {
463
        $result = [
464 3
            self::STATS_MESSAGE_READY    => $this->size($queue, self::STATE_READY),
465 3
            self::STATS_MESSAGE_RESERVED => $this->size($queue, self::STATE_RESERVED),
466 3
            self::STATS_MESSAGE_DELAYED  => $this->size($queue, self::STATE_DELAYED),
467 3
            self::STATS_MESSAGE_BURIED   => $this->size($queue, self::STATE_BURIED),
468
        ];
469
470 3
        $result[self::STATS_MESSAGE_TOTAL] = array_sum($result);
471 3
        $result[self::STATS_QUEUE_STOP] = $this->isStop($queue);
472
473 3
        return $result;
474
    }
475
476
    /**
477
     * @param null|string $queue
478
     *
479
     * @return array
480
     */
481 5
    public function stats($queue = null)
482
    {
483 5
        if ($queue !== null) {
484 2
            return $this->queueStats($queue);
485
        }
486
487
        $result = [
488 4
            self::STATS_QUEUES_LIST      => [],
489 4
            self::STATS_MESSAGE_TOTAL    => 0,
490 4
            self::STATS_MESSAGE_READY    => 0,
491 4
            self::STATS_MESSAGE_RESERVED => 0,
492 4
            self::STATS_MESSAGE_DELAYED  => 0,
493 4
            self::STATS_MESSAGE_BURIED   => 0,
494
        ];
495
496 4
        $queueList = $this->getQueueList();
497
498 4
        if (count($queueList) == 0) {
499 4
            return $result;
500
        }
501
502 1
        $result[self::STATS_QUEUES_LIST] = $queueList;
503
504 1
        foreach ($queueList as $queue) {
505 1
            $itemStats = $this->queueStats($queue);
506 1
            $result[self::STATS_MESSAGE_READY] += $itemStats[self::STATS_MESSAGE_READY];
507 1
            $result[self::STATS_MESSAGE_RESERVED] += $itemStats[self::STATS_MESSAGE_RESERVED];
508 1
            $result[self::STATS_MESSAGE_DELAYED] += $itemStats[self::STATS_MESSAGE_DELAYED];
509 1
            $result[self::STATS_MESSAGE_BURIED] += $itemStats[self::STATS_MESSAGE_BURIED];
510 1
            $result[self::STATS_MESSAGE_TOTAL] += $itemStats[self::STATS_MESSAGE_TOTAL];
511 1
            $result[self::STATS_QUEUES][$queue] = $itemStats;
512
        }
513
514 1
        return $result;
515
    }
516
517
    /**
518
     * @param string $queue
519
     * @param string $state
520
     *
521
     * @return int|string
522
     * @throws QueueException
523
     */
524 4
    public function size($queue, $state = self::STATE_READY)
525
    {
526 4
        if (!in_array($state, [self::STATE_READY, self::STATE_DELAYED, self::STATE_BURIED, self::STATE_RESERVED])) {
527 1
            throw new QueueException('Unsupported state for size calculation.');
528
        }
529
530 3
        $redis = $this->getRedis();
531
532 3
        return ($state === self::STATE_READY)
533 3
            ? $redis->llen($this->getKey($queue, self::STATE_READY))
534 3
            : $redis->zcount($this->getKey($queue, $state), '-inf', '+inf');
535
    }
536
537
538
    /**
539
     * @param null|string $queue
540
     *
541
     * @return $this
542
     */
543 33
    public function clean($queue = null)
544
    {
545 33
        $redis = $this->getRedis();
546
547 33
        $keys = $redis->keys($this->namespace . '*');
548
549 33
        if (count($keys) == 0) {
550 24
            return $this;
551
        }
552
553 9
        foreach ($keys as $index => $name) {
554 9
            $redis->del($name);
555
        }
556
557 9
        return $this;
558
    }
559
560
    /**
561
     * @param mixed $mixed
562
     *
563
     * @return MessageInterface
564
     */
565 27
    protected function createMessage($mixed)
566
    {
567 27
        if ($mixed instanceof MessageInterface) {
568 1
            return $mixed;
569
        }
570
571 27
        return new Message($mixed);
572
    }
573
574
    /**
575
     * @param string $type
576
     *
577
     * @return string
578
     */
579 28
    protected function ns($type)
580
    {
581 28
        return implode(':', [$this->namespace, $type]);
582
    }
583
584
585
    /**
586
     * @param string $queue
587
     * @param string $state
588
     *
589
     * @return string
590
     */
591 27
    protected function getKey($queue, $state)
592
    {
593 27
        return implode(':', [$this->ns(self::NS_QUEUE), $queue, $state]);
594
    }
595
596
    /**
597
     * @param int|\DateTime $mixed
598
     *
599
     * @return int
600
     */
601 27
    protected function parseDelay($mixed)
602
    {
603 27
        if ($mixed instanceof \DateTime) {
604 1
            $delay = $mixed->getTimestamp() - time();
605
        } else {
606 27
            $delay = (int)$mixed;
607
        }
608
609 27
        return $delay < 0 ? 0 : $delay;
610
    }
611
612
    /**
613
     * @param string $currentState
614
     * @param string $action
615
     *
616
     * @return $this
617
     * @throws FlowException
618
     */
619 24
    protected function checkMessageFlow($currentState, $action)
620
    {
621
        $mapping = [
622 24
            'bury'    => [self::STATE_RESERVED],
623 24
            'delete'  => [self::STATE_RESERVED, self::STATE_BURIED, self::STATE_DELAYED],
624 24
            'kick'    => [self::STATE_BURIED],
625 24
            'release' => [self::STATE_RESERVED],
626
        ];
627
628 24
        if (!in_array($currentState, $mapping[$action], true)) {
629 5
            throw new FlowException("Flow error, the message state '{$currentState}' cannot be '{$action}'.");
630
        }
631
632 19
        return $this;
633
    }
634
635
    /**
636
     * @param string|MessageInterface $mixed
637
     *
638
     * @return Payload
639
     * @throws QueueException
640
     */
641 24
    protected function payload($mixed)
642
    {
643 24
        $redis = $this->getRedis();
644 24
        $token = $mixed instanceof MessageInterface ? $mixed->getToken() : $mixed;
645 24
        $queue = $redis->hget($this->ns(self::NS_MESSAGE_TO_QUEUE), $token);
646 24
        $state = $redis->hget($this->ns(self::NS_MESSAGE_TO_STATE), $token);
647
648 24
        return new Payload($token, $queue, $state);
649
    }
650
651
}
652