Completed
Pull Request — master (#30)
by Matthew
48:21 queued 02:09
created

JobManager::addFoundJob()   A

Complexity

Conditions 4
Paths 4

Size

Total Lines 16
Code Lines 10

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 5
CRAP Score 6

Importance

Changes 1
Bugs 0 Features 0
Metric Value
c 1
b 0
f 0
dl 0
loc 16
ccs 5
cts 10
cp 0.5
rs 9.2
cc 4
eloc 10
nc 4
nop 4
crap 6
1
<?php
2
3
namespace Dtc\QueueBundle\Redis;
4
5
use Dtc\QueueBundle\Exception\ClassNotSubclassException;
6
use Dtc\QueueBundle\Exception\PriorityException;
7
use Dtc\QueueBundle\Exception\UnsupportedException;
8
use Dtc\QueueBundle\Manager\JobIdTrait;
9
use Dtc\QueueBundle\Manager\JobTimingManager;
10
use Dtc\QueueBundle\Manager\PriorityJobManager;
11
use Dtc\QueueBundle\Manager\RunManager;
12
use Dtc\QueueBundle\Model\BaseJob;
13
use Dtc\QueueBundle\Model\RetryableJob;
14
use Dtc\QueueBundle\Util\Util;
15
16
/**
17
 * For future implementation.
18
 */
19
class JobManager extends PriorityJobManager
20
{
21
    use JobIdTrait;
22
23
    /** @var RedisInterface */
24
    protected $redis;
25
26
    /** @var string */
27
    protected $cacheKeyPrefix;
28
29
    protected $hostname;
30
    protected $pid;
31
32
    /**
33
     * @param string $cacheKeyPrefix
34
     */
35 2
    public function __construct(RunManager $runManager, JobTimingManager $jobTimingManager, $jobClass, $cacheKeyPrefix)
36
    {
37 2
        $this->cacheKeyPrefix = $cacheKeyPrefix;
38 2
        $this->hostname = gethostname() ?: '';
39 2
        $this->pid = getmypid();
40
41 2
        parent::__construct($runManager, $jobTimingManager, $jobClass);
42 2
    }
43
44
    public function setRedis(RedisInterface $redis)
45
    {
46
        $this->redis = $redis;
47
    }
48
49 18
    protected function getJobCacheKey($jobId)
50
    {
51 18
        return $this->cacheKeyPrefix.'_job_'.$jobId;
52
    }
53
54
    /**
55
     * @param string $jobCrc
56
     */
57 18
    protected function getJobCrcHashKey($jobCrc)
58
    {
59 18
        return $this->cacheKeyPrefix.'_job_crc_'.$jobCrc;
60
    }
61
62 14
    protected function getPriorityQueueCacheKey()
63
    {
64 14
        return $this->cacheKeyPrefix.'_priority';
65
    }
66
67 18
    protected function getWhenQueueCacheKey()
68
    {
69 18
        return $this->cacheKeyPrefix.'_when';
70
    }
71
72 14
    protected function transferQueues()
73
    {
74
        // Drains from WhenAt queue into Prioirty Queue
75 14
        $whenQueue = $this->getWhenQueueCacheKey();
76 14
        $priorityQueue = $this->getPriorityQueueCacheKey();
77 14
        $microtime = Util::getMicrotimeDecimal();
78 14
        while ($jobId = $this->redis->zPopByMaxScore($whenQueue, $microtime)) {
79 12
            $jobMessage = $this->redis->get($this->getJobCacheKey($jobId));
80 12
            if ($jobMessage) {
81 12
                $job = new Job();
82 12
                $job->fromMessage($jobMessage);
0 ignored issues
show
Documentation introduced by
$jobMessage is of type boolean, but the function expects a string.

It seems like the type of the argument is not accepted by the function/method which you are calling.

In some cases, in particular if PHP’s automatic type-juggling kicks in this might be fine. In other cases, however this might be a bug.

We suggest to add an explicit type cast like in the following example:

function acceptsInteger($int) { }

$x = '123'; // string "123"

// Instead of
acceptsInteger($x);

// we recommend to use
acceptsInteger((integer) $x);
Loading history...
83 12
                $this->redis->zAdd($priorityQueue, $job->getPriority(), $job->getId());
84
            }
85
        }
86 14
    }
87
88
    /**
89
     * @param Job $job
90
     *
91
     * @return Job|null
92
     */
93 2
    protected function batchSave(Job $job)
94
    {
95 2
        $crcHash = $job->getCrcHash();
96 2
        $crcCacheKey = $this->getJobCrcHashKey($crcHash);
97 2
        $result = $this->redis->lrange($crcCacheKey, 0, 1000);
98 2
        if (is_array($result)) {
99 2
            foreach ($result as $jobId) {
100 2
                $jobCacheKey1 = $this->getJobCacheKey($jobId);
101 2
                if (!($foundJobMessage = $this->redis->get($jobCacheKey1))) {
102
                    $this->redis->lRem($crcCacheKey, 1, $jobCacheKey1);
103
                    continue;
104
                }
105
106
                /// There is one?
107 2
                if ($foundJobMessage) {
108 2
                    $foundJob = $this->batchFoundJob($job, $jobCacheKey1, $foundJobMessage);
109 2
                    if ($foundJob) {
110 2
                        return $foundJob;
111
                    }
112
                }
113
            }
114
        }
115
116
        return null;
117
    }
118
119
    /**
120
     * @param string $foundJobCacheKey
121
     * @param bool   $foundJobMessage
122
     */
123 2
    protected function batchFoundJob(Job $job, $foundJobCacheKey, $foundJobMessage)
124
    {
125 2
        $when = $job->getWhenUs();
126 2
        $crcHash = $job->getCrcHash();
127 2
        $crcCacheKey = $this->getJobCrcHashKey($crcHash);
128
129 2
        $foundJob = new Job();
130 2
        $foundJob->fromMessage($foundJobMessage);
0 ignored issues
show
Documentation introduced by
$foundJobMessage is of type boolean, but the function expects a string.

It seems like the type of the argument is not accepted by the function/method which you are calling.

In some cases, in particular if PHP’s automatic type-juggling kicks in this might be fine. In other cases, however this might be a bug.

We suggest to add an explicit type cast like in the following example:

function acceptsInteger($int) { }

$x = '123'; // string "123"

// Instead of
acceptsInteger($x);

// we recommend to use
acceptsInteger((integer) $x);
Loading history...
131 2
        $foundWhen = $foundJob->getWhenUs();
132
133
        // Fix this using bcmath
134 2
        $curtimeU = Util::getMicrotimeDecimal();
135 2
        $newFoundWhen = null;
136 2
        if (bccomp($foundWhen, $curtimeU) > 0 && bccomp($foundWhen, $when) >= 1) {
137 2
            $newFoundWhen = $when;
138
        }
139 2
        $foundPriority = $foundJob->getPriority();
140 2
        $newFoundPriority = null;
141 2
        if ($foundPriority > $job->getPriority()) {
142 2
            $newFoundPriority = $job->getPriority();
143
        }
144
145 2
        return $this->finishBatchFoundJob($foundJob, $foundJobCacheKey, $crcCacheKey, $newFoundWhen, $newFoundPriority);
146
    }
147
148
    /**
149
     * @param string   $crcCacheKey
150
     * @param int|null $newFoundPriority
151
     */
152 2
    protected function finishBatchFoundJob(Job $foundJob, $foundJobCacheKey, $crcCacheKey, $newFoundWhen, $newFoundPriority)
153
    {
154
        // Now how do we adjust this job's priority or time?
155 2
        $adjust = false;
156 2
        if (isset($newFoundWhen)) {
157 2
            $foundJob->setWhenUs($newFoundWhen);
158 2
            $adjust = true;
159
        }
160 2
        if (isset($newFoundPriority)) {
161 2
            $foundJob->setPriority($newFoundPriority);
162 2
            $adjust = true;
163
        }
164 2
        if (!$adjust) {
165 2
            return $foundJob;
166
        }
167
168 2
        return $this->addFoundJob($adjust, $foundJob, $foundJobCacheKey, $crcCacheKey);
169
    }
170
171
    /**
172
     * @param bool $adjust
173
     */
174 2
    protected function addFoundJob($adjust, Job $foundJob, $foundJobCacheKey, $crcCacheKey)
175
    {
176 2
        $whenQueue = $this->getWhenQueueCacheKey();
177 2
        $result = $this->adjustJob($adjust, $whenQueue, $foundJob, $foundJobCacheKey, $crcCacheKey, $foundJob->getWhenUs());
178 2
        if (null !== $result) {
179 2
            return $result;
180
        }
181
        if (null === $this->maxPriority) {
182
            return false;
183
        }
184
185
        $priorityQueue = $this->getPriorityQueueCacheKey();
186
        $result = $this->adjustJob($adjust, $priorityQueue, $foundJob, $foundJobCacheKey, $crcCacheKey, $foundJob->getPriority());
187
188
        return $result ?: false;
189
    }
190
191
    /**
192
     * @param string $queue
193
     * @param boolean $adjust
194
     */
195 2
    private function adjustJob($adjust, $queue, Job $foundJob, $foundJobCacheKey, $crcCacheKey, $zScore)
196
    {
197 2
        if ($adjust && $this->redis->zRem($queue, $foundJob->getId()) > 0) {
198 2
            if (!$this->insertJob($foundJob)) {
199
                // Job is expired
200
                $this->redis->lRem($crcCacheKey, 1, $foundJobCacheKey);
201
202
                return false;
203
            }
204 2
            $this->redis->zAdd($queue, $zScore, $foundJob->getId());
205
206 2
            return $foundJob;
207
        }
208
209
        return null;
210
    }
211
212
    /**
213
     * @param \Dtc\QueueBundle\Model\Job $job
214
     *
215
     * @return Job|null
216
     *
217
     * @throws ClassNotSubclassException
218
     * @throws PriorityException
219
     */
220 18
    public function prioritySave(\Dtc\QueueBundle\Model\Job $job)
221
    {
222 18
        if (!$job instanceof Job) {
223
            throw new \InvalidArgumentException('$job must be instance of '.Job::class);
224
        }
225
226 18
        $this->validateSaveable($job);
227 18
        $this->setJobId($job);
228
229
        // Add to whenAt or priority queue?  /// optimizaiton...
230 18
        $whenUs = $job->getWhenUs();
231 18
        if (!$whenUs) {
232
            $whenUs = Util::getMicrotimeDecimal();
233
            $job->setWhenUs($whenUs);
234
        }
235
236 18
        if (true === $job->getBatch()) {
237
            // is there a CRC Hash already for this job
238 2
            if ($oldJob = $this->batchSave($job)) {
239 2
                return $oldJob;
240
            }
241
        }
242
243 18
        return $this->saveJob($job);
244
    }
245
246 18
    protected function saveJob(Job $job)
247
    {
248 18
        $whenQueue = $this->getWhenQueueCacheKey();
249 18
        $crcCacheKey = $this->getJobCrcHashKey($job->getCrcHash());
250
        // Save Job
251 18
        if (!$this->insertJob($job)) {
252
            // job is expired
253 2
            return null;
254
        }
255 18
        $jobId = $job->getId();
256 18
        $when = $job->getWhenUs();
257
        // Add Job to CRC list
258 18
        $this->redis->lPush($crcCacheKey, [$jobId]);
259
260 18
        $this->redis->zAdd($whenQueue, $when, $jobId);
261
262 18
        return $job;
263
    }
264
265
    /**
266
     * @param Job $job
267
     *
268
     * @return bool false if the job is already expired, true otherwise
269
     */
270 18
    protected function insertJob(Job $job)
271
    {
272
        // Save Job
273 18
        $jobCacheKey = $this->getJobCacheKey($job->getId());
274 18
        if ($expiresAt = $job->getExpiresAt()) {
275 2
            $expiresAtTime = $expiresAt->getTimestamp() - time();
276 2
            if ($expiresAtTime <= 0) {
277 2
                return false; /// ??? job is already expired
278
            }
279
            $this->redis->setEx($jobCacheKey, $expiresAtTime, $job->toMessage());
280
281
            return true;
282
        }
283 18
        $this->redis->set($jobCacheKey, $job->toMessage());
284
285 18
        return true;
286
    }
287
288
    /**
289
     * Returns the prioirty in DESCENDING order, except if maxPrioirty is null, then prioirty is 0.
290
     *
291
     * @param int|null $priority
292
     *
293
     * @return int
294
     */
295 18
    protected function calculatePriority($priority)
296
    {
297 18
        $priority = parent::calculatePriority($priority);
298 18
        if (null === $priority) {
299 16
            return null === $this->maxPriority ? 0 : $this->maxPriority;
300
        }
301
302 8
        if (null === $this->maxPriority) {
303
            return 0;
304
        }
305
306
        // Redis priority should be in DESC order
307 8
        return $this->maxPriority - $priority;
308
    }
309
310
    /**
311
     * @param \Dtc\QueueBundle\Model\Job $job
312
     *
313
     * @throws PriorityException
314
     * @throws ClassNotSubclassException
315
     */
316 18 View Code Duplication
    protected function validateSaveable(\Dtc\QueueBundle\Model\Job $job)
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
317
    {
318 18
        if (null !== $job->getPriority() && null === $this->maxPriority) {
319
            throw new PriorityException('This queue does not support priorities');
320
        }
321
322 18
        if (!$job instanceof RetryableJob) {
323
            throw new ClassNotSubclassException('Job needs to be instance of '.RetryableJob::class);
324
        }
325 18
    }
326
327
    /**
328
     * @param string|null $workerName
329
     * @param string|null $methodName
330
     * @param bool        $prioritize
331
     *
332
     * @throws UnsupportedException
333
     */
334 16
    protected function verifyGetJobArgs($workerName = null, $methodName = null, $prioritize = true)
335
    {
336 16 View Code Duplication
        if (null !== $workerName || null !== $methodName || (null !== $this->maxPriority && true !== $prioritize)) {
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated across your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
337 4
            throw new UnsupportedException('Unsupported');
338
        }
339 14
    }
340
341 2
    public function deleteJob(\Dtc\QueueBundle\Model\Job $job)
342
    {
343 2
        $jobId = $job->getId();
344 2
        $priorityQueue = $this->getPriorityQueueCacheKey();
345 2
        $whenQueue = $this->getWhenQueueCacheKey();
346
347 2
        $deleted = false;
348 2
        if ($this->redis->zRem($priorityQueue, $jobId)) {
349
            $deleted = true;
350 2
        } elseif ($this->redis->zRem($whenQueue, $jobId)) {
351 2
            $deleted = true;
352
        }
353
354 2
        if ($deleted) {
355 2
            $this->redis->del([$this->getJobCacheKey($jobId)]);
356 2
            $this->redis->lRem($this->getJobCrcHashKey($job->getCrcHash()), 1, $jobId);
357
        }
358 2
    }
359
360
    /**
361
     * @param string|null $workerName
362
     * @param string|null $methodName
363
     * @param bool        $prioritize
364
     * @param mixed       $runId
365
     *
366
     * @throws UnsupportedException
367
     *
368
     * @return Job|null
369
     */
370 16
    public function getJob($workerName = null, $methodName = null, $prioritize = true, $runId = null)
371
    {
372
        // First thing migrate any jobs from When queue to Prioirty queue
373
374 16
        $this->verifyGetJobArgs($workerName, $methodName, $prioritize);
375 14
        if (null !== $this->maxPriority) {
376 14
            $this->transferQueues();
377 14
            $queue = $this->getPriorityQueueCacheKey();
378 14
            $jobId = $this->redis->zPop($queue);
379
        } else {
380
            $queue = $this->getWhenQueueCacheKey();
381
            $microtime = Util::getMicrotimeDecimal();
382
            $jobId = $this->redis->zPopByMaxScore($queue, $microtime);
383
        }
384
385 14
        if ($jobId) {
386 12
            $jobMessage = $this->redis->get($this->getJobCacheKey($jobId));
387 12
            $job = new Job();
388 12
            $job->fromMessage($jobMessage);
0 ignored issues
show
Documentation introduced by
$jobMessage is of type boolean, but the function expects a string.

It seems like the type of the argument is not accepted by the function/method which you are calling.

In some cases, in particular if PHP’s automatic type-juggling kicks in this might be fine. In other cases, however this might be a bug.

We suggest to add an explicit type cast like in the following example:

function acceptsInteger($int) { }

$x = '123'; // string "123"

// Instead of
acceptsInteger($x);

// we recommend to use
acceptsInteger((integer) $x);
Loading history...
389 12
            $crcCacheKey = $this->getJobCrcHashKey($job->getCrcHash());
390 12
            $this->redis->lRem($crcCacheKey, 1, $job->getId());
391 12
            $this->redis->del([$this->getJobCacheKey($job->getId())]);
392
393 12
            return $job;
394
        }
395
396 10
        return null;
397
    }
398
399
    protected function getCurTime()
400
    {
401
        $time = intval(microtime(true) * 1000000);
402
403
        return $time;
404
    }
405
406 4 View Code Duplication
    public function resetJob(RetryableJob $job)
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
407
    {
408 4
        if (!$job instanceof Job) {
409
            throw new \InvalidArgumentException('$job must be instance of '.Job::class);
410
        }
411 4
        $job->setStatus(BaseJob::STATUS_NEW);
412 4
        $job->setMessage(null);
413 4
        $job->setStartedAt(null);
414 4
        $job->setRetries($job->getRetries() + 1);
415 4
        $job->setUpdatedAt(Util::getMicrotimeDateTime());
416 4
        $this->saveJob($job);
417
418 4
        return true;
419
    }
420
421 6
    public function retryableSaveHistory(RetryableJob $job, $retry)
422
    {
423 6
    }
424
}
425