Completed
Pull Request — master (#44)
by Matthew
16:45
created

JobManager::getStatus()   A

Complexity

Conditions 4
Paths 6

Size

Total Lines 21
Code Lines 14

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 0
CRAP Score 20

Importance

Changes 0
Metric Value
c 0
b 0
f 0
dl 0
loc 21
ccs 0
cts 0
cp 0
rs 9.0534
cc 4
eloc 14
nc 6
nop 0
crap 20
1
<?php
2
3
namespace Dtc\QueueBundle\Redis;
4
5
use Dtc\QueueBundle\Exception\ClassNotSubclassException;
6
use Dtc\QueueBundle\Exception\PriorityException;
7
use Dtc\QueueBundle\Exception\UnsupportedException;
8
use Dtc\QueueBundle\Manager\JobIdTrait;
9
use Dtc\QueueBundle\Manager\JobTimingManager;
10
use Dtc\QueueBundle\Manager\PriorityJobManager;
11
use Dtc\QueueBundle\Manager\RunManager;
12
use Dtc\QueueBundle\Model\BaseJob;
13
use Dtc\QueueBundle\Model\RetryableJob;
14
use Dtc\QueueBundle\Util\Util;
15
16
/**
17
 * For future implementation.
18
 */
19
class JobManager extends PriorityJobManager
20
{
21
    use JobIdTrait;
22
23
    /** @var RedisInterface */
24
    protected $redis;
25
26
    /** @var string */
27
    protected $cacheKeyPrefix;
28
29
    protected $hostname;
30
    protected $pid;
31
32
    /**
33
     * @param string $cacheKeyPrefix
34
     */
35 2
    public function __construct(RunManager $runManager, JobTimingManager $jobTimingManager, $jobClass, $cacheKeyPrefix)
36
    {
37 2
        $this->cacheKeyPrefix = $cacheKeyPrefix;
38 2
        $this->hostname = gethostname() ?: '';
39 2
        $this->pid = getmypid();
40
41 2
        parent::__construct($runManager, $jobTimingManager, $jobClass);
42 2
    }
43
44
    public function setRedis(RedisInterface $redis)
45
    {
46
        $this->redis = $redis;
47
    }
48
49 18
    protected function getJobCacheKey($jobId)
50
    {
51 18
        return $this->cacheKeyPrefix.'_job_'.$jobId;
52
    }
53
54
    /**
55
     * @param string $jobCrc
56
     */
57 18
    protected function getJobCrcHashKey($jobCrc)
58
    {
59 18
        return $this->cacheKeyPrefix.'_job_crc_'.$jobCrc;
60
    }
61
62 16
    protected function getPriorityQueueCacheKey()
63
    {
64 16
        return $this->cacheKeyPrefix.'_priority';
65
    }
66
67 20
    protected function getWhenQueueCacheKey()
68
    {
69 20
        return $this->cacheKeyPrefix.'_when';
70
    }
71
72 14
    protected function getStatusCacheKey()
73
    {
74
        return $this->cacheKeyPrefix.'_status';
75 14
    }
76 14
77 14
    protected function transferQueues()
78 14
    {
79 12
        // Drains from WhenAt queue into Prioirty Queue
80 12
        $whenQueue = $this->getWhenQueueCacheKey();
81 12
        $priorityQueue = $this->getPriorityQueueCacheKey();
82 12
        $microtime = Util::getMicrotimeDecimal();
83 12
        while ($jobId = $this->redis->zPopByMaxScore($whenQueue, $microtime)) {
84
            $jobMessage = $this->redis->get($this->getJobCacheKey($jobId));
85
            if ($jobMessage) {
86 14
                $job = new Job();
87
                $job->fromMessage($jobMessage);
0 ignored issues
show
Documentation introduced by
$jobMessage is of type boolean, but the function expects a string.

It seems like the type of the argument is not accepted by the function/method which you are calling.

In some cases, in particular if PHP’s automatic type-juggling kicks in this might be fine. In other cases, however this might be a bug.

We suggest to add an explicit type cast like in the following example:

function acceptsInteger($int) { }

$x = '123'; // string "123"

// Instead of
acceptsInteger($x);

// we recommend to use
acceptsInteger((integer) $x);
Loading history...
88
                $this->redis->zAdd($priorityQueue, $job->getPriority(), $job->getId());
89
            }
90
        }
91
    }
92
93 2
    /**
94
     * @param Job $job
95 2
     *
96 2
     * @return Job|null
97 2
     */
98 2
    protected function batchSave(Job $job)
99 2
    {
100 2
        $crcHash = $job->getCrcHash();
101 2
        $crcCacheKey = $this->getJobCrcHashKey($crcHash);
102
        $result = $this->redis->lrange($crcCacheKey, 0, 1000);
103
        if (is_array($result)) {
104
            foreach ($result as $jobId) {
105
                $jobCacheKey1 = $this->getJobCacheKey($jobId);
106
                if (!($foundJobMessage = $this->redis->get($jobCacheKey1))) {
107 2
                    $this->redis->lRem($crcCacheKey, 1, $jobCacheKey1);
108 2
                    continue;
109 2
                }
110 2
111
                /// There is one?
112
                if ($foundJobMessage) {
113
                    $foundJob = $this->batchFoundJob($job, $jobCacheKey1, $foundJobMessage);
114
                    if ($foundJob) {
115
                        return $foundJob;
116
                    }
117
                }
118
            }
119
        }
120
121
        return null;
122
    }
123 2
124
    /**
125 2
     * @param string $foundJobCacheKey
126 2
     * @param bool   $foundJobMessage
127 2
     */
128
    protected function batchFoundJob(Job $job, $foundJobCacheKey, $foundJobMessage)
129 2
    {
130 2
        $when = $job->getWhenUs();
131 2
        $crcHash = $job->getCrcHash();
132
        $crcCacheKey = $this->getJobCrcHashKey($crcHash);
133
134 2
        $foundJob = new Job();
135 2
        $foundJob->fromMessage($foundJobMessage);
0 ignored issues
show
Documentation introduced by
$foundJobMessage is of type boolean, but the function expects a string.

It seems like the type of the argument is not accepted by the function/method which you are calling.

In some cases, in particular if PHP’s automatic type-juggling kicks in this might be fine. In other cases, however this might be a bug.

We suggest to add an explicit type cast like in the following example:

function acceptsInteger($int) { }

$x = '123'; // string "123"

// Instead of
acceptsInteger($x);

// we recommend to use
acceptsInteger((integer) $x);
Loading history...
136 2
        $foundWhen = $foundJob->getWhenUs();
137 2
138
        // Fix this using bcmath
139 2
        $curtimeU = Util::getMicrotimeDecimal();
140 2
        $newFoundWhen = null;
141 2
        if (bccomp($foundWhen, $curtimeU) > 0 && bccomp($foundWhen, $when) >= 1) {
142 2
            $newFoundWhen = $when;
143
        }
144
        $foundPriority = $foundJob->getPriority();
145 2
        $newFoundPriority = null;
146
        if ($foundPriority > $job->getPriority()) {
147
            $newFoundPriority = $job->getPriority();
148
        }
149
150
        return $this->finishBatchFoundJob($foundJob, $foundJobCacheKey, $crcCacheKey, $newFoundWhen, $newFoundPriority);
151
    }
152 2
153
    /**
154
     * @param string   $crcCacheKey
155 2
     * @param int|null $newFoundPriority
156 2
     */
157 2
    protected function finishBatchFoundJob(Job $foundJob, $foundJobCacheKey, $crcCacheKey, $newFoundWhen, $newFoundPriority)
158 2
    {
159
        // Now how do we adjust this job's priority or time?
160 2
        $adjust = false;
161 2
        if (isset($newFoundWhen)) {
162 2
            $foundJob->setWhenUs($newFoundWhen);
163
            $adjust = true;
164 2
        }
165 2
        if (isset($newFoundPriority)) {
166
            $foundJob->setPriority($newFoundPriority);
167
            $adjust = true;
168 2
        }
169
        if (!$adjust) {
170
            return $foundJob;
171
        }
172
173
        return $this->addFoundJob($adjust, $foundJob, $foundJobCacheKey, $crcCacheKey);
174 2
    }
175
176 2
    /**
177 2
     * @param bool $adjust
178 2
     */
179 2
    protected function addFoundJob($adjust, Job $foundJob, $foundJobCacheKey, $crcCacheKey)
180
    {
181
        $whenQueue = $this->getWhenQueueCacheKey();
182
        $result = $this->adjustJob($adjust, $whenQueue, $foundJob, $foundJobCacheKey, $crcCacheKey, $foundJob->getWhenUs());
183
        if (null !== $result) {
184
            return $result;
185
        }
186
        if (null === $this->maxPriority) {
187
            return false;
188
        }
189
190
        $priorityQueue = $this->getPriorityQueueCacheKey();
191
        $result = $this->adjustJob($adjust, $priorityQueue, $foundJob, $foundJobCacheKey, $crcCacheKey, $foundJob->getPriority());
192
193
        return $result ?: false;
194
    }
195 2
196
    /**
197 2
     * @param string $queue
198 2
     * @param bool   $adjust
199
     */
200
    private function adjustJob($adjust, $queue, Job $foundJob, $foundJobCacheKey, $crcCacheKey, $zScore)
201
    {
202
        if ($adjust && $this->redis->zRem($queue, $foundJob->getId()) > 0) {
203
            if (!$this->insertJob($foundJob)) {
204 2
                // Job is expired
205
                $this->redis->lRem($crcCacheKey, 1, $foundJobCacheKey);
206 2
207
                return false;
208
            }
209
            $this->redis->zAdd($queue, $zScore, $foundJob->getId());
210
211
            return $foundJob;
212
        }
213
214
        return null;
215
    }
216
217
    /**
218
     * @param \Dtc\QueueBundle\Model\Job $job
219
     *
220 18
     * @return Job|null
221
     *
222 18
     * @throws ClassNotSubclassException
223
     * @throws PriorityException
224
     */
225
    public function prioritySave(\Dtc\QueueBundle\Model\Job $job)
226 18
    {
227 18
        if (!$job instanceof Job) {
228
            throw new \InvalidArgumentException('$job must be instance of '.Job::class);
229
        }
230 18
231 18
        $this->validateSaveable($job);
232
        $this->setJobId($job);
233
234
        // Add to whenAt or priority queue?  /// optimizaiton...
235
        $whenUs = $job->getWhenUs();
236 18
        if (!$whenUs) {
237
            $whenUs = Util::getMicrotimeDecimal();
238 2
            $job->setWhenUs($whenUs);
239 2
        }
240
241
        if (true === $job->getBatch()) {
242
            // is there a CRC Hash already for this job
243 18
            if ($oldJob = $this->batchSave($job)) {
244
                return $oldJob;
245
            }
246 18
        }
247
248 18
        return $this->saveJob($job);
249 18
    }
250
251 18
    protected function saveJob(Job $job)
252
    {
253 2
        $whenQueue = $this->getWhenQueueCacheKey();
254
        $crcCacheKey = $this->getJobCrcHashKey($job->getCrcHash());
255 18
        // Save Job
256 18
        if (!$this->insertJob($job)) {
257
            // job is expired
258 18
            return null;
259
        }
260 18
        $jobId = $job->getId();
261
        $when = $job->getWhenUs();
262 18
        // Add Job to CRC list
263
        $this->redis->lPush($crcCacheKey, [$jobId]);
264
265
        $this->redis->zAdd($whenQueue, $when, $jobId);
266
267
        return $job;
268
    }
269
270 18
    /**
271
     * @param Job $job
272
     *
273 18
     * @return bool false if the job is already expired, true otherwise
274 18
     */
275 2
    protected function insertJob(Job $job)
276 2
    {
277 2
        // Save Job
278
        $jobCacheKey = $this->getJobCacheKey($job->getId());
279
        if ($expiresAt = $job->getExpiresAt()) {
280
            $expiresAtTime = $expiresAt->getTimestamp() - time();
281
            if ($expiresAtTime <= 0) {
282
                return false; /// ??? job is already expired
283 18
            }
284
            $this->redis->setEx($jobCacheKey, $expiresAtTime, $job->toMessage());
285 18
286
            return true;
287
        }
288
        $this->redis->set($jobCacheKey, $job->toMessage());
289
290
        return true;
291
    }
292
293
    /**
294
     * Returns the prioirty in DESCENDING order, except if maxPrioirty is null, then prioirty is 0.
295 18
     *
296
     * @param int|null $priority
297 18
     *
298 18
     * @return int
299 16
     */
300
    protected function calculatePriority($priority)
301
    {
302 8
        $priority = parent::calculatePriority($priority);
303
        if (null === $priority) {
304
            return null === $this->maxPriority ? 0 : $this->maxPriority;
305
        }
306
307 8
        if (null === $this->maxPriority) {
308
            return 0;
309
        }
310
311
        // Redis priority should be in DESC order
312
        return $this->maxPriority - $priority;
313
    }
314
315
    /**
316 18
     * @param \Dtc\QueueBundle\Model\Job $job
317
     *
318 18
     * @throws PriorityException
319
     * @throws ClassNotSubclassException
320
     */
321 View Code Duplication
    protected function validateSaveable(\Dtc\QueueBundle\Model\Job $job)
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
322 18
    {
323
        if (null !== $job->getPriority() && null === $this->maxPriority) {
324
            throw new PriorityException('This queue does not support priorities');
325 18
        }
326
327
        if (!$job instanceof RetryableJob) {
328
            throw new ClassNotSubclassException('Job needs to be instance of '.RetryableJob::class);
329
        }
330
    }
331
332
    /**
333
     * @param string|null $workerName
334 16
     * @param string|null $methodName
335
     * @param bool        $prioritize
336 16
     *
337 4
     * @throws UnsupportedException
338
     */
339 14
    protected function verifyGetJobArgs($workerName = null, $methodName = null, $prioritize = true)
340
    {
341 2 View Code Duplication
        if (null !== $workerName || null !== $methodName || (null !== $this->maxPriority && true !== $prioritize)) {
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated across your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
342
            throw new UnsupportedException('Unsupported');
343 2
        }
344 2
    }
345 2
346
    public function deleteJob(\Dtc\QueueBundle\Model\Job $job)
347 2
    {
348 2
        $jobId = $job->getId();
349
        $priorityQueue = $this->getPriorityQueueCacheKey();
350 2
        $whenQueue = $this->getWhenQueueCacheKey();
351 2
352
        $deleted = false;
0 ignored issues
show
Unused Code introduced by
$deleted is not used, you could remove the assignment.

This check looks for variable assignements that are either overwritten by other assignments or where the variable is not used subsequently.

$myVar = 'Value';
$higher = false;

if (rand(1, 6) > 3) {
    $higher = true;
} else {
    $higher = false;
}

Both the $myVar assignment in line 1 and the $higher assignment in line 2 are dead. The first because $myVar is never used and the second because $higher is always overwritten for every possible time line.

Loading history...
353
354 2
        if ($this->redis->zRem($priorityQueue, $jobId)) {
355 2
            $deleted = true;
0 ignored issues
show
Unused Code introduced by
$deleted is not used, you could remove the assignment.

This check looks for variable assignements that are either overwritten by other assignments or where the variable is not used subsequently.

$myVar = 'Value';
$higher = false;

if (rand(1, 6) > 3) {
    $higher = true;
} else {
    $higher = false;
}

Both the $myVar assignment in line 1 and the $higher assignment in line 2 are dead. The first because $myVar is never used and the second because $higher is always overwritten for every possible time line.

Loading history...
356 2
        } elseif ($this->redis->zRem($whenQueue, $jobId)) {
357
            $deleted = true;
0 ignored issues
show
Unused Code introduced by
$deleted is not used, you could remove the assignment.

This check looks for variable assignements that are either overwritten by other assignments or where the variable is not used subsequently.

$myVar = 'Value';
$higher = false;

if (rand(1, 6) > 3) {
    $higher = true;
} else {
    $higher = false;
}

Both the $myVar assignment in line 1 and the $higher assignment in line 2 are dead. The first because $myVar is never used and the second because $higher is always overwritten for every possible time line.

Loading history...
358 2
        }
359
360
        $this->redis->del([$this->getJobCacheKey($jobId)]);
361
        $this->redis->lRem($this->getJobCrcHashKey($job->getCrcHash()), 1, $jobId);
362
    }
363
364
    /**
365
     * @param string|null $workerName
366
     * @param string|null $methodName
367
     * @param bool        $prioritize
368
     * @param mixed       $runId
369
     *
370 16
     * @throws UnsupportedException
371
     *
372
     * @return Job|null
373
     */
374 16
    public function getJob($workerName = null, $methodName = null, $prioritize = true, $runId = null)
375 14
    {
376 14
        // First thing migrate any jobs from When queue to Prioirty queue
377 14
378 14
        $this->verifyGetJobArgs($workerName, $methodName, $prioritize);
379
        if (null !== $this->maxPriority) {
380
            $this->transferQueues();
381
            $queue = $this->getPriorityQueueCacheKey();
382
            $jobId = $this->redis->zPop($queue);
383
        } else {
384
            $queue = $this->getWhenQueueCacheKey();
385 14
            $microtime = Util::getMicrotimeDecimal();
386 12
            $jobId = $this->redis->zPopByMaxScore($queue, $microtime);
387 12
        }
388 12
389 12
        if ($jobId) {
390 12
            $jobMessage = $this->redis->get($this->getJobCacheKey($jobId));
391 12
            $job = new Job();
392
            $job->fromMessage($jobMessage);
0 ignored issues
show
Documentation introduced by
$jobMessage is of type boolean, but the function expects a string.

It seems like the type of the argument is not accepted by the function/method which you are calling.

In some cases, in particular if PHP’s automatic type-juggling kicks in this might be fine. In other cases, however this might be a bug.

We suggest to add an explicit type cast like in the following example:

function acceptsInteger($int) { }

$x = '123'; // string "123"

// Instead of
acceptsInteger($x);

// we recommend to use
acceptsInteger((integer) $x);
Loading history...
393 12
            $crcCacheKey = $this->getJobCrcHashKey($job->getCrcHash());
394
            $this->redis->lRem($crcCacheKey, 1, $job->getId());
395
            $this->redis->del([$this->getJobCacheKey($job->getId())]);
396 10
397
            return $job;
398
        }
399 4
400
        return null;
401 4
    }
402 4
403
    public function getWaitingJobCount($workerName = null, $methodName = null)
404 4
    {
405 4
        $microtime = Util::getMicrotimeDecimal();
406
        $count = $this->redis->zCount($this->getWhenQueueCacheKey(), 0, $microtime);
407
408 4
        if (null !== $this->maxPriority) {
409
            $count += $this->redis->zCount($this->getPriorityQueueCacheKey(), '-inf', '+inf');
410
        }
411 4
412
        return $count;
413 4
    }
414
415 View Code Duplication
    public function resetJob(RetryableJob $job)
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
416 4
    {
417 4
        if (!$job instanceof Job) {
418 4
            throw new \InvalidArgumentException('$job must be instance of '.Job::class);
419 4
        }
420 4
        $job->setStatus(BaseJob::STATUS_NEW);
421 4
        $job->setMessage(null);
422
        $job->setStartedAt(null);
423 4
        $job->setRetries($job->getRetries() + 1);
424
        $job->setUpdatedAt(Util::getMicrotimeDateTime());
425
        $this->saveJob($job);
426 6
427
        return true;
428 6
    }
429
430
    private function collateStatusResults(array &$results, $cacheKey)
431
    {
432
        $cursor = null;
433
        while ($jobs = $this->redis->zScan($cacheKey, $cursor, '', 100)) {
434
            $jobs = $this->redis->mget(array_map(function ($item) {
435
                return $this->getJobCacheKey($item);
436
            }, array_keys($jobs)));
437
            $this->extractStatusResults($jobs, $results);
438
            if (0 === $cursor) {
439
                break;
440
            }
441
        }
442
443
        return $results;
444
    }
445
446
    private function extractStatusResults(array $jobs, array &$results)
447
    {
448
        foreach ($jobs as $jobMessage) {
449
            if ($jobMessage) {
450
                $job = new Job();
451
                $job->fromMessage($jobMessage);
452
                $resultHashKey = $job->getWorkerName().'->'.$job->getMethod().'()';
453
                if (!isset($results[$resultHashKey][BaseJob::STATUS_NEW])) {
454
                    $results[$resultHashKey] = static::getAllStatuses();
455
                }
456
                if (!isset($results[$resultHashKey][BaseJob::STATUS_NEW])) {
457
                    $results[$resultHashKey][BaseJob::STATUS_NEW] = 0;
458
                }
459
                ++$results[$resultHashKey][BaseJob::STATUS_NEW];
460
            }
461
        }
462
    }
463
464
    private function extractStatusHashResults(array $hResults, array &$results)
465
    {
466
        foreach ($hResults as $key => $value) {
467
            list($workerName, $method, $status) = explode(',', $key);
468
            $resultHashKey = $workerName.'->'.$method.'()';
469
            if (!isset($results[$resultHashKey])) {
470
                $results[$resultHashKey] = static::getAllStatuses();
471
            }
472
            if (!isset($results[$resultHashKey][$status])) {
473
                $results[$resultHashKey][$status] = 0;
474
            }
475
            $results[$resultHashKey][$status] += $value;
476
        }
477
    }
478
479
    public function getStatus()
480
    {
481
        $whenQueueCacheKey = $this->getWhenQueueCacheKey();
482
        $priorityQueueCacheKey = $this->getPriorityQueueCacheKey();
483
        $results = [];
484
        $this->collateStatusResults($results, $whenQueueCacheKey);
485
        if (null !== $this->maxPriority) {
486
            $this->collateStatusResults($results, $priorityQueueCacheKey);
487
        }
488
489
        $cacheKey = $this->getStatusCacheKey();
490
        $cursor = null;
491
        while ($hResults = $this->redis->hScan($cacheKey, $cursor, '', 100)) {
492
            $this->extractStatusHashResults($hResults, $results);
493
            if (0 === $cursor) {
494
                break;
495
            }
496
        }
497
498
        return $results;
499
    }
500
501
    public function retryableSaveHistory(RetryableJob $job, $retry)
502
    {
503
        $cacheKey = $this->getStatusCacheKey();
504
        $hashKey = $job->getWorkerName();
505
        $hashKey .= ',';
506
        $hashKey .= $job->getMethod();
507
        $hashKey .= ',';
508
        $hashKey .= $job->getStatus();
509
        $this->redis->hIncrBy($cacheKey, $hashKey, 1);
510
    }
511
}
512