Passed
Push — master ( 4d8734...991786 )
by Matthew
09:35 queued 06:44
created

JobManager   B

Complexity

Total Complexity 51

Size/Duplication

Total Lines 344
Duplicated Lines 0 %

Test Coverage

Coverage 88.89%

Importance

Changes 0
Metric Value
eloc 150
dl 0
loc 344
ccs 144
cts 162
cp 0.8889
rs 7.92
c 0
b 0
f 0
wmc 51

15 Methods

Rating   Name   Duplication   Size   Complexity  
A addFoundJob() 0 15 4
A transferQueues() 0 12 3
A finishBatchFoundJob() 0 17 4
A getWaitingJobCount() 0 10 2
A calculatePriority() 0 13 4
A batchSave() 0 26 6
A getJob() 0 20 3
A saveJob() 0 17 2
A adjustJob() 0 15 4
A getStatus() 0 20 4
A retrieveJob() 0 13 2
A retryableSaveHistory() 0 9 1
A prioritySave() 0 24 5
A batchFoundJob() 0 23 4
A insertJob() 0 16 3

How to fix   Complexity   

Complex Class

Complex classes like JobManager often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

While breaking up the class, it is a good idea to analyze how other classes use JobManager, and based on these observations, apply Extract Interface, too.

1
<?php
2
3
namespace Dtc\QueueBundle\Redis;
4
5
use Dtc\QueueBundle\Exception\ClassNotSubclassException;
6
use Dtc\QueueBundle\Exception\PriorityException;
7
use Dtc\QueueBundle\Exception\UnsupportedException;
8
use Dtc\QueueBundle\Manager\SaveableTrait;
9
use Dtc\QueueBundle\Model\RetryableJob;
10
use Dtc\QueueBundle\Util\Util;
11
12
/**
13
 * For future implementation.
14
 */
