Completed
Push — master ( b0a5e6...70c7f4 )
by Matthew
07:14
created

JobManager   C

Complexity

Total Complexity 70

Size/Duplication

Total Lines 425
Duplicated Lines 5.65 %

Coupling/Cohesion

Components 1
Dependencies 8

Test Coverage

Coverage 88.13%

Importance

Changes 1
Bugs 0 Features 0
Metric Value
wmc 70
lcom 1
cbo 8
dl 24
loc 425
ccs 193
cts 219
cp 0.8813
rs 5.6163
c 1
b 0
f 0

21 Methods

Rating   Name   Duplication   Size   Complexity  
A transferQueues() 0 15 3
B batchSave() 0 25 6
B batchFoundJob() 0 24 4
A finishBatchFoundJob() 0 18 4
A addFoundJob() 0 16 4
A adjustJob() 0 16 4
B prioritySave() 0 25 5
A saveJob() 0 18 2
A insertJob() 0 17 3
A calculatePriority() 0 14 4
A validateSaveable() 10 10 4
A deleteJob() 0 11 1
A getJob() 0 21 3
A retrieveJob() 0 14 2
A getWaitingJobCount() 0 11 2
A resetJob() 14 14 2
A collateStatusResults() 0 15 3
B extractStatusResults() 0 17 5
A extractStatusHashResults() 0 14 4
A getStatus() 0 21 4
A retryableSaveHistory() 0 10 1

How to fix   Duplicated Code    Complexity   

Duplicated Code

Duplicate code is one of the most pungent code smells. A rule that is often used is to re-structure code once it is duplicated in three or more places.

Common duplication problems, and corresponding solutions are:

Complex Class

 Tip:   Before tackling complexity, make sure that you eliminate any duplication first. This often can reduce the size of classes significantly.

Complex classes like JobManager often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes. You can also have a look at the cohesion graph to spot any un-connected, or weakly-connected components.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

While breaking up the class, it is a good idea to analyze how other classes use JobManager, and based on these observations, apply Extract Interface, too.

1
<?php
2
3
namespace Dtc\QueueBundle\Redis;
4
5
use Dtc\QueueBundle\Exception\ClassNotSubclassException;
6
use Dtc\QueueBundle\Exception\PriorityException;
7
use Dtc\QueueBundle\Exception\UnsupportedException;
8
use Dtc\QueueBundle\Model\BaseJob;
9
use Dtc\QueueBundle\Model\RetryableJob;
10
use Dtc\QueueBundle\Util\Util;
11
12
/**
13
 * For future implementation.
14
 */
