Completed
Push — master ( 7fae80...8ffdf8 )
by Matthew
07:25
created

JobManager::deleteJob()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 11
Code Lines 8

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 9
CRAP Score 1

Importance

Changes 0
Metric Value
dl 0
loc 11
ccs 9
cts 9
cp 1
rs 9.4285
c 0
b 0
f 0
cc 1
eloc 8
nc 1
nop 1
crap 1

1 Method

Rating   Name   Duplication   Size   Complexity  
A JobManager::validateSaveable() 10 10 4
1
<?php
2
3
namespace Dtc\QueueBundle\Redis;
4
5
use Dtc\QueueBundle\Exception\ClassNotSubclassException;
6
use Dtc\QueueBundle\Exception\PriorityException;
7
use Dtc\QueueBundle\Exception\UnsupportedException;
8
use Dtc\QueueBundle\Model\RetryableJob;
9
use Dtc\QueueBundle\Util\Util;
10
11
/**
12
 * For future implementation.
13
 */
14
class JobManager extends BaseJobManager
15
{
16
    use StatusTrait;
17
18
    /**
19
     * There's a bit of danger here if there are more jobs being inserted than can be efficiently drained
20
     *   What could happen is that this infinitely loops...
21
     */
22 9
    protected function transferQueues()
23
    {
24
        // Drains from WhenAt queue into Prioirty Queue
25 9
        $whenQueue = $this->getWhenQueueCacheKey();
26 9
        $priorityQueue = $this->getPriorityQueueCacheKey();
27 9
        $microtime = Util::getMicrotimeDecimal();
28 9
        while ($jobId = $this->redis->zPopByMaxScore($whenQueue, $microtime)) {
29 9
            $jobMessage = $this->redis->get($this->getJobCacheKey($jobId));
30 9
            if (is_string($jobMessage)) {
31 9
                $job = new Job();
32 9
                $job->fromMessage($jobMessage);
33 9
                $this->redis->zAdd($priorityQueue, $job->getPriority(), $job->getId());
34
            }
35
        }
36 9
    }
37
38
    /**
39
     * @param Job $job
40
     *
41
     * @return Job|null
42
     */
43 3
    protected function batchSave(Job $job)
44
    {
45 3
        $crcHash = $job->getCrcHash();
46 3
        $crcCacheKey = $this->getJobCrcHashKey($crcHash);
47 3
        $result = $this->redis->lrange($crcCacheKey, 0, 1000);
48 3
        if (!is_array($result)) {
49
            return null;
50
        }
51
52 3
        foreach ($result as $jobId) {
53 3
            $jobCacheKey1 = $this->getJobCacheKey($jobId);
54 3
            if (!($foundJobMessage = $this->redis->get($jobCacheKey1))) {
55
                $this->redis->lRem($crcCacheKey, 1, $jobCacheKey1);
56
                continue;
57
            }
58
59
            /// There is one?
60 3
            if ($foundJobMessage) {
61 3
                $foundJob = $this->batchFoundJob($job, $jobCacheKey1, $foundJobMessage);
62 3
                if ($foundJob) {
63 3
                    return $foundJob;
64
                }
65
            }
66
        }
67
68
        return null;
69
    }
70
71
    /**
72
     * @param string $foundJobCacheKey
73
     * @param bool   $foundJobMessage
74
     */
75 3
    protected function batchFoundJob(Job $job, $foundJobCacheKey, $foundJobMessage)
76
    {
77 3
        $when = $job->getWhenUs();
78 3
        $crcHash = $job->getCrcHash();
79 3
        $crcCacheKey = $this->getJobCrcHashKey($crcHash);
80
81 3
        $foundJob = new Job();
82 3
        $foundJob->fromMessage($foundJobMessage);
0 ignored issues
show
Documentation introduced by
$foundJobMessage is of type boolean, but the function expects a string.

It seems like the type of the argument is not accepted by the function/method which you are calling.

In some cases, in particular if PHP’s automatic type-juggling kicks in this might be fine. In other cases, however this might be a bug.

We suggest to add an explicit type cast like in the following example:

function acceptsInteger($int) { }

$x = '123'; // string "123"

// Instead of
acceptsInteger($x);

// we recommend to use
acceptsInteger((integer) $x);
Loading history...
83 3
        $foundWhen = $foundJob->getWhenUs();
84
85
        // Fix this using bcmath
86 3
        $curtimeU = Util::getMicrotimeDecimal();
87 3
        $newFoundWhen = null;
88 3
        if (bccomp($foundWhen, $curtimeU) > 0 && bccomp($foundWhen, $when) >= 1) {
89 3
            $newFoundWhen = $when;
90
        }
91 3
        $foundPriority = $foundJob->getPriority();
92 3
        $newFoundPriority = null;
93 3
        if ($foundPriority > $job->getPriority()) {
94 1
            $newFoundPriority = $job->getPriority();
95
        }
96
97 3
        return $this->finishBatchFoundJob($foundJob, $foundJobCacheKey, $crcCacheKey, $newFoundWhen, $newFoundPriority);
98
    }
99
100
    /**
101
     * @param string   $crcCacheKey
102
     * @param int|null $newFoundPriority
103
     */
104 3
    protected function finishBatchFoundJob(Job $foundJob, $foundJobCacheKey, $crcCacheKey, $newFoundWhen, $newFoundPriority)
105
    {
106
        // Now how do we adjust this job's priority or time?
107 3
        $adjust = false;
108 3
        if (isset($newFoundWhen)) {
109 3
            $foundJob->setWhenUs($newFoundWhen);
110 3
            $adjust = true;
111
        }
112 3
        if (isset($newFoundPriority)) {
113 1
            $foundJob->setPriority($newFoundPriority);
114 1
            $adjust = true;
115
        }
116 3
        if (!$adjust) {
117 3
            return $foundJob;
118
        }
119
120 3
        return $this->addFoundJob($adjust, $foundJob, $foundJobCacheKey, $crcCacheKey);
121
    }
122
123
    /**
124
     * @param bool $adjust
125
     */
126 3
    protected function addFoundJob($adjust, Job $foundJob, $foundJobCacheKey, $crcCacheKey)
127
    {
128 3
        $whenQueue = $this->getWhenQueueCacheKey();
129 3
        $result = $this->adjustJob($adjust, $whenQueue, $foundJob, $foundJobCacheKey, $crcCacheKey, $foundJob->getWhenUs());
130 3
        if (null !== $result) {
131 3
            return $result;
132
        }
133
        if (null === $this->maxPriority) {
134
            return false;
135
        }
136
137
        $priorityQueue = $this->getPriorityQueueCacheKey();
138
        $result = $this->adjustJob($adjust, $priorityQueue, $foundJob, $foundJobCacheKey, $crcCacheKey, $foundJob->getPriority());
139
140
        return $result ?: false;
141
    }
142
143
    /**
144
     * @param string $queue
145
     * @param bool   $adjust
146
     */
147 3
    private function adjustJob($adjust, $queue, Job $foundJob, $foundJobCacheKey, $crcCacheKey, $zScore)
148
    {
149 3
        if ($adjust && $this->redis->zRem($queue, $foundJob->getId()) > 0) {
150 3
            if (!$this->insertJob($foundJob)) {
151
                // Job is expired
152
                $this->redis->lRem($crcCacheKey, 1, $foundJobCacheKey);
153
154
                return false;
155
            }
156 3
            $this->redis->zAdd($queue, $zScore, $foundJob->getId());
157
158 3
            return $foundJob;
159
        }
160
161
        return null;
162
    }
163
164
    /**
165
     * @param \Dtc\QueueBundle\Model\Job $job
166
     *
167
     * @return Job|null
168
     *
169
     * @throws ClassNotSubclassException
170
     * @throws PriorityException
171
     */
172 28
    public function prioritySave(\Dtc\QueueBundle\Model\Job $job)
173
    {
174 28
        if (!$job instanceof Job) {
175
            throw new \InvalidArgumentException('$job must be instance of '.Job::class);
176
        }
177
178 28
        $this->validateSaveable($job);
179 28
        $this->setJobId($job);
180
181
        // Add to whenAt or priority queue?  /// optimizaiton...
182 28
        $whenUs = $job->getWhenUs();
183 28
        if (!$whenUs) {
184
            $whenUs = Util::getMicrotimeDecimal();
185
            $job->setWhenUs($whenUs);
186
        }
187
188 28
        if (true === $job->getBatch()) {
189
            // is there a CRC Hash already for this job
190 3
            if ($oldJob = $this->batchSave($job)) {
191 3
                return $oldJob;
192
            }
193
        }
194
195 28
        return $this->saveJob($job);
196
    }
197
198
    /**
199
     * @param Job $job
200
     *
201
     * @return Job|null
202
     */
203 28
    protected function saveJob(Job $job)
204
    {
205 28
        $whenQueue = $this->getWhenQueueCacheKey();
206 28
        $crcCacheKey = $this->getJobCrcHashKey($job->getCrcHash());
207
        // Save Job
208 28
        if (!$this->insertJob($job)) {
209
            // job is expired
210 3
            return null;
211
        }
212 28
        $jobId = $job->getId();
213 28
        $when = $job->getWhenUs();
214
        // Add Job to CRC list
215 28
        $this->redis->lPush($crcCacheKey, [$jobId]);
216
217 28
        $this->redis->zAdd($whenQueue, $when, $jobId);
218
219 28
        return $job;
220
    }
221
222
    /**
223
     * @param Job $job
224
     *
225
     * @return bool false if the job is already expired, true otherwise
226
     */
227 28
    protected function insertJob(Job $job)
228
    {
229
        // Save Job
230 28
        $jobCacheKey = $this->getJobCacheKey($job->getId());
231 28
        if ($expiresAt = $job->getExpiresAt()) {
232 3
            $expiresAtTime = $expiresAt->getTimestamp() - time();
233 3
            if ($expiresAtTime <= 0) {
234 3
                return false; /// ??? job is already expired
235
            }
236
            $this->redis->setEx($jobCacheKey, $expiresAtTime, $job->toMessage());
237
238
            return true;
239
        }
240 28
        $this->redis->set($jobCacheKey, $job->toMessage());
241
242 28
        return true;
243
    }
244
245
    /**
246
     * Returns the prioirty in DESCENDING order, except if maxPrioirty is null, then prioirty is 0.
247
     *
248
     * @param int|null $priority
249
     *
250
     * @return int
251
     */
252 28
    protected function calculatePriority($priority)
253
    {
254 28
        $priority = parent::calculatePriority($priority);
255 28
        if (null === $priority) {
256 27
            return null === $this->maxPriority ? null : $this->maxPriority;
257
        }
258
259 4
        if (null === $this->maxPriority) {
260
            return null;
261
        }
262
263
        // Redis priority should be in DESC order
264 4
        return $this->maxPriority - $priority;
265
    }
266
267
    /**
268
     * @param \Dtc\QueueBundle\Model\Job $job
269
     *
270
     * @throws PriorityException
271
     * @throws ClassNotSubclassException
272
     */
273 28 View Code Duplication
    protected function validateSaveable(\Dtc\QueueBundle\Model\Job $job)
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
274
    {
275 28
        if (null !== $job->getPriority() && null === $this->maxPriority) {
276
            throw new PriorityException('This queue does not support priorities');
277
        }
278
279 28
        if (!$job instanceof RetryableJob) {
280
            throw new ClassNotSubclassException('Job needs to be instance of '.RetryableJob::class);
281
        }
282 28
    }
283
284
    /**
285
     * @param string|null $workerName
286
     * @param string|null $methodName
287
     * @param bool        $prioritize
288
     * @param mixed       $runId
289
     *
290
     * @throws UnsupportedException
291
     *
292
     * @return Job|null
293
     */
294 26
    public function getJob($workerName = null, $methodName = null, $prioritize = true, $runId = null)
295
    {
296
        // First thing migrate any jobs from When queue to Prioirty queue
297
298 26
        $this->verifyGetJobArgs($workerName, $methodName, $prioritize);
299 23
        if (null !== $this->maxPriority) {
300 9
            $this->transferQueues();
301 9
            $queue = $this->getPriorityQueueCacheKey();
302 9
            $jobId = $this->redis->zPop($queue);
303
        } else {
304 14
            $queue = $this->getWhenQueueCacheKey();
305 14
            $microtime = Util::getMicrotimeDecimal();
306 14
            $jobId = $this->redis->zPopByMaxScore($queue, $microtime);
307
        }
308
309 23
        if ($jobId) {
310 23
            return $this->retrieveJob($jobId);
311
        }
312
313 19
        return null;
314
    }
315
316 23
    protected function retrieveJob($jobId)
317
    {
318 23
        $job = null;
319 23
        $jobMessage = $this->redis->get($this->getJobCacheKey($jobId));
320 23
        if (is_string($jobMessage)) {
321 23
            $job = new Job();
322 23
            $job->fromMessage($jobMessage);
323 23
            $crcCacheKey = $this->getJobCrcHashKey($job->getCrcHash());
324 23
            $this->redis->lRem($crcCacheKey, 1, $job->getId());
325 23
            $this->redis->del([$this->getJobCacheKey($job->getId())]);
326
        }
327
328 23
        return $job;
329
    }
330
331 3
    public function getWaitingJobCount($workerName = null, $methodName = null)
332
    {
333 3
        $microtime = Util::getMicrotimeDecimal();
334 3
        $count = $this->redis->zCount($this->getWhenQueueCacheKey(), 0, $microtime);
335
336 3
        if (null !== $this->maxPriority) {
337 1
            $count += $this->redis->zCount($this->getPriorityQueueCacheKey(), '-inf', '+inf');
338
        }
339
340 3
        return $count;
341
    }
342
343 3
    public function getStatus()
344
    {
345 3
        $whenQueueCacheKey = $this->getWhenQueueCacheKey();
346 3
        $priorityQueueCacheKey = $this->getPriorityQueueCacheKey();
347 3
        $results = [];
348 3
        $this->collateStatusResults($results, $whenQueueCacheKey);
349 3
        if (null !== $this->maxPriority) {
350 1
            $this->collateStatusResults($results, $priorityQueueCacheKey);
351
        }
352
353 3
        $cacheKey = $this->getStatusCacheKey();
354 3
        $cursor = null;
355 3
        while ($hResults = $this->redis->hScan($cacheKey, $cursor, '', 100)) {
356 3
            $this->extractStatusHashResults($hResults, $results);
357 3
            if (0 === $cursor) {
358 3
                break;
359
            }
360
        }
361
362 3
        return $results;
363
    }
364
365 15
    public function retryableSaveHistory(RetryableJob $job, $retry)
366
    {
367 15
        $cacheKey = $this->getStatusCacheKey();
368 15
        $hashKey = $job->getWorkerName();
369 15
        $hashKey .= ',';
370 15
        $hashKey .= $job->getMethod();
371 15
        $hashKey .= ',';
372 15
        $hashKey .= $job->getStatus();
373 15
        $this->redis->hIncrBy($cacheKey, $hashKey, 1);
374 15
    }
375
}
376