Completed
Pull Request — master (#30)
by Matthew
18:57 queued 16:20
created

JobManager::batchFoundJob()   C

Complexity

Conditions 14
Paths 112

Size

Total Lines 67
Code Lines 39

Duplication

Lines 22
Ratio 32.84 %

Code Coverage

Tests 0
CRAP Score 210

Importance

Changes 0
Metric Value
dl 22
loc 67
ccs 0
cts 39
cp 0
rs 5.5095
c 0
b 0
f 0
cc 14
eloc 39
nc 112
nop 3
crap 210

How to fix   Long Method    Complexity   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

1
<?php
2
3
namespace Dtc\QueueBundle\Redis;
4
5
use Dtc\QueueBundle\Exception\ClassNotSubclassException;
6
use Dtc\QueueBundle\Exception\PriorityException;
7
use Dtc\QueueBundle\Exception\UnsupportedException;
8
use Dtc\QueueBundle\Manager\JobTimingManager;
9
use Dtc\QueueBundle\Manager\PriorityJobManager;
10
use Dtc\QueueBundle\Manager\RunManager;
11
use Dtc\QueueBundle\Model\BaseJob;
12
use Dtc\QueueBundle\Model\Job;
0 ignored issues
show
Bug introduced by
This use statement conflicts with another class in this namespace, Dtc\QueueBundle\Redis\Job.

Let’s assume that you have a directory layout like this:

.
|-- OtherDir
|   |-- Bar.php
|   `-- Foo.php
`-- SomeDir
    `-- Foo.php

and let’s assume the following content of Bar.php:

// Bar.php
namespace OtherDir;

use SomeDir\Foo; // This now conflicts the class OtherDir\Foo

If both files OtherDir/Foo.php and SomeDir/Foo.php are loaded in the same runtime, you will see a PHP error such as the following:

PHP Fatal error:  Cannot use SomeDir\Foo as Foo because the name is already in use in OtherDir/Foo.php

However, as OtherDir/Foo.php does not necessarily have to be loaded and the error is only triggered if it is loaded before OtherDir/Bar.php, this problem might go unnoticed for a while. In order to prevent this error from surfacing, you must import the namespace with a different alias:

// Bar.php
namespace OtherDir;

use SomeDir\Foo as SomeDirFoo; // There is no conflict anymore.
Loading history...
13
use Dtc\QueueBundle\Model\RetryableJob;
14
use Dtc\QueueBundle\Util\Util;
15
16
/**
17
 * For future implementation.
18
 */
19
class JobManager extends PriorityJobManager
20
{
21
    /** @var RedisInterface */
22
    protected $redis;
23
24
    /** @var string */
25
    protected $cacheKeyPrefix;
26
27
    protected $hostname;
28
    protected $pid;
29
30 1
    public function __construct(RunManager $runManager, JobTimingManager $jobTimingManager, $jobClass, $cacheKeyPrefix)
31
    {
32 1
        $this->cacheKeyPrefix = $cacheKeyPrefix;
33 1
        $this->hostname = gethostname() ?: '';
34 1
        $this->pid = getmypid();
35
36 1
        parent::__construct($runManager, $jobTimingManager, $jobClass);
37 1
    }
38
39
    public function setRedis(RedisInterface $redis)
40
    {
41
        $this->redis = $redis;
42
    }
43
44 8
    protected function getJobCacheKey($jobId)
45
    {
46 8
        return $this->cacheKeyPrefix.'_job_'.$jobId;
47
    }
48
49 8
    protected function getJobCrcHashKey($jobCrc)
50
    {
51 8
        return $this->cacheKeyPrefix.'_job_crc_'.$jobCrc;
52
    }
53
54 6
    protected function getPriorityQueueCacheKey()
55
    {
56 6
        return $this->cacheKeyPrefix.'_priority';
57
    }
58
59 8
    protected function getWhenQueueCacheKey()
60
    {
61 8
        return $this->cacheKeyPrefix.'_when';
62
    }
63
64 6
    protected function transferQueues()
65
    {
66
        // Drains from WhenAt queue into Prioirty Queue
67 6
        $whenQueue = $this->getWhenQueueCacheKey();
68 6
        $priorityQueue = $this->getPriorityQueueCacheKey();
69 6
        $microtime = Util::getMicrotimeDecimal();
70 6
        while ($jobId = $this->redis->zPopByMaxScore($whenQueue, $microtime)) {
71 5
            $jobMessage = $this->redis->get($this->getJobCacheKey($jobId));
72 5
            if ($jobMessage) {
73 5
                $job = new \Dtc\QueueBundle\Redis\Job();
74 5
                $job->fromMessage($jobMessage);
75 5
                $this->redis->zAdd($priorityQueue, $job->getPriority(), $job->getId());
76
            }
77
        }
78 6
    }
79
80
    protected function batchSave(\Dtc\QueueBundle\Redis\Job $job)
81
    {
82
        $crcHash = $job->getCrcHash();
83
        $crcCacheKey = $this->getJobCrcHashKey($crcHash);
84
        $result = $this->redis->lrange($crcCacheKey, 0, 1000);
85
        if (is_array($result)) {
86
            foreach ($result as $jobId) {
87
                $jobCacheKey1 = $this->getJobCacheKey($jobId);
88
                if (!($foundJobMessage = $this->redis->get($jobCacheKey1))) {
89
                    $this->redis->lRem($crcCacheKey, 1, $jobCacheKey1);
90
                    continue;
91
                }
92
93
                /// There is one?
94
                if ($foundJobMessage) {
95
                    $foundJob = $this->batchFoundJob($job, $jobCacheKey1, $foundJobMessage);
96
                    if ($foundJob) {
97
                        return $foundJob;
98
                    }
99
                }
100
            }
101
        }
102
103
        return null;
104
    }
105
106
    protected function batchFoundJob(\Dtc\QueueBundle\Redis\Job $job, $foundJobCacheKey, $foundJobMessage)
107
    {
108
        $when = $job->getWhenUs();
109
        $crcHash = $job->getCrcHash();
110
        $crcCacheKey = $this->getJobCrcHashKey($crcHash);
111
112
        $foundJob = new \Dtc\QueueBundle\Redis\Job();
113
        $foundJob->fromMessage($foundJobMessage);
114
        $foundWhen = $foundJob->getWhenUs();
115
116
        // Fix this using bcmath
117
        $curtimeU = Util::getMicrotimeDecimal();
118
119
        if (bccomp($foundWhen, $curtimeU) > 0 && bccomp($foundWhen, $when) > 1) {
120
            $newFoundWhen = $when;
121
        }
122
        $foundPriority = $foundJob->getPriority();
123
        if ($foundPriority < $job->getPriority()) {
124
            $newFoundPriority = $job->getPriority();
125
        }
126
127
        // Now how do we adjust this job's priority or time?
128
        $adjust = false;
129
        if (isset($newFoundWhen)) {
130
            $foundJob->setWhenUs($newFoundWhen);
131
            $adjust = true;
132
        }
133
        if (isset($newFoundPriority)) {
134
            $foundJob->setPriority($newFoundPriority);
135
            $adjust = true;
136
        }
137
        if (!$adjust) {
138
            return $foundJob;
139
        }
140
141
        $whenQueue = $this->getWhenQueueCacheKey();
142 View Code Duplication
        if ($adjust && $this->redis->zRem($whenQueue, $foundJob->getId()) > 0) {
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated across your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
143
            if (!$this->insertJob($foundJob)) {
144
                // Job is expired
145
                $this->redis->lRem($crcCacheKey, 1, $foundJobCacheKey);
146
147
                return false;
148
            }
149
            $this->redis->zAdd($whenQueue, $foundJob->getWhenUs(), $foundJob->toMessage());
150
151
            return $foundJob;
152
        }
153
154
        if (null === $this->maxPriority) {
155
            return false;
156
        }
157
158
        $priorityQueue = $this->getPriorityQueueCacheKey();
159 View Code Duplication
        if ($adjust && $this->redis->zRem($priorityQueue, $foundJob->getId()) > 0) {
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated across your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
160
            if (!$this->insertJob($foundJob)) {
161
                // Job is expired
162
                $this->redis->lRem($crcCacheKey, 1, $foundJobCacheKey);
163
164
                return false;
165
            }
166
            $this->redis->zAdd($priorityQueue, $foundJob->getPriority(), $foundJob->toMessage());
167
168
            return $foundJob;
169
        }
170
171
        return false;
172
    }
173
174
    /**
175
     * @param \Dtc\QueueBundle\Model\Job $job
176
     *
177
     * @return \Dtc\QueueBundle\Model\Job
178
     *
179
     * @throws ClassNotSubclassException
180
     */
181 8
    public function prioritySave(\Dtc\QueueBundle\Model\Job $job)
182
    {
183 8
        if (!$job instanceof \Dtc\QueueBundle\Redis\Job) {
184
            throw new \InvalidArgumentException('$job must be instance of '.\Dtc\QueueBundle\Redis\Job::class);
185
        }
186
187 8
        $this->validateSaveable($job);
188 8
        $this->setJobId($job);
189
190
        // Add to whenAt or priority queue?  /// optimizaiton...
191 8
        $whenUs = $job->getWhenUs();
192 8
        if (!$whenUs) {
193
            $whenUs = Util::getMicrotimeDecimal();
194
            $job->setWhenUs($whenUs);
195
        }
196
197 8
        if (true === $job->getBatch()) {
198
            // is there a CRC Hash already for this job
199
            if ($oldJob = $this->batchSave($job)) {
0 ignored issues
show
Bug introduced by
Are you sure the assignment to $oldJob is correct as $this->batchSave($job) (which targets Dtc\QueueBundle\Redis\JobManager::batchSave()) seems to always return null.

This check looks for function or method calls that always return null and whose return value is assigned to a variable.

class A
{
    function getObject()
    {
        return null;
    }

}

$a = new A();
$object = $a->getObject();

The method getObject() can return nothing but null, so it makes no sense to assign that value to a variable.

The reason is most likely that a function or method is imcomplete or has been reduced for debug purposes.

Loading history...
200
                return $oldJob;
201
            }
202
        }
203
204 8
        return $this->saveJob($job);
205
    }
206
207 8
    protected function saveJob(\Dtc\QueueBundle\Redis\Job $job)
208
    {
209 8
        $whenQueue = $this->getWhenQueueCacheKey();
210 8
        $crcCacheKey = $this->getJobCrcHashKey($job->getCrcHash());
211
        // Save Job
212 8
        if (!$this->insertJob($job)) {
213
            // job is expired
214 1
            return null;
215
        }
216 8
        $jobId = $job->getId();
217 8
        $when = $job->getWhenUs();
218
        // Add Job to CRC list
219 8
        $this->redis->lPush($crcCacheKey, [$jobId]);
220
221 8
        $this->redis->zAdd($whenQueue, $when, $jobId);
222
223 8
        return $job;
224
    }
225
226 8
    protected function insertJob(\Dtc\QueueBundle\Redis\Job $job)
227
    {
228
        // Save Job
229 8
        $jobCacheKey = $this->getJobCacheKey($job->getId());
230 8
        if ($expiresAt = $job->getExpiresAt()) {
231 1
            $expiresAtTime = $expiresAt->getTimestamp() - time();
232 1
            if ($expiresAtTime <= 0) {
233 1
                return false; /// ??? job is already expired
234
            }
235
            $this->redis->setEx($jobCacheKey, $expiresAtTime, $job->toMessage());
236
237
            return true;
238
        }
239 8
        $this->redis->set($jobCacheKey, $job->toMessage());
240
241 8
        return true;
242
    }
243
244
    /**
245
     * Attach a unique id to a job since RabbitMQ will not.
246
     *
247
     * @param \Dtc\QueueBundle\Model\Job $job
248
     */
249 8
    protected function setJobId(\Dtc\QueueBundle\Model\Job $job)
250
    {
251 8 View Code Duplication
        if (!$job->getId()) {
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated across your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
252 8
            $job->setId(uniqid($this->hostname.'-'.$this->pid, true));
253
        }
254 8
    }
255
256
    /**
257
     * Returns the prioirty in DESCENDING order, except if maxPrioirty is null, then prioirty is 0.
258
     */
259 8
    protected function calculatePriority($priority)
260
    {
261 8
        $priority = parent::calculatePriority($priority);
262 8
        if (null === $priority) {
263 7
            return null === $this->maxPriority ? 0 : $this->maxPriority;
264
        }
265
266 3
        if (null === $this->maxPriority) {
267
            return 0;
268
        }
269
270
        // Redis priority should be in DESC order
271 3
        return $this->maxPriority - $priority;
272
    }
273
274
    /**
275
     * @param \Dtc\QueueBundle\Model\Job $job
276
     *
277
     * @throws PriorityException
278
     * @throws ClassNotSubclassException
279
     */
280 8 View Code Duplication
    protected function validateSaveable(\Dtc\QueueBundle\Model\Job $job)
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
281
    {
282 8
        if (null !== $job->getPriority() && null === $this->maxPriority) {
283
            throw new PriorityException('This queue does not support priorities');
284
        }
285
286 8
        if (!$job instanceof RetryableJob) {
287
            throw new ClassNotSubclassException('Job needs to be instance of '.RetryableJob::class);
288
        }
289 8
    }
290
291 7
    protected function verifyGetJobArgs($workerName = null, $methodName = null, $prioritize = true)
292
    {
293 7 View Code Duplication
        if (null !== $workerName || null !== $methodName || (null !== $this->maxPriority && true !== $prioritize)) {
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated across your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
294 2
            throw new UnsupportedException('Unsupported');
295
        }
296 6
    }
297
298 1
    public function deleteJob(Job $job)
299
    {
300 1
        $jobId = $job->getId();
301 1
        $priorityQueue = $this->getPriorityQueueCacheKey();
302 1
        $whenQueue = $this->getWhenQueueCacheKey();
303
304 1
        $deleted = false;
305 1
        if ($this->redis->zRem($priorityQueue, $jobId)) {
306
            $deleted = true;
307 1
        } elseif ($this->redis->zRem($whenQueue, $jobId)) {
308 1
            $deleted = true;
309
        }
310
311 1
        if ($deleted) {
312 1
            $this->redis->del([$this->getJobCacheKey($jobId)]);
313 1
            $this->redis->lRem($this->getJobCrcHashKey($job->getCrcHash()), 1, $jobId);
314
        }
315 1
    }
316
317
    /**
318
     * @param string $workerName
319
     */
320 7
    public function getJob($workerName = null, $methodName = null, $prioritize = true, $runId = null)
321
    {
322
        // First thing migrate any jobs from When queue to Prioirty queue
323
324 7
        $this->verifyGetJobArgs($workerName, $methodName, $prioritize);
325 6
        if (null !== $this->maxPriority) {
326 6
            $this->transferQueues();
327 6
            $queue = $this->getPriorityQueueCacheKey();
328 6
            $jobId = $this->redis->zPop($queue);
329
        } else {
330
            $queue = $this->getWhenQueueCacheKey();
331
            $microtime = Util::getMicrotimeDecimal();
332
            $jobId = $this->redis->zPopByMaxScore($queue, $microtime);
333
        }
334
335 6
        if ($jobId) {
336 5
            $jobMessage = $this->redis->get($this->getJobCacheKey($jobId));
337 5
            $job = new \Dtc\QueueBundle\Redis\Job();
338 5
            $job->fromMessage($jobMessage);
339 5
            $crcCacheKey = $this->getJobCrcHashKey($job->getCrcHash());
340 5
            $this->redis->lRem($crcCacheKey, 1, $job->getId());
341 5
            $this->redis->del([$this->getJobCacheKey($job->getId())]);
342
343 5
            return $job;
344
        }
345
346 4
        return null;
347
    }
348
349
    protected function getCurTime()
350
    {
351
        $time = intval(microtime(true) * 1000000);
352
353
        return $time;
354
    }
355
356 2 View Code Duplication
    public function resetJob(RetryableJob $job)
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
357
    {
358 2
        if (!$job instanceof \Dtc\QueueBundle\Redis\Job) {
359
            throw new \InvalidArgumentException('$job must be instance of '.\Dtc\QueueBundle\Redis\Job::class);
360
        }
361 2
        $job->setStatus(BaseJob::STATUS_NEW);
362 2
        $job->setMessage(null);
363 2
        $job->setStartedAt(null);
364 2
        $job->setRetries($job->getRetries() + 1);
365 2
        $job->setUpdatedAt(new \DateTime());
366 2
        $this->saveJob($job);
367
368 2
        return true;
369
    }
370
371 3
    public function retryableSaveHistory(RetryableJob $job, $retry)
372
    {
373 3
    }
374
}
375