Completed
Pull Request — master (#30)
by Matthew
06:35
created

JobManager::batchSave()   B

Complexity

Conditions 6
Paths 3

Size

Total Lines 25
Code Lines 15

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 12
CRAP Score 6.288

Importance

Changes 0
Metric Value
c 0
b 0
f 0
dl 0
loc 25
ccs 12
cts 15
cp 0.8
rs 8.439
cc 6
eloc 15
nc 3
nop 1
crap 6.288
1
<?php
2
3
namespace Dtc\QueueBundle\Redis;
4
5
use Dtc\QueueBundle\Exception\ClassNotSubclassException;
6
use Dtc\QueueBundle\Exception\PriorityException;
7
use Dtc\QueueBundle\Exception\UnsupportedException;
8
use Dtc\QueueBundle\Manager\JobTimingManager;
9
use Dtc\QueueBundle\Manager\PriorityJobManager;
10
use Dtc\QueueBundle\Manager\RunManager;
11
use Dtc\QueueBundle\Model\BaseJob;
12
use Dtc\QueueBundle\Model\RetryableJob;
13
use Dtc\QueueBundle\Util\Util;
14
15
/**
16
 * For future implementation.
17
 */
18
class JobManager extends PriorityJobManager
19
{
20
    /** @var RedisInterface */
21
    protected $redis;
22
23
    /** @var string */
24
    protected $cacheKeyPrefix;
25
26
    protected $hostname;
27
    protected $pid;
28
29 2
    public function __construct(RunManager $runManager, JobTimingManager $jobTimingManager, $jobClass, $cacheKeyPrefix)
30
    {
31 2
        $this->cacheKeyPrefix = $cacheKeyPrefix;
32 2
        $this->hostname = gethostname() ?: '';
33 2
        $this->pid = getmypid();
34
35 2
        parent::__construct($runManager, $jobTimingManager, $jobClass);
36 2
    }
37
38
    public function setRedis(RedisInterface $redis)
39
    {
40
        $this->redis = $redis;
41
    }
42
43 18
    protected function getJobCacheKey($jobId)
44
    {
45 18
        return $this->cacheKeyPrefix.'_job_'.$jobId;
46
    }
47
48 18
    protected function getJobCrcHashKey($jobCrc)
49
    {
50 18
        return $this->cacheKeyPrefix.'_job_crc_'.$jobCrc;
51
    }
52
53 14
    protected function getPriorityQueueCacheKey()
54
    {
55 14
        return $this->cacheKeyPrefix.'_priority';
56
    }
57
58 18
    protected function getWhenQueueCacheKey()
59
    {
60 18
        return $this->cacheKeyPrefix.'_when';
61
    }
62
63 14
    protected function transferQueues()
64
    {
65
        // Drains from WhenAt queue into Prioirty Queue
66 14
        $whenQueue = $this->getWhenQueueCacheKey();
67 14
        $priorityQueue = $this->getPriorityQueueCacheKey();
68 14
        $microtime = Util::getMicrotimeDecimal();
69 14
        while ($jobId = $this->redis->zPopByMaxScore($whenQueue, $microtime)) {
70 12
            $jobMessage = $this->redis->get($this->getJobCacheKey($jobId));
71 12
            if ($jobMessage) {
72 12
                $job = new \Dtc\QueueBundle\Redis\Job();
73 12
                $job->fromMessage($jobMessage);
0 ignored issues
show
Documentation introduced by
$jobMessage is of type boolean, but the function expects a string.

It seems like the type of the argument is not accepted by the function/method which you are calling.

In some cases, in particular if PHP’s automatic type-juggling kicks in this might be fine. In other cases, however this might be a bug.

We suggest to add an explicit type cast like in the following example:

function acceptsInteger($int) { }

$x = '123'; // string "123"

// Instead of
acceptsInteger($x);

// we recommend to use
acceptsInteger((integer) $x);
Loading history...
74 12
                $this->redis->zAdd($priorityQueue, $job->getPriority(), $job->getId());
75
            }
76
        }
77 14
    }
78
79 2
    protected function batchSave(\Dtc\QueueBundle\Redis\Job $job)
80
    {
81 2
        $crcHash = $job->getCrcHash();
82 2
        $crcCacheKey = $this->getJobCrcHashKey($crcHash);
83 2
        $result = $this->redis->lrange($crcCacheKey, 0, 1000);
84 2
        if (is_array($result)) {
85 2
            foreach ($result as $jobId) {
86 2
                $jobCacheKey1 = $this->getJobCacheKey($jobId);
87 2
                if (!($foundJobMessage = $this->redis->get($jobCacheKey1))) {
88
                    $this->redis->lRem($crcCacheKey, 1, $jobCacheKey1);
89
                    continue;
90
                }
91
92
                /// There is one?
93 2
                if ($foundJobMessage) {
94 2
                    $foundJob = $this->batchFoundJob($job, $jobCacheKey1, $foundJobMessage);
95 2
                    if ($foundJob) {
96 2
                        return $foundJob;
97
                    }
98
                }
99
            }
100
        }
101
102
        return null;
103
    }
104
105 2
    protected function batchFoundJob(\Dtc\QueueBundle\Redis\Job $job, $foundJobCacheKey, $foundJobMessage)
106
    {
107 2
        $when = $job->getWhenUs();
108 2
        $crcHash = $job->getCrcHash();
109 2
        $crcCacheKey = $this->getJobCrcHashKey($crcHash);
110
111 2
        $foundJob = new \Dtc\QueueBundle\Redis\Job();
112 2
        $foundJob->fromMessage($foundJobMessage);
113 2
        $foundWhen = $foundJob->getWhenUs();
114
115
        // Fix this using bcmath
116 2
        $curtimeU = Util::getMicrotimeDecimal();
117 2
        $newFoundWhen = null;
118 2
        if (bccomp($foundWhen, $curtimeU) > 0 && bccomp($foundWhen, $when) >= 1) {
119 2
            $newFoundWhen = $when;
120
        }
121 2
        $foundPriority = $foundJob->getPriority();
122 2
        $newFoundPriority = null;
123 2
        if ($foundPriority > $job->getPriority()) {
124 2
            $newFoundPriority = $job->getPriority();
125
        }
126
127 2
        return $this->finishBatchFoundJob($foundJob, $foundJobCacheKey, $crcCacheKey, $newFoundWhen, $newFoundPriority);
128
    }
129
130 2
    protected function finishBatchFoundJob(Job $foundJob, $foundJobCacheKey, $crcCacheKey, $newFoundWhen, $newFoundPriority)
131
    {
132
        // Now how do we adjust this job's priority or time?
133 2
        $adjust = false;
134 2
        if (isset($newFoundWhen)) {
135 2
            $foundJob->setWhenUs($newFoundWhen);
136 2
            $adjust = true;
137
        }
138 2
        if (isset($newFoundPriority)) {
139 2
            $foundJob->setPriority($newFoundPriority);
140 2
            $adjust = true;
141
        }
142 2
        if (!$adjust) {
143 2
            return $foundJob;
144
        }
145
146 2
        return $this->addFoundJob($adjust, $foundJob, $foundJobCacheKey, $crcCacheKey);
147
    }
148
149 2
    protected function addFoundJob($adjust, Job $foundJob, $foundJobCacheKey, $crcCacheKey)
150
    {
151 2
        $whenQueue = $this->getWhenQueueCacheKey();
152 2
        $result = $this->adjustJob($adjust, $whenQueue, $foundJob, $foundJobCacheKey, $crcCacheKey, $foundJob->getWhenUs());
153 2
        if (null !== $result) {
154 2
            return $result;
155
        }
156
        if (null === $this->maxPriority) {
157
            return false;
158
        }
159
160
        $priorityQueue = $this->getPriorityQueueCacheKey();
161
        $result = $this->adjustJob($adjust, $priorityQueue, $foundJob, $foundJobCacheKey, $crcCacheKey, $foundJob->getPriority());
162
163
        return $result ?: false;
164
    }
165
166 2
    private function adjustJob($adjust, $queue, Job $foundJob, $foundJobCacheKey, $crcCacheKey, $zScore)
167
    {
168 2
        if ($adjust && $this->redis->zRem($queue, $foundJob->getId()) > 0) {
169 2
            if (!$this->insertJob($foundJob)) {
170
                // Job is expired
171
                $this->redis->lRem($crcCacheKey, 1, $foundJobCacheKey);
172
173
                return false;
174
            }
175 2
            $this->redis->zAdd($queue, $zScore, $foundJob->getId());
176
177 2
            return $foundJob;
178
        }
179
180
        return null;
181
    }
182
183
    /**
184
     * @param \Dtc\QueueBundle\Model\Job $job
185
     *
186
     * @return \Dtc\QueueBundle\Model\Job
187
     *
188
     * @throws ClassNotSubclassException
189
     */
190 18
    public function prioritySave(\Dtc\QueueBundle\Model\Job $job)
191
    {
192 18
        if (!$job instanceof \Dtc\QueueBundle\Redis\Job) {
193
            throw new \InvalidArgumentException('$job must be instance of '.\Dtc\QueueBundle\Redis\Job::class);
194
        }
195
196 18
        $this->validateSaveable($job);
197 18
        $this->setJobId($job);
198
199
        // Add to whenAt or priority queue?  /// optimizaiton...
200 18
        $whenUs = $job->getWhenUs();
201 18
        if (!$whenUs) {
202
            $whenUs = Util::getMicrotimeDecimal();
203
            $job->setWhenUs($whenUs);
204
        }
205
206 18
        if (true === $job->getBatch()) {
207
            // is there a CRC Hash already for this job
208 2
            if ($oldJob = $this->batchSave($job)) {
0 ignored issues
show
Bug introduced by
Are you sure the assignment to $oldJob is correct as $this->batchSave($job) (which targets Dtc\QueueBundle\Redis\JobManager::batchSave()) seems to always return null.

This check looks for function or method calls that always return null and whose return value is assigned to a variable.

class A
{
    function getObject()
    {
        return null;
    }

}

$a = new A();
$object = $a->getObject();

The method getObject() can return nothing but null, so it makes no sense to assign that value to a variable.

The reason is most likely that a function or method is imcomplete or has been reduced for debug purposes.

Loading history...
209 2
                return $oldJob;
210
            }
211
        }
212
213 18
        return $this->saveJob($job);
214
    }
215
216 18
    protected function saveJob(\Dtc\QueueBundle\Redis\Job $job)
217
    {
218 18
        $whenQueue = $this->getWhenQueueCacheKey();
219 18
        $crcCacheKey = $this->getJobCrcHashKey($job->getCrcHash());
220
        // Save Job
221 18
        if (!$this->insertJob($job)) {
222
            // job is expired
223 2
            return null;
224
        }
225 18
        $jobId = $job->getId();
226 18
        $when = $job->getWhenUs();
227
        // Add Job to CRC list
228 18
        $this->redis->lPush($crcCacheKey, [$jobId]);
229
230 18
        $this->redis->zAdd($whenQueue, $when, $jobId);
231
232 18
        return $job;
233
    }
234
235
    /**
236
     * @param Job $job
237
     *
238
     * @return bool false if the job is already expired, true otherwise
239
     */
240 18
    protected function insertJob(\Dtc\QueueBundle\Redis\Job $job)
241
    {
242
        // Save Job
243 18
        $jobCacheKey = $this->getJobCacheKey($job->getId());
244 18
        if ($expiresAt = $job->getExpiresAt()) {
245 2
            $expiresAtTime = $expiresAt->getTimestamp() - time();
246 2
            if ($expiresAtTime <= 0) {
247 2
                return false; /// ??? job is already expired
248
            }
249
            $this->redis->setEx($jobCacheKey, $expiresAtTime, $job->toMessage());
250
251
            return true;
252
        }
253 18
        $this->redis->set($jobCacheKey, $job->toMessage());
254
255 18
        return true;
256
    }
257
258
    /**
259
     * Attach a unique id to a job since RabbitMQ will not.
260
     *
261
     * @param \Dtc\QueueBundle\Model\Job $job
262
     */
263 18
    protected function setJobId(\Dtc\QueueBundle\Model\Job $job)
264
    {
265 18 View Code Duplication
        if (!$job->getId()) {
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated across your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
266 18
            $job->setId(uniqid($this->hostname.'-'.$this->pid, true));
267
        }
268 18
    }
269
270
    /**
271
     * Returns the prioirty in DESCENDING order, except if maxPrioirty is null, then prioirty is 0.
272
     */
273 18
    protected function calculatePriority($priority)
274
    {
275 18
        $priority = parent::calculatePriority($priority);
276 18
        if (null === $priority) {
277 16
            return null === $this->maxPriority ? 0 : $this->maxPriority;
278
        }
279
280 8
        if (null === $this->maxPriority) {
281
            return 0;
282
        }
283
284
        // Redis priority should be in DESC order
285 8
        return $this->maxPriority - $priority;
286
    }
287
288
    /**
289
     * @param \Dtc\QueueBundle\Model\Job $job
290
     *
291
     * @throws PriorityException
292
     * @throws ClassNotSubclassException
293
     */
294 18 View Code Duplication
    protected function validateSaveable(\Dtc\QueueBundle\Model\Job $job)
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
295
    {
296 18
        if (null !== $job->getPriority() && null === $this->maxPriority) {
297
            throw new PriorityException('This queue does not support priorities');
298
        }
299
300 18
        if (!$job instanceof RetryableJob) {
301
            throw new ClassNotSubclassException('Job needs to be instance of '.RetryableJob::class);
302
        }
303 18
    }
304
305 16
    protected function verifyGetJobArgs($workerName = null, $methodName = null, $prioritize = true)
306
    {
307 16 View Code Duplication
        if (null !== $workerName || null !== $methodName || (null !== $this->maxPriority && true !== $prioritize)) {
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated across your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
308 4
            throw new UnsupportedException('Unsupported');
309
        }
310 14
    }
311
312 2
    public function deleteJob(\Dtc\QueueBundle\Model\Job $job)
313
    {
314 2
        $jobId = $job->getId();
315 2
        $priorityQueue = $this->getPriorityQueueCacheKey();
316 2
        $whenQueue = $this->getWhenQueueCacheKey();
317
318 2
        $deleted = false;
319 2
        if ($this->redis->zRem($priorityQueue, $jobId)) {
320
            $deleted = true;
321 2
        } elseif ($this->redis->zRem($whenQueue, $jobId)) {
322 2
            $deleted = true;
323
        }
324
325 2
        if ($deleted) {
326 2
            $this->redis->del([$this->getJobCacheKey($jobId)]);
327 2
            $this->redis->lRem($this->getJobCrcHashKey($job->getCrcHash()), 1, $jobId);
328
        }
329 2
    }
330
331
    /**
332
     * @param string $workerName
333
     */
334 16
    public function getJob($workerName = null, $methodName = null, $prioritize = true, $runId = null)
335
    {
336
        // First thing migrate any jobs from When queue to Prioirty queue
337
338 16
        $this->verifyGetJobArgs($workerName, $methodName, $prioritize);
339 14
        if (null !== $this->maxPriority) {
340 14
            $this->transferQueues();
341 14
            $queue = $this->getPriorityQueueCacheKey();
342 14
            $jobId = $this->redis->zPop($queue);
343
        } else {
344
            $queue = $this->getWhenQueueCacheKey();
345
            $microtime = Util::getMicrotimeDecimal();
346
            $jobId = $this->redis->zPopByMaxScore($queue, $microtime);
347
        }
348
349 14
        if ($jobId) {
350 12
            $jobMessage = $this->redis->get($this->getJobCacheKey($jobId));
351 12
            $job = new \Dtc\QueueBundle\Redis\Job();
352 12
            $job->fromMessage($jobMessage);
0 ignored issues
show
Documentation introduced by
$jobMessage is of type boolean, but the function expects a string.

It seems like the type of the argument is not accepted by the function/method which you are calling.

In some cases, in particular if PHP’s automatic type-juggling kicks in this might be fine. In other cases, however this might be a bug.

We suggest to add an explicit type cast like in the following example:

function acceptsInteger($int) { }

$x = '123'; // string "123"

// Instead of
acceptsInteger($x);

// we recommend to use
acceptsInteger((integer) $x);
Loading history...
353 12
            $crcCacheKey = $this->getJobCrcHashKey($job->getCrcHash());
354 12
            $this->redis->lRem($crcCacheKey, 1, $job->getId());
355 12
            $this->redis->del([$this->getJobCacheKey($job->getId())]);
356
357 12
            return $job;
358
        }
359
360 10
        return null;
361
    }
362
363
    protected function getCurTime()
364
    {
365
        $time = intval(microtime(true) * 1000000);
366
367
        return $time;
368
    }
369
370 4 View Code Duplication
    public function resetJob(RetryableJob $job)
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
371
    {
372 4
        if (!$job instanceof \Dtc\QueueBundle\Redis\Job) {
373
            throw new \InvalidArgumentException('$job must be instance of '.\Dtc\QueueBundle\Redis\Job::class);
374
        }
375 4
        $job->setStatus(BaseJob::STATUS_NEW);
376 4
        $job->setMessage(null);
377 4
        $job->setStartedAt(null);
378 4
        $job->setRetries($job->getRetries() + 1);
379 4
        $job->setUpdatedAt(Util::getMicrotimeDateTime());
380 4
        $this->saveJob($job);
381
382 4
        return true;
383
    }
384
385 6
    public function retryableSaveHistory(RetryableJob $job, $retry)
386
    {
387 6
    }
388
}
389