Queue   F
last analyzed

Complexity

Total Complexity 61

Size/Duplication

Total Lines 671
Duplicated Lines 11.77 %

Coupling/Cohesion

Components 1
Dependencies 3

Test Coverage

Coverage 83.33%

Importance

Changes 2
Bugs 0 Features 0
Metric Value
wmc 61
c 2
b 0
f 0
lcom 1
cbo 3
dl 79
loc 671
ccs 200
cts 240
cp 0.8333
rs 3.0069

34 Methods

Rating   Name   Duplication   Size   Complexity  
A setProducer() 0 10 2
A getProducer() 0 8 2
A getRedis() 0 4 1
A getAppId() 0 4 1
A getRedisQueueNamespace() 0 4 1
A getRedisJobNamespace() 0 4 1
A getRedisQueueStopNamespace() 0 4 1
A isStop() 0 4 1
A stop() 0 4 1
A start() 0 4 1
A getQueueNames() 0 18 3
A getJobReserved() 0 18 4
A getJobCount() 0 8 2
B getStats() 0 30 3
A getQueueInfo() 0 14 1
A getPayload() 0 6 1
A buildPayload() 0 7 1
A parsePayload() 0 4 1
A getTime() 0 4 1
A getQueueNameFromKey() 0 4 1
A getKey() 0 4 1
A getKeyReady() 0 4 1
A getKeyDelayed() 0 4 1
A getKeyReserved() 0 4 1
A getKeyBuried() 0 4 1
A existsJob() 0 4 1
B putJob() 15 27 4
B reserveJob() 0 33 5
A deleteJob() 0 23 2
B buryJob() 24 24 2
B kickJob() 24 24 2
B releaseJob() 16 34 4
B migrate() 0 38 5
A clean() 0 8 1

How to fix   Duplicated Code    Complexity   

Duplicated Code

Duplicate code is one of the most pungent code smells. A rule that is often used is to re-structure code once it is duplicated in three or more places.

Common duplication problems, and corresponding solutions are:

Complex Class

 Tip:   Before tackling complexity, make sure that you eliminate any duplication first. This often can reduce the size of classes significantly.

Complex classes like Queue often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes. You can also have a look at the cohesion graph to spot any un-connected, or weakly-connected components.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

While breaking up the class, it is a good idea to analyze how other classes use Queue, and based on these observations, apply Extract Interface, too.

1
<?php
2
3
/**
4
 * https://hackage.haskell.org/package/hbeanstalk-0.1/docs/Network-Beanstalk.html#1
5
 * https://github.com/nrk/predis
6
 * https://laravel.ru/docs/v4/queues
7
 * http://laravel.com/docs/4.2/queues
8
 * https://github.com/laravel/framework/blob/5.0/src/Illuminate/Queue/RedisQueue.php
9
 */
