Completed
Pull Request — master (#44)
by Matthew
07:13 queued 01:21
created

JobManager::extractStatusHashResults()   A

Complexity

Conditions 4
Paths 5

Size

Total Lines 14
Code Lines 9

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 8
CRAP Score 4.128

Importance

Changes 0
Metric Value
c 0
b 0
f 0
dl 0
loc 14
ccs 8
cts 10
cp 0.8
rs 9.2
cc 4
eloc 9
nc 5
nop 2
crap 4.128
1
<?php
2
3
namespace Dtc\QueueBundle\Redis;
4
5
use Dtc\QueueBundle\Exception\ClassNotSubclassException;
6
use Dtc\QueueBundle\Exception\PriorityException;
7
use Dtc\QueueBundle\Exception\UnsupportedException;
8
use Dtc\QueueBundle\Manager\JobIdTrait;
9
use Dtc\QueueBundle\Manager\JobTimingManager;
10
use Dtc\QueueBundle\Manager\PriorityJobManager;
11
use Dtc\QueueBundle\Manager\RunManager;
12
use Dtc\QueueBundle\Model\BaseJob;
13
use Dtc\QueueBundle\Model\RetryableJob;
14
use Dtc\QueueBundle\Util\Util;
15
16
/**
17
 * For future implementation.
18
 */
19
class JobManager extends PriorityJobManager
20
{
21
    use JobIdTrait;
22
23
    /** @var RedisInterface */
24
    protected $redis;
25
26
    /** @var string */
27
    protected $cacheKeyPrefix;
28
29
    protected $hostname;
30
    protected $pid;
31
32
    /**
33
     * @param string $cacheKeyPrefix
34
     */
35 2
    public function __construct(RunManager $runManager, JobTimingManager $jobTimingManager, $jobClass, $cacheKeyPrefix)
36
    {
37 2
        $this->cacheKeyPrefix = $cacheKeyPrefix;
38 2
        $this->hostname = gethostname() ?: '';
39 2
        $this->pid = getmypid();
40
41 2
        parent::__construct($runManager, $jobTimingManager, $jobClass);
42 2
    }
43
44
    public function setRedis(RedisInterface $redis)
45
    {
46
        $this->redis = $redis;
47
    }
48
49 20
    protected function getJobCacheKey($jobId)
50
    {
51 20
        return $this->cacheKeyPrefix.'_job_'.$jobId;
52
    }
53
54
    /**
55
     * @param string $jobCrc
56
     */
57 20
    protected function getJobCrcHashKey($jobCrc)
58
    {
59 20
        return $this->cacheKeyPrefix.'_job_crc_'.$jobCrc;
60
    }
61
62 16
    protected function getPriorityQueueCacheKey()
63
    {
64 16
        return $this->cacheKeyPrefix.'_priority';
65
    }
66
67 20
    protected function getWhenQueueCacheKey()
68
    {
69 20
        return $this->cacheKeyPrefix.'_when';
70
    }
71
72 8
    protected function getStatusCacheKey()
73
    {
74 8
        return $this->cacheKeyPrefix.'_status';
75
    }
76
77 14
    protected function transferQueues()
78
    {
79
        // Drains from WhenAt queue into Prioirty Queue
80 14
        $whenQueue = $this->getWhenQueueCacheKey();
81 14
        $priorityQueue = $this->getPriorityQueueCacheKey();
82 14
        $microtime = Util::getMicrotimeDecimal();
83 14
        while ($jobId = $this->redis->zPopByMaxScore($whenQueue, $microtime)) {
84 14
            $jobMessage = $this->redis->get($this->getJobCacheKey($jobId));
85 14
            if ($jobMessage) {
86 14
                $job = new Job();
87 14
                $job->fromMessage($jobMessage);
0 ignored issues
show
Documentation introduced by
$jobMessage is of type boolean, but the function expects a string.

It seems like the type of the argument is not accepted by the function/method which you are calling.

In some cases, in particular if PHP’s automatic type-juggling kicks in this might be fine. In other cases, however this might be a bug.

We suggest to add an explicit type cast like in the following example:

function acceptsInteger($int) { }

$x = '123'; // string "123"

// Instead of
acceptsInteger($x);

// we recommend to use
acceptsInteger((integer) $x);
Loading history...
88 14
                $this->redis->zAdd($priorityQueue, $job->getPriority(), $job->getId());
89
            }
90
        }
91 14
    }
92
93
    /**
94
     * @param Job $job
95
     *
96
     * @return Job|null
97
     */
98 2
    protected function batchSave(Job $job)
99
    {
100 2
        $crcHash = $job->getCrcHash();
101 2
        $crcCacheKey = $this->getJobCrcHashKey($crcHash);
102 2
        $result = $this->redis->lrange($crcCacheKey, 0, 1000);
103 2
        if (is_array($result)) {
104 2
            foreach ($result as $jobId) {
105 2
                $jobCacheKey1 = $this->getJobCacheKey($jobId);
106 2
                if (!($foundJobMessage = $this->redis->get($jobCacheKey1))) {
107
                    $this->redis->lRem($crcCacheKey, 1, $jobCacheKey1);
108
                    continue;
109
                }
110
111
                /// There is one?
112 2
                if ($foundJobMessage) {
113 2
                    $foundJob = $this->batchFoundJob($job, $jobCacheKey1, $foundJobMessage);
114 2
                    if ($foundJob) {
115 2
                        return $foundJob;
116
                    }
117
                }
118
            }
119
        }
120
121
        return null;
122
    }
123
124
    /**
125
     * @param string $foundJobCacheKey
126
     * @param bool   $foundJobMessage
127
     */
128 2
    protected function batchFoundJob(Job $job, $foundJobCacheKey, $foundJobMessage)
129
    {
130 2
        $when = $job->getWhenUs();
131 2
        $crcHash = $job->getCrcHash();
132 2
        $crcCacheKey = $this->getJobCrcHashKey($crcHash);
133
134 2
        $foundJob = new Job();
135 2
        $foundJob->fromMessage($foundJobMessage);
0 ignored issues
show
Documentation introduced by
$foundJobMessage is of type boolean, but the function expects a string.

It seems like the type of the argument is not accepted by the function/method which you are calling.

In some cases, in particular if PHP’s automatic type-juggling kicks in this might be fine. In other cases, however this might be a bug.

We suggest to add an explicit type cast like in the following example:

function acceptsInteger($int) { }

$x = '123'; // string "123"

// Instead of
acceptsInteger($x);

// we recommend to use
acceptsInteger((integer) $x);
Loading history...
136 2
        $foundWhen = $foundJob->getWhenUs();
137
138
        // Fix this using bcmath
139 2
        $curtimeU = Util::getMicrotimeDecimal();
140 2
        $newFoundWhen = null;
141 2
        if (bccomp($foundWhen, $curtimeU) > 0 && bccomp($foundWhen, $when) >= 1) {
142 2
            $newFoundWhen = $when;
143
        }
144 2
        $foundPriority = $foundJob->getPriority();
145 2
        $newFoundPriority = null;
146 2
        if ($foundPriority > $job->getPriority()) {
147 2
            $newFoundPriority = $job->getPriority();
148
        }
149
150 2
        return $this->finishBatchFoundJob($foundJob, $foundJobCacheKey, $crcCacheKey, $newFoundWhen, $newFoundPriority);
151
    }
152
153
    /**
154
     * @param string   $crcCacheKey
155
     * @param int|null $newFoundPriority
156
     */
157 2
    protected function finishBatchFoundJob(Job $foundJob, $foundJobCacheKey, $crcCacheKey, $newFoundWhen, $newFoundPriority)
158
    {
159
        // Now how do we adjust this job's priority or time?
160 2
        $adjust = false;
161 2
        if (isset($newFoundWhen)) {
162 2
            $foundJob->setWhenUs($newFoundWhen);
163 2
            $adjust = true;
164
        }
165 2
        if (isset($newFoundPriority)) {
166 2
            $foundJob->setPriority($newFoundPriority);
167 2
            $adjust = true;
168
        }
169 2
        if (!$adjust) {
170 2
            return $foundJob;
171
        }
172
173 2
        return $this->addFoundJob($adjust, $foundJob, $foundJobCacheKey, $crcCacheKey);
174
    }
175
176
    /**
177
     * @param bool $adjust
178
     */
179 2
    protected function addFoundJob($adjust, Job $foundJob, $foundJobCacheKey, $crcCacheKey)
180
    {
181 2
        $whenQueue = $this->getWhenQueueCacheKey();
182 2
        $result = $this->adjustJob($adjust, $whenQueue, $foundJob, $foundJobCacheKey, $crcCacheKey, $foundJob->getWhenUs());
183 2
        if (null !== $result) {
184 2
            return $result;
185
        }
186
        if (null === $this->maxPriority) {
187
            return false;
188
        }
189
190
        $priorityQueue = $this->getPriorityQueueCacheKey();
191
        $result = $this->adjustJob($adjust, $priorityQueue, $foundJob, $foundJobCacheKey, $crcCacheKey, $foundJob->getPriority());
192
193
        return $result ?: false;
194
    }
195
196
    /**
197
     * @param string $queue
198
     * @param bool   $adjust
199
     */
200 2
    private function adjustJob($adjust, $queue, Job $foundJob, $foundJobCacheKey, $crcCacheKey, $zScore)
201
    {
202 2
        if ($adjust && $this->redis->zRem($queue, $foundJob->getId()) > 0) {
203 2
            if (!$this->insertJob($foundJob)) {
204
                // Job is expired
205
                $this->redis->lRem($crcCacheKey, 1, $foundJobCacheKey);
206
207
                return false;
208
            }
209 2
            $this->redis->zAdd($queue, $zScore, $foundJob->getId());
210
211 2
            return $foundJob;
212
        }
213
214
        return null;
215
    }
216
217
    /**
218
     * @param \Dtc\QueueBundle\Model\Job $job
219
     *
220
     * @return Job|null
221
     *
222
     * @throws ClassNotSubclassException
223
     * @throws PriorityException
224
     */
225 20
    public function prioritySave(\Dtc\QueueBundle\Model\Job $job)
226
    {
227 20
        if (!$job instanceof Job) {
228
            throw new \InvalidArgumentException('$job must be instance of '.Job::class);
229
        }
230
231 20
        $this->validateSaveable($job);
232 20
        $this->setJobId($job);
233
234
        // Add to whenAt or priority queue?  /// optimizaiton...
235 20
        $whenUs = $job->getWhenUs();
236 20
        if (!$whenUs) {
237
            $whenUs = Util::getMicrotimeDecimal();
238
            $job->setWhenUs($whenUs);
239
        }
240
241 20
        if (true === $job->getBatch()) {
242
            // is there a CRC Hash already for this job
243 2
            if ($oldJob = $this->batchSave($job)) {
244 2
                return $oldJob;
245
            }
246
        }
247
248 20
        return $this->saveJob($job);
249
    }
250
251 20
    protected function saveJob(Job $job)
252
    {
253 20
        $whenQueue = $this->getWhenQueueCacheKey();
254 20
        $crcCacheKey = $this->getJobCrcHashKey($job->getCrcHash());
255
        // Save Job
256 20
        if (!$this->insertJob($job)) {
257
            // job is expired
258 2
            return null;
259
        }
260 20
        $jobId = $job->getId();
261 20
        $when = $job->getWhenUs();
262
        // Add Job to CRC list
263 20
        $this->redis->lPush($crcCacheKey, [$jobId]);
264
265 20
        $this->redis->zAdd($whenQueue, $when, $jobId);
266
267 20
        return $job;
268
    }
269
270
    /**
271
     * @param Job $job
272
     *
273
     * @return bool false if the job is already expired, true otherwise
274
     */
275 20
    protected function insertJob(Job $job)
276
    {
277
        // Save Job
278 20
        $jobCacheKey = $this->getJobCacheKey($job->getId());
279 20
        if ($expiresAt = $job->getExpiresAt()) {
280 2
            $expiresAtTime = $expiresAt->getTimestamp() - time();
281 2
            if ($expiresAtTime <= 0) {
282 2
                return false; /// ??? job is already expired
283
            }
284
            $this->redis->setEx($jobCacheKey, $expiresAtTime, $job->toMessage());
285
286
            return true;
287
        }
288 20
        $this->redis->set($jobCacheKey, $job->toMessage());
289
290 20
        return true;
291
    }
292
293
    /**
294
     * Returns the prioirty in DESCENDING order, except if maxPrioirty is null, then prioirty is 0.
295
     *
296
     * @param int|null $priority
297
     *
298
     * @return int
299
     */
300 20
    protected function calculatePriority($priority)
301
    {
302 20
        $priority = parent::calculatePriority($priority);
303 20
        if (null === $priority) {
304 18
            return null === $this->maxPriority ? 0 : $this->maxPriority;
305
        }
306
307 8
        if (null === $this->maxPriority) {
308
            return 0;
309
        }
310
311
        // Redis priority should be in DESC order
312 8
        return $this->maxPriority - $priority;
313
    }
314
315
    /**
316
     * @param \Dtc\QueueBundle\Model\Job $job
317
     *
318
     * @throws PriorityException
319
     * @throws ClassNotSubclassException
320
     */
321 20 View Code Duplication
    protected function validateSaveable(\Dtc\QueueBundle\Model\Job $job)
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
322
    {
323 20
        if (null !== $job->getPriority() && null === $this->maxPriority) {
324
            throw new PriorityException('This queue does not support priorities');
325
        }
326
327 20
        if (!$job instanceof RetryableJob) {
328
            throw new ClassNotSubclassException('Job needs to be instance of '.RetryableJob::class);
329
        }
330 20
    }
331
332
    /**
333
     * @param string|null $workerName
334
     * @param string|null $methodName
335
     * @param bool        $prioritize
336
     *
337
     * @throws UnsupportedException
338
     */
339 16
    protected function verifyGetJobArgs($workerName = null, $methodName = null, $prioritize = true)
340
    {
341 16 View Code Duplication
        if (null !== $workerName || null !== $methodName || (null !== $this->maxPriority && true !== $prioritize)) {
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated across your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
342 4
            throw new UnsupportedException('Unsupported');
343
        }
344 14
    }
345
346 2
    public function deleteJob(\Dtc\QueueBundle\Model\Job $job)
347
    {
348 2
        $jobId = $job->getId();
349 2
        $priorityQueue = $this->getPriorityQueueCacheKey();
350 2
        $whenQueue = $this->getWhenQueueCacheKey();
351
352 2
        $deleted = false;
0 ignored issues
show
Unused Code introduced by
$deleted is not used, you could remove the assignment.

This check looks for variable assignements that are either overwritten by other assignments or where the variable is not used subsequently.

$myVar = 'Value';
$higher = false;

if (rand(1, 6) > 3) {
    $higher = true;
} else {
    $higher = false;
}

Both the $myVar assignment in line 1 and the $higher assignment in line 2 are dead. The first because $myVar is never used and the second because $higher is always overwritten for every possible time line.

Loading history...
353
354 2
        if ($this->redis->zRem($priorityQueue, $jobId)) {
355
            $deleted = true;
0 ignored issues
show
Unused Code introduced by
$deleted is not used, you could remove the assignment.

This check looks for variable assignements that are either overwritten by other assignments or where the variable is not used subsequently.

$myVar = 'Value';
$higher = false;

if (rand(1, 6) > 3) {
    $higher = true;
} else {
    $higher = false;
}

Both the $myVar assignment in line 1 and the $higher assignment in line 2 are dead. The first because $myVar is never used and the second because $higher is always overwritten for every possible time line.

Loading history...
356 2
        } elseif ($this->redis->zRem($whenQueue, $jobId)) {
357 2
            $deleted = true;
0 ignored issues
show
Unused Code introduced by
$deleted is not used, you could remove the assignment.

This check looks for variable assignements that are either overwritten by other assignments or where the variable is not used subsequently.

$myVar = 'Value';
$higher = false;

if (rand(1, 6) > 3) {
    $higher = true;
} else {
    $higher = false;
}

Both the $myVar assignment in line 1 and the $higher assignment in line 2 are dead. The first because $myVar is never used and the second because $higher is always overwritten for every possible time line.

Loading history...
358
        }
359
360 2
        $this->redis->del([$this->getJobCacheKey($jobId)]);
361 2
        $this->redis->lRem($this->getJobCrcHashKey($job->getCrcHash()), 1, $jobId);
362 2
    }
363
364
    /**
365
     * @param string|null $workerName
366
     * @param string|null $methodName
367
     * @param bool        $prioritize
368
     * @param mixed       $runId
369
     *
370
     * @throws UnsupportedException
371
     *
372
     * @return Job|null
373
     */
374 16
    public function getJob($workerName = null, $methodName = null, $prioritize = true, $runId = null)
375
    {
376
        // First thing migrate any jobs from When queue to Prioirty queue
377
378 16
        $this->verifyGetJobArgs($workerName, $methodName, $prioritize);
379 14
        if (null !== $this->maxPriority) {
380 14
            $this->transferQueues();
381 14
            $queue = $this->getPriorityQueueCacheKey();
382 14
            $jobId = $this->redis->zPop($queue);
383
        } else {
384
            $queue = $this->getWhenQueueCacheKey();
385
            $microtime = Util::getMicrotimeDecimal();
386
            $jobId = $this->redis->zPopByMaxScore($queue, $microtime);
387
        }
388
389 14
        if ($jobId) {
390 14
            $jobMessage = $this->redis->get($this->getJobCacheKey($jobId));
391 14
            $job = new Job();
392 14
            $job->fromMessage($jobMessage);
0 ignored issues
show
Documentation introduced by
$jobMessage is of type boolean, but the function expects a string.

It seems like the type of the argument is not accepted by the function/method which you are calling.

In some cases, in particular if PHP’s automatic type-juggling kicks in this might be fine. In other cases, however this might be a bug.

We suggest to add an explicit type cast like in the following example:

function acceptsInteger($int) { }

$x = '123'; // string "123"

// Instead of
acceptsInteger($x);

// we recommend to use
acceptsInteger((integer) $x);
Loading history...
393 14
            $crcCacheKey = $this->getJobCrcHashKey($job->getCrcHash());
394 14
            $this->redis->lRem($crcCacheKey, 1, $job->getId());
395 14
            $this->redis->del([$this->getJobCacheKey($job->getId())]);
396
397 14
            return $job;
398
        }
399
400 10
        return null;
401
    }
402
403 2
    public function getWaitingJobCount($workerName = null, $methodName = null)
404
    {
405 2
        $microtime = Util::getMicrotimeDecimal();
406 2
        $count = $this->redis->zCount($this->getWhenQueueCacheKey(), 0, $microtime);
407
408 2
        if (null !== $this->maxPriority) {
409 2
            $count += $this->redis->zCount($this->getPriorityQueueCacheKey(), '-inf', '+inf');
410
        }
411
412 2
        return $count;
413
    }
414
415 4 View Code Duplication
    public function resetJob(RetryableJob $job)
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
416
    {
417 4
        if (!$job instanceof Job) {
418
            throw new \InvalidArgumentException('$job must be instance of '.Job::class);
419
        }
420 4
        $job->setStatus(BaseJob::STATUS_NEW);
421 4
        $job->setMessage(null);
422 4
        $job->setStartedAt(null);
423 4
        $job->setRetries($job->getRetries() + 1);
424 4
        $job->setUpdatedAt(Util::getMicrotimeDateTime());
425 4
        $this->saveJob($job);
426
427 4
        return true;
428
    }
429
430 2
    private function collateStatusResults(array &$results, $cacheKey)
431
    {
432 2
        $cursor = null;
433 2
        while ($jobs = $this->redis->zScan($cacheKey, $cursor, '', 100)) {
434 2
            $jobs = $this->redis->mget(array_map(function ($item) {
435 2
                return $this->getJobCacheKey($item);
436 2
            }, array_keys($jobs)));
437 2
            $this->extractStatusResults($jobs, $results);
438 2
            if (0 === $cursor) {
439 2
                break;
440
            }
441
        }
442
443 2
        return $results;
444
    }
445
446 2
    private function extractStatusResults(array $jobs, array &$results)
447
    {
448 2
        foreach ($jobs as $jobMessage) {
449 2
            if ($jobMessage) {
450 2
                $job = new Job();
451 2
                $job->fromMessage($jobMessage);
452 2
                $resultHashKey = $job->getWorkerName().'->'.$job->getMethod().'()';
453 2
                if (!isset($results[$resultHashKey][BaseJob::STATUS_NEW])) {
454 2
                    $results[$resultHashKey] = static::getAllStatuses();
455
                }
456 2
                if (!isset($results[$resultHashKey][BaseJob::STATUS_NEW])) {
457
                    $results[$resultHashKey][BaseJob::STATUS_NEW] = 0;
458
                }
459 2
                ++$results[$resultHashKey][BaseJob::STATUS_NEW];
460
            }
461
        }
462 2
    }
463
464 2
    private function extractStatusHashResults(array $hResults, array &$results)
465
    {
466 2
        foreach ($hResults as $key => $value) {
467 2
            list($workerName, $method, $status) = explode(',', $key);
468 2
            $resultHashKey = $workerName.'->'.$method.'()';
469 2
            if (!isset($results[$resultHashKey])) {
470
                $results[$resultHashKey] = static::getAllStatuses();
471
            }
472 2
            if (!isset($results[$resultHashKey][$status])) {
473
                $results[$resultHashKey][$status] = 0;
474
            }
475 2
            $results[$resultHashKey][$status] += $value;
476
        }
477 2
    }
478
479 2
    public function getStatus()
480
    {
481 2
        $whenQueueCacheKey = $this->getWhenQueueCacheKey();
482 2
        $priorityQueueCacheKey = $this->getPriorityQueueCacheKey();
483 2
        $results = [];
484 2
        $this->collateStatusResults($results, $whenQueueCacheKey);
485 2
        if (null !== $this->maxPriority) {
486 2
            $this->collateStatusResults($results, $priorityQueueCacheKey);
487
        }
488
489 2
        $cacheKey = $this->getStatusCacheKey();
490 2
        $cursor = null;
491 2
        while ($hResults = $this->redis->hScan($cacheKey, $cursor, '', 100)) {
492 2
            $this->extractStatusHashResults($hResults, $results);
493 2
            if (0 === $cursor) {
494 2
                break;
495
            }
496
        }
497
498 2
        return $results;
499
    }
500
501 6
    public function retryableSaveHistory(RetryableJob $job, $retry)
502
    {
503 6
        $cacheKey = $this->getStatusCacheKey();
504 6
        $hashKey = $job->getWorkerName();
505 6
        $hashKey .= ',';
506 6
        $hashKey .= $job->getMethod();
507 6
        $hashKey .= ',';
508 6
        $hashKey .= $job->getStatus();
509 6
        $this->redis->hIncrBy($cacheKey, $hashKey, 1);
510 6
    }
511
}
512