Completed
Pull Request — master (#30)
by Matthew
07:05 queued 16s
created

JobManager::batchFoundJob()   C

Complexity

Conditions 7
Paths 32

Size

Total Lines 37
Code Lines 23

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 0
CRAP Score 56

Importance

Changes 1
Bugs 0 Features 0
Metric Value
dl 0
loc 37
ccs 0
cts 23
cp 0
rs 6.7272
c 1
b 0
f 0
cc 7
eloc 23
nc 32
nop 3
crap 56
1
<?php
2
3
namespace Dtc\QueueBundle\Redis;
4
5
use Dtc\QueueBundle\Exception\ClassNotSubclassException;
6
use Dtc\QueueBundle\Exception\PriorityException;
7
use Dtc\QueueBundle\Exception\UnsupportedException;
8
use Dtc\QueueBundle\Manager\JobTimingManager;
9
use Dtc\QueueBundle\Manager\PriorityJobManager;
10
use Dtc\QueueBundle\Manager\RunManager;
11
use Dtc\QueueBundle\Model\BaseJob;
12
use Dtc\QueueBundle\Model\RetryableJob;
13
use Dtc\QueueBundle\Util\Util;
14
15
/**
16
 * For future implementation.
17
 */
18
class JobManager extends PriorityJobManager
19
{
20
    /** @var RedisInterface */
21
    protected $redis;
22
23
    /** @var string */
24
    protected $cacheKeyPrefix;
25
26
    protected $hostname;
27
    protected $pid;
28
29 2
    public function __construct(RunManager $runManager, JobTimingManager $jobTimingManager, $jobClass, $cacheKeyPrefix)
30
    {
31 2
        $this->cacheKeyPrefix = $cacheKeyPrefix;
32 2
        $this->hostname = gethostname() ?: '';
33 2
        $this->pid = getmypid();
34
35 2
        parent::__construct($runManager, $jobTimingManager, $jobClass);
36 2
    }
37
38
    public function setRedis(RedisInterface $redis)
39
    {
40
        $this->redis = $redis;
41
    }
42
43 16
    protected function getJobCacheKey($jobId)
44
    {
45 16
        return $this->cacheKeyPrefix.'_job_'.$jobId;
46
    }
47
48 16
    protected function getJobCrcHashKey($jobCrc)
49
    {
50 16
        return $this->cacheKeyPrefix.'_job_crc_'.$jobCrc;
51
    }
52
53 12
    protected function getPriorityQueueCacheKey()
54
    {
55 12
        return $this->cacheKeyPrefix.'_priority';
56
    }
57
58 16
    protected function getWhenQueueCacheKey()
59
    {
60 16
        return $this->cacheKeyPrefix.'_when';
61
    }
62
63 12
    protected function transferQueues()
64
    {
65
        // Drains from WhenAt queue into Prioirty Queue
66 12
        $whenQueue = $this->getWhenQueueCacheKey();
67 12
        $priorityQueue = $this->getPriorityQueueCacheKey();
68 12
        $microtime = Util::getMicrotimeDecimal();
69 12
        while ($jobId = $this->redis->zPopByMaxScore($whenQueue, $microtime)) {
70 10
            $jobMessage = $this->redis->get($this->getJobCacheKey($jobId));
71 10
            if ($jobMessage) {
72 10
                $job = new \Dtc\QueueBundle\Redis\Job();
73 10
                $job->fromMessage($jobMessage);
0 ignored issues
show
Documentation introduced by
$jobMessage is of type boolean, but the function expects a string.

It seems like the type of the argument is not accepted by the function/method which you are calling.

In some cases, in particular if PHP’s automatic type-juggling kicks in this might be fine. In other cases, however this might be a bug.

We suggest to add an explicit type cast like in the following example:

function acceptsInteger($int) { }

$x = '123'; // string "123"

// Instead of
acceptsInteger($x);

// we recommend to use
acceptsInteger((integer) $x);
Loading history...
74 10
                $this->redis->zAdd($priorityQueue, $job->getPriority(), $job->getId());
75
            }
76
        }
77 12
    }
78
79
    protected function batchSave(\Dtc\QueueBundle\Redis\Job $job)
80
    {
81
        $crcHash = $job->getCrcHash();
82
        $crcCacheKey = $this->getJobCrcHashKey($crcHash);
83
        $result = $this->redis->lrange($crcCacheKey, 0, 1000);
84
        if (is_array($result)) {
85
            foreach ($result as $jobId) {
86
                $jobCacheKey1 = $this->getJobCacheKey($jobId);
87
                if (!($foundJobMessage = $this->redis->get($jobCacheKey1))) {
88
                    $this->redis->lRem($crcCacheKey, 1, $jobCacheKey1);
89
                    continue;
90
                }
91
92
                /// There is one?
93
                if ($foundJobMessage) {
94
                    $foundJob = $this->batchFoundJob($job, $jobCacheKey1, $foundJobMessage);
95
                    if ($foundJob) {
96
                        return $foundJob;
97
                    }
98
                }
99
            }
100
        }
101
102
        return null;
103
    }
104
105
    protected function batchFoundJob(\Dtc\QueueBundle\Redis\Job $job, $foundJobCacheKey, $foundJobMessage)
106
    {
107
        $when = $job->getWhenUs();
108
        $crcHash = $job->getCrcHash();
109
        $crcCacheKey = $this->getJobCrcHashKey($crcHash);
110
111
        $foundJob = new \Dtc\QueueBundle\Redis\Job();
112
        $foundJob->fromMessage($foundJobMessage);
113
        $foundWhen = $foundJob->getWhenUs();
114
115
        // Fix this using bcmath
116
        $curtimeU = Util::getMicrotimeDecimal();
117
118
        if (bccomp($foundWhen, $curtimeU) > 0 && bccomp($foundWhen, $when) > 1) {
119
            $newFoundWhen = $when;
120
        }
121
        $foundPriority = $foundJob->getPriority();
122
        if ($foundPriority < $job->getPriority()) {
123
            $newFoundPriority = $job->getPriority();
124
        }
125
126
        // Now how do we adjust this job's priority or time?
127
        $adjust = false;
128
        if (isset($newFoundWhen)) {
129
            $foundJob->setWhenUs($newFoundWhen);
130
            $adjust = true;
131
        }
132
        if (isset($newFoundPriority)) {
133
            $foundJob->setPriority($newFoundPriority);
134
            $adjust = true;
135
        }
136
        if (!$adjust) {
137
            return $foundJob;
138
        }
139
140
        return $this->addFoundJob($adjust, $foundJob, $foundJobCacheKey, $crcCacheKey);
141
    }
142
143
    protected function addFoundJob($adjust, Job $foundJob, $foundJobCacheKey, $crcCacheKey)
144
    {
145
        $whenQueue = $this->getWhenQueueCacheKey();
146
        $result = $this->adjustJob($adjust, $whenQueue, $foundJob, $foundJobCacheKey, $crcCacheKey, $foundJob->getWhenUs());
147
        if (null !== $result) {
148
            return $result;
149
        }
150
        if (null === $this->maxPriority) {
151
            return false;
152
        }
153
154
        $priorityQueue = $this->getPriorityQueueCacheKey();
155
        $result = $this->adjustJob($adjust, $priorityQueue, $foundJob, $foundJobCacheKey, $crcCacheKey, $foundJob->getPriority());
156
157
        return $result ?: false;
158
    }
159
160
    private function adjustJob($adjust, $queue, Job $foundJob, $foundJobCacheKey, $crcCacheKey, $zScore)
161
    {
162
        if ($adjust && $this->redis->zRem($queue, $foundJob->getId()) > 0) {
163
            if (!$this->insertJob($foundJob)) {
164
                // Job is expired
165
                $this->redis->lRem($crcCacheKey, 1, $foundJobCacheKey);
166
167
                return false;
168
            }
169
            $this->redis->zAdd($queue, $zScore, $foundJob->getId());
170
171
            return $foundJob;
172
        }
173
174
        return null;
175
    }
176
177
    /**
178
     * @param \Dtc\QueueBundle\Model\Job $job
179
     *
180
     * @return \Dtc\QueueBundle\Model\Job
181
     *
182
     * @throws ClassNotSubclassException
183
     */
184 16
    public function prioritySave(\Dtc\QueueBundle\Model\Job $job)
185
    {
186 16
        if (!$job instanceof \Dtc\QueueBundle\Redis\Job) {
187
            throw new \InvalidArgumentException('$job must be instance of '.\Dtc\QueueBundle\Redis\Job::class);
188
        }
189
190 16
        $this->validateSaveable($job);
191 16
        $this->setJobId($job);
192
193
        // Add to whenAt or priority queue?  /// optimizaiton...
194 16
        $whenUs = $job->getWhenUs();
195 16
        if (!$whenUs) {
196
            $whenUs = Util::getMicrotimeDecimal();
197
            $job->setWhenUs($whenUs);
198
        }
199
200 16
        if (true === $job->getBatch()) {
201
            // is there a CRC Hash already for this job
202
            if ($oldJob = $this->batchSave($job)) {
0 ignored issues
show
Bug introduced by
Are you sure the assignment to $oldJob is correct as $this->batchSave($job) (which targets Dtc\QueueBundle\Redis\JobManager::batchSave()) seems to always return null.

This check looks for function or method calls that always return null and whose return value is assigned to a variable.

class A
{
    function getObject()
    {
        return null;
    }

}

$a = new A();
$object = $a->getObject();

The method getObject() can return nothing but null, so it makes no sense to assign that value to a variable.

The reason is most likely that a function or method is imcomplete or has been reduced for debug purposes.

Loading history...
203
                return $oldJob;
204
            }
205
        }
206
207 16
        return $this->saveJob($job);
208
    }
209
210 16
    protected function saveJob(\Dtc\QueueBundle\Redis\Job $job)
211
    {
212 16
        $whenQueue = $this->getWhenQueueCacheKey();
213 16
        $crcCacheKey = $this->getJobCrcHashKey($job->getCrcHash());
214
        // Save Job
215 16
        if (!$this->insertJob($job)) {
216
            // job is expired
217 2
            return null;
218
        }
219 16
        $jobId = $job->getId();
220 16
        $when = $job->getWhenUs();
221
        // Add Job to CRC list
222 16
        $this->redis->lPush($crcCacheKey, [$jobId]);
223
224 16
        $this->redis->zAdd($whenQueue, $when, $jobId);
225
226 16
        return $job;
227
    }
228
229
    /**
230
     * @param Job $job
231
     *
232
     * @return bool false if the job is already expired, true otherwise
233
     */
234 16
    protected function insertJob(\Dtc\QueueBundle\Redis\Job $job)
235
    {
236
        // Save Job
237 16
        $jobCacheKey = $this->getJobCacheKey($job->getId());
238 16
        if ($expiresAt = $job->getExpiresAt()) {
239 2
            $expiresAtTime = $expiresAt->getTimestamp() - time();
240 2
            if ($expiresAtTime <= 0) {
241 2
                return false; /// ??? job is already expired
242
            }
243
            $this->redis->setEx($jobCacheKey, $expiresAtTime, $job->toMessage());
244
245
            return true;
246
        }
247 16
        $this->redis->set($jobCacheKey, $job->toMessage());
248
249 16
        return true;
250
    }
251
252
    /**
253
     * Attach a unique id to a job since RabbitMQ will not.
254
     *
255
     * @param \Dtc\QueueBundle\Model\Job $job
256
     */
257 16
    protected function setJobId(\Dtc\QueueBundle\Model\Job $job)
258
    {
259 16 View Code Duplication
        if (!$job->getId()) {
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated across your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
260 16
            $job->setId(uniqid($this->hostname.'-'.$this->pid, true));
261
        }
262 16
    }
263
264
    /**
265
     * Returns the prioirty in DESCENDING order, except if maxPrioirty is null, then prioirty is 0.
266
     */
267 16
    protected function calculatePriority($priority)
268
    {
269 16
        $priority = parent::calculatePriority($priority);
270 16
        if (null === $priority) {
271 14
            return null === $this->maxPriority ? 0 : $this->maxPriority;
272
        }
273
274 6
        if (null === $this->maxPriority) {
275
            return 0;
276
        }
277
278
        // Redis priority should be in DESC order
279 6
        return $this->maxPriority - $priority;
280
    }
281
282
    /**
283
     * @param \Dtc\QueueBundle\Model\Job $job
284
     *
285
     * @throws PriorityException
286
     * @throws ClassNotSubclassException
287
     */
288 16 View Code Duplication
    protected function validateSaveable(\Dtc\QueueBundle\Model\Job $job)
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
289
    {
290 16
        if (null !== $job->getPriority() && null === $this->maxPriority) {
291
            throw new PriorityException('This queue does not support priorities');
292
        }
293
294 16
        if (!$job instanceof RetryableJob) {
295
            throw new ClassNotSubclassException('Job needs to be instance of '.RetryableJob::class);
296
        }
297 16
    }
298
299 14
    protected function verifyGetJobArgs($workerName = null, $methodName = null, $prioritize = true)
300
    {
301 14 View Code Duplication
        if (null !== $workerName || null !== $methodName || (null !== $this->maxPriority && true !== $prioritize)) {
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated across your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
302 4
            throw new UnsupportedException('Unsupported');
303
        }
304 12
    }
305
306 2
    public function deleteJob(\Dtc\QueueBundle\Model\Job $job)
307
    {
308 2
        $jobId = $job->getId();
309 2
        $priorityQueue = $this->getPriorityQueueCacheKey();
310 2
        $whenQueue = $this->getWhenQueueCacheKey();
311
312 2
        $deleted = false;
313 2
        if ($this->redis->zRem($priorityQueue, $jobId)) {
314
            $deleted = true;
315 2
        } elseif ($this->redis->zRem($whenQueue, $jobId)) {
316 2
            $deleted = true;
317
        }
318
319 2
        if ($deleted) {
320 2
            $this->redis->del([$this->getJobCacheKey($jobId)]);
321 2
            $this->redis->lRem($this->getJobCrcHashKey($job->getCrcHash()), 1, $jobId);
322
        }
323 2
    }
324
325
    /**
326
     * @param string $workerName
327
     */
328 14
    public function getJob($workerName = null, $methodName = null, $prioritize = true, $runId = null)
329
    {
330
        // First thing migrate any jobs from When queue to Prioirty queue
331
332 14
        $this->verifyGetJobArgs($workerName, $methodName, $prioritize);
333 12
        if (null !== $this->maxPriority) {
334 12
            $this->transferQueues();
335 12
            $queue = $this->getPriorityQueueCacheKey();
336 12
            $jobId = $this->redis->zPop($queue);
337
        } else {
338
            $queue = $this->getWhenQueueCacheKey();
339
            $microtime = Util::getMicrotimeDecimal();
340
            $jobId = $this->redis->zPopByMaxScore($queue, $microtime);
341
        }
342
343 12
        if ($jobId) {
344 10
            $jobMessage = $this->redis->get($this->getJobCacheKey($jobId));
345 10
            $job = new \Dtc\QueueBundle\Redis\Job();
346 10
            $job->fromMessage($jobMessage);
0 ignored issues
show
Documentation introduced by
$jobMessage is of type boolean, but the function expects a string.

It seems like the type of the argument is not accepted by the function/method which you are calling.

In some cases, in particular if PHP’s automatic type-juggling kicks in this might be fine. In other cases, however this might be a bug.

We suggest to add an explicit type cast like in the following example:

function acceptsInteger($int) { }

$x = '123'; // string "123"

// Instead of
acceptsInteger($x);

// we recommend to use
acceptsInteger((integer) $x);
Loading history...
347 10
            $crcCacheKey = $this->getJobCrcHashKey($job->getCrcHash());
348 10
            $this->redis->lRem($crcCacheKey, 1, $job->getId());
349 10
            $this->redis->del([$this->getJobCacheKey($job->getId())]);
350
351 10
            return $job;
352
        }
353
354 8
        return null;
355
    }
356
357
    protected function getCurTime()
358
    {
359
        $time = intval(microtime(true) * 1000000);
360
361
        return $time;
362
    }
363
364 4 View Code Duplication
    public function resetJob(RetryableJob $job)
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
365
    {
366 4
        if (!$job instanceof \Dtc\QueueBundle\Redis\Job) {
367
            throw new \InvalidArgumentException('$job must be instance of '.\Dtc\QueueBundle\Redis\Job::class);
368
        }
369 4
        $job->setStatus(BaseJob::STATUS_NEW);
370 4
        $job->setMessage(null);
371 4
        $job->setStartedAt(null);
372 4
        $job->setRetries($job->getRetries() + 1);
373 4
        $job->setUpdatedAt(Util::getMicrotimeDateTime());
374 4
        $this->saveJob($job);
375
376 4
        return true;
377
    }
378
379 6
    public function retryableSaveHistory(RetryableJob $job, $retry)
380
    {
381 6
    }
382
}
383