Completed
Pull Request — master (#30)
by Matthew
23:29 queued 08:10
created

JobManager::setRedis()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 4
Code Lines 2

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 0
CRAP Score 2

Importance

Changes 1
Bugs 0 Features 0
Metric Value
dl 0
loc 4
ccs 0
cts 3
cp 0
rs 10
c 1
b 0
f 0
cc 1
eloc 2
nc 1
nop 1
crap 2
1
<?php
2
3
namespace Dtc\QueueBundle\Redis;
4
5
use Dtc\QueueBundle\Exception\ClassNotSubclassException;
6
use Dtc\QueueBundle\Exception\PriorityException;
7
use Dtc\QueueBundle\Exception\UnsupportedException;
8
use Dtc\QueueBundle\Manager\JobIdTrait;
9
use Dtc\QueueBundle\Manager\JobTimingManager;
10
use Dtc\QueueBundle\Manager\PriorityJobManager;
11
use Dtc\QueueBundle\Manager\RunManager;
12
use Dtc\QueueBundle\Model\BaseJob;
13
use Dtc\QueueBundle\Model\RetryableJob;
14
use Dtc\QueueBundle\Util\Util;
15
16
/**
17
 * For future implementation.
18
 */
19
class JobManager extends PriorityJobManager
20
{
21
    use JobIdTrait;
22
23
    /** @var RedisInterface */
24
    protected $redis;
25
26
    /** @var string */
27
    protected $cacheKeyPrefix;
28
29
    protected $hostname;
30
    protected $pid;
31
32
    /**
33
     * @param string $cacheKeyPrefix
34
     */
35 2
    public function __construct(RunManager $runManager, JobTimingManager $jobTimingManager, $jobClass, $cacheKeyPrefix)
36
    {
37 2
        $this->cacheKeyPrefix = $cacheKeyPrefix;
38 2
        $this->hostname = gethostname() ?: '';
39 2
        $this->pid = getmypid();
40
41 2
        parent::__construct($runManager, $jobTimingManager, $jobClass);
42 2
    }
43
44
    public function setRedis(RedisInterface $redis)
45
    {
46
        $this->redis = $redis;
47
    }
48
49 18
    protected function getJobCacheKey($jobId)
50
    {
51 18
        return $this->cacheKeyPrefix.'_job_'.$jobId;
52
    }
53
54
    /**
55
     * @param string $jobCrc
56
     */
57 18
    protected function getJobCrcHashKey($jobCrc)
58
    {
59 18
        return $this->cacheKeyPrefix.'_job_crc_'.$jobCrc;
60
    }
61
62 14
    protected function getPriorityQueueCacheKey()
63
    {
64 14
        return $this->cacheKeyPrefix.'_priority';
65
    }
66
67 18
    protected function getWhenQueueCacheKey()
68
    {
69 18
        return $this->cacheKeyPrefix.'_when';
70
    }
71
72 14
    protected function transferQueues()
73
    {
74
        // Drains from WhenAt queue into Prioirty Queue
75 14
        $whenQueue = $this->getWhenQueueCacheKey();
76 14
        $priorityQueue = $this->getPriorityQueueCacheKey();
77 14
        $microtime = Util::getMicrotimeDecimal();
78 14
        while ($jobId = $this->redis->zPopByMaxScore($whenQueue, $microtime)) {
79 12
            $jobMessage = $this->redis->get($this->getJobCacheKey($jobId));
80 12
            if ($jobMessage) {
81 12
                $job = new Job();
82 12
                $job->fromMessage($jobMessage);
0 ignored issues
show
Documentation introduced by
$jobMessage is of type boolean, but the function expects a string.

It seems like the type of the argument is not accepted by the function/method which you are calling.

In some cases, in particular if PHP’s automatic type-juggling kicks in this might be fine. In other cases, however this might be a bug.

We suggest to add an explicit type cast like in the following example:

function acceptsInteger($int) { }

$x = '123'; // string "123"

// Instead of
acceptsInteger($x);

// we recommend to use
acceptsInteger((integer) $x);
Loading history...
83 12
                $this->redis->zAdd($priorityQueue, $job->getPriority(), $job->getId());
84
            }
85
        }
86 14
    }
87
88
    /**
89
     * @param Job $job
90
     *
91
     * @return Job|null
92
     */
93 2
    protected function batchSave(Job $job)
94
    {
95 2
        $crcHash = $job->getCrcHash();
96 2
        $crcCacheKey = $this->getJobCrcHashKey($crcHash);
97 2
        $result = $this->redis->lrange($crcCacheKey, 0, 1000);
98 2
        if (is_array($result)) {
99 2
            foreach ($result as $jobId) {
100 2
                $jobCacheKey1 = $this->getJobCacheKey($jobId);
101 2
                if (!($foundJobMessage = $this->redis->get($jobCacheKey1))) {
102
                    $this->redis->lRem($crcCacheKey, 1, $jobCacheKey1);
103
                    continue;
104
                }
105
106
                /// There is one?
107 2
                if ($foundJobMessage) {
108 2
                    $foundJob = $this->batchFoundJob($job, $jobCacheKey1, $foundJobMessage);
109 2
                    if ($foundJob) {
110 2
                        return $foundJob;
111
                    }
112
                }
113
            }
114
        }
115
116
        return null;
117
    }
118
119
    /**
120
     * @param string $foundJobCacheKey
121
     * @param boolean $foundJobMessage
122
     */
123 2
    protected function batchFoundJob(Job $job, $foundJobCacheKey, $foundJobMessage)
124
    {
125 2
        $when = $job->getWhenUs();
126 2
        $crcHash = $job->getCrcHash();
127 2
        $crcCacheKey = $this->getJobCrcHashKey($crcHash);
128
129 2
        $foundJob = new Job();
130 2
        $foundJob->fromMessage($foundJobMessage);
0 ignored issues
show
Documentation introduced by
$foundJobMessage is of type boolean, but the function expects a string.

It seems like the type of the argument is not accepted by the function/method which you are calling.

In some cases, in particular if PHP’s automatic type-juggling kicks in this might be fine. In other cases, however this might be a bug.

We suggest to add an explicit type cast like in the following example:

function acceptsInteger($int) { }

$x = '123'; // string "123"

// Instead of
acceptsInteger($x);

// we recommend to use
acceptsInteger((integer) $x);
Loading history...
131 2
        $foundWhen = $foundJob->getWhenUs();
132
133
        // Fix this using bcmath
134 2
        $curtimeU = Util::getMicrotimeDecimal();
135 2
        $newFoundWhen = null;
136 2
        if (bccomp($foundWhen, $curtimeU) > 0 && bccomp($foundWhen, $when) >= 1) {
137 2
            $newFoundWhen = $when;
138
        }
139 2
        $foundPriority = $foundJob->getPriority();
140 2
        $newFoundPriority = null;
141 2
        if ($foundPriority > $job->getPriority()) {
142 2
            $newFoundPriority = $job->getPriority();
143
        }
144
145 2
        return $this->finishBatchFoundJob($foundJob, $foundJobCacheKey, $crcCacheKey, $newFoundWhen, $newFoundPriority);
146
    }
147
148
    /**
149
     * @param string $crcCacheKey
150
     * @param integer|null $newFoundPriority
151
     */
152 2
    protected function finishBatchFoundJob(Job $foundJob, $foundJobCacheKey, $crcCacheKey, $newFoundWhen, $newFoundPriority)
153
    {
154
        // Now how do we adjust this job's priority or time?
155 2
        $adjust = false;
156 2
        if (isset($newFoundWhen)) {
157 2
            $foundJob->setWhenUs($newFoundWhen);
158 2
            $adjust = true;
159
        }
160 2
        if (isset($newFoundPriority)) {
161 2
            $foundJob->setPriority($newFoundPriority);
162 2
            $adjust = true;
163
        }
164 2
        if (!$adjust) {
165 2
            return $foundJob;
166
        }
167
168 2
        return $this->addFoundJob($adjust, $foundJob, $foundJobCacheKey, $crcCacheKey);
169
    }
170
171
    /**
172
     * @param boolean $adjust
173
     */
174 2
    protected function addFoundJob($adjust, Job $foundJob, $foundJobCacheKey, $crcCacheKey)
175
    {
176 2
        $whenQueue = $this->getWhenQueueCacheKey();
177 2
        $result = $this->adjustJob($adjust, $whenQueue, $foundJob, $foundJobCacheKey, $crcCacheKey, $foundJob->getWhenUs());
178 2
        if (null !== $result) {
179 2
            return $result;
180
        }
181
        if (null === $this->maxPriority) {
182
            return false;
183
        }
184
185
        $priorityQueue = $this->getPriorityQueueCacheKey();
186
        $result = $this->adjustJob($adjust, $priorityQueue, $foundJob, $foundJobCacheKey, $crcCacheKey, $foundJob->getPriority());
187
188
        return $result ?: false;
189
    }
190
191
    /**
192
     * @param string $queue
193
     */
194 2
    private function adjustJob($adjust, $queue, Job $foundJob, $foundJobCacheKey, $crcCacheKey, $zScore)
195
    {
196 2
        if ($adjust && $this->redis->zRem($queue, $foundJob->getId()) > 0) {
197 2
            if (!$this->insertJob($foundJob)) {
198
                // Job is expired
199
                $this->redis->lRem($crcCacheKey, 1, $foundJobCacheKey);
200
201
                return false;
202
            }
203 2
            $this->redis->zAdd($queue, $zScore, $foundJob->getId());
204
205 2
            return $foundJob;
206
        }
207
208
        return null;
209
    }
210
211
    /**
212
     * @param \Dtc\QueueBundle\Model\Job $job
213
     *
214
     * @return Job|null
215
     *
216
     * @throws ClassNotSubclassException
217
     * @throws PriorityException
218
     */
219 18
    public function prioritySave(\Dtc\QueueBundle\Model\Job $job)
220
    {
221 18
        if (!$job instanceof Job) {
222
            throw new \InvalidArgumentException('$job must be instance of '.Job::class);
223
        }
224
225 18
        $this->validateSaveable($job);
226 18
        $this->setJobId($job);
227
228
        // Add to whenAt or priority queue?  /// optimizaiton...
229 18
        $whenUs = $job->getWhenUs();
230 18
        if (!$whenUs) {
231
            $whenUs = Util::getMicrotimeDecimal();
232
            $job->setWhenUs($whenUs);
233
        }
234
235 18
        if (true === $job->getBatch()) {
236
            // is there a CRC Hash already for this job
237 2
            if ($oldJob = $this->batchSave($job)) {
238 2
                return $oldJob;
239
            }
240
        }
241
242 18
        return $this->saveJob($job);
243
    }
244
245 18
    protected function saveJob(Job $job)
246
    {
247 18
        $whenQueue = $this->getWhenQueueCacheKey();
248 18
        $crcCacheKey = $this->getJobCrcHashKey($job->getCrcHash());
249
        // Save Job
250 18
        if (!$this->insertJob($job)) {
251
            // job is expired
252 2
            return null;
253
        }
254 18
        $jobId = $job->getId();
255 18
        $when = $job->getWhenUs();
256
        // Add Job to CRC list
257 18
        $this->redis->lPush($crcCacheKey, [$jobId]);
258
259 18
        $this->redis->zAdd($whenQueue, $when, $jobId);
260
261 18
        return $job;
262
    }
263
264
    /**
265
     * @param Job $job
266
     *
267
     * @return bool false if the job is already expired, true otherwise
268
     */
269 18
    protected function insertJob(Job $job)
270
    {
271
        // Save Job
272 18
        $jobCacheKey = $this->getJobCacheKey($job->getId());
273 18
        if ($expiresAt = $job->getExpiresAt()) {
274 2
            $expiresAtTime = $expiresAt->getTimestamp() - time();
275 2
            if ($expiresAtTime <= 0) {
276 2
                return false; /// ??? job is already expired
277
            }
278
            $this->redis->setEx($jobCacheKey, $expiresAtTime, $job->toMessage());
279
280
            return true;
281
        }
282 18
        $this->redis->set($jobCacheKey, $job->toMessage());
283
284 18
        return true;
285
    }
286
287
    /**
288
     * Returns the prioirty in DESCENDING order, except if maxPrioirty is null, then prioirty is 0.
289
     *
290
     * @param int|null $priority
291
     *
292
     * @return int
293
     */
294 18
    protected function calculatePriority($priority)
295
    {
296 18
        $priority = parent::calculatePriority($priority);
297 18
        if (null === $priority) {
298 16
            return null === $this->maxPriority ? 0 : $this->maxPriority;
299
        }
300
301 8
        if (null === $this->maxPriority) {
302
            return 0;
303
        }
304
305
        // Redis priority should be in DESC order
306 8
        return $this->maxPriority - $priority;
307
    }
308
309
    /**
310
     * @param \Dtc\QueueBundle\Model\Job $job
311
     *
312
     * @throws PriorityException
313
     * @throws ClassNotSubclassException
314
     */
315 18 View Code Duplication
    protected function validateSaveable(\Dtc\QueueBundle\Model\Job $job)
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
316
    {
317 18
        if (null !== $job->getPriority() && null === $this->maxPriority) {
318
            throw new PriorityException('This queue does not support priorities');
319
        }
320
321 18
        if (!$job instanceof RetryableJob) {
322
            throw new ClassNotSubclassException('Job needs to be instance of '.RetryableJob::class);
323
        }
324 18
    }
325
326
    /**
327
     * @param string|null $workerName
328
     * @param string|null $methodName
329
     * @param bool $prioritize
330
     *
331
     * @throws UnsupportedException
332
     */
333 16
    protected function verifyGetJobArgs($workerName = null, $methodName = null, $prioritize = true)
334
    {
335 16 View Code Duplication
        if (null !== $workerName || null !== $methodName || (null !== $this->maxPriority && true !== $prioritize)) {
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated across your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
336 4
            throw new UnsupportedException('Unsupported');
337
        }
338 14
    }
339
340 2
    public function deleteJob(\Dtc\QueueBundle\Model\Job $job)
341
    {
342 2
        $jobId = $job->getId();
343 2
        $priorityQueue = $this->getPriorityQueueCacheKey();
344 2
        $whenQueue = $this->getWhenQueueCacheKey();
345
346 2
        $deleted = false;
347 2
        if ($this->redis->zRem($priorityQueue, $jobId)) {
348
            $deleted = true;
349 2
        } elseif ($this->redis->zRem($whenQueue, $jobId)) {
350 2
            $deleted = true;
351
        }
352
353 2
        if ($deleted) {
354 2
            $this->redis->del([$this->getJobCacheKey($jobId)]);
355 2
            $this->redis->lRem($this->getJobCrcHashKey($job->getCrcHash()), 1, $jobId);
356
        }
357 2
    }
358
359
    /**
360
     * @param string|null $workerName
361
     * @param string|null $methodName
362
     * @param bool        $prioritize
363
     * @param mixed       $runId
364
     *
365
     * @throws UnsupportedException
366
     *
367
     * @return \Dtc\QueueBundle\Model\Job|null
368
     */
369 16
    public function getJob($workerName = null, $methodName = null, $prioritize = true, $runId = null)
370
    {
371
        // First thing migrate any jobs from When queue to Prioirty queue
372
373 16
        $this->verifyGetJobArgs($workerName, $methodName, $prioritize);
374 14
        if (null !== $this->maxPriority) {
375 14
            $this->transferQueues();
376 14
            $queue = $this->getPriorityQueueCacheKey();
377 14
            $jobId = $this->redis->zPop($queue);
378
        } else {
379
            $queue = $this->getWhenQueueCacheKey();
380
            $microtime = Util::getMicrotimeDecimal();
381
            $jobId = $this->redis->zPopByMaxScore($queue, $microtime);
382
        }
383
384 14
        if ($jobId) {
385 12
            $jobMessage = $this->redis->get($this->getJobCacheKey($jobId));
386 12
            $job = new Job();
387 12
            $job->fromMessage($jobMessage);
0 ignored issues
show
Documentation introduced by
$jobMessage is of type boolean, but the function expects a string.

It seems like the type of the argument is not accepted by the function/method which you are calling.

In some cases, in particular if PHP’s automatic type-juggling kicks in this might be fine. In other cases, however this might be a bug.

We suggest to add an explicit type cast like in the following example:

function acceptsInteger($int) { }

$x = '123'; // string "123"

// Instead of
acceptsInteger($x);

// we recommend to use
acceptsInteger((integer) $x);
Loading history...
388 12
            $crcCacheKey = $this->getJobCrcHashKey($job->getCrcHash());
389 12
            $this->redis->lRem($crcCacheKey, 1, $job->getId());
390 12
            $this->redis->del([$this->getJobCacheKey($job->getId())]);
391
392 12
            return $job;
393
        }
394
395 10
        return null;
396
    }
397
398
    protected function getCurTime()
399
    {
400
        $time = intval(microtime(true) * 1000000);
401
402
        return $time;
403
    }
404
405 4 View Code Duplication
    public function resetJob(RetryableJob $job)
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
406
    {
407 4
        if (!$job instanceof Job) {
408
            throw new \InvalidArgumentException('$job must be instance of '.Job::class);
409
        }
410 4
        $job->setStatus(BaseJob::STATUS_NEW);
411 4
        $job->setMessage(null);
412 4
        $job->setStartedAt(null);
413 4
        $job->setRetries($job->getRetries() + 1);
414 4
        $job->setUpdatedAt(Util::getMicrotimeDateTime());
415 4
        $this->saveJob($job);
416
417 4
        return true;
418
    }
419
420 6
    public function retryableSaveHistory(RetryableJob $job, $retry)
421
    {
422 6
    }
423
}
424