Completed
Pull Request — master (#44)
by Matthew
19:46 queued 28s
created

JobManager::deleteJob()   A

Complexity

Conditions 4
Paths 6

Size

Total Lines 18
Code Lines 12

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 8
CRAP Score 4.0218

Importance

Changes 0
Metric Value
dl 0
loc 18
ccs 8
cts 9
cp 0.8889
rs 9.2
c 0
b 0
f 0
cc 4
eloc 12
nc 6
nop 1
crap 4.0218
1
<?php
2
3
namespace Dtc\QueueBundle\Redis;
4
5
use Dtc\QueueBundle\Exception\ClassNotSubclassException;
6
use Dtc\QueueBundle\Exception\PriorityException;
7
use Dtc\QueueBundle\Exception\UnsupportedException;
8
use Dtc\QueueBundle\Manager\JobIdTrait;
9
use Dtc\QueueBundle\Manager\JobTimingManager;
10
use Dtc\QueueBundle\Manager\PriorityJobManager;
11
use Dtc\QueueBundle\Manager\RunManager;
12
use Dtc\QueueBundle\Model\BaseJob;
13
use Dtc\QueueBundle\Model\RetryableJob;
14
use Dtc\QueueBundle\Util\Util;
15
16
/**
17
 * For future implementation.
18
 */
19
class JobManager extends PriorityJobManager
20
{
21
    use JobIdTrait;
22
23
    /** @var RedisInterface */
24
    protected $redis;
25
26
    /** @var string */
27
    protected $cacheKeyPrefix;
28
29
    protected $hostname;
30
    protected $pid;
31
32
    /**
33
     * @param string $cacheKeyPrefix
34
     */
35 2
    public function __construct(RunManager $runManager, JobTimingManager $jobTimingManager, $jobClass, $cacheKeyPrefix)
36
    {
37 2
        $this->cacheKeyPrefix = $cacheKeyPrefix;
38 2
        $this->hostname = gethostname() ?: '';
39 2
        $this->pid = getmypid();
40
41 2
        parent::__construct($runManager, $jobTimingManager, $jobClass);
42 2
    }
43
44
    public function setRedis(RedisInterface $redis)
45
    {
46
        $this->redis = $redis;
47
    }
48
49 18
    protected function getJobCacheKey($jobId)
50
    {
51 18
        return $this->cacheKeyPrefix.'_job_'.$jobId;
52
    }
53
54
    /**
55
     * @param string $jobCrc
56
     */
57 18
    protected function getJobCrcHashKey($jobCrc)
58
    {
59 18
        return $this->cacheKeyPrefix.'_job_crc_'.$jobCrc;
60
    }
61
62 16
    protected function getPriorityQueueCacheKey()
63
    {
64 16
        return $this->cacheKeyPrefix.'_priority';
65
    }
66
67 20
    protected function getWhenQueueCacheKey()
68
    {
69 20
        return $this->cacheKeyPrefix.'_when';
70
    }
71
72 14
    protected function getStatusCacheKey()
73
    {
74
        return $this->cacheKeyPrefix.'_status';
75 14
    }
76 14
77 14
    protected function transferQueues()
78 14
    {
79 12
        // Drains from WhenAt queue into Prioirty Queue
80 12
        $whenQueue = $this->getWhenQueueCacheKey();
81 12
        $priorityQueue = $this->getPriorityQueueCacheKey();
82 12
        $microtime = Util::getMicrotimeDecimal();
83 12
        while ($jobId = $this->redis->zPopByMaxScore($whenQueue, $microtime)) {
84
            $jobMessage = $this->redis->get($this->getJobCacheKey($jobId));
85
            if ($jobMessage) {
86 14
                $job = new Job();
87
                $job->fromMessage($jobMessage);
0 ignored issues
show
Documentation introduced by
$jobMessage is of type boolean, but the function expects a string.

It seems like the type of the argument is not accepted by the function/method which you are calling.

In some cases, in particular if PHP’s automatic type-juggling kicks in this might be fine. In other cases, however this might be a bug.

We suggest to add an explicit type cast like in the following example:

function acceptsInteger($int) { }

$x = '123'; // string "123"

// Instead of
acceptsInteger($x);

// we recommend to use
acceptsInteger((integer) $x);
Loading history...
88
                $this->redis->zAdd($priorityQueue, $job->getPriority(), $job->getId());
89
            }
90
        }
91
    }
92
93 2
    /**
94
     * @param Job $job
95 2
     *
96 2
     * @return Job|null
97 2
     */
98 2
    protected function batchSave(Job $job)
99 2
    {
100 2
        $crcHash = $job->getCrcHash();
101 2
        $crcCacheKey = $this->getJobCrcHashKey($crcHash);
102
        $result = $this->redis->lrange($crcCacheKey, 0, 1000);
103
        if (is_array($result)) {
104
            foreach ($result as $jobId) {
105
                $jobCacheKey1 = $this->getJobCacheKey($jobId);
106
                if (!($foundJobMessage = $this->redis->get($jobCacheKey1))) {
107 2
                    $this->redis->lRem($crcCacheKey, 1, $jobCacheKey1);
108 2
                    continue;
109 2
                }
110 2
111
                /// There is one?
112
                if ($foundJobMessage) {
113
                    $foundJob = $this->batchFoundJob($job, $jobCacheKey1, $foundJobMessage);
114
                    if ($foundJob) {
115
                        return $foundJob;
116
                    }
117
                }
118
            }
119
        }
120
121
        return null;
122
    }
123 2
124
    /**
125 2
     * @param string $foundJobCacheKey
126 2
     * @param bool   $foundJobMessage
127 2
     */
128
    protected function batchFoundJob(Job $job, $foundJobCacheKey, $foundJobMessage)
129 2
    {
130 2
        $when = $job->getWhenUs();
131 2
        $crcHash = $job->getCrcHash();
132
        $crcCacheKey = $this->getJobCrcHashKey($crcHash);
133
134 2
        $foundJob = new Job();
135 2
        $foundJob->fromMessage($foundJobMessage);
0 ignored issues
show
Documentation introduced by
$foundJobMessage is of type boolean, but the function expects a string.

It seems like the type of the argument is not accepted by the function/method which you are calling.

In some cases, in particular if PHP’s automatic type-juggling kicks in this might be fine. In other cases, however this might be a bug.

We suggest to add an explicit type cast like in the following example:

function acceptsInteger($int) { }

$x = '123'; // string "123"

// Instead of
acceptsInteger($x);

// we recommend to use
acceptsInteger((integer) $x);
Loading history...
136 2
        $foundWhen = $foundJob->getWhenUs();
137 2
138
        // Fix this using bcmath
139 2
        $curtimeU = Util::getMicrotimeDecimal();
140 2
        $newFoundWhen = null;
141 2
        if (bccomp($foundWhen, $curtimeU) > 0 && bccomp($foundWhen, $when) >= 1) {
142 2
            $newFoundWhen = $when;
143
        }
144
        $foundPriority = $foundJob->getPriority();
145 2
        $newFoundPriority = null;
146
        if ($foundPriority > $job->getPriority()) {
147
            $newFoundPriority = $job->getPriority();
148
        }
149
150
        return $this->finishBatchFoundJob($foundJob, $foundJobCacheKey, $crcCacheKey, $newFoundWhen, $newFoundPriority);
151
    }
152 2
153
    /**
154
     * @param string   $crcCacheKey
155 2
     * @param int|null $newFoundPriority
156 2
     */
157 2
    protected function finishBatchFoundJob(Job $foundJob, $foundJobCacheKey, $crcCacheKey, $newFoundWhen, $newFoundPriority)
158 2
    {
159
        // Now how do we adjust this job's priority or time?
160 2
        $adjust = false;
161 2
        if (isset($newFoundWhen)) {
162 2
            $foundJob->setWhenUs($newFoundWhen);
163
            $adjust = true;
164 2
        }
165 2
        if (isset($newFoundPriority)) {
166
            $foundJob->setPriority($newFoundPriority);
167
            $adjust = true;
168 2
        }
169
        if (!$adjust) {
170
            return $foundJob;
171
        }
172
173
        return $this->addFoundJob($adjust, $foundJob, $foundJobCacheKey, $crcCacheKey);
174 2
    }
175
176 2
    /**
177 2
     * @param bool $adjust
178 2
     */
179 2
    protected function addFoundJob($adjust, Job $foundJob, $foundJobCacheKey, $crcCacheKey)
180
    {
181
        $whenQueue = $this->getWhenQueueCacheKey();
182
        $result = $this->adjustJob($adjust, $whenQueue, $foundJob, $foundJobCacheKey, $crcCacheKey, $foundJob->getWhenUs());
183
        if (null !== $result) {
184
            return $result;
185
        }
186
        if (null === $this->maxPriority) {
187
            return false;
188
        }
189
190
        $priorityQueue = $this->getPriorityQueueCacheKey();
191
        $result = $this->adjustJob($adjust, $priorityQueue, $foundJob, $foundJobCacheKey, $crcCacheKey, $foundJob->getPriority());
192
193
        return $result ?: false;
194
    }
195 2
196
    /**
197 2
     * @param string $queue
198 2
     * @param bool   $adjust
199
     */
200
    private function adjustJob($adjust, $queue, Job $foundJob, $foundJobCacheKey, $crcCacheKey, $zScore)
201
    {
202
        if ($adjust && $this->redis->zRem($queue, $foundJob->getId()) > 0) {
203
            if (!$this->insertJob($foundJob)) {
204 2
                // Job is expired
205
                $this->redis->lRem($crcCacheKey, 1, $foundJobCacheKey);
206 2
207
                return false;
208
            }
209
            $this->redis->zAdd($queue, $zScore, $foundJob->getId());
210
211
            return $foundJob;
212
        }
213
214
        return null;
215
    }
216
217
    /**
218
     * @param \Dtc\QueueBundle\Model\Job $job
219
     *
220 18
     * @return Job|null
221
     *
222 18
     * @throws ClassNotSubclassException
223
     * @throws PriorityException
224
     */
225
    public function prioritySave(\Dtc\QueueBundle\Model\Job $job)
226 18
    {
227 18
        if (!$job instanceof Job) {
228
            throw new \InvalidArgumentException('$job must be instance of '.Job::class);
229
        }
230 18
231 18
        $this->validateSaveable($job);
232
        $this->setJobId($job);
233
234
        // Add to whenAt or priority queue?  /// optimizaiton...
235
        $whenUs = $job->getWhenUs();
236 18
        if (!$whenUs) {
237
            $whenUs = Util::getMicrotimeDecimal();
238 2
            $job->setWhenUs($whenUs);
239 2
        }
240
241
        if (true === $job->getBatch()) {
242
            // is there a CRC Hash already for this job
243 18
            if ($oldJob = $this->batchSave($job)) {
244
                return $oldJob;
245
            }
246 18
        }
247
248 18
        return $this->saveJob($job);
249 18
    }
250
251 18
    protected function saveJob(Job $job)
252
    {
253 2
        $whenQueue = $this->getWhenQueueCacheKey();
254
        $crcCacheKey = $this->getJobCrcHashKey($job->getCrcHash());
255 18
        // Save Job
256 18
        if (!$this->insertJob($job)) {
257
            // job is expired
258 18
            return null;
259
        }
260 18
        $jobId = $job->getId();
261
        $when = $job->getWhenUs();
262 18
        // Add Job to CRC list
263
        $this->redis->lPush($crcCacheKey, [$jobId]);
264
265
        $this->redis->zAdd($whenQueue, $when, $jobId);
266
267
        return $job;
268
    }
269
270 18
    /**
271
     * @param Job $job
272
     *
273 18
     * @return bool false if the job is already expired, true otherwise
274 18
     */
275 2
    protected function insertJob(Job $job)
276 2
    {
277 2
        // Save Job
278
        $jobCacheKey = $this->getJobCacheKey($job->getId());
279
        if ($expiresAt = $job->getExpiresAt()) {
280
            $expiresAtTime = $expiresAt->getTimestamp() - time();
281
            if ($expiresAtTime <= 0) {
282
                return false; /// ??? job is already expired
283 18
            }
284
            $this->redis->setEx($jobCacheKey, $expiresAtTime, $job->toMessage());
285 18
286
            return true;
287
        }
288
        $this->redis->set($jobCacheKey, $job->toMessage());
289
290
        return true;
291
    }
292
293
    /**
294
     * Returns the prioirty in DESCENDING order, except if maxPrioirty is null, then prioirty is 0.
295 18
     *
296
     * @param int|null $priority
297 18
     *
298 18
     * @return int
299 16
     */
300
    protected function calculatePriority($priority)
301
    {
302 8
        $priority = parent::calculatePriority($priority);
303
        if (null === $priority) {
304
            return null === $this->maxPriority ? 0 : $this->maxPriority;
305
        }
306
307 8
        if (null === $this->maxPriority) {
308
            return 0;
309
        }
310
311
        // Redis priority should be in DESC order
312
        return $this->maxPriority - $priority;
313
    }
314
315
    /**
316 18
     * @param \Dtc\QueueBundle\Model\Job $job
317
     *
318 18
     * @throws PriorityException
319
     * @throws ClassNotSubclassException
320
     */
321 View Code Duplication
    protected function validateSaveable(\Dtc\QueueBundle\Model\Job $job)
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
322 18
    {
323
        if (null !== $job->getPriority() && null === $this->maxPriority) {
324
            throw new PriorityException('This queue does not support priorities');
325 18
        }
326
327
        if (!$job instanceof RetryableJob) {
328
            throw new ClassNotSubclassException('Job needs to be instance of '.RetryableJob::class);
329
        }
330
    }
331
332
    /**
333
     * @param string|null $workerName
334 16
     * @param string|null $methodName
335
     * @param bool        $prioritize
336 16
     *
337 4
     * @throws UnsupportedException
338
     */
339 14
    protected function verifyGetJobArgs($workerName = null, $methodName = null, $prioritize = true)
340
    {
341 2 View Code Duplication
        if (null !== $workerName || null !== $methodName || (null !== $this->maxPriority && true !== $prioritize)) {
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated across your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
342
            throw new UnsupportedException('Unsupported');
343 2
        }
344 2
    }
345 2
346
    public function deleteJob(\Dtc\QueueBundle\Model\Job $job)
347 2
    {
348 2
        $jobId = $job->getId();
349
        $priorityQueue = $this->getPriorityQueueCacheKey();
350 2
        $whenQueue = $this->getWhenQueueCacheKey();
351 2
352
        $deleted = false;
353
        if ($this->redis->zRem($priorityQueue, $jobId)) {
354 2
            $deleted = true;
355 2
        } elseif ($this->redis->zRem($whenQueue, $jobId)) {
356 2
            $deleted = true;
357
        }
358 2
359
        if ($deleted) {
360
            $this->redis->del([$this->getJobCacheKey($jobId)]);
361
            $this->redis->lRem($this->getJobCrcHashKey($job->getCrcHash()), 1, $jobId);
362
        }
363
    }
364
365
    /**
366
     * @param string|null $workerName
367
     * @param string|null $methodName
368
     * @param bool        $prioritize
369
     * @param mixed       $runId
370 16
     *
371
     * @throws UnsupportedException
372
     *
373
     * @return Job|null
374 16
     */
375 14
    public function getJob($workerName = null, $methodName = null, $prioritize = true, $runId = null)
376 14
    {
377 14
        // First thing migrate any jobs from When queue to Prioirty queue
378 14
379
        $this->verifyGetJobArgs($workerName, $methodName, $prioritize);
380
        if (null !== $this->maxPriority) {
381
            $this->transferQueues();
382
            $queue = $this->getPriorityQueueCacheKey();
383
            $jobId = $this->redis->zPop($queue);
384
        } else {
385 14
            $queue = $this->getWhenQueueCacheKey();
386 12
            $microtime = Util::getMicrotimeDecimal();
387 12
            $jobId = $this->redis->zPopByMaxScore($queue, $microtime);
388 12
        }
389 12
390 12
        if ($jobId) {
391 12
            $jobMessage = $this->redis->get($this->getJobCacheKey($jobId));
392
            $job = new Job();
393 12
            $job->fromMessage($jobMessage);
0 ignored issues
show
Documentation introduced by
$jobMessage is of type boolean, but the function expects a string.

It seems like the type of the argument is not accepted by the function/method which you are calling.

In some cases, in particular if PHP’s automatic type-juggling kicks in this might be fine. In other cases, however this might be a bug.

We suggest to add an explicit type cast like in the following example:

function acceptsInteger($int) { }

$x = '123'; // string "123"

// Instead of
acceptsInteger($x);

// we recommend to use
acceptsInteger((integer) $x);
Loading history...
394
            $crcCacheKey = $this->getJobCrcHashKey($job->getCrcHash());
395
            $this->redis->lRem($crcCacheKey, 1, $job->getId());
396 10
            $this->redis->del([$this->getJobCacheKey($job->getId())]);
397
398
            return $job;
399 4
        }
400
401 4
        return null;
402 4
    }
403
404 4
    public function getWaitingJobCount($workerName = null, $methodName = null)
405 4
    {
406
        $microtime = Util::getMicrotimeDecimal();
407
        $count = $this->redis->zCount($this->getWhenQueueCacheKey(), 0, $microtime);
408 4
409
        if (null !== $this->maxPriority) {
410
            $count += $this->redis->zCount($this->getPriorityQueueCacheKey(), '-inf', '+inf');
411 4
        }
412
413 4
        return $count;
414
    }
415
416 4 View Code Duplication
    public function resetJob(RetryableJob $job)
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
417 4
    {
418 4
        if (!$job instanceof Job) {
419 4
            throw new \InvalidArgumentException('$job must be instance of '.Job::class);
420 4
        }
421 4
        $job->setStatus(BaseJob::STATUS_NEW);
422
        $job->setMessage(null);
423 4
        $job->setStartedAt(null);
424
        $job->setRetries($job->getRetries() + 1);
425
        $job->setUpdatedAt(Util::getMicrotimeDateTime());
426 6
        $this->saveJob($job);
427
428 6
        return true;
429
    }
430
431
    protected function collateStatusResults(array &$results, $cacheKey)
432
    {
433
        $cursor = null;
434
        while ($jobs = $this->redis->zScan($cacheKey, $cursor, '', 100)) {
435
            $jobs = $this->redis->mget(array_map(function ($item) {
436
                return $this->getJobCacheKey($item);
437
            }, array_keys($jobs)));
438
            foreach ($jobs as $jobMessage) {
439
                if ($jobMessage) {
440
                    $job = new Job();
441
                    $job->fromMessage($jobMessage);
442
                    $resultHashKey = $job->getWorkerName().'->'.$job->getMethod().'()';
443
                    if (!isset($results[$resultHashKey][BaseJob::STATUS_NEW])) {
444
                        $results[$resultHashKey] = static::getAllStatuses();
445
                    }
446
                    if (!isset($results[$resultHashKey][BaseJob::STATUS_NEW])) {
447
                        $results[$resultHashKey][BaseJob::STATUS_NEW] = 0;
448
                    }
449
                    ++$results[$resultHashKey][BaseJob::STATUS_NEW];
450
                }
451
            }
452
            if (0 === $cursor) {
453
                break;
454
            }
455
        }
456
457
        return $results;
458
    }
459
460
    public function getStatus()
461
    {
462
        $whenQueueCacheKey = $this->getWhenQueueCacheKey();
463
        $priorityQueueCacheKey = $this->getPriorityQueueCacheKey();
464
        $results = [];
465
        $this->collateStatusResults($results, $whenQueueCacheKey);
466
        if (null !== $this->maxPriority) {
467
            $this->collateStatusResults($results, $priorityQueueCacheKey);
468
        }
469
470
        $cacheKey = $this->getStatusCacheKey();
471
        $cursor = null;
472
        while ($hResults = $this->redis->hScan($cacheKey, $cursor, '', 100)) {
473
            foreach ($hResults as $key => $value) {
474
                list($workerName, $method, $status) = explode(',', $key);
475
                $resultHashKey = $workerName.'->'.$method.'()';
476
                if (!isset($results[$resultHashKey])) {
477
                    $results[$resultHashKey] = static::getAllStatuses();
478
                }
479
                if (!isset($results[$resultHashKey][$status])) {
480
                    $results[$resultHashKey][$status] = 0;
481
                }
482
                $results[$resultHashKey][$status] += $value;
483
            }
484
            if (0 === $cursor) {
485
                break;
486
            }
487
        }
488
489
        return $results;
490
    }
491
492
    public function retryableSaveHistory(RetryableJob $job, $retry)
493
    {
494
        $cacheKey = $this->getStatusCacheKey();
495
        $hashKey = $job->getWorkerName();
496
        $hashKey .= ',';
497
        $hashKey .= $job->getMethod();
498
        $hashKey .= ',';
499
        $hashKey .= $job->getStatus();
500
        $this->redis->hIncrBy($cacheKey, $hashKey, 1);
501
    }
502
}
503