Completed
Push — dev ( dc03b0...0aea99 )
by Vlad
01:50
created

Queue::reserveToken()   A

Complexity

Conditions 3
Paths 3

Size

Total Lines 15
Code Lines 8

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 7
CRAP Score 3

Importance

Changes 0
Metric Value
dl 0
loc 15
ccs 7
cts 7
cp 1
rs 9.4285
c 0
b 0
f 0
cc 3
eloc 8
nc 3
nop 2
crap 3
1
<?php
2
3
namespace Simplario\Quedis;
4
5
use Simplario\Quedis\Exceptions\FlowException;
6
use Simplario\Quedis\Exceptions\QueueException;
7
use Simplario\Quedis\Interfaces\QueueInterface;
8
9
/**
10
 * Class Queue
11
 *
12
 * @package Simplario\Quedis
13
 *
14
 *
15
 *
16
 *   Message flows (like in the Beanstalk: http://beanstalkc.readthedocs.io/en/latest/tutorial.html )
17
 *   ------------------------------------------------------------------------------------------------
18
 *
19
 *   1)   put            pop
20
 *       -----> [READY] --------> *poof*
21
 *
22
 *
23
 *   2)   put            reserve               delete
24
 *       -----> [READY] ---------> [RESERVED] --------> *poof*
25
 *
26
 *
27
 *   3)   put with delay               release with delay
28
 *       ----------------> [DELAYED] <------------.
29
 *                             |                   |
30
 *                             | (time passes)     |
31
 *                             |                   |
32
 *        put                  v     reserve       |       delete
33
 *       -----------------> [READY] ---------> [RESERVED] --------> *poof*
34
 *                            ^  ^                |  |
35
 *                            |   \  release      |  |
36
 *                            |    ``-------------'   |
37
 *                            |                      |
38
 *                            | kick                 |
39
 *                            |                      |
40
 *                            |       bury           |
41
 *                         [BURIED] <---------------'
42
 *                            |
43
 *                            |  delete
44
 *                             ``--------> *poof*
45
 *
46
 */
