Completed
Pull Request — master (#30)
by Matthew
13:27 queued 10:46
created

JobManager::batchFoundJob()   C

Complexity

Conditions 14
Paths 112

Size

Total Lines 67
Code Lines 39

Duplication

Lines 22
Ratio 32.84 %

Code Coverage

Tests 0
CRAP Score 210

Importance

Changes 0
Metric Value
dl 22
loc 67
ccs 0
cts 51
cp 0
rs 5.5095
c 0
b 0
f 0
cc 14
eloc 39
nc 112
nop 3
crap 210

How to fix   Long Method    Complexity   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

1
<?php
2
3
namespace Dtc\QueueBundle\Redis;
4
5
use Dtc\QueueBundle\Exception\ClassNotSubclassException;
6
use Dtc\QueueBundle\Exception\PriorityException;
7
use Dtc\QueueBundle\Exception\UnsupportedException;
8
use Dtc\QueueBundle\Manager\JobTimingManager;
9
use Dtc\QueueBundle\Manager\PriorityJobManager;
10
use Dtc\QueueBundle\Manager\RunManager;
11
use Dtc\QueueBundle\Model\BaseJob;
12
use Dtc\QueueBundle\Model\Job;
0 ignored issues
show
Bug introduced by
This use statement conflicts with another class in this namespace, Dtc\QueueBundle\Redis\Job.

Let’s assume that you have a directory layout like this:

.
|-- OtherDir
|   |-- Bar.php
|   `-- Foo.php
`-- SomeDir
    `-- Foo.php

and let’s assume the following content of Bar.php:

// Bar.php
namespace OtherDir;

use SomeDir\Foo; // This now conflicts the class OtherDir\Foo

If both files OtherDir/Foo.php and SomeDir/Foo.php are loaded in the same runtime, you will see a PHP error such as the following:

PHP Fatal error:  Cannot use SomeDir\Foo as Foo because the name is already in use in OtherDir/Foo.php

However, as OtherDir/Foo.php does not necessarily have to be loaded and the error is only triggered if it is loaded before OtherDir/Bar.php, this problem might go unnoticed for a while. In order to prevent this error from surfacing, you must import the namespace with a different alias:

// Bar.php
namespace OtherDir;

use SomeDir\Foo as SomeDirFoo; // There is no conflict anymore.
Loading history...
13
use Dtc\QueueBundle\Model\RetryableJob;
14
use Dtc\QueueBundle\Util\Util;
15
16
/**
17
 * For future implementation.
18
 */
19
class JobManager extends PriorityJobManager
20
{
21
    /** @var RedisInterface */
22
    protected $redis;
23
24
    /** @var string */
25
    protected $cacheKeyPrefix;
26
27
    protected $hostname;
28
    protected $pid;
29
30
    public function __construct(RunManager $runManager, JobTimingManager $jobTimingManager, $jobClass, $cacheKeyPrefix)
31
    {
32
        $this->cacheKeyPrefix = $cacheKeyPrefix;
33
        $this->hostname = gethostname() ?: '';
34
        $this->pid = getmypid();
35
36
        parent::__construct($runManager, $jobTimingManager, $jobClass);
37
    }
38
39
    public function setRedis(RedisInterface $redis)
40
    {
41
        $this->redis = $redis;
42
    }
43
44
    protected function getJobCacheKey($jobId)
45
    {
46
        return $this->cacheKeyPrefix.'_job_'.$jobId;
47
    }
48
49
    protected function getJobCrcHashKey($jobCrc)
50
    {
51
        return $this->cacheKeyPrefix.'_job_crc_'.$jobCrc;
52
    }
53
54
    protected function getPriorityQueueCacheKey()
55
    {
56
        return $this->cacheKeyPrefix.'_priority';
57
    }
58
59
    protected function getWhenQueueCacheKey()
60
    {
61
        return $this->cacheKeyPrefix.'_when';
62
    }
63
64
    protected function transferQueues()
65
    {
66
        // Drains from WhenAt queue into Prioirty Queue
67
        $whenQueue = $this->getWhenQueueCacheKey();
68
        $priorityQueue = $this->getPriorityQueueCacheKey();
69
        $microtime = Util::getMicrotimeDecimal();
70
        while ($jobId = $this->redis->zPopByMaxScore($whenQueue, $microtime)) {
71
            $jobMessage = $this->redis->get($this->getJobCacheKey($jobId));
72
            if ($jobMessage) {
73
                $job = new \Dtc\QueueBundle\Redis\Job();
74
                $job->fromMessage($jobMessage);
75
                $this->redis->zAdd($priorityQueue, $job->getPriority(), $job->getId());
76
            }
77
        }
78
    }
79
80
    protected function batchSave(\Dtc\QueueBundle\Redis\Job $job)
81
    {
82
        $crcHash = $job->getCrcHash();
83
        $crcCacheKey = $this->getJobCrcHashKey($crcHash);
84
        $result = $this->redis->lrange($crcCacheKey, 0, 1000);
85
        if (is_array($result)) {
86
            foreach ($result as $jobId) {
87
                $jobCacheKey1 = $this->getJobCacheKey($jobId);
88
                if (!($foundJobMessage = $this->redis->get($jobCacheKey1))) {
89
                    $this->redis->lRem($crcCacheKey, 1, $jobCacheKey1);
90
                    continue;
91
                }
92
93
                /// There is one?
94
                if ($foundJobMessage) {
95
                    $foundJob = $this->batchFoundJob($job, $jobCacheKey1, $foundJobMessage);
96
                    if ($foundJob) {
97
                        return $foundJob;
98
                    }
99
                }
100
            }
101
        }
102
103
        return null;
104
    }
105
106
    protected function batchFoundJob(\Dtc\QueueBundle\Redis\Job $job, $foundJobCacheKey, $foundJobMessage)
107
    {
108
        $when = $job->getWhenUs();
109
        $crcHash = $job->getCrcHash();
110
        $crcCacheKey = $this->getJobCrcHashKey($crcHash);
111
112
        $foundJob = new \Dtc\QueueBundle\Redis\Job();
113
        $foundJob->fromMessage($foundJobMessage);
114
        $foundWhen = $foundJob->getWhenUs();
115
116
        // Fix this using bcmath
117
        $curtimeU = Util::getMicrotimeDecimal();
118
119
        if (bccomp($foundWhen, $curtimeU) > 0 && bccomp($foundWhen, $when) > 1) {
120
            $newFoundWhen = $when;
121
        }
122
        $foundPriority = $foundJob->getPriority();
123
        if ($foundPriority < $job->getPriority()) {
124
            $newFoundPriority = $job->getPriority();
125
        }
126
127
        // Now how do we adjust this job's priority or time?
128
        $adjust = false;
129
        if (isset($newFoundWhen)) {
130
            $foundJob->setWhenUs($newFoundWhen);
131
            $adjust = true;
132
        }
133
        if (isset($newFoundPriority)) {
134
            $foundJob->setPriority($newFoundPriority);
135
            $adjust = true;
136
        }
137
        if (!$adjust) {
138
            return $foundJob;
139
        }
140
141
        $whenQueue = $this->getWhenQueueCacheKey();
142 View Code Duplication
        if ($adjust && $this->redis->zRem($whenQueue, $foundJob->getId()) > 0) {
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated across your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
143
            if (!$this->insertJob($foundJob)) {
144
                // Job is expired
145
                $this->redis->lRem($crcCacheKey, 1, $foundJobCacheKey);
146
147
                return false;
148
            }
149
            $this->redis->zAdd($whenQueue, $foundJob->getWhenUs(), $foundJob->toMessage());
150
151
            return $foundJob;
152
        }
153
154
        if (null === $this->maxPriority) {
155
            return false;
156
        }
157
158
        $priorityQueue = $this->getPriorityQueueCacheKey();
159 View Code Duplication
        if ($adjust && $this->redis->zRem($priorityQueue, $foundJob->getId()) > 0) {
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated across your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
160
            if (!$this->insertJob($foundJob)) {
161
                // Job is expired
162
                $this->redis->lRem($crcCacheKey, 1, $foundJobCacheKey);
163
164
                return false;
165
            }
166
            $this->redis->zAdd($priorityQueue, $foundJob->getPriority(), $foundJob->toMessage());
167
168
            return $foundJob;
169
        }
170
171
        return false;
172
    }
173
174
    /**
175
     * @param \Dtc\QueueBundle\Model\Job $job
176
     *
177
     * @return \Dtc\QueueBundle\Model\Job
178
     *
179
     * @throws ClassNotSubclassException
180
     */
181
    public function prioritySave(\Dtc\QueueBundle\Model\Job $job)
182
    {
183
        if (!$job instanceof \Dtc\QueueBundle\Redis\Job) {
184
            throw new \InvalidArgumentException('$job must be instance of '.\Dtc\QueueBundle\Redis\Job::class);
185
        }
186
187
        $this->validateSaveable($job);
188
        $this->setJobId($job);
189
190
        // Add to whenAt or priority queue?  /// optimizaiton...
191
        $whenUs = $job->getWhenUs();
192
        if (!$whenUs) {
193
            $whenUs = Util::getMicrotimeDecimal();
194
            $job->setWhenUs($whenUs);
195
        }
196
197
        if (true === $job->getBatch()) {
198
            // is there a CRC Hash already for this job
199
            if ($oldJob = $this->batchSave($job)) {
0 ignored issues
show
Bug introduced by
Are you sure the assignment to $oldJob is correct as $this->batchSave($job) (which targets Dtc\QueueBundle\Redis\JobManager::batchSave()) seems to always return null.

This check looks for function or method calls that always return null and whose return value is assigned to a variable.

class A
{
    function getObject()
    {
        return null;
    }

}

$a = new A();
$object = $a->getObject();

The method getObject() can return nothing but null, so it makes no sense to assign that value to a variable.

The reason is most likely that a function or method is imcomplete or has been reduced for debug purposes.

Loading history...
200
                return $oldJob;
201
            }
202
        }
203
204
        return $this->saveJob($job);
205
    }
206
207
    protected function saveJob(\Dtc\QueueBundle\Redis\Job $job)
208
    {
209
        $whenQueue = $this->getWhenQueueCacheKey();
210
        $crcCacheKey = $this->getJobCrcHashKey($job->getCrcHash());
211
        // Save Job
212
        if (!$this->insertJob($job)) {
213
            // job is expired
214
            return null;
215
        }
216
        $jobId = $job->getId();
217
        $when = $job->getWhenUs();
218
        // Add Job to CRC list
219
        $this->redis->lPush($crcCacheKey, [$jobId]);
220
221
        $this->redis->zAdd($whenQueue, $when, $jobId);
222
223
        return $job;
224
    }
225
226
    protected function insertJob(\Dtc\QueueBundle\Redis\Job $job)
227
    {
228
        // Save Job
229
        $jobCacheKey = $this->getJobCacheKey($job->getId());
230
        if ($expiresAt = $job->getExpiresAt()) {
231
            $expiresAtTime = $expiresAt->getTimestamp() - time();
232
            if ($expiresAtTime <= 0) {
233
                return false; /// ??? job is already expired
234
            }
235
            $this->redis->setEx($jobCacheKey, $expiresAtTime, $job->toMessage());
236
237
            return true;
238
        }
239
        $this->redis->set($jobCacheKey, $job->toMessage());
240
241
        return true;
242
    }
243
244
    /**
245
     * Attach a unique id to a job since RabbitMQ will not.
246
     *
247
     * @param \Dtc\QueueBundle\Model\Job $job
248
     */
249
    protected function setJobId(\Dtc\QueueBundle\Model\Job $job)
250
    {
251 View Code Duplication
        if (!$job->getId()) {
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated across your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
252
            $job->setId(uniqid($this->hostname.'-'.$this->pid, true));
253
        }
254
    }
255
256
    /**
257
     * Returns the prioirty in DESCENDING order, except if maxPrioirty is null, then prioirty is 0.
258
     */
259
    protected function calculatePriority($priority)
260
    {
261
        $priority = parent::calculatePriority($priority);
262
        if (null === $priority) {
263
            return null === $this->maxPriority ? 0 : $this->maxPriority;
264
        }
265
266
        if (null === $this->maxPriority) {
267
            return 0;
268
        }
269
270
        // Redis priority should be in DESC order
271
        return $this->maxPriority - $priority;
272
    }
273
274
    /**
275
     * @param \Dtc\QueueBundle\Model\Job $job
276
     *
277
     * @throws PriorityException
278
     * @throws ClassNotSubclassException
279
     */
280 View Code Duplication
    protected function validateSaveable(\Dtc\QueueBundle\Model\Job $job)
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
281
    {
282
        if (null !== $job->getPriority() && null === $this->maxPriority) {
283
            throw new PriorityException('This queue does not support priorities');
284
        }
285
286
        if (!$job instanceof RetryableJob) {
287
            throw new ClassNotSubclassException('Job needs to be instance of '.RetryableJob::class);
288
        }
289
    }
290
291
    protected function verifyGetJobArgs($workerName = null, $methodName = null, $prioritize = true)
292
    {
293 View Code Duplication
        if (null !== $workerName || null !== $methodName || (null !== $this->maxPriority && true !== $prioritize)) {
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated across your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
294
            throw new UnsupportedException('Unsupported');
295
        }
296
    }
297
298
    public function deleteJob(Job $job)
299
    {
300
        $jobId = $job->getId();
301
        $priorityQueue = $this->getPriorityQueueCacheKey();
302
        $whenQueue = $this->getWhenQueueCacheKey();
303
304
        $deleted = false;
305
        if ($this->redis->zRem($priorityQueue, $jobId)) {
306
            $deleted = true;
307
        } elseif ($this->redis->zRem($whenQueue, $jobId)) {
308
            $deleted = true;
309
        }
310
311
        if ($deleted) {
312
            $this->redis->del([$this->getJobCacheKey($jobId)]);
313
            $this->redis->lRem($this->getJobCrcHashKey($job->getCrcHash()), 1, $jobId);
314
        }
315
    }
316
317
    /**
318
     * @param string $workerName
319
     */
320
    public function getJob($workerName = null, $methodName = null, $prioritize = true, $runId = null)
321
    {
322
        // First thing migrate any jobs from When queue to Prioirty queue
323
324
        $this->verifyGetJobArgs($workerName, $methodName, $prioritize);
325
        if (null !== $this->maxPriority) {
326
            $this->transferQueues();
327
            $queue = $this->getPriorityQueueCacheKey();
328
            $jobId = $this->redis->zPop($queue);
329
        } else {
330
            $queue = $this->getWhenQueueCacheKey();
331
            $microtime = Util::getMicrotimeDecimal();
332
            $jobId = $this->redis->zPopByMaxScore($queue, $microtime);
333
        }
334
335
        if ($jobId) {
336
            $jobMessage = $this->redis->get($this->getJobCacheKey($jobId));
337
            $job = new \Dtc\QueueBundle\Redis\Job();
338
            $job->fromMessage($jobMessage);
339
            $crcCacheKey = $this->getJobCrcHashKey($job->getCrcHash());
340
            $this->redis->lRem($crcCacheKey, 1, $job->getId());
341
            $this->redis->del([$this->getJobCacheKey($job->getId())]);
342
343
            return $job;
344
        }
345
346
        return null;
347
    }
348
349
    protected function getCurTime()
350
    {
351
        $time = intval(microtime(true) * 1000000);
352
353
        return $time;
354
    }
355
356 View Code Duplication
    public function resetJob(RetryableJob $job)
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
357
    {
358
        if (!$job instanceof \Dtc\QueueBundle\Redis\Job) {
359
            throw new \InvalidArgumentException('$job must be instance of '.\Dtc\QueueBundle\Redis\Job::class);
360
        }
361
        $job->setStatus(BaseJob::STATUS_NEW);
362
        $job->setMessage(null);
363
        $job->setStartedAt(null);
364
        $job->setRetries($job->getRetries() + 1);
365
        $job->setUpdatedAt(new \DateTime());
366
        $this->saveJob($job);
367
368
        return true;
369
    }
370
371
    public function retryableSaveHistory(RetryableJob $job, $retry)
372
    {
373
    }
374
}
375