JobManager::addFoundJob()   A
last analyzed

Complexity

Conditions 4
Paths 3

Size

Total Lines 15
Code Lines 9

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 5
CRAP Score 6

Importance

Changes 0
Metric Value
cc 4
eloc 9
nc 3
nop 4
dl 0
loc 15
ccs 5
cts 10
cp 0.5
crap 6
rs 9.9666
c 0
b 0
f 0
1
<?php
2
3
namespace Dtc\QueueBundle\Redis;
4
5
use Dtc\QueueBundle\Exception\ClassNotSubclassException;
6
use Dtc\QueueBundle\Exception\PriorityException;
7
use Dtc\QueueBundle\Exception\UnsupportedException;
8
use Dtc\QueueBundle\Manager\SaveableTrait;
9
use Dtc\QueueBundle\Model\RetryableJob;
10
use Dtc\QueueBundle\Util\Util;
11
12
/**
13
 * For future implementation.
14
 */
15
class JobManager extends BaseJobManager
16
{
17
    use SaveableTrait;
18
    use StatusTrait;
19
20
    /**
21
     * There's a bit of danger here if there are more jobs being inserted than can be efficiently drained
22
     *   What could happen is that this infinitely loops...
23
     */
24 9
    protected function transferQueues()
25
    {
26
        // Drains from WhenAt queue into Prioirty Queue
27 9
        $whenQueue = $this->getWhenQueueCacheKey();
28 9
        $priorityQueue = $this->getPriorityQueueCacheKey();
29 9
        $microtime = Util::getMicrotimeInteger();
30 9
        while ($jobId = $this->redis->zPopByMaxScore($whenQueue, $microtime)) {
31 8
            $jobMessage = $this->redis->get($this->getJobCacheKey($jobId));
32 8
            if (is_string($jobMessage)) {
33 8
                $job = new Job();
34 8
                $job->fromMessage($jobMessage);
35 8
                $this->redis->zAdd($priorityQueue, $job->getPriority(), $job->getId());
36
            }
37
        }
38 9
    }
39
40
    /**
41
     * @return Job|null
42
     */
43 3
    protected function batchSave(Job $job)
44
    {
45 3
        $crcHash = $job->getCrcHash();
46 3
        $crcCacheKey = $this->getJobCrcHashKey($crcHash);
47 3
        $result = $this->redis->lrange($crcCacheKey, 0, 1000);
48 3
        if (!is_array($result)) {
49
            return null;
50
        }
51
52 3
        foreach ($result as $jobId) {
53 3
            $jobCacheKey1 = $this->getJobCacheKey($jobId);
54 3
            if (!($foundJobMessage = $this->redis->get($jobCacheKey1))) {
55
                $this->redis->lRem($crcCacheKey, 1, $jobCacheKey1);
56
                continue;
57
            }
58
59
            /// There is one?
60 3
            if ($foundJobMessage) {
61 3
                $foundJob = $this->batchFoundJob($job, $jobCacheKey1, $foundJobMessage);
62 3
                if ($foundJob) {
63 3
                    return $foundJob;
64
                }
65
            }
66
        }
67
68
        return null;
69
    }
70
71
    /**
72
     * @param string $foundJobCacheKey
73
     * @param string $foundJobMessage
74
     */
75 3
    protected function batchFoundJob(Job $job, $foundJobCacheKey, $foundJobMessage)
76
    {
77 3
        $when = $job->getWhenUs();
78 3
        $crcHash = $job->getCrcHash();
79 3
        $crcCacheKey = $this->getJobCrcHashKey($crcHash);
80
81 3
        $foundJob = new Job();
82 3
        $foundJob->fromMessage($foundJobMessage);
83 3
        $foundWhen = $foundJob->getWhenUs();
84
85
        // Fix this using bcmath
86 3
        $curtimeU = Util::getMicrotimeInteger();
87 3
        $newFoundWhen = null;
88 3
        if (bccomp($foundWhen, $curtimeU) > 0 && bccomp($foundWhen, $when) >= 1) {
89 3
            $newFoundWhen = $when;
90
        }
91 3
        $foundPriority = $foundJob->getPriority();
92 3
        $newFoundPriority = null;
93 3
        if ($foundPriority > $job->getPriority()) {
94 1
            $newFoundPriority = $job->getPriority();
95
        }
96
97 3
        return $this->finishBatchFoundJob($foundJob, $foundJobCacheKey, $crcCacheKey, $newFoundWhen, $newFoundPriority);
98
    }
99
100
    /**
101
     * @param string   $crcCacheKey
102
     * @param int|null $newFoundPriority
103
     */
104 3
    protected function finishBatchFoundJob(Job $foundJob, $foundJobCacheKey, $crcCacheKey, $newFoundWhen, $newFoundPriority)
105
    {
106
        // Now how do we adjust this job's priority or time?
107 3
        $adjust = false;
108 3
        if (isset($newFoundWhen)) {
109 3
            $foundJob->setWhenUs($newFoundWhen);
110 3
            $adjust = true;
111
        }
112 3
        if (isset($newFoundPriority)) {
113 1
            $foundJob->setPriority($newFoundPriority);
114 1
            $adjust = true;
115
        }
116 3
        if (!$adjust) {
117 3
            return $foundJob;
118
        }
119
120 3
        return $this->addFoundJob($adjust, $foundJob, $foundJobCacheKey, $crcCacheKey);
121
    }
122
123
    /**
124
     * @param bool $adjust
125
     */
126 3
    protected function addFoundJob($adjust, Job $foundJob, $foundJobCacheKey, $crcCacheKey)
127
    {
128 3
        $whenQueue = $this->getWhenQueueCacheKey();
129 3
        $result = $this->adjustJob($adjust, $whenQueue, $foundJob, $foundJobCacheKey, $crcCacheKey, $foundJob->getWhenUs());
130 3
        if (null !== $result) {
131 3
            return $result;
132
        }
133
        if (null === $this->maxPriority) {
134
            return false;
135
        }
136
137
        $priorityQueue = $this->getPriorityQueueCacheKey();
138
        $result = $this->adjustJob($adjust, $priorityQueue, $foundJob, $foundJobCacheKey, $crcCacheKey, $foundJob->getPriority());
139
140
        return $result ?: false;
141
    }
142
143
    /**
144
     * @param string $queue
145
     * @param bool   $adjust
146
     */
147 3
    private function adjustJob($adjust, $queue, Job $foundJob, $foundJobCacheKey, $crcCacheKey, $zScore)
148
    {
149 3
        if ($adjust && $this->redis->zRem($queue, $foundJob->getId()) > 0) {
150 3
            if (!$this->insertJob($foundJob)) {
151
                // Job is expired
152
                $this->redis->lRem($crcCacheKey, 1, $foundJobCacheKey);
153
154
                return false;
155
            }
156 3
            $this->redis->zAdd($queue, $zScore, $foundJob->getId());
157
158 3
            return $foundJob;
159
        }
160
161
        return null;
162
    }
163
164
    /**
165
     * @return Job|null
166
     *
167
     * @throws ClassNotSubclassException
168
     * @throws PriorityException
169
     */
170 28
    public function prioritySave(\Dtc\QueueBundle\Model\Job $job)
171
    {
172 28
        if (!$job instanceof Job) {
173
            throw new \InvalidArgumentException('$job must be instance of '.Job::class);
174
        }
175
176 28
        $this->validateSaveable($job);
177 28
        $this->setJobId($job);
178
179
        // Add to whenAt or priority queue?  /// optimizaiton...
180 28
        $whenUs = $job->getWhenUs();
181 28
        if (!$whenUs) {
182
            $whenUs = Util::getMicrotimeInteger();
183
            $job->setWhenUs($whenUs);
184
        }
185
186 28
        if (true === $job->getBatch()) {
187
            // is there a CRC Hash already for this job
188 3
            if ($oldJob = $this->batchSave($job)) {
189 3
                return $oldJob;
190
            }
191
        }
192
193 28
        return $this->saveJob($job);
194
    }
195
196
    /**
197
     * @return Job|null
198
     */
199 28
    protected function saveJob(Job $job)
200
    {
201 28
        $whenQueue = $this->getWhenQueueCacheKey();
202 28
        $crcCacheKey = $this->getJobCrcHashKey($job->getCrcHash());
203
        // Save Job
204 28
        if (!$this->insertJob($job)) {
205
            // job is expired
206 3
            return null;
207
        }
208 28
        $jobId = $job->getId();
209 28
        $when = $job->getWhenUs();
210
        // Add Job to CRC list
211 28
        $this->redis->lPush($crcCacheKey, [$jobId]);
212
213 28
        $this->redis->zAdd($whenQueue, $when, $jobId);
214
215 28
        return $job;
216
    }
217
218
    /**
219
     * @return bool false if the job is already expired, true otherwise
220
     */
221 28
    protected function insertJob(Job $job)
222
    {
223
        // Save Job
224 28
        $jobCacheKey = $this->getJobCacheKey($job->getId());
225 28
        if ($expiresAt = $job->getExpiresAt()) {
226 3
            $expiresAtTime = $expiresAt->getTimestamp() - time();
227 3
            if ($expiresAtTime <= 0) {
228 3
                return false; /// ??? job is already expired
229
            }
230
            $this->redis->setEx($jobCacheKey, $expiresAtTime, $job->toMessage());
231
232
            return true;
233
        }
234 28
        $this->redis->set($jobCacheKey, $job->toMessage());
235
236 28
        return true;
237
    }
238
239
    /**
240
     * Returns the prioirty in DESCENDING order, except if maxPrioirty is null, then prioirty is 0.
241
     *
242
     * @param int|null $priority
243
     *
244
     * @return int
245
     */
246 28
    protected function calculatePriority($priority)
247
    {
248 28
        $priority = parent::calculatePriority($priority);
249 28
        if (null === $priority) {
250 27
            return null === $this->maxPriority ? null : $this->maxPriority;
251
        }
252
253 4
        if (null === $this->maxPriority) {
254
            return null;
255
        }
256
257
        // Redis priority should be in DESC order
258 4
        return $this->maxPriority - $priority;
259
    }
260
261
    /**
262
     * @param string|null $workerName
263
     * @param string|null $methodName
264
     * @param bool        $prioritize
265
     * @param mixed       $runId
266
     *
267
     * @throws UnsupportedException
268
     *
269
     * @return Job|null
270
     */
271 26
    public function getJob($workerName = null, $methodName = null, $prioritize = true, $runId = null)
272
    {
273
        // First thing migrate any jobs from When queue to Prioirty queue
274
275 26
        $this->verifyGetJobArgs($workerName, $methodName, $prioritize);
276 23
        if (null !== $this->maxPriority) {
277 9
            $this->transferQueues();
278 9
            $queue = $this->getPriorityQueueCacheKey();
279 9
            $jobId = $this->redis->zPop($queue);
280
        } else {
281 14
            $queue = $this->getWhenQueueCacheKey();
282 14
            $microtime = Util::getMicrotimeInteger();
283 14
            $jobId = $this->redis->zPopByMaxScore($queue, $microtime);
284
        }
285
286 23
        if ($jobId) {
287 20
            return $this->retrieveJob($jobId);
0 ignored issues
show
Bug introduced by
Are you sure the usage of $this->retrieveJob($jobId) targeting Dtc\QueueBundle\Redis\JobManager::retrieveJob() seems to always return null.

This check looks for function or method calls that always return null and whose return value is used.

class A
{
    function getObject()
    {
        return null;
    }

}

$a = new A();
if ($a->getObject()) {

The method getObject() can return nothing but null, so it makes no sense to use the return value.

The reason is most likely that a function or method is imcomplete or has been reduced for debug purposes.

Loading history...
288
        }
289
290 22
        return null;
291
    }
292
293 20
    protected function retrieveJob($jobId)
294
    {
295 20
        $job = null;
296 20
        $jobMessage = $this->redis->get($this->getJobCacheKey($jobId));
297 20
        if (is_string($jobMessage)) {
0 ignored issues
show
introduced by
The condition is_string($jobMessage) is always false.
Loading history...
298 20
            $job = new Job();
299 20
            $job->fromMessage($jobMessage);
300 20
            $crcCacheKey = $this->getJobCrcHashKey($job->getCrcHash());
301 20
            $this->redis->lRem($crcCacheKey, 1, $job->getId());
302 20
            $this->redis->del([$this->getJobCacheKey($job->getId())]);
303
        }
304
305 20
        return $job;
306
    }
307
308 3
    public function getWaitingJobCount($workerName = null, $methodName = null)
309
    {
310 3
        $microtime = Util::getMicrotimeInteger();
311 3
        $count = $this->redis->zCount($this->getWhenQueueCacheKey(), 0, $microtime);
312
313 3
        if (null !== $this->maxPriority) {
314 1
            $count += $this->redis->zCount($this->getPriorityQueueCacheKey(), '-inf', '+inf');
315
        }
316
317 3
        return $count;
318
    }
319
320 3
    public function getStatus(): array
321
    {
322 3
        $whenQueueCacheKey = $this->getWhenQueueCacheKey();
323 3
        $priorityQueueCacheKey = $this->getPriorityQueueCacheKey();
324 3
        $results = [];
325 3
        $this->collateStatusResults($results, $whenQueueCacheKey);
326 3
        if (null !== $this->maxPriority) {
327 1
            $this->collateStatusResults($results, $priorityQueueCacheKey);
328
        }
329
330 3
        $cacheKey = $this->getStatusCacheKey();
331 3
        $cursor = null;
332 3
        while ($hResults = $this->redis->hScan($cacheKey, $cursor, '', 100)) {
333 2
            $this->extractStatusHashResults($hResults, $results);
334 2
            if (0 === $cursor) {
335 2
                break;
336
            }
337
        }
338
339 3
        return $results;
340
    }
341
342 12
    public function retryableSaveHistory(RetryableJob $job, $retry)
343
    {
344 12
        $cacheKey = $this->getStatusCacheKey();
345 12
        $hashKey = $job->getWorkerName();
346 12
        $hashKey .= ',';
347 12
        $hashKey .= $job->getMethod();
348 12
        $hashKey .= ',';
349 12
        $hashKey .= $job->getStatus();
350 12
        $this->redis->hIncrBy($cacheKey, $hashKey, 1);
351 12
    }
352
}
353