15
class JobManager extends BaseJobManager
16
{
17
    use StatusTrait;
18
    use SaveableTrait;
19
20
    /**
21
     * There's a bit of danger here if there are more jobs being inserted than can be efficiently drained
22
     *   What could happen is that this infinitely loops...
23
     */
24 9
    protected function transferQueues()
25
    {
26
        // Drains from WhenAt queue into Prioirty Queue
27 9
        $whenQueue = $this->getWhenQueueCacheKey();
28 9
        $priorityQueue = $this->getPriorityQueueCacheKey();
29 9
        $microtime = Util::getMicrotimeDecimal();
30 9
        while ($jobId = $this->redis->zPopByMaxScore($whenQueue, $microtime)) {
31 8
            $jobMessage = $this->redis->get($this->getJobCacheKey($jobId));
32 8
            if (is_string($jobMessage)) {
33 8
                $job = new Job();
34 8
                $job->fromMessage($jobMessage);
35 8
                $this->redis->zAdd($priorityQueue, $job->getPriority(), $job->getId());
36
            }
37
        }
38 9
    }
39
40
    /**
41
     * @param Job $job
42
     *
43
     * @return Job|null
44
     */
45 3
    protected function batchSave(Job $job)
46
    {
47 3
        $crcHash = $job->getCrcHash();
48 3
        $crcCacheKey = $this->getJobCrcHashKey($crcHash);
49 3
        $result = $this->redis->lrange($crcCacheKey, 0, 1000);
50 3
        if (!is_array($result)) {
51
            return null;
52
        }
53
54 3
        foreach ($result as $jobId) {
55 3
            $jobCacheKey1 = $this->getJobCacheKey($jobId);
56 3
            if (!($foundJobMessage = $this->redis->get($jobCacheKey1))) {
57
                $this->redis->lRem($crcCacheKey, 1, $jobCacheKey1);
58
                continue;
59
            }
60
61
            /// There is one?
62 3
            if ($foundJobMessage) {
63 3
                $foundJob = $this->batchFoundJob($job, $jobCacheKey1, $foundJobMessage);
64 3
                if ($foundJob) {
65 3
                    return $foundJob;
66
                }
67
            }
68
        }
69
70
        return null;
71
    }
72
73
    /**
74
     * @param string $foundJobCacheKey
75
     * @param string $foundJobMessage
76
     */
77 3
    protected function batchFoundJob(Job $job, $foundJobCacheKey, $foundJobMessage)
78
    {
79 3
        $when = $job->getWhenUs();
80 3
        $crcHash = $job->getCrcHash();
81 3
        $crcCacheKey = $this->getJobCrcHashKey($crcHash);
82
83 3
        $foundJob = new Job();
84 3
        $foundJob->fromMessage($foundJobMessage);
85 3
        $foundWhen = $foundJob->getWhenUs();
86
87
        // Fix this using bcmath
88 3
        $curtimeU = Util::getMicrotimeDecimal();
89 3
        $newFoundWhen = null;
90 3
        if (bccomp($foundWhen, $curtimeU) > 0 && bccomp($foundWhen, $when) >= 1) {
91 3
            $newFoundWhen = $when;
92
        }
93 3
        $foundPriority = $foundJob->getPriority();
94 3
        $newFoundPriority = null;
95 3
        if ($foundPriority > $job->getPriority()) {
96 1
            $newFoundPriority = $job->getPriority();
97
        }
98
99 3
        return $this->finishBatchFoundJob($foundJob, $foundJobCacheKey, $crcCacheKey, $newFoundWhen, $newFoundPriority);
100
    }
101
102
    /**
103
     * @param string   $crcCacheKey
104
     * @param int|null $newFoundPriority
105
     */
106 3
    protected function finishBatchFoundJob(Job $foundJob, $foundJobCacheKey, $crcCacheKey, $newFoundWhen, $newFoundPriority)
107
    {
108
        // Now how do we adjust this job's priority or time?
109 3
        $adjust = false;
110 3
        if (isset($newFoundWhen)) {
111 3
            $foundJob->setWhenUs($newFoundWhen);
112 3
            $adjust = true;
113
        }
114 3
        if (isset($newFoundPriority)) {
115 1
            $foundJob->setPriority($newFoundPriority);
116 1
            $adjust = true;
117
        }
118 3
        if (!$adjust) {
119 3
            return $foundJob;
120
        }
121
122 3
        return $this->addFoundJob($adjust, $foundJob, $foundJobCacheKey, $crcCacheKey);
123
    }
124
125
    /**
126
     * @param bool $adjust
127
     */
128 3
    protected function addFoundJob($adjust, Job $foundJob, $foundJobCacheKey, $crcCacheKey)
129
    {
130 3
        $whenQueue = $this->getWhenQueueCacheKey();
131 3
        $result = $this->adjustJob($adjust, $whenQueue, $foundJob, $foundJobCacheKey, $crcCacheKey, $foundJob->getWhenUs());
132 3
        if (null !== $result) {
133 3
            return $result;
134
        }
135
        if (null === $this->maxPriority) {
136
            return false;
137
        }
138
139
        $priorityQueue = $this->getPriorityQueueCacheKey();
140
        $result = $this->adjustJob($adjust, $priorityQueue, $foundJob, $foundJobCacheKey, $crcCacheKey, $foundJob->getPriority());
141
142
        return $result ?: false;
143
    }
144
145
    /**
146
     * @param string $queue
147
     * @param bool   $adjust
148
     */
149 3
    private function adjustJob($adjust, $queue, Job $foundJob, $foundJobCacheKey, $crcCacheKey, $zScore)
150
    {
151 3
        if ($adjust && $this->redis->zRem($queue, $foundJob->getId()) > 0) {
152 3
            if (!$this->insertJob($foundJob)) {
153
                // Job is expired
154
                $this->redis->lRem($crcCacheKey, 1, $foundJobCacheKey);
155
156
                return false;
157
            }
158 3
            $this->redis->zAdd($queue, $zScore, $foundJob->getId());
159
160 3
            return $foundJob;
161
        }
162
163
        return null;
164
    }
165
166
    /**
167
     * @param \Dtc\QueueBundle\Model\Job $job
168
     *
169
     * @return Job|null
170
     *
171
     * @throws ClassNotSubclassException
172
     * @throws PriorityException
173
     */
174 28
    public function prioritySave(\Dtc\QueueBundle\Model\Job $job)
175
    {
176 28
        if (!$job instanceof Job) {
177
            throw new \InvalidArgumentException('$job must be instance of '.Job::class);
178
        }
179
180 28
        $this->validateSaveable($job);
181 28
        $this->setJobId($job);
182
183
        // Add to whenAt or priority queue?  /// optimizaiton...
184 28
        $whenUs = $job->getWhenUs();
185 28
        if (!$whenUs) {
186
            $whenUs = Util::getMicrotimeDecimal();
187
            $job->setWhenUs($whenUs);
188
        }
189
190 28
        if (true === $job->getBatch()) {
191
            // is there a CRC Hash already for this job
192 3
            if ($oldJob = $this->batchSave($job)) {
193 3
                return $oldJob;
194
            }
195
        }
196
197 28
        return $this->saveJob($job);
198
    }
199
200
    /**
201
     * @param Job $job
202
     *
203
     * @return Job|null
204
     */
205 28
    protected function saveJob(Job $job)
206
    {
207 28
        $whenQueue = $this->getWhenQueueCacheKey();
208 28
        $crcCacheKey = $this->getJobCrcHashKey($job->getCrcHash());
209
        // Save Job
210 28
        if (!$this->insertJob($job)) {
211
            // job is expired
212 3
            return null;
213
        }
214 28
        $jobId = $job->getId();
215 28
        $when = $job->getWhenUs();
216
        // Add Job to CRC list
217 28
        $this->redis->lPush($crcCacheKey, [$jobId]);
218
219 28
        $this->redis->zAdd($whenQueue, $when, $jobId);
220
221 28
        return $job;
222
    }
223
224
    /**
225
     * @param Job $job
226
     *
227
     * @return bool false if the job is already expired, true otherwise
228
     */
229 28
    protected function insertJob(Job $job)
230
    {
231
        // Save Job
232 28
        $jobCacheKey = $this->getJobCacheKey($job->getId());
233 28
        if ($expiresAt = $job->getExpiresAt()) {
234 3
            $expiresAtTime = $expiresAt->getTimestamp() - time();
235 3
            if ($expiresAtTime <= 0) {
236 3
                return false; /// ??? job is already expired
237
            }
238
            $this->redis->setEx($jobCacheKey, $expiresAtTime, $job->toMessage());
239
240
            return true;
241
        }
242 28
        $this->redis->set($jobCacheKey, $job->toMessage());
243
244 28
        return true;
245
    }
246
247
    /**
248
     * Returns the prioirty in DESCENDING order, except if maxPrioirty is null, then prioirty is 0.
249
     *
250
     * @param int|null $priority
251
     *
252
     * @return int
253
     */
254 28
    protected function calculatePriority($priority)
255
    {
256 28
        $priority = parent::calculatePriority($priority);
257 28
        if (null === $priority) {
258 27
            return null === $this->maxPriority ? null : $this->maxPriority;
259
        }
260
261 4
        if (null === $this->maxPriority) {
262
            return null;
263
        }
264
265
        // Redis priority should be in DESC order
266 4
        return $this->maxPriority - $priority;
267
    }
268
269
    /**
270
     * @param string|null $workerName
271
     * @param string|null $methodName
272
     * @param bool        $prioritize
273
     * @param mixed       $runId
274
     *
275
     * @throws UnsupportedException
276
     *
277
     * @return Job|null
278
     */
279 26
    public function getJob($workerName = null, $methodName = null, $prioritize = true, $runId = null)
280
    {
281
        // First thing migrate any jobs from When queue to Prioirty queue
282
283 26
        $this->verifyGetJobArgs($workerName, $methodName, $prioritize);
284 23
        if (null !== $this->maxPriority) {
285 9
            $this->transferQueues();
286 9
            $queue = $this->getPriorityQueueCacheKey();
287 9
            $jobId = $this->redis->zPop($queue);
288
        } else {
289 14
            $queue = $this->getWhenQueueCacheKey();
290 14
            $microtime = Util::getMicrotimeDecimal();
291 14
            $jobId = $this->redis->zPopByMaxScore($queue, $microtime);
292
        }
293
294 23
        if ($jobId) {
295 20
            return $this->retrieveJob($jobId);
0 ignored issues
show
Bug introduced by
Are you sure the usage of $this->retrieveJob($jobId) targeting Dtc\QueueBundle\Redis\JobManager::retrieveJob() seems to always return null.

This check looks for function or method calls that always return null and whose return value is used.

class A
{
    function getObject()
    {
        return null;
    }

}

$a = new A();
if ($a->getObject()) {

The method getObject() can return nothing but null, so it makes no sense to use the return value.

The reason is most likely that a function or method is imcomplete or has been reduced for debug purposes.

Loading history...
296
        }
297
298 22
        return null;
299
    }
300
301 20
    protected function retrieveJob($jobId)
302
    {
303 20
        $job = null;
304 20
        $jobMessage = $this->redis->get($this->getJobCacheKey($jobId));
305 20
        if (is_string($jobMessage)) {
0 ignored issues
show
introduced by
The condition is_string($jobMessage) is always false.
Loading history...
306 20
            $job = new Job();
307 20
            $job->fromMessage($jobMessage);
308 20
            $crcCacheKey = $this->getJobCrcHashKey($job->getCrcHash());
309 20
            $this->redis->lRem($crcCacheKey, 1, $job->getId());
310 20
            $this->redis->del([$this->getJobCacheKey($job->getId())]);
311
        }
312
313 20
        return $job;
314
    }
315
316 3
    public function getWaitingJobCount($workerName = null, $methodName = null)
317
    {
318 3
        $microtime = Util::getMicrotimeDecimal();
319 3
        $count = $this->redis->zCount($this->getWhenQueueCacheKey(), 0, $microtime);
320
321 3
        if (null !== $this->maxPriority) {
322 1
            $count += $this->redis->zCount($this->getPriorityQueueCacheKey(), '-inf', '+inf');
323
        }
324
325 3
        return $count;
326
    }
327
328 3
    public function getStatus()
329
    {
330 3
        $whenQueueCacheKey = $this->getWhenQueueCacheKey();
331 3
        $priorityQueueCacheKey = $this->getPriorityQueueCacheKey();
332 3
        $results = [];
333 3
        $this->collateStatusResults($results, $whenQueueCacheKey);
334 3
        if (null !== $this->maxPriority) {
335 1
            $this->collateStatusResults($results, $priorityQueueCacheKey);
336
        }
337
338 3
        $cacheKey = $this->getStatusCacheKey();
339 3
        $cursor = null;
340 3
        while ($hResults = $this->redis->hScan($cacheKey, $cursor, '', 100)) {
341 2
            $this->extractStatusHashResults($hResults, $results);
342 2
            if (0 === $cursor) {
343 2
                break;
344
            }
345
        }
346
347 3
        return $results;
348
    }
349
350 12
    public function retryableSaveHistory(RetryableJob $job, $retry)
351
    {
352 12
        $cacheKey = $this->getStatusCacheKey();
353 12
        $hashKey = $job->getWorkerName();
354 12
        $hashKey .= ',';
355 12
        $hashKey .= $job->getMethod();
356 12
        $hashKey .= ',';
357 12
        $hashKey .= $job->getStatus();
358 12
        $this->redis->hIncrBy($cacheKey, $hashKey, 1);
359 12
    }
360
}
361