Completed
Pull Request — master (#30)
by Matthew
14:29
created

JobManager::__construct()   A

Complexity

Conditions 2
Paths 2

Size

Total Lines 8
Code Lines 5

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 6
CRAP Score 2

Importance

Changes 0
Metric Value
dl 0
loc 8
ccs 6
cts 6
cp 1
rs 9.4285
c 0
b 0
f 0
cc 2
eloc 5
nc 2
nop 4
crap 2
1
<?php
2
3
namespace Dtc\QueueBundle\Redis;
4
5
use Dtc\QueueBundle\Exception\ClassNotSubclassException;
6
use Dtc\QueueBundle\Exception\PriorityException;
7
use Dtc\QueueBundle\Exception\UnsupportedException;
8
use Dtc\QueueBundle\Manager\JobTimingManager;
9
use Dtc\QueueBundle\Manager\PriorityJobManager;
10
use Dtc\QueueBundle\Manager\RunManager;
11
use Dtc\QueueBundle\Model\BaseJob;
12
use Dtc\QueueBundle\Model\RetryableJob;
13
use Dtc\QueueBundle\Util\Util;
14
15
/**
16
 * For future implementation.
17
 */
18
class JobManager extends PriorityJobManager
19
{
20
    /** @var RedisInterface */
21
    protected $redis;
22
23
    /** @var string */
24
    protected $cacheKeyPrefix;
25
26
    protected $hostname;
27
    protected $pid;
28
29 2
    public function __construct(RunManager $runManager, JobTimingManager $jobTimingManager, $jobClass, $cacheKeyPrefix)
30
    {
31 2
        $this->cacheKeyPrefix = $cacheKeyPrefix;
32 2
        $this->hostname = gethostname() ?: '';
33 2
        $this->pid = getmypid();
34
35 2
        parent::__construct($runManager, $jobTimingManager, $jobClass);
36 2
    }
37
38
    public function setRedis(RedisInterface $redis)
39
    {
40
        $this->redis = $redis;
41
    }
42
43 18
    protected function getJobCacheKey($jobId)
44
    {
45 18
        return $this->cacheKeyPrefix.'_job_'.$jobId;
46
    }
47
48 18
    protected function getJobCrcHashKey($jobCrc)
49
    {
50 18
        return $this->cacheKeyPrefix.'_job_crc_'.$jobCrc;
51
    }
52
53 14
    protected function getPriorityQueueCacheKey()
54
    {
55 14
        return $this->cacheKeyPrefix.'_priority';
56
    }
57
58 18
    protected function getWhenQueueCacheKey()
59
    {
60 18
        return $this->cacheKeyPrefix.'_when';
61
    }
62
63 14
    protected function transferQueues()
64
    {
65
        // Drains from WhenAt queue into Prioirty Queue
66 14
        $whenQueue = $this->getWhenQueueCacheKey();
67 14
        $priorityQueue = $this->getPriorityQueueCacheKey();
68 14
        $microtime = Util::getMicrotimeDecimal();
69 14
        while ($jobId = $this->redis->zPopByMaxScore($whenQueue, $microtime)) {
70 12
            $jobMessage = $this->redis->get($this->getJobCacheKey($jobId));
71 12
            if ($jobMessage) {
72 12
                $job = new \Dtc\QueueBundle\Redis\Job();
73 12
                $job->fromMessage($jobMessage);
0 ignored issues
show
Documentation introduced by
$jobMessage is of type boolean, but the function expects a string.

It seems like the type of the argument is not accepted by the function/method which you are calling.

In some cases, in particular if PHP’s automatic type-juggling kicks in this might be fine. In other cases, however this might be a bug.

We suggest to add an explicit type cast like in the following example:

function acceptsInteger($int) { }

$x = '123'; // string "123"

// Instead of
acceptsInteger($x);

// we recommend to use
acceptsInteger((integer) $x);
Loading history...
74 12
                $this->redis->zAdd($priorityQueue, $job->getPriority(), $job->getId());
75
            }
76
        }
77 14
    }
78
79
    /**
80
     * @param Job $job
81
     *
82
     * @return Job|null
83
     */
84 2
    protected function batchSave(\Dtc\QueueBundle\Redis\Job $job)
85
    {
86 2
        $crcHash = $job->getCrcHash();
87 2
        $crcCacheKey = $this->getJobCrcHashKey($crcHash);
88 2
        $result = $this->redis->lrange($crcCacheKey, 0, 1000);
89 2
        if (is_array($result)) {
90 2
            foreach ($result as $jobId) {
91 2
                $jobCacheKey1 = $this->getJobCacheKey($jobId);
92 2
                if (!($foundJobMessage = $this->redis->get($jobCacheKey1))) {
93
                    $this->redis->lRem($crcCacheKey, 1, $jobCacheKey1);
94
                    continue;
95
                }
96
97
                /// There is one?
98 2
                if ($foundJobMessage) {
99 2
                    $foundJob = $this->batchFoundJob($job, $jobCacheKey1, $foundJobMessage);
100 2
                    if ($foundJob) {
101 2
                        return $foundJob;
102
                    }
103
                }
104
            }
105
        }
106
107
        return null;
108
    }
109
110 2
    protected function batchFoundJob(\Dtc\QueueBundle\Redis\Job $job, $foundJobCacheKey, $foundJobMessage)
111
    {
112 2
        $when = $job->getWhenUs();
113 2
        $crcHash = $job->getCrcHash();
114 2
        $crcCacheKey = $this->getJobCrcHashKey($crcHash);
115
116 2
        $foundJob = new \Dtc\QueueBundle\Redis\Job();
117 2
        $foundJob->fromMessage($foundJobMessage);
118 2
        $foundWhen = $foundJob->getWhenUs();
119
120
        // Fix this using bcmath
121 2
        $curtimeU = Util::getMicrotimeDecimal();
122 2
        $newFoundWhen = null;
123 2
        if (bccomp($foundWhen, $curtimeU) > 0 && bccomp($foundWhen, $when) >= 1) {
124 2
            $newFoundWhen = $when;
125
        }
126 2
        $foundPriority = $foundJob->getPriority();
127 2
        $newFoundPriority = null;
128 2
        if ($foundPriority > $job->getPriority()) {
129 2
            $newFoundPriority = $job->getPriority();
130
        }
131
132 2
        return $this->finishBatchFoundJob($foundJob, $foundJobCacheKey, $crcCacheKey, $newFoundWhen, $newFoundPriority);
133
    }
134
135 2
    protected function finishBatchFoundJob(Job $foundJob, $foundJobCacheKey, $crcCacheKey, $newFoundWhen, $newFoundPriority)
136
    {
137
        // Now how do we adjust this job's priority or time?
138 2
        $adjust = false;
139 2
        if (isset($newFoundWhen)) {
140 2
            $foundJob->setWhenUs($newFoundWhen);
141 2
            $adjust = true;
142
        }
143 2
        if (isset($newFoundPriority)) {
144 2
            $foundJob->setPriority($newFoundPriority);
145 2
            $adjust = true;
146
        }
147 2
        if (!$adjust) {
148 2
            return $foundJob;
149
        }
150
151 2
        return $this->addFoundJob($adjust, $foundJob, $foundJobCacheKey, $crcCacheKey);
152
    }
153
154 2
    protected function addFoundJob($adjust, Job $foundJob, $foundJobCacheKey, $crcCacheKey)
155
    {
156 2
        $whenQueue = $this->getWhenQueueCacheKey();
157 2
        $result = $this->adjustJob($adjust, $whenQueue, $foundJob, $foundJobCacheKey, $crcCacheKey, $foundJob->getWhenUs());
158 2
        if (null !== $result) {
159 2
            return $result;
160
        }
161
        if (null === $this->maxPriority) {
162
            return false;
163
        }
164
165
        $priorityQueue = $this->getPriorityQueueCacheKey();
166
        $result = $this->adjustJob($adjust, $priorityQueue, $foundJob, $foundJobCacheKey, $crcCacheKey, $foundJob->getPriority());
167
168
        return $result ?: false;
169
    }
170
171 2
    private function adjustJob($adjust, $queue, Job $foundJob, $foundJobCacheKey, $crcCacheKey, $zScore)
172
    {
173 2
        if ($adjust && $this->redis->zRem($queue, $foundJob->getId()) > 0) {
174 2
            if (!$this->insertJob($foundJob)) {
175
                // Job is expired
176
                $this->redis->lRem($crcCacheKey, 1, $foundJobCacheKey);
177
178
                return false;
179
            }
180 2
            $this->redis->zAdd($queue, $zScore, $foundJob->getId());
181
182 2
            return $foundJob;
183
        }
184
185
        return null;
186
    }
187
188
    /**
189
     * @param \Dtc\QueueBundle\Model\Job $job
190
     *
191
     * @return \Dtc\QueueBundle\Model\Job
192
     *
193
     * @throws ClassNotSubclassException
194
     */
195 18
    public function prioritySave(\Dtc\QueueBundle\Model\Job $job)
196
    {
197 18
        if (!$job instanceof \Dtc\QueueBundle\Redis\Job) {
198
            throw new \InvalidArgumentException('$job must be instance of '.\Dtc\QueueBundle\Redis\Job::class);
199
        }
200
201 18
        $this->validateSaveable($job);
202 18
        $this->setJobId($job);
203
204
        // Add to whenAt or priority queue?  /// optimizaiton...
205 18
        $whenUs = $job->getWhenUs();
206 18
        if (!$whenUs) {
207
            $whenUs = Util::getMicrotimeDecimal();
208
            $job->setWhenUs($whenUs);
209
        }
210
211 18
        if (true === $job->getBatch()) {
212
            // is there a CRC Hash already for this job
213 2
            if ($oldJob = $this->batchSave($job)) {
214 2
                return $oldJob;
215
            }
216
        }
217
218 18
        return $this->saveJob($job);
219
    }
220
221 18
    protected function saveJob(\Dtc\QueueBundle\Redis\Job $job)
222
    {
223 18
        $whenQueue = $this->getWhenQueueCacheKey();
224 18
        $crcCacheKey = $this->getJobCrcHashKey($job->getCrcHash());
225
        // Save Job
226 18
        if (!$this->insertJob($job)) {
227
            // job is expired
228 2
            return null;
229
        }
230 18
        $jobId = $job->getId();
231 18
        $when = $job->getWhenUs();
232
        // Add Job to CRC list
233 18
        $this->redis->lPush($crcCacheKey, [$jobId]);
234
235 18
        $this->redis->zAdd($whenQueue, $when, $jobId);
236
237 18
        return $job;
238
    }
239
240
    /**
241
     * @param Job $job
242
     *
243
     * @return bool false if the job is already expired, true otherwise
244
     */
245 18
    protected function insertJob(\Dtc\QueueBundle\Redis\Job $job)
246
    {
247
        // Save Job
248 18
        $jobCacheKey = $this->getJobCacheKey($job->getId());
249 18
        if ($expiresAt = $job->getExpiresAt()) {
250 2
            $expiresAtTime = $expiresAt->getTimestamp() - time();
251 2
            if ($expiresAtTime <= 0) {
252 2
                return false; /// ??? job is already expired
253
            }
254
            $this->redis->setEx($jobCacheKey, $expiresAtTime, $job->toMessage());
255
256
            return true;
257
        }
258 18
        $this->redis->set($jobCacheKey, $job->toMessage());
259
260 18
        return true;
261
    }
262
263
    /**
264
     * Attach a unique id to a job since RabbitMQ will not.
265
     *
266
     * @param \Dtc\QueueBundle\Model\Job $job
267
     */
268 18
    protected function setJobId(\Dtc\QueueBundle\Model\Job $job)
269
    {
270 18 View Code Duplication
        if (!$job->getId()) {
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated across your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
271 18
            $job->setId(uniqid($this->hostname.'-'.$this->pid, true));
272
        }
273 18
    }
274
275
    /**
276
     * Returns the prioirty in DESCENDING order, except if maxPrioirty is null, then prioirty is 0.
277
     */
278 18
    protected function calculatePriority($priority)
279
    {
280 18
        $priority = parent::calculatePriority($priority);
281 18
        if (null === $priority) {
282 16
            return null === $this->maxPriority ? 0 : $this->maxPriority;
283
        }
284
285 8
        if (null === $this->maxPriority) {
286
            return 0;
287
        }
288
289
        // Redis priority should be in DESC order
290 8
        return $this->maxPriority - $priority;
291
    }
292
293
    /**
294
     * @param \Dtc\QueueBundle\Model\Job $job
295
     *
296
     * @throws PriorityException
297
     * @throws ClassNotSubclassException
298
     */
299 18 View Code Duplication
    protected function validateSaveable(\Dtc\QueueBundle\Model\Job $job)
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
300
    {
301 18
        if (null !== $job->getPriority() && null === $this->maxPriority) {
302
            throw new PriorityException('This queue does not support priorities');
303
        }
304
305 18
        if (!$job instanceof RetryableJob) {
306
            throw new ClassNotSubclassException('Job needs to be instance of '.RetryableJob::class);
307
        }
308 18
    }
309
310 16
    protected function verifyGetJobArgs($workerName = null, $methodName = null, $prioritize = true)
311
    {
312 16 View Code Duplication
        if (null !== $workerName || null !== $methodName || (null !== $this->maxPriority && true !== $prioritize)) {
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated across your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
313 4
            throw new UnsupportedException('Unsupported');
314
        }
315 14
    }
316
317 2
    public function deleteJob(\Dtc\QueueBundle\Model\Job $job)
318
    {
319 2
        $jobId = $job->getId();
320 2
        $priorityQueue = $this->getPriorityQueueCacheKey();
321 2
        $whenQueue = $this->getWhenQueueCacheKey();
322
323 2
        $deleted = false;
324 2
        if ($this->redis->zRem($priorityQueue, $jobId)) {
325
            $deleted = true;
326 2
        } elseif ($this->redis->zRem($whenQueue, $jobId)) {
327 2
            $deleted = true;
328
        }
329
330 2
        if ($deleted) {
331 2
            $this->redis->del([$this->getJobCacheKey($jobId)]);
332 2
            $this->redis->lRem($this->getJobCrcHashKey($job->getCrcHash()), 1, $jobId);
333
        }
334 2
    }
335
336
    /**
337
     * @param string $workerName
338
     */
339 16
    public function getJob($workerName = null, $methodName = null, $prioritize = true, $runId = null)
340
    {
341
        // First thing migrate any jobs from When queue to Prioirty queue
342
343 16
        $this->verifyGetJobArgs($workerName, $methodName, $prioritize);
344 14
        if (null !== $this->maxPriority) {
345 14
            $this->transferQueues();
346 14
            $queue = $this->getPriorityQueueCacheKey();
347 14
            $jobId = $this->redis->zPop($queue);
348
        } else {
349
            $queue = $this->getWhenQueueCacheKey();
350
            $microtime = Util::getMicrotimeDecimal();
351
            $jobId = $this->redis->zPopByMaxScore($queue, $microtime);
352
        }
353
354 14
        if ($jobId) {
355 12
            $jobMessage = $this->redis->get($this->getJobCacheKey($jobId));
356 12
            $job = new \Dtc\QueueBundle\Redis\Job();
357 12
            $job->fromMessage($jobMessage);
0 ignored issues
show
Documentation introduced by
$jobMessage is of type boolean, but the function expects a string.

It seems like the type of the argument is not accepted by the function/method which you are calling.

In some cases, in particular if PHP’s automatic type-juggling kicks in this might be fine. In other cases, however this might be a bug.

We suggest to add an explicit type cast like in the following example:

function acceptsInteger($int) { }

$x = '123'; // string "123"

// Instead of
acceptsInteger($x);

// we recommend to use
acceptsInteger((integer) $x);
Loading history...
358 12
            $crcCacheKey = $this->getJobCrcHashKey($job->getCrcHash());
359 12
            $this->redis->lRem($crcCacheKey, 1, $job->getId());
360 12
            $this->redis->del([$this->getJobCacheKey($job->getId())]);
361
362 12
            return $job;
363
        }
364
365 10
        return null;
366
    }
367
368
    protected function getCurTime()
369
    {
370
        $time = intval(microtime(true) * 1000000);
371
372
        return $time;
373
    }
374
375 4 View Code Duplication
    public function resetJob(RetryableJob $job)
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
376
    {
377 4
        if (!$job instanceof \Dtc\QueueBundle\Redis\Job) {
378
            throw new \InvalidArgumentException('$job must be instance of '.\Dtc\QueueBundle\Redis\Job::class);
379
        }
380 4
        $job->setStatus(BaseJob::STATUS_NEW);
381 4
        $job->setMessage(null);
382 4
        $job->setStartedAt(null);
383 4
        $job->setRetries($job->getRetries() + 1);
384 4
        $job->setUpdatedAt(Util::getMicrotimeDateTime());
385 4
        $this->saveJob($job);
386
387 4
        return true;
388
    }
389
390 6
    public function retryableSaveHistory(RetryableJob $job, $retry)
391
    {
392 6
    }
393
}
394