10
11
namespace Cronario;
12
13
use Cronario\Exception\QueueException;
14
15
class Queue
16
{
17
18
    // region MAIN *************************************************************
19
20
    // Namespaces
21
    const REDIS_NS_JOB = 'cronario@queue-job';
22
    const REDIS_NS_QUEUE = 'cronario@queue';
23
    const REDIS_NS_QUEUE_STOP = 'cronario@queueStop';
24
25
    // State
26
    const STATE_READY = 'ready';
27
    const STATE_DELAYED = 'delayed';
28
    const STATE_RESERVED = 'reserved';
29
    const STATE_BURIED = 'buried';
30
31
    // Priority
32
    const PRIORITY_HIGH = 'high';
33
    const PRIORITY_LOW = 'low';
34
35
    // Stats
36
    const STATS_QUEUES_LIST = 'queues-list';
37
    const STATS_QUEUES = 'queues';
38
    const STATS_JOBS_TOTAL = 'jobs-total';
39
    const STATS_JOBS_READY = 'jobs-ready';
40
    const STATS_JOBS_RESERVED = 'jobs-reserved';
41
    const STATS_JOBS_DELAYED = 'jobs-delayed';
42
    const STATS_JOBS_BURIED = 'jobs-buried';
43
    const STATS_QUEUE_STOP = 'stop';
44
45
    // Job payload
46
    const JOB_PAYLOAD_QUEUE = 'queue';
47
    const JOB_PAYLOAD_STATE = 'state';
48
49
    // endregion ***************************************************************
50
51
52
    // region STATIC ********************************************************
53
54
    /** @var  Producer */
55
    protected $producer;
56
57
    /**
58
     * @param Producer $producer
59
     *
60
     * @return $this
61
     * @throws QueueException
62
     */
63 57
    public function setProducer(Producer $producer)
64
    {
65 57
        if (null != $this->producer) {
66
            throw new QueueException('Queue - producer already sets!');
67
        }
68
69 57
        $this->producer = $producer;
70
71 57
        return $this;
72
    }
73
74
    /**
75
     * @return Producer
76
     * @throws QueueException
77
     */
78 1
    public function getProducer()
79
    {
80 1
        if (null === $this->producer) {
81
            throw new QueueException('Queue - producer is undefined');
82
        }
83
84 1
        return $this->producer;
85
    }
86
87
    /**
88
     * @return \Predis\Client
89
     */
90 8
    public function getRedis()
91
    {
92 8
        return $this->producer->getRedis();
93
    }
94
95
    /**
96
     * @return string
97
     */
98 8
    public function getAppId()
99
    {
100 8
        return $this->producer->getAppId();
101
    }
102
103
    /**
104
     * @return string
105
     */
106 6
    protected function getRedisQueueNamespace()
107
    {
108 6
        return implode(':', [self::REDIS_NS_QUEUE, $this->getAppId()]);
109
    }
110
111
    /**
112
     * @return string
113
     */
114 6
    protected function getRedisJobNamespace()
115
    {
116 6
        return implode(':', [self::REDIS_NS_JOB, $this->getAppId()]);
117
    }
118
119
    /**
120
     * @return string
121
     */
122 5
    protected function getRedisQueueStopNamespace()
123
    {
124 5
        return implode(':', [self::REDIS_NS_QUEUE_STOP, $this->getAppId()]);
125
    }
126
127
    // endregion ***************************************************************
128
129
    // region QUEUE Stopping ************************************************************
130
131
    /**
132
     * @param $queueName
133
     *
134
     * @return bool
135
     */
136 5
    public function isStop($queueName)
137
    {
138 5
        return (bool)$this->getRedis()->hexists($this->getRedisQueueStopNamespace(), $queueName);
139
    }
140
141
    /**
142
     * @param $queueName
143
     *
144
     * @return bool
145
     */
146 1
    public function stop($queueName)
147
    {
148 1
        return (bool)$this->getRedis()->hset($this->getRedisQueueStopNamespace(), $queueName, true);
149
    }
150
151
    /**
152
     * @param $queueName
153
     *
154
     * @return bool
155
     */
156 1
    public function start($queueName)
157
    {
158 1
        return (bool)$this->getRedis()->hdel($this->getRedisQueueStopNamespace(), $queueName);
159
    }
160
161
162
    // endregion ***************************************************************
163
164
    // region STATS ************************************************************
165
166
    /**
167
     * @return array
168
     */
169 2
    public function getQueueNames()
170
    {
171 2
        $result = [];
172
173 2
        $jobSet = $this->getRedis()->hgetall($this->getRedisJobNamespace());
174 2
        $payloadSet = array_unique(array_values($jobSet));
175
176 2
        if (count($payloadSet) == 0) {
177 1
            return $result;
178
        }
179
180 1
        foreach ($payloadSet as $payload) {
181 1
            $payload = self::parsePayload($payload);
182 1
            $result[$payload[self::JOB_PAYLOAD_QUEUE]] = true;
183 1
        }
184
185 1
        return array_keys($result);
186
    }
187
188
    /**
189
     * @return array
190
     */
191 1
    public function getJobReserved()
192
    {
193 1
        $result = [];
194 1
        $queueKeys = $this->getRedis()->keys($this->getKeyReserved('*'));
195
196 1
        if (count($queueKeys) == 0) {
197 1
            return $result;
198
        }
199
200
        foreach ($queueKeys as $queueKey) {
201
            $jobIds = $this->getRedis()->zrangebyscore($queueKey, '-inf', '+inf');
202
            foreach ($jobIds as $id) {
203
                $result[$id] = $this->getQueueNameFromKey($queueKey);
204
            }
205
        }
206
207
        return $result;
208
    }
209
210
    /**
211
     * @param $queue
212
     * @param $state
213
     *
214
     * @return int|string
215
     */
216 4
    public function getJobCount($queue, $state)
217
    {
218 4
        if ($state === self::STATE_READY) {
219 4
            return $this->getRedis()->llen($this->getKeyReady($queue));
220
        }
221
222 4
        return $this->getRedis()->zcount($this->getKey($queue, $state), '-inf', '+inf');
223
    }
224
225
    /**
226
     * Return statistical information about the server, across all clients.
227
     * Keys that can be expected to be returned are the following:
228
     *
229
     * @return array
230
     */
231 1
    public function getStats()
232
    {
233
        $result = [
234 1
            self::STATS_QUEUES_LIST   => [],
235 1
            self::STATS_JOBS_TOTAL    => 0,
236 1
            self::STATS_JOBS_READY    => 0,
237 1
            self::STATS_JOBS_RESERVED => 0,
238 1
            self::STATS_JOBS_DELAYED  => 0,
239 1
            self::STATS_JOBS_BURIED   => 0,
240 1
        ];
241
242 1
        $queueList = $this->getQueueNames();
243 1
        if (count($queueList) == 0) {
244 1
            return $result;
245
        }
246
247
        $result[self::STATS_QUEUES_LIST] = $queueList;
248
249
        foreach ($queueList as $queueName) {
250
            $itemStats = $this->getQueueInfo($queueName);
251
            $result[self::STATS_JOBS_READY] += $itemStats[self::STATS_JOBS_READY];
252
            $result[self::STATS_JOBS_RESERVED] += $itemStats[self::STATS_JOBS_RESERVED];
253
            $result[self::STATS_JOBS_DELAYED] += $itemStats[self::STATS_JOBS_DELAYED];
254
            $result[self::STATS_JOBS_BURIED] += $itemStats[self::STATS_JOBS_BURIED];
255
            $result[self::STATS_JOBS_TOTAL] += $itemStats[self::STATS_JOBS_TOTAL];
256
            $result[self::STATS_QUEUES][$queueName] = $itemStats;
257
        }
258
259
        return $result;
260
    }
261
262
263
    /**
264
     * @param $queueName
265
     *
266
     * @return array
267
     */
268 4
    public function getQueueInfo($queueName)
269
    {
270
        $result = [
271 4
            self::STATS_JOBS_READY    => $this->getJobCount($queueName, self::STATE_READY),
272 4
            self::STATS_JOBS_RESERVED => $this->getJobCount($queueName, self::STATE_RESERVED),
273 4
            self::STATS_JOBS_DELAYED  => $this->getJobCount($queueName, self::STATE_DELAYED),
274 4
            self::STATS_JOBS_BURIED   => $this->getJobCount($queueName, self::STATE_BURIED),
275 4
        ];
276
277 4
        $result[self::STATS_JOBS_TOTAL] = array_sum($result);
278 4
        $result[self::STATS_QUEUE_STOP] = $this->isStop($queueName);
279
280 4
        return $result;
281
    }
282
283
284
    // endregion ***************************************************************
285
286
    // region JobPayload ************************************************************
287
288
    /**
289
     * @param $id
290
     *
291
     * @return mixed
292
     */
293 4
    protected function getPayload($id)
294
    {
295 4
        $rawPayload = $this->getRedis()->hget($this->getRedisJobNamespace(), $id);
296
297 4
        return static::parsePayload($rawPayload);
298
    }
299
300
    /**
301
     * @param $queueName string
302
     * @param $jobState  string
303
     *
304
     * @return string
305
     */
306 5
    protected static function buildPayload($queueName, $jobState)
307
    {
308 5
        return serialize([
309 5
            self::JOB_PAYLOAD_QUEUE => $queueName,
310 5
            self::JOB_PAYLOAD_STATE => $jobState,
311 5
        ]);
312
    }
313
314
    /**
315
     * @param $rawPayload
316
     *
317
     * @return mixed
318
     */
319 4
    protected static function parsePayload($rawPayload)
320
    {
321 4
        return unserialize($rawPayload);
322
    }
323
324
    // endregion ***************************************************************
325
326
    // region HELPER ************************************************************
327
328
329
    /**
330
     * @return int
331
     */
332 4
    protected function getTime()
333
    {
334 4
        return time();
335
    }
336
337
    /**
338
     * @param string $key
339
     *
340
     * @return string
341
     */
342
    protected function getQueueNameFromKey($key)
343
    {
344
        return explode(':', $key)[1];
345
    }
346
347
    /**
348
     * @param string $queue
349
     * @param string $state
350
     *
351
     * @return string
352
     */
353 6
    protected function getKey($queue, $state = self::STATE_READY)
354
    {
355 6
        return implode(':', [$this->getRedisQueueNamespace(), $queue, $state]);
356
    }
357
358
    /**
359
     * @param $queue
360
     *
361
     * @return string
362
     */
363 5
    protected function getKeyReady($queue)
364
    {
365 5
        return $this->getKey($queue, self::STATE_READY);
366
    }
367
368
    /**
369
     * @param $queue
370
     *
371
     * @return string
372
     */
373 5
    protected function getKeyDelayed($queue)
374
    {
375 5
        return $this->getKey($queue, self::STATE_DELAYED);
376
    }
377
378
    /**
379
     * @param $queue
380
     *
381
     * @return string
382
     */
383 5
    protected function getKeyReserved($queue)
384
    {
385 5
        return $this->getKey($queue, self::STATE_RESERVED);
386
    }
387
388
    /**
389
     * @param $queue
390
     *
391
     * @return string
392
     */
393 4
    protected function getKeyBuried($queue)
394
    {
395 4
        return $this->getKey($queue, self::STATE_BURIED);
396
    }
397
398
399
    // endregion ***************************************************************
400
401
    // region JOB ************************************************************
402
403
    /**
404
     * @param $id
405
     *
406
     * @return bool
407
     */
408 1
    public function existsJob($id)
409
    {
410 1
        return !empty($this->getPayload($id));
411
    }
412
413
    /**
414
     * @param        $queue
415
     * @param        $id
416
     * @param int    $delay
417
     * @param string $priority
418
     *
419
     * @return bool
420
     */
421 5
    public function putJob($queue, $id, $delay = 0, $priority = self::PRIORITY_LOW)
422
    {
423 5
        $delay = intval($delay);
424 5
        $keyReady = $this->getKeyReady($queue);
425 5
        $keyDelayed = $this->getKeyDelayed($queue);
426 5
        $redisJobNamespace = $this->getRedisJobNamespace();
427
428 5
        $result = $this->getRedis()->transaction(
429 View Code Duplication
            function ($tx) use ($queue, $id, $delay, $keyDelayed, $priority, $keyReady, $redisJobNamespace) {
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated across your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
430
                /** @var $tx \Predis\Client */
431 5
                $payload = self::buildPayload($queue, (($delay) ? self::STATE_DELAYED : self::STATE_READY));
432 5
                $tx->hset($redisJobNamespace, $id, $payload);
433
434 5
                if ($delay <= 0) {
435 4
                    if (self::PRIORITY_HIGH === $priority) {
436
                        $tx->lpush($keyReady, $id);
437
                    } else {
438 4
                        $tx->rpush($keyReady, $id);
439
                    }
440 4
                } else {
441 1
                    $tx->zadd($keyDelayed, $this->getTime() + $delay, $id);
442
                }
443 5
            }
444 5
        );
445
446 5
        return !in_array(false, $result);
447
    }
448
449
    /**
450
     * @param      $queue
451
     * @param null $timeout
452
     *
453
     * @return null|string
454
     */
455 4
    public function reserveJob($queue, $timeout = null)
456
    {
457 4
        $this->migrate($queue);
458 4
        $keyReady = $this->getKeyReady($queue);
459 4
        $keyReserved = $this->getKeyReserved($queue);
460
461 4
        if ($this->isStop($queue)) {
462
            return null;
463
        }
464
465 4
        if ($timeout > 0) {
466
            $id = $this->getRedis()->blpop($keyReady, $timeout);
467
            /**
468
             *  return [
469
             *      '0' => queueName ,
470
             *      '1' => jobId
471
             *  ]
472
             */
473
            $id = (is_array($id)) ? $id[1] : null;
474
        } else {
475 4
            $id = $this->getRedis()->lpop($keyReady);
476
        }
477
478 4
        if (!is_null($id)) {
479 4
            $payload = self::buildPayload($queue, self::STATE_RESERVED);
480 4
            $this->getRedis()->hset($this->getRedisJobNamespace(), $id, $payload);
481 4
            $this->getRedis()->zadd($keyReserved, $this->getTime(), $id);
482
483 4
            return $id;
484
        }
485
486
        return null;
487
    }
488
489
    /**
490
     * @param $id
491
     *
492
     * @return bool
493
     */
494 4
    public function deleteJob($id)
495
    {
496 4
        $payload = $this->getPayload($id);
497
498 4
        if (!in_array($payload[self::JOB_PAYLOAD_STATE], [self::STATE_RESERVED, self::STATE_BURIED])) {
499
            return true;
500
        }
501
502 4
        $keyBuried = $this->getKeyBuried($payload[self::JOB_PAYLOAD_QUEUE]);
503 4
        $keyReserved = $this->getKeyReserved($payload[self::JOB_PAYLOAD_QUEUE]);
504 4
        $redisJobNamespace = $this->getRedisJobNamespace();
505
506 4
        $result = $this->getRedis()->transaction(
507
            function ($tx) use ($id, $keyBuried, $keyReserved, $redisJobNamespace) {
508
                /** @var $tx \Predis\Client */
509 4
                $tx->zrem($keyReserved, $id);
510 4
                $tx->zrem($keyBuried, $id);
511 4
                $tx->hdel($redisJobNamespace, $id);
512 4
            }
513 4
        );
514
515 4
        return array_count_values($result) >= 1;
516
    }
517
518
    /**
519
     * @param $id
520
     *
521
     * @return bool
522
     * @throws QueueException
523
     */
524 1 View Code Duplication
    public function buryJob($id)
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
525
    {
526 1
        $payload = $this->getPayload($id);
527
528 1
        if ($payload[self::JOB_PAYLOAD_STATE] !== self::STATE_RESERVED) {
529
            throw new QueueException(' Cannot bury state unsupported!' . $payload[self::JOB_PAYLOAD_STATE]);
530
        }
531
532 1
        $redisJobNamespace = $this->getRedisJobNamespace();
533 1
        $keyBuried = $this->getKeyBuried($payload[self::JOB_PAYLOAD_QUEUE]);
534 1
        $keyReserved = $this->getKeyReserved($payload[self::JOB_PAYLOAD_QUEUE]);
535
536 1
        $result = $this->getRedis()->transaction(
537
            function ($tx) use ($id, $payload, $keyBuried, $keyReserved, $redisJobNamespace) {
538
                /** @var $tx \Predis\Client */
539 1
                $tx->zrem($keyReserved, $id);
540 1
                $payload = self::buildPayload($payload[self::JOB_PAYLOAD_QUEUE], self::STATE_BURIED);
0 ignored issues
show
Bug introduced by
Consider using a different name than the imported variable $payload, or did you forget to import by reference?

It seems like you are assigning to a variable which was imported through a use statement which was not imported by reference.

For clarity, we suggest to use a different name or import by reference depending on whether you would like to have the change visibile in outer-scope.

Change not visible in outer-scope

$x = 1;
$callable = function() use ($x) {
    $x = 2; // Not visible in outer scope. If you would like this, how
            // about using a different variable name than $x?
};

$callable();
var_dump($x); // integer(1)

Change visible in outer-scope

$x = 1;
$callable = function() use (&$x) {
    $x = 2;
};

$callable();
var_dump($x); // integer(2)
Loading history...
541 1
                $tx->hset($redisJobNamespace, $id, $payload);
542 1
                $tx->zadd($keyBuried, $this->getTime(), $id);
543 1
            }
544 1
        );
545
546 1
        return !in_array(false, $result);
547
    }
548
549
    /**
550
     * @param $id
551
     *
552
     * @return bool
553
     * @throws QueueException
554
     */
555 1 View Code Duplication
    public function kickJob($id)
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
556
    {
557 1
        $payload = $this->getPayload($id);
558
559 1
        if ($payload[self::JOB_PAYLOAD_STATE] !== self::STATE_BURIED) {
560
            throw new QueueException(' Cannot kick state unsupported!' . $payload[self::JOB_PAYLOAD_STATE]);
561
        }
562
563 1
        $keyReady = $this->getKeyReady($payload[self::JOB_PAYLOAD_QUEUE]);
564 1
        $keyBuried = $this->getKeyBuried($payload[self::JOB_PAYLOAD_QUEUE]);
565 1
        $redisJobNamespace = $this->getRedisJobNamespace();
566
567 1
        $result = $this->getRedis()->transaction(
568
            function ($tx) use ($id, $payload, $keyBuried, $keyReady, $redisJobNamespace) {
569
                /** @var $tx \Predis\Client */
570 1
                $tx->zrem($keyBuried, $id);
571 1
                $payload = self::buildPayload($payload[self::JOB_PAYLOAD_QUEUE], self::STATE_READY);
0 ignored issues
show
Bug introduced by
Consider using a different name than the imported variable $payload, or did you forget to import by reference?

It seems like you are assigning to a variable which was imported through a use statement which was not imported by reference.

For clarity, we suggest to use a different name or import by reference depending on whether you would like to have the change visibile in outer-scope.

Change not visible in outer-scope

$x = 1;
$callable = function() use ($x) {
    $x = 2; // Not visible in outer scope. If you would like this, how
            // about using a different variable name than $x?
};

$callable();
var_dump($x); // integer(1)

Change visible in outer-scope

$x = 1;
$callable = function() use (&$x) {
    $x = 2;
};

$callable();
var_dump($x); // integer(2)
Loading history...
572 1
                $tx->hset($redisJobNamespace, $id, $payload);
573 1
                $tx->rpush($keyReady, $id);
574 1
            }
575 1
        );
576
577 1
        return !in_array(false, $result);
578
    }
579
580
    /**
581
     * @param     $id
582
     * @param int $delay
583
     *
584
     * @return bool
585
     * @throws QueueException
586
     */
587 1
    public function releaseJob($id, $delay = 0)
588
    {
589 1
        $payload = $this->getPayload($id);
590
591 1
        if ($payload[self::JOB_PAYLOAD_STATE] !== self::STATE_RESERVED) {
592
            throw new QueueException(' Cannot release state unsupported! ' . $payload[self::JOB_PAYLOAD_STATE]);
593
        }
594
595 1
        $keyReady = $this->getKeyReady($payload[self::JOB_PAYLOAD_QUEUE]);
596 1
        $keyDelayed = $this->getKeyDelayed($payload[self::JOB_PAYLOAD_QUEUE]);
597 1
        $keyReserved = $this->getKeyReserved($payload[self::JOB_PAYLOAD_QUEUE]);
598 1
        $redisJobNamespace = $this->getRedisJobNamespace();
599
600 1
        $result = $this->getRedis()->transaction(
601 View Code Duplication
            function ($tx) use ($id, $payload, $delay, $keyReady, $keyDelayed, $keyReserved, $redisJobNamespace) {
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated across your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
602
                /** @var $tx \Predis\Client */
603 1
                $tx->zrem($keyReserved, $id);
604
605 1
                $payload = self::buildPayload($payload[self::JOB_PAYLOAD_QUEUE], (($delay)
0 ignored issues
show
Bug introduced by
Consider using a different name than the imported variable $payload, or did you forget to import by reference?

It seems like you are assigning to a variable which was imported through a use statement which was not imported by reference.

For clarity, we suggest to use a different name or import by reference depending on whether you would like to have the change visibile in outer-scope.

Change not visible in outer-scope

$x = 1;
$callable = function() use ($x) {
    $x = 2; // Not visible in outer scope. If you would like this, how
            // about using a different variable name than $x?
};

$callable();
var_dump($x); // integer(1)

Change visible in outer-scope

$x = 1;
$callable = function() use (&$x) {
    $x = 2;
};

$callable();
var_dump($x); // integer(2)
Loading history...
606 1
                    ? self::STATE_DELAYED
607
                    : self::STATE_READY
608 1
                ));
609 1
                $tx->hset($redisJobNamespace, $id, $payload);
610
611 1
                if ($delay == 0) {
612 1
                    $tx->rpush($keyReady, $id);
613 1
                } else {
614
                    $tx->zadd($keyDelayed, $this->getTime() + $delay, $id);
615
                }
616 1
            }
617 1
        );
618
619 1
        return !in_array(false, $result);
620
    }
621
622
    // endregion ***************************************************************
623
624
    // region MIGRATIONS *************************************************************
625
626
    /**
627
     * @param null $queue
628
     *
629
     * @return bool
630
     */
631 4
    public function migrate($queue = null)
632
    {
633 4
        if (null === $queue) {
634 1
            $list = $this->getQueueNames();
635 1
            foreach ($list as $item) {
636 1
                $this->migrate($item);
637 1
            }
638
639 1
            return true;
640
        }
641
642 4
        $keyReady = $this->getKeyReady($queue);
643 4
        $keyDelayed = $this->getKeyDelayed($queue);
644 4
        $redisJobNamespace = $this->getRedisJobNamespace();
645
646 4
        $this->getRedis()->transaction(['cas' => true, 'watch' => [$keyReady, $keyDelayed], 'retry' => 10],
647 4
            function ($tx) use ($queue, $keyReady, $keyDelayed, $redisJobNamespace) {
648
649
                /** @var $tx \Predis\Client */
650 4
                $time = $this->getTime();
651
652
                // get expired jobs from "delayed queue"
653 4
                $jobIds = $tx->zrangebyscore($keyDelayed, '-inf', $time);
654
655 4
                if (count($jobIds) > 0) {
656
                    // remove jobs from "delayed queue"
657 1
                    $tx->multi();
658 1
                    $tx->zremrangebyscore($keyDelayed, '-inf', $time);
659 1
                    foreach ($jobIds as $id) {
660 1
                        $tx->hset($redisJobNamespace, $id, self::buildPayload($queue, self::STATE_READY));
661 1
                        $tx->rpush($keyReady, $id);
662 1
                    }
663 1
                }
664 4
            }
665 4
        );
666
667 4
        return true;
668
    }
669
670
    // endregion ***************************************************************
671
672
    /**
673
     * @return $this
674
     */
675
    public function clean()
676
    {
677
        $this->getRedis()->del($this->getRedisQueueNamespace());
678
        $this->getRedis()->del($this->getRedisJobNamespace());
679
        $this->getRedis()->del($this->getRedisQueueStopNamespace());
680
681
        return $this;
682
    }
683
684
685
}