15
class JobManager extends BaseJobManager
16
{
17 14
    protected function transferQueues()
18
    {
19
        // Drains from WhenAt queue into Prioirty Queue
20 14
        $whenQueue = $this->getWhenQueueCacheKey();
21 14
        $priorityQueue = $this->getPriorityQueueCacheKey();
22 14
        $microtime = Util::getMicrotimeDecimal();
23 14
        while ($jobId = $this->redis->zPopByMaxScore($whenQueue, $microtime)) {
24 14
            $jobMessage = $this->redis->get($this->getJobCacheKey($jobId));
25 14
            if (is_string($jobMessage)) {
26 14
                $job = new Job();
27 14
                $job->fromMessage($jobMessage);
28 14
                $this->redis->zAdd($priorityQueue, $job->getPriority(), $job->getId());
29
            }
30
        }
31 14
    }
32
33
    /**
34
     * @param Job $job
35
     *
36
     * @return Job|null
37
     */
38 2
    protected function batchSave(Job $job)
39
    {
40 2
        $crcHash = $job->getCrcHash();
41 2
        $crcCacheKey = $this->getJobCrcHashKey($crcHash);
42 2
        $result = $this->redis->lrange($crcCacheKey, 0, 1000);
43 2
        if (is_array($result)) {
44 2
            foreach ($result as $jobId) {
45 2
                $jobCacheKey1 = $this->getJobCacheKey($jobId);
46 2
                if (!($foundJobMessage = $this->redis->get($jobCacheKey1))) {
47
                    $this->redis->lRem($crcCacheKey, 1, $jobCacheKey1);
48
                    continue;
49
                }
50
51
                /// There is one?
52 2
                if ($foundJobMessage) {
53 2
                    $foundJob = $this->batchFoundJob($job, $jobCacheKey1, $foundJobMessage);
54 2
                    if ($foundJob) {
55 2
                        return $foundJob;
56
                    }
57
                }
58
            }
59
        }
60
61
        return null;
62
    }
63
64
    /**
65
     * @param string $foundJobCacheKey
66
     * @param bool   $foundJobMessage
67
     */
68 2
    protected function batchFoundJob(Job $job, $foundJobCacheKey, $foundJobMessage)
69
    {
70 2
        $when = $job->getWhenUs();
71 2
        $crcHash = $job->getCrcHash();
72 2
        $crcCacheKey = $this->getJobCrcHashKey($crcHash);
73
74 2
        $foundJob = new Job();
75 2
        $foundJob->fromMessage($foundJobMessage);
0 ignored issues
show
Documentation introduced by
$foundJobMessage is of type boolean, but the function expects a string.

It seems like the type of the argument is not accepted by the function/method which you are calling.

In some cases, in particular if PHP’s automatic type-juggling kicks in this might be fine. In other cases, however this might be a bug.

We suggest to add an explicit type cast like in the following example:

function acceptsInteger($int) { }

$x = '123'; // string "123"

// Instead of
acceptsInteger($x);

// we recommend to use
acceptsInteger((integer) $x);
Loading history...
76 2
        $foundWhen = $foundJob->getWhenUs();
77
78
        // Fix this using bcmath
79 2
        $curtimeU = Util::getMicrotimeDecimal();
80 2
        $newFoundWhen = null;
81 2
        if (bccomp($foundWhen, $curtimeU) > 0 && bccomp($foundWhen, $when) >= 1) {
82 2
            $newFoundWhen = $when;
83
        }
84 2
        $foundPriority = $foundJob->getPriority();
85 2
        $newFoundPriority = null;
86 2
        if ($foundPriority > $job->getPriority()) {
87 2
            $newFoundPriority = $job->getPriority();
88
        }
89
90 2
        return $this->finishBatchFoundJob($foundJob, $foundJobCacheKey, $crcCacheKey, $newFoundWhen, $newFoundPriority);
91
    }
92
93
    /**
94
     * @param string   $crcCacheKey
95
     * @param int|null $newFoundPriority
96
     */
97 2
    protected function finishBatchFoundJob(Job $foundJob, $foundJobCacheKey, $crcCacheKey, $newFoundWhen, $newFoundPriority)
98
    {
99
        // Now how do we adjust this job's priority or time?
100 2
        $adjust = false;
101 2
        if (isset($newFoundWhen)) {
102 2
            $foundJob->setWhenUs($newFoundWhen);
103 2
            $adjust = true;
104
        }
105 2
        if (isset($newFoundPriority)) {
106 2
            $foundJob->setPriority($newFoundPriority);
107 2
            $adjust = true;
108
        }
109 2
        if (!$adjust) {
110 2
            return $foundJob;
111
        }
112
113 2
        return $this->addFoundJob($adjust, $foundJob, $foundJobCacheKey, $crcCacheKey);
114
    }
115
116
    /**
117
     * @param bool $adjust
118
     */
119 2
    protected function addFoundJob($adjust, Job $foundJob, $foundJobCacheKey, $crcCacheKey)
120
    {
121 2
        $whenQueue = $this->getWhenQueueCacheKey();
122 2
        $result = $this->adjustJob($adjust, $whenQueue, $foundJob, $foundJobCacheKey, $crcCacheKey, $foundJob->getWhenUs());
123 2
        if (null !== $result) {
124 2
            return $result;
125
        }
126
        if (null === $this->maxPriority) {
127
            return false;
128
        }
129
130
        $priorityQueue = $this->getPriorityQueueCacheKey();
131
        $result = $this->adjustJob($adjust, $priorityQueue, $foundJob, $foundJobCacheKey, $crcCacheKey, $foundJob->getPriority());
132
133
        return $result ?: false;
134
    }
135
136
    /**
137
     * @param string $queue
138
     * @param bool   $adjust
139
     */
140 2
    private function adjustJob($adjust, $queue, Job $foundJob, $foundJobCacheKey, $crcCacheKey, $zScore)
141
    {
142 2
        if ($adjust && $this->redis->zRem($queue, $foundJob->getId()) > 0) {
143 2
            if (!$this->insertJob($foundJob)) {
144
                // Job is expired
145
                $this->redis->lRem($crcCacheKey, 1, $foundJobCacheKey);
146
147
                return false;
148
            }
149 2
            $this->redis->zAdd($queue, $zScore, $foundJob->getId());
150
151 2
            return $foundJob;
152
        }
153
154
        return null;
155
    }
156
157
    /**
158
     * @param \Dtc\QueueBundle\Model\Job $job
159
     *
160
     * @return Job|null
161
     *
162
     * @throws ClassNotSubclassException
163
     * @throws PriorityException
164
     */
165 20
    public function prioritySave(\Dtc\QueueBundle\Model\Job $job)
166
    {
167 20
        if (!$job instanceof Job) {
168
            throw new \InvalidArgumentException('$job must be instance of '.Job::class);
169
        }
170
171 20
        $this->validateSaveable($job);
172 20
        $this->setJobId($job);
173
174
        // Add to whenAt or priority queue?  /// optimizaiton...
175 20
        $whenUs = $job->getWhenUs();
176 20
        if (!$whenUs) {
177
            $whenUs = Util::getMicrotimeDecimal();
178
            $job->setWhenUs($whenUs);
179
        }
180
181 20
        if (true === $job->getBatch()) {
182
            // is there a CRC Hash already for this job
183 2
            if ($oldJob = $this->batchSave($job)) {
184 2
                return $oldJob;
185
            }
186
        }
187
188 20
        return $this->saveJob($job);
189
    }
190
191 20
    protected function saveJob(Job $job)
192
    {
193 20
        $whenQueue = $this->getWhenQueueCacheKey();
194 20
        $crcCacheKey = $this->getJobCrcHashKey($job->getCrcHash());
195
        // Save Job
196 20
        if (!$this->insertJob($job)) {
197
            // job is expired
198 2
            return null;
199
        }
200 20
        $jobId = $job->getId();
201 20
        $when = $job->getWhenUs();
202
        // Add Job to CRC list
203 20
        $this->redis->lPush($crcCacheKey, [$jobId]);
204
205 20
        $this->redis->zAdd($whenQueue, $when, $jobId);
206
207 20
        return $job;
208
    }
209
210
    /**
211
     * @param Job $job
212
     *
213
     * @return bool false if the job is already expired, true otherwise
214
     */
215 20
    protected function insertJob(Job $job)
216
    {
217
        // Save Job
218 20
        $jobCacheKey = $this->getJobCacheKey($job->getId());
219 20
        if ($expiresAt = $job->getExpiresAt()) {
220 2
            $expiresAtTime = $expiresAt->getTimestamp() - time();
221 2
            if ($expiresAtTime <= 0) {
222 2
                return false; /// ??? job is already expired
223
            }
224
            $this->redis->setEx($jobCacheKey, $expiresAtTime, $job->toMessage());
225
226
            return true;
227
        }
228 20
        $this->redis->set($jobCacheKey, $job->toMessage());
229
230 20
        return true;
231
    }
232
233
    /**
234
     * Returns the prioirty in DESCENDING order, except if maxPrioirty is null, then prioirty is 0.
235
     *
236
     * @param int|null $priority
237
     *
238
     * @return int
239
     */
240 20
    protected function calculatePriority($priority)
241
    {
242 20
        $priority = parent::calculatePriority($priority);
243 20
        if (null === $priority) {
244 18
            return null === $this->maxPriority ? 0 : $this->maxPriority;
245
        }
246
247 8
        if (null === $this->maxPriority) {
248
            return 0;
249
        }
250
251
        // Redis priority should be in DESC order
252 8
        return $this->maxPriority - $priority;
253
    }
254
255
    /**
256
     * @param \Dtc\QueueBundle\Model\Job $job
257
     *
258
     * @throws PriorityException
259
     * @throws ClassNotSubclassException
260
     */
261 20 View Code Duplication
    protected function validateSaveable(\Dtc\QueueBundle\Model\Job $job)
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
262
    {
263 20
        if (null !== $job->getPriority() && null === $this->maxPriority) {
264
            throw new PriorityException('This queue does not support priorities');
265
        }
266
267 20
        if (!$job instanceof RetryableJob) {
268
            throw new ClassNotSubclassException('Job needs to be instance of '.RetryableJob::class);
269
        }
270 20
    }
271
272 2
    public function deleteJob(\Dtc\QueueBundle\Model\Job $job)
273
    {
274 2
        $jobId = $job->getId();
275 2
        $priorityQueue = $this->getPriorityQueueCacheKey();
276 2
        $whenQueue = $this->getWhenQueueCacheKey();
277
278 2
        $this->redis->zRem($priorityQueue, $jobId);
279 2
        $this->redis->zRem($whenQueue, $jobId);
280 2
        $this->redis->del([$this->getJobCacheKey($jobId)]);
281 2
        $this->redis->lRem($this->getJobCrcHashKey($job->getCrcHash()), 1, $jobId);
282 2
    }
283
284
    /**
285
     * @param string|null $workerName
286
     * @param string|null $methodName
287
     * @param bool        $prioritize
288
     * @param mixed       $runId
289
     *
290
     * @throws UnsupportedException
291
     *
292
     * @return Job|null
293
     */
294 16
    public function getJob($workerName = null, $methodName = null, $prioritize = true, $runId = null)
295
    {
296
        // First thing migrate any jobs from When queue to Prioirty queue
297
298 16
        $this->verifyGetJobArgs($workerName, $methodName, $prioritize);
299 14
        if (null !== $this->maxPriority) {
300 14
            $this->transferQueues();
301 14
            $queue = $this->getPriorityQueueCacheKey();
302 14
            $jobId = $this->redis->zPop($queue);
303
        } else {
304
            $queue = $this->getWhenQueueCacheKey();
305
            $microtime = Util::getMicrotimeDecimal();
306
            $jobId = $this->redis->zPopByMaxScore($queue, $microtime);
307
        }
308
309 14
        if ($jobId) {
310 14
            return $this->retrieveJob($jobId);
311
        }
312
313 10
        return null;
314
    }
315
316 14
    protected function retrieveJob($jobId)
317
    {
318 14
        $job = null;
319 14
        $jobMessage = $this->redis->get($this->getJobCacheKey($jobId));
320 14
        if (is_string($jobMessage)) {
321 14
            $job = new Job();
322 14
            $job->fromMessage($jobMessage);
323 14
            $crcCacheKey = $this->getJobCrcHashKey($job->getCrcHash());
324 14
            $this->redis->lRem($crcCacheKey, 1, $job->getId());
325 14
            $this->redis->del([$this->getJobCacheKey($job->getId())]);
326
        }
327
328 14
        return $job;
329
    }
330
331 2
    public function getWaitingJobCount($workerName = null, $methodName = null)
332
    {
333 2
        $microtime = Util::getMicrotimeDecimal();
334 2
        $count = $this->redis->zCount($this->getWhenQueueCacheKey(), 0, $microtime);
335
336 2
        if (null !== $this->maxPriority) {
337 2
            $count += $this->redis->zCount($this->getPriorityQueueCacheKey(), '-inf', '+inf');
338
        }
339
340 2
        return $count;
341
    }
342
343 4 View Code Duplication
    public function resetJob(RetryableJob $job)
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
344
    {
345 4
        if (!$job instanceof Job) {
346
            throw new \InvalidArgumentException('$job must be instance of '.Job::class);
347
        }
348 4
        $job->setStatus(BaseJob::STATUS_NEW);
349 4
        $job->setMessage(null);
350 4
        $job->setStartedAt(null);
351 4
        $job->setRetries($job->getRetries() + 1);
352 4
        $job->setUpdatedAt(Util::getMicrotimeDateTime());
353 4
        $this->saveJob($job);
354
355 4
        return true;
356
    }
357
358 2
    private function collateStatusResults(array &$results, $cacheKey)
359
    {
360 2
        $cursor = null;
361 2
        while ($jobs = $this->redis->zScan($cacheKey, $cursor, '', 100)) {
362 2
            $jobs = $this->redis->mget(array_map(function ($item) {
363 2
                return $this->getJobCacheKey($item);
364 2
            }, array_keys($jobs)));
365 2
            $this->extractStatusResults($jobs, $results);
366 2
            if (0 === $cursor) {
367 2
                break;
368
            }
369
        }
370
371 2
        return $results;
372
    }
373
374 2
    private function extractStatusResults(array $jobs, array &$results)
375
    {
376 2
        foreach ($jobs as $jobMessage) {
377 2
            if (is_string($jobMessage)) {
378 2
                $job = new Job();
379 2
                $job->fromMessage($jobMessage);
380 2
                $resultHashKey = $job->getWorkerName().'->'.$job->getMethod().'()';
381 2
                if (!isset($results[$resultHashKey][BaseJob::STATUS_NEW])) {
382 2
                    $results[$resultHashKey] = static::getAllStatuses();
383
                }
384 2
                if (!isset($results[$resultHashKey][BaseJob::STATUS_NEW])) {
385
                    $results[$resultHashKey][BaseJob::STATUS_NEW] = 0;
386
                }
387 2
                ++$results[$resultHashKey][BaseJob::STATUS_NEW];
388
            }
389
        }
390 2
    }
391
392 2
    private function extractStatusHashResults(array $hResults, array &$results)
393
    {
394 2
        foreach ($hResults as $key => $value) {
395 2
            list($workerName, $method, $status) = explode(',', $key);
396 2
            $resultHashKey = $workerName.'->'.$method.'()';
397 2
            if (!isset($results[$resultHashKey])) {
398
                $results[$resultHashKey] = static::getAllStatuses();
399
            }
400 2
            if (!isset($results[$resultHashKey][$status])) {
401
                $results[$resultHashKey][$status] = 0;
402
            }
403 2
            $results[$resultHashKey][$status] += $value;
404
        }
405 2
    }
406
407 2
    public function getStatus()
408
    {
409 2
        $whenQueueCacheKey = $this->getWhenQueueCacheKey();
410 2
        $priorityQueueCacheKey = $this->getPriorityQueueCacheKey();
411 2
        $results = [];
412 2
        $this->collateStatusResults($results, $whenQueueCacheKey);
413 2
        if (null !== $this->maxPriority) {
414 2
            $this->collateStatusResults($results, $priorityQueueCacheKey);
415
        }
416
417 2
        $cacheKey = $this->getStatusCacheKey();
418 2
        $cursor = null;
419 2
        while ($hResults = $this->redis->hScan($cacheKey, $cursor, '', 100)) {
420 2
            $this->extractStatusHashResults($hResults, $results);
421 2
            if (0 === $cursor) {
422 2
                break;
423
            }
424
        }
425
426 2
        return $results;
427
    }
428
429 6
    public function retryableSaveHistory(RetryableJob $job, $retry)
430
    {
431 6
        $cacheKey = $this->getStatusCacheKey();
432 6
        $hashKey = $job->getWorkerName();
433 6
        $hashKey .= ',';
434 6
        $hashKey .= $job->getMethod();
435 6
        $hashKey .= ',';
436 6
        $hashKey .= $job->getStatus();
437 6
        $this->redis->hIncrBy($cacheKey, $hashKey, 1);
438 6
    }
439
}
440