47
class Queue implements QueueInterface
48
{
49
50
    const PRIORITY_HIGH = 'high';
51
    const PRIORITY_LOW = 'low';
52
53
    const NS_QUEUE = 'queue';
54
    const NS_QUEUE_STOP = 'queue2stop';
55
    const NS_MESSAGE = 'message';
56
    const NS_MESSAGE_TO_QUEUE = 'message2queue';
57
    const NS_MESSAGE_TO_STATE = 'message2state';
58
59
    // State
60
    const STATE_READY = 'ready';
61
    const STATE_DELAYED = 'delayed';
62
    const STATE_RESERVED = 'reserved';
63
    const STATE_BURIED = 'buried';
64
65
    // Stats
66
    const STATS_QUEUES_LIST = 'queues';
67
    const STATS_QUEUES = 'queues';
68
    const STATS_MESSAGE_TOTAL = 'total';
69
    const STATS_MESSAGE_READY = 'ready';
70
    const STATS_MESSAGE_RESERVED = 'reserved';
71
    const STATS_MESSAGE_DELAYED = 'delayed';
72
    const STATS_MESSAGE_BURIED = 'buried';
73
    const STATS_QUEUE_STOP = 'stop';
74
75
    /**
76
     * @var \Predis\Client
77
     */
78
    protected $redis;
79
80
    /**
81
     * @var string
82
     */
83
    protected $namespace;
84
85
    /**
86
     * Queue constructor.
87
     *
88
     * @param mixed  $redis
89
     * @param string $namespace
90
     */
91 28
    public function __construct($redis, $namespace = 'Quedis')
92
    {
93 28
        $this->redis = $redis;
94 28
        $this->namespace = $namespace;
95 28
    }
96
97
    /**
98
     * @return \Predis\Client
99
     */
100 28
    public function getRedis()
101
    {
102 28
        return $this->redis;
103
    }
104
105
    /**
106
     * @return string
107
     */
108 1
    public function getNamespace()
109
    {
110 1
        return $this->namespace;
111
    }
112
113
    /**
114
     * @param string $queue
115
     * @param mixed  $data
116
     * @param int    $delay
117
     * @param string $priority
118
     *
119
     * @return Message
120
     * @throws \Exception
121
     */
122 23
    public function put($queue, $data, $delay = 0, $priority = self::PRIORITY_LOW)
123
    {
124 23
        $message = $this->createMessage($data);
125 23
        $delay = $this->parseDelay($delay);
126
127 23
        $result = $this->getRedis()->transaction(function ($tx) use ($queue, $message, $delay, $priority) {
128
            /** @var $tx \Predis\Client */
129
130 23
            $state = $delay == 0 ? self::STATE_READY : self::STATE_DELAYED;
131
132 23
            $tx->hset($this->ns(self::NS_MESSAGE), $message->getToken(), $message->encode());
133 23
            $tx->hset($this->ns(self::NS_MESSAGE_TO_QUEUE), $message->getToken(), $queue);
134 23
            $tx->hset($this->ns(self::NS_MESSAGE_TO_STATE), $message->getToken(), $state);
135
136 23
            if ($state === self::STATE_READY) {
137 19
                if (self::PRIORITY_HIGH === $priority) {
138 1
                    $tx->lpush($this->getKey($queue, self::STATE_READY), $message->getToken());
139
                } else {
140 19
                    $tx->rpush($this->getKey($queue, self::STATE_READY), $message->getToken());
141
                }
142
            } else {
143 6
                $tx->zadd($this->getKey($queue, self::STATE_DELAYED), time() + $delay, $message->getToken());
144
            }
145 23
        });
146
147 23
        if (in_array(false, $result, true)) {
148
            throw new QueueException('Can not put message to queue');
149
        }
150
151 23
        return $message;
152
    }
153
154
    /**
155
     * @param string        $queue
156
     * @param int           $timeout
157
     * @param callable|null $callback
158
     *
159
     * @return mixed|null|Queue
160
     */
161 7
    public function pop($queue, $timeout = 0, callable $callback = null)
162
    {
163 7
        if (is_callable($callback)) {
164 1
            $callback = function (Message $message = null, Queue $queue) use ($callback) {
165 1
                $this->delete($message);
0 ignored issues
show
Bug introduced by
It seems like $message defined by parameter $message on line 164 can be null; however, Simplario\Quedis\Queue::delete() does not accept null, maybe add an additional type check?

It seems like you allow that null is being passed for a parameter, however the function which is called does not seem to accept null.

We recommend to add an additional type check (or disallow null for the parameter):

function notNullable(stdClass $x) { }

// Unsafe
function withoutCheck(stdClass $x = null) {
    notNullable($x);
}

// Safe - Alternative 1: Adding Additional Type-Check
function withCheck(stdClass $x = null) {
    if ($x instanceof stdClass) {
        notNullable($x);
    }
}

// Safe - Alternative 2: Changing Parameter
function withNonNullableParam(stdClass $x) {
    notNullable($x);
}
Loading history...
166 1
                $callback($message, $queue);
167 1
            };
168
169 1
            return $this->reserve($queue, $timeout, $callback);
170
        }
171
172 6
        $message = $this->reserve($queue, $timeout, $callback);
173
174 6
        if ($message !== null) {
175 6
            $this->delete($message);
176
        }
177
178 6
        return $message;
179
    }
180
181
    /**
182
     * @param string        $queue
183
     * @param int           $timeout
184
     * @param callable|null $callback
185
     *
186
     * @return mixed|null|static
187
     */
188 17
    public function reserve($queue, $timeout = 0, callable $callback = null)
189
    {
190 17
        if ($this->isStop($queue)) {
191 1
            return null;
192
        }
193
194 16
        $this->migrate($queue);
195
196 16
        $token = $this->reserveToken($queue, $timeout);
197 16
        $message = $this->restoreMessage($token);
198
199 16
        if($message === null){
200 5
            return null;
201
        }
202
203 16
        $this->getRedis()->transaction(function ($tx) use ($queue, $token) {
204
            /** @var $tx \Predis\Client */
205 16
            $tx->zadd($this->getKey($queue, self::STATE_RESERVED), time(), $token);
206 16
            $tx->hset($this->ns(self::NS_MESSAGE_TO_STATE), $token, self::STATE_RESERVED);
207 16
        });
208
209 16
        return $this->reserveResult($queue, $message, $timeout, $callback);
0 ignored issues
show
Bug introduced by
It seems like $callback defined by parameter $callback on line 188 can also be of type callable; however, Simplario\Quedis\Queue::reserveResult() does only seem to accept null, maybe add an additional type check?

This check looks at variables that have been passed in as parameters and are passed out again to other methods.

If the outgoing method call has stricter type requirements than the method itself, an issue is raised.

An additional type check may prevent trouble.

Loading history...
210
    }
211
212
    /**
213
     * @param string  $queue
214
     * @param Message $message
215
     * @param int     $timeout
216
     * @param null    $callback
217
     *
218
     * @return mixed|null|Queue
219
     */
220 16
    protected function reserveResult($queue, $message, $timeout = 0, $callback = null)
221
    {
222 16
        if (is_callable($callback)) {
223 2
            $callback($message, $this);
224 2
            return $this->reserve($queue, $timeout, $callback);
225
        }
226
227 14
        return $message;
228
    }
229
230
    /**
231
     * @param string $queue
232
     * @param int    $timeout
233
     *
234
     * @return null|string
235
     */
236 16
    protected function reserveToken($queue, $timeout = 0)
237
    {
238
239 16
        $redis = $this->getRedis();
240
241 16
        if ($timeout > 0) {
242
            // example return [ '0' => queue name , '1' => jobId ]
243 2
            $token = $redis->blpop([$this->getKey($queue, self::STATE_READY)], $timeout);
244 2
            $token = (is_array($token)) ? $token[1] : null;
245
        } else {
246 14
            $token = $redis->lpop($this->getKey($queue, self::STATE_READY));
247
        }
248
249 16
        return $token;
250
    }
251
252
    /**
253
     * @param string|null $token
254
     *
255
     * @return null|Message
256
     */
257 16
    protected function restoreMessage($token)
258
    {
259 16
        if (is_null($token)) {
260 5
            return null;
261
        }
262
263 16
        $encoded = $this->getRedis()->hget($this->ns(self::NS_MESSAGE), $token);
264 16
        $message = Message::decode($encoded);
265
266 16
        return $message;
267
    }
268
269
    /**
270
     * @param Message|string $mixed
271
     *
272
     * @return $this
273
     * @throws QueueException
274
     */
275 3 View Code Duplication
    public function bury($mixed)
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
276
    {
277 3
        $payload = $this->payload($mixed);
278 3
        $this->checkMessageFlow($payload->getState(), 'bury');
279
280 2
        $result = $this->getRedis()->transaction(function ($tx) use ($payload) {
281
            /** @var $tx \Predis\Client */
282 2
            $tx->zrem($this->getKey($payload->getQueue(), self::STATE_RESERVED), $payload->getToken());
283 2
            $tx->zadd($this->getKey($payload->getQueue(), self::STATE_BURIED), time(), $payload->getToken());
284 2
            $tx->hset($this->ns(self::NS_MESSAGE_TO_STATE), $payload->getToken(), self::STATE_BURIED);
285 2
        });
286
287 2
        if (in_array(false, $result, true)) {
288
            throw new QueueException("The message bury error.");
289
        }
290
291 2
        return $this;
292
    }
293
294
    /**
295
     * @param Message|string $mixed
296
     *
297
     * @return $this
298
     * @throws QueueException
299
     */
300 16
    public function delete($mixed)
301
    {
302 16
        $payload = $this->payload($mixed);
303 16
        $this->checkMessageFlow($payload->getState(), 'delete');
304
305 15
        $result = $this->getRedis()->transaction(function ($tx) use ($payload) {
306
            /** @var $tx \Predis\Client */
307 15
            $tx->zrem($this->getKey($payload->getQueue(), self::STATE_RESERVED), $payload->getToken());
308 15
            $tx->zrem($this->getKey($payload->getQueue(), self::STATE_BURIED), $payload->getToken());
309 15
            $tx->zrem($this->getKey($payload->getQueue(), self::STATE_DELAYED), $payload->getToken()); // ?
310 15
            $tx->hdel($this->ns(self::NS_MESSAGE), $payload->getToken());
311 15
            $tx->hdel($this->ns(self::NS_MESSAGE_TO_QUEUE), $payload->getToken());
312 15
            $tx->hdel($this->ns(self::NS_MESSAGE_TO_STATE), $payload->getToken());
313 15
        });
314
315 15
        if (array_count_values($result) < 1) {
316
            throw new QueueException("The message deleting error");
317
        }
318
319 15
        return $this;
320
    }
321
322
    /**
323
     * @param Message|string $mixed
324
     *
325
     * @return $this
326
     * @throws QueueException
327
     */
328 3 View Code Duplication
    public function kick($mixed)
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
329
    {
330 3
        $payload = $this->payload($mixed);
331 3
        $this->checkMessageFlow($payload->getState(), 'kick');
332
333 1
        $result = $this->getRedis()->transaction(function ($tx) use ($payload) {
334
            /** @var $tx \Predis\Client */
335 1
            $tx->zrem($this->getKey($payload->getQueue(), self::STATE_BURIED), $payload->getToken());
336 1
            $tx->rpush($this->getKey($payload->getQueue(), self::STATE_READY), $payload->getToken());
337 1
            $tx->hset($this->ns(self::NS_MESSAGE_TO_STATE), $payload->getToken(), self::STATE_READY);
338 1
        });
339
340 1
        if (in_array(false, $result, true)) {
341
            throw new QueueException("The message kicked error");
342
        }
343
344 1
        return $this;
345
    }
346
347
    /**
348
     * @param Message|string $mixed
349
     * @param int $delay
350
     *
351
     * @return $this
352
     * @throws FlowException
353
     * @throws QueueException
354
     */
355 2
    public function release($mixed, $delay = 0)
356
    {
357 2
        $payload = $this->payload($mixed);
358 2
        $this->checkMessageFlow($payload->getState(), 'release');
359 1
        $delay = $this->parseDelay($delay);
360
361 1
        $result = $this->getRedis()->transaction(function ($tx) use ($payload, $delay) {
362
            /** @var $tx \Predis\Client */
363 1
            $tx->zrem($this->getKey($payload->getQueue(), self::STATE_RESERVED), $payload->getToken());
364 1
            $tx->hset($this->ns(self::NS_MESSAGE_TO_STATE), $payload->getToken(), self::STATE_READY);
365
366 1
            if ($delay == 0) {
367 1
                $tx->rpush($this->getKey($payload->getQueue(), self::STATE_READY), $payload->getToken());
368
            } else {
369
                $tx->zadd($this->getKey($payload->getQueue(), self::STATE_DELAYED), time() + $delay,
370
                    $payload->getToken());
371
            }
372 1
        });
373
374 1
        if (in_array(false, $result, true)) {
375
            throw new QueueException("The message release error.");
376
        }
377
378 1
        return $this;
379
    }
380
381
    /**
382
     * @return array
383
     */
384 6
    public function getQueueList()
385
    {
386 6
        $redis = $this->getRedis();
387 6
        $queueList = $redis->hgetall($this->ns(self::NS_MESSAGE_TO_QUEUE));
388 6
        $queueList = array_values($queueList);
389 6
        $queueList = array_unique($queueList);
390
391 6
        return $queueList;
392
    }
393
394
    /**
395
     * @param null|string $queue
396
     *
397
     * @return $this
398
     */
399 17
    public function migrate($queue = null)
400
    {
401 17
        if (null === $queue) {
402 1
            $queueList = $this->getQueueList();
403 1
            foreach ($queueList as $queue) {
404 1
                $this->migrate($queue);
405
            }
406
407 1
            return $this;
408
        }
409
410 17
        $keyReady = $this->getKey($queue, self::STATE_READY);
411 17
        $keyDelayed = $this->getKey($queue, self::STATE_DELAYED);
412
413 17
        $this->getRedis()->transaction(['cas' => true, 'watch' => [$keyReady, $keyDelayed], 'retry' => 10],
414 17
            function ($tx) use ($queue, $keyReady, $keyDelayed) {
415
416
                /** @var $tx \Predis\Client */
417 17
                $time = time();
418
419
                // get expired jobs from "delayed queue"
420 17
                $messageTokenSet = $tx->zrangebyscore($keyDelayed, '-inf', $time);
421
422 17
                if (count($messageTokenSet) > 0) {
423
                    // remove jobs from "delayed queue"
424 3
                    $tx->multi();
425 3
                    $tx->zremrangebyscore($keyDelayed, '-inf', $time);
426 3
                    foreach ($messageTokenSet as $token) {
427 3
                        $tx->rpush($keyReady, $token);
428 3
                        $tx->hset($this->ns(self::NS_MESSAGE_TO_STATE), $token, self::STATE_READY);
429
                    }
430
                }
431 17
            });
432
433 17
        return $this;
434
    }
435
436
    /**
437
     * @param string $queue
438
     *
439
     * @return $this
440
     */
441 1
    public function start($queue)
442
    {
443 1
        $this->getRedis()->hdel($this->ns(self::NS_QUEUE_STOP), [$queue]);
444
445 1
        return $this;
446
    }
447
448
    /**
449
     * @param string $queue
450
     *
451
     * @return $this
452
     */
453 2
    public function stop($queue)
454
    {
455 2
        $this->getRedis()->hset($this->ns(self::NS_QUEUE_STOP), $queue, true);
456
457 2
        return $this;
458
    }
459
460
    /**
461
     * @param string $queue
462
     *
463
     * @return bool
464
     */
465 20
    public function isStop($queue)
466
    {
467 20
        return (bool)$this->getRedis()->hexists($this->ns(self::NS_QUEUE_STOP), $queue);
468
    }
469
470
    /**
471
     * @param string $queue
472
     *
473
     * @return array
474
     */
475 3
    protected function queueStats($queue)
476
    {
477
        $result = [
478 3
            self::STATS_MESSAGE_READY    => $this->size($queue, self::STATE_READY),
479 3
            self::STATS_MESSAGE_RESERVED => $this->size($queue, self::STATE_RESERVED),
480 3
            self::STATS_MESSAGE_DELAYED  => $this->size($queue, self::STATE_DELAYED),
481 3
            self::STATS_MESSAGE_BURIED   => $this->size($queue, self::STATE_BURIED),
482
        ];
483
484 3
        $result[self::STATS_MESSAGE_TOTAL] = array_sum($result);
485 3
        $result[self::STATS_QUEUE_STOP] = $this->isStop($queue);
486
487 3
        return $result;
488
    }
489
490
    /**
491
     * @param null|string $queue
492
     *
493
     * @return array
494
     */
495 7
    public function stats($queue = null)
496
    {
497 7
        if ($queue !== null) {
498 2
            return $this->queueStats($queue);
499
        }
500
501
        $result = [
502 6
            self::STATS_QUEUES_LIST      => [],
503 6
            self::STATS_MESSAGE_TOTAL    => 0,
504 6
            self::STATS_MESSAGE_READY    => 0,
505 6
            self::STATS_MESSAGE_RESERVED => 0,
506 6
            self::STATS_MESSAGE_DELAYED  => 0,
507 6
            self::STATS_MESSAGE_BURIED   => 0,
508
        ];
509
510 6
        $queueList = $this->getQueueList();
511
512 6
        if (count($queueList) == 0) {
513 6
            return $result;
514
        }
515
516 1
        $result[self::STATS_QUEUES_LIST] = $queueList;
517
518 1
        foreach ($queueList as $queue) {
519 1
            $itemStats = $this->queueStats($queue);
520 1
            $result[self::STATS_MESSAGE_READY] += $itemStats[self::STATS_MESSAGE_READY];
521 1
            $result[self::STATS_MESSAGE_RESERVED] += $itemStats[self::STATS_MESSAGE_RESERVED];
522 1
            $result[self::STATS_MESSAGE_DELAYED] += $itemStats[self::STATS_MESSAGE_DELAYED];
523 1
            $result[self::STATS_MESSAGE_BURIED] += $itemStats[self::STATS_MESSAGE_BURIED];
524 1
            $result[self::STATS_MESSAGE_TOTAL] += $itemStats[self::STATS_MESSAGE_TOTAL];
525 1
            $result[self::STATS_QUEUES][$queue] = $itemStats;
526
        }
527
528 1
        return $result;
529
    }
530
531
    /**
532
     * @param string $queue
533
     * @param string $state
534
     *
535
     * @return int|string
536
     * @throws QueueException
537
     */
538 4
    public function size($queue, $state = self::STATE_READY)
539
    {
540 4
        if (!in_array($state, [self::STATE_READY, self::STATE_DELAYED, self::STATE_BURIED, self::STATE_RESERVED])) {
541 1
            throw new QueueException('Unsupported state for size calculation.');
542
        }
543
544 3
        $redis = $this->getRedis();
545
546 3
        return ($state === self::STATE_READY)
547 3
            ? $redis->llen($this->getKey($queue, self::STATE_READY))
548 3
            : $redis->zcount($this->getKey($queue, $state), '-inf', '+inf');
549
    }
550
551
552
    /**
553
     * @param null|string $queue
554
     *
555
     * @return $this
556
     */
557 28
    public function clean($queue = null)
558
    {
559 28
        $redis = $this->getRedis();
560
561 28
        $keys = $redis->keys($this->namespace . '*');
562
563 28
        if (count($keys) == 0) {
564 21
            return $this;
565
        }
566
567 7
        foreach ($keys as $index => $name) {
568 7
            $redis->del($name);
569
        }
570
571 7
        return $this;
572
    }
573
574
    /**
575
     * @param mixed $mixed
576
     *
577
     * @return Message
578
     */
579 23
    protected function createMessage($mixed)
580
    {
581 23
        if ($mixed instanceof Message) {
582 1
            return $mixed;
583
        }
584
585 23
        return new Message($mixed);
586
    }
587
588
    /**
589
     * @param string $type
590
     *
591
     * @return string
592
     */
593 24
    protected function ns($type)
594
    {
595 24
        return implode(':', [$this->namespace, $type]);
596
    }
597
598
599
    /**
600
     * @param string $queue
601
     * @param string $state
602
     *
603
     * @return string
604
     */
605 23
    protected function getKey($queue, $state)
606
    {
607 23
        return implode(':', [$this->ns(self::NS_QUEUE), $queue, $state]);
608
    }
609
610
    /**
611
     * @param int|\DateTime $mixed
612
     *
613
     * @return int
614
     */
615 23
    protected function parseDelay($mixed)
616
    {
617 23
        if ($mixed instanceof \DateTime) {
618 1
            $delay = $mixed->getTimestamp() - time();
619
        } else {
620 23
            $delay = (int)$mixed;
621
        }
622
623 23
        return $delay < 0 ? 0 : $delay;
624
    }
625
626
    /**
627
     * @param string $currentState
628
     * @param string $action
629
     *
630
     * @return $this
631
     * @throws FlowException
632
     */
633 20
    protected function checkMessageFlow($currentState, $action)
634
    {
635
        $mapping = [
636 20
            'bury'    => [self::STATE_RESERVED],
637 20
            'delete'  => [self::STATE_RESERVED, self::STATE_BURIED, self::STATE_DELAYED],
638 20
            'kick'    => [self::STATE_BURIED],
639 20
            'release' => [self::STATE_RESERVED],
640
        ];
641
642 20
        if (!in_array($currentState, $mapping[$action], true)) {
643 5
            throw new FlowException("Flow error, the message state '{$currentState}' cannot be '{$action}'.");
644
        }
645
646 15
        return $this;
647
    }
648
649
    /**
650
     * @param Message|string $mixed
651
     *
652
     * @return Payload
653
     * @throws QueueException
654
     */
655 20
    protected function payload($mixed)
656
    {
657 20
        $redis = $this->getRedis();
658 20
        $token = $mixed instanceof Message ? $mixed->getToken() : $mixed;
659 20
        $queue = $redis->hget($this->ns(self::NS_MESSAGE_TO_QUEUE), $token);
660 20
        $state = $redis->hget($this->ns(self::NS_MESSAGE_TO_STATE), $token);
661
662 20
        return new Payload($token, $queue, $state);
663
    }
664
665
}
666