Completed
Pull Request — master (#30)
by Matthew
07:46
created

JobManager::prioritySave()   B

Complexity

Conditions 5
Paths 7

Size

Total Lines 25
Code Lines 13

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 8
CRAP Score 6.4222

Importance

Changes 0
Metric Value
dl 0
loc 25
ccs 8
cts 13
cp 0.6153
rs 8.439
c 0
b 0
f 0
cc 5
eloc 13
nc 7
nop 1
crap 6.4222
1
<?php
2
3
namespace Dtc\QueueBundle\Redis;
4
5
use Dtc\QueueBundle\Exception\ClassNotSubclassException;
6
use Dtc\QueueBundle\Exception\PriorityException;
7
use Dtc\QueueBundle\Exception\UnsupportedException;
8
use Dtc\QueueBundle\Manager\JobTimingManager;
9
use Dtc\QueueBundle\Manager\PriorityJobManager;
10
use Dtc\QueueBundle\Manager\RunManager;
11
use Dtc\QueueBundle\Model\BaseJob;
12
use Dtc\QueueBundle\Model\Job;
0 ignored issues
show
Bug introduced by
This use statement conflicts with another class in this namespace, Dtc\QueueBundle\Redis\Job.

Let’s assume that you have a directory layout like this:

.
|-- OtherDir
|   |-- Bar.php
|   `-- Foo.php
`-- SomeDir
    `-- Foo.php

and let’s assume the following content of Bar.php:

// Bar.php
namespace OtherDir;

use SomeDir\Foo; // This now conflicts the class OtherDir\Foo

If both files OtherDir/Foo.php and SomeDir/Foo.php are loaded in the same runtime, you will see a PHP error such as the following:

PHP Fatal error:  Cannot use SomeDir\Foo as Foo because the name is already in use in OtherDir/Foo.php

However, as OtherDir/Foo.php does not necessarily have to be loaded and the error is only triggered if it is loaded before OtherDir/Bar.php, this problem might go unnoticed for a while. In order to prevent this error from surfacing, you must import the namespace with a different alias:

// Bar.php
namespace OtherDir;

use SomeDir\Foo as SomeDirFoo; // There is no conflict anymore.
Loading history...
13
use Dtc\QueueBundle\Model\RetryableJob;
14
use Dtc\QueueBundle\Util\Util;
15
16
/**
17
 * For future implementation.
18
 */
19
class JobManager extends PriorityJobManager
20
{
21
    /** @var RedisInterface */
22
    protected $redis;
23
24
    /** @var string */
25
    protected $cacheKeyPrefix;
26
27
    protected $hostname;
28
    protected $pid;
29
30 2
    public function __construct(RunManager $runManager, JobTimingManager $jobTimingManager, $jobClass, $cacheKeyPrefix)
31
    {
32 2
        $this->cacheKeyPrefix = $cacheKeyPrefix;
33 2
        $this->hostname = gethostname() ?: '';
34 2
        $this->pid = getmypid();
35
36 2
        parent::__construct($runManager, $jobTimingManager, $jobClass);
37 2
    }
38
39
    public function setRedis(RedisInterface $redis)
40
    {
41
        $this->redis = $redis;
42
    }
43
44 16
    protected function getJobCacheKey($jobId)
45
    {
46 16
        return $this->cacheKeyPrefix.'_job_'.$jobId;
47
    }
48
49 16
    protected function getJobCrcHashKey($jobCrc)
50
    {
51 16
        return $this->cacheKeyPrefix.'_job_crc_'.$jobCrc;
52
    }
53
54 12
    protected function getPriorityQueueCacheKey()
55
    {
56 12
        return $this->cacheKeyPrefix.'_priority';
57
    }
58
59 16
    protected function getWhenQueueCacheKey()
60
    {
61 16
        return $this->cacheKeyPrefix.'_when';
62
    }
63
64 12
    protected function transferQueues()
65
    {
66
        // Drains from WhenAt queue into Prioirty Queue
67 12
        $whenQueue = $this->getWhenQueueCacheKey();
68 12
        $priorityQueue = $this->getPriorityQueueCacheKey();
69 12
        $microtime = Util::getMicrotimeDecimal();
70 12
        while ($jobId = $this->redis->zPopByMaxScore($whenQueue, $microtime)) {
71 10
            $jobMessage = $this->redis->get($this->getJobCacheKey($jobId));
72 10
            if ($jobMessage) {
73 10
                $job = new \Dtc\QueueBundle\Redis\Job();
74 10
                $job->fromMessage($jobMessage);
75 10
                $this->redis->zAdd($priorityQueue, $job->getPriority(), $job->getId());
76
            }
77
        }
78 12
    }
79
80
    protected function batchSave(\Dtc\QueueBundle\Redis\Job $job)
81
    {
82
        $crcHash = $job->getCrcHash();
83
        $crcCacheKey = $this->getJobCrcHashKey($crcHash);
84
        $result = $this->redis->lrange($crcCacheKey, 0, 1000);
85
        if (is_array($result)) {
86
            foreach ($result as $jobId) {
87
                $jobCacheKey1 = $this->getJobCacheKey($jobId);
88
                if (!($foundJobMessage = $this->redis->get($jobCacheKey1))) {
89
                    $this->redis->lRem($crcCacheKey, 1, $jobCacheKey1);
90
                    continue;
91
                }
92
93
                /// There is one?
94
                if ($foundJobMessage) {
95
                    $foundJob = $this->batchFoundJob($job, $jobCacheKey1, $foundJobMessage);
96
                    if ($foundJob) {
97
                        return $foundJob;
98
                    }
99
                }
100
            }
101
        }
102
103
        return null;
104
    }
105
106
    protected function batchFoundJob(\Dtc\QueueBundle\Redis\Job $job, $foundJobCacheKey, $foundJobMessage)
107
    {
108
        $when = $job->getWhenUs();
109
        $crcHash = $job->getCrcHash();
110
        $crcCacheKey = $this->getJobCrcHashKey($crcHash);
111
112
        $foundJob = new \Dtc\QueueBundle\Redis\Job();
113
        $foundJob->fromMessage($foundJobMessage);
114
        $foundWhen = $foundJob->getWhenUs();
115
116
        // Fix this using bcmath
117
        $curtimeU = Util::getMicrotimeDecimal();
118
119
        if (bccomp($foundWhen, $curtimeU) > 0 && bccomp($foundWhen, $when) > 1) {
120
            $newFoundWhen = $when;
121
        }
122
        $foundPriority = $foundJob->getPriority();
123
        if ($foundPriority < $job->getPriority()) {
124
            $newFoundPriority = $job->getPriority();
125
        }
126
127
        // Now how do we adjust this job's priority or time?
128
        $adjust = false;
129
        if (isset($newFoundWhen)) {
130
            $foundJob->setWhenUs($newFoundWhen);
131
            $adjust = true;
132
        }
133
        if (isset($newFoundPriority)) {
134
            $foundJob->setPriority($newFoundPriority);
135
            $adjust = true;
136
        }
137
        if (!$adjust) {
138
            return $foundJob;
139
        }
140
141
        $whenQueue = $this->getWhenQueueCacheKey();
142 View Code Duplication
        if ($adjust && $this->redis->zRem($whenQueue, $foundJob->getId()) > 0) {
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated across your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
143
            if (!$this->insertJob($foundJob)) {
144
                // Job is expired
145
                $this->redis->lRem($crcCacheKey, 1, $foundJobCacheKey);
146
147
                return false;
148
            }
149
            $this->redis->zAdd($whenQueue, $foundJob->getWhenUs(), $foundJob->toMessage());
150
151
            return $foundJob;
152
        }
153
154
        if (null === $this->maxPriority) {
155
            return false;
156
        }
157
158
        $priorityQueue = $this->getPriorityQueueCacheKey();
159 View Code Duplication
        if ($adjust && $this->redis->zRem($priorityQueue, $foundJob->getId()) > 0) {
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated across your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
160
            if (!$this->insertJob($foundJob)) {
161
                // Job is expired
162
                $this->redis->lRem($crcCacheKey, 1, $foundJobCacheKey);
163
164
                return false;
165
            }
166
            $this->redis->zAdd($priorityQueue, $foundJob->getPriority(), $foundJob->toMessage());
167
168
            return $foundJob;
169
        }
170
171
        return false;
172
    }
173
174
    /**
175
     * @param \Dtc\QueueBundle\Model\Job $job
176
     *
177
     * @return \Dtc\QueueBundle\Model\Job
178
     *
179
     * @throws ClassNotSubclassException
180
     */
181 16
    public function prioritySave(\Dtc\QueueBundle\Model\Job $job)
182
    {
183 16
        if (!$job instanceof \Dtc\QueueBundle\Redis\Job) {
184
            throw new \InvalidArgumentException('$job must be instance of '.\Dtc\QueueBundle\Redis\Job::class);
185
        }
186
187 16
        $this->validateSaveable($job);
188 16
        $this->setJobId($job);
189
190
        // Add to whenAt or priority queue?  /// optimizaiton...
191 16
        $whenUs = $job->getWhenUs();
192 16
        if (!$whenUs) {
193
            $whenUs = Util::getMicrotimeDecimal();
194
            $job->setWhenUs($whenUs);
195
        }
196
197 16
        if (true === $job->getBatch()) {
198
            // is there a CRC Hash already for this job
199
            if ($oldJob = $this->batchSave($job)) {
0 ignored issues
show
Bug introduced by
Are you sure the assignment to $oldJob is correct as $this->batchSave($job) (which targets Dtc\QueueBundle\Redis\JobManager::batchSave()) seems to always return null.

This check looks for function or method calls that always return null and whose return value is assigned to a variable.

class A
{
    function getObject()
    {
        return null;
    }

}

$a = new A();
$object = $a->getObject();

The method getObject() can return nothing but null, so it makes no sense to assign that value to a variable.

The reason is most likely that a function or method is imcomplete or has been reduced for debug purposes.

Loading history...
200
                return $oldJob;
201
            }
202
        }
203
204 16
        return $this->saveJob($job);
205
    }
206
207 16
    protected function saveJob(\Dtc\QueueBundle\Redis\Job $job)
208
    {
209 16
        $whenQueue = $this->getWhenQueueCacheKey();
210 16
        $crcCacheKey = $this->getJobCrcHashKey($job->getCrcHash());
211
        // Save Job
212 16
        if (!$this->insertJob($job)) {
213
            // job is expired
214 2
            return null;
215
        }
216 16
        $jobId = $job->getId();
217 16
        $when = $job->getWhenUs();
218
        // Add Job to CRC list
219 16
        $this->redis->lPush($crcCacheKey, [$jobId]);
220
221 16
        $this->redis->zAdd($whenQueue, $when, $jobId);
222
223 16
        return $job;
224
    }
225
226 16
    protected function insertJob(\Dtc\QueueBundle\Redis\Job $job)
227
    {
228
        // Save Job
229 16
        $jobCacheKey = $this->getJobCacheKey($job->getId());
230 16
        if ($expiresAt = $job->getExpiresAt()) {
231 2
            $expiresAtTime = $expiresAt->getTimestamp() - time();
232 2
            if ($expiresAtTime <= 0) {
233 2
                return false; /// ??? job is already expired
234
            }
235
            $this->redis->setEx($jobCacheKey, $expiresAtTime, $job->toMessage());
236
237
            return true;
238
        }
239 16
        $this->redis->set($jobCacheKey, $job->toMessage());
240
241 16
        return true;
242
    }
243
244
    /**
245
     * Attach a unique id to a job since RabbitMQ will not.
246
     *
247
     * @param \Dtc\QueueBundle\Model\Job $job
248
     */
249 16
    protected function setJobId(\Dtc\QueueBundle\Model\Job $job)
250
    {
251 16 View Code Duplication
        if (!$job->getId()) {
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated across your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
252 16
            $job->setId(uniqid($this->hostname.'-'.$this->pid, true));
253
        }
254 16
    }
255
256
    /**
257
     * Returns the prioirty in DESCENDING order, except if maxPrioirty is null, then prioirty is 0.
258
     */
259 16
    protected function calculatePriority($priority)
260
    {
261 16
        $priority = parent::calculatePriority($priority);
262 16
        if (null === $priority) {
263 14
            return null === $this->maxPriority ? 0 : $this->maxPriority;
264
        }
265
266 6
        if (null === $this->maxPriority) {
267
            return 0;
268
        }
269
270
        // Redis priority should be in DESC order
271 6
        return $this->maxPriority - $priority;
272
    }
273
274
    /**
275
     * @param \Dtc\QueueBundle\Model\Job $job
276
     *
277
     * @throws PriorityException
278
     * @throws ClassNotSubclassException
279
     */
280 16 View Code Duplication
    protected function validateSaveable(\Dtc\QueueBundle\Model\Job $job)
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
281
    {
282 16
        if (null !== $job->getPriority() && null === $this->maxPriority) {
283
            throw new PriorityException('This queue does not support priorities');
284
        }
285
286 16
        if (!$job instanceof RetryableJob) {
287
            throw new ClassNotSubclassException('Job needs to be instance of '.RetryableJob::class);
288
        }
289 16
    }
290
291 14
    protected function verifyGetJobArgs($workerName = null, $methodName = null, $prioritize = true)
292
    {
293 14 View Code Duplication
        if (null !== $workerName || null !== $methodName || (null !== $this->maxPriority && true !== $prioritize)) {
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated across your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
294 4
            throw new UnsupportedException('Unsupported');
295
        }
296 12
    }
297
298 2
    public function deleteJob(Job $job)
299
    {
300 2
        $jobId = $job->getId();
301 2
        $priorityQueue = $this->getPriorityQueueCacheKey();
302 2
        $whenQueue = $this->getWhenQueueCacheKey();
303
304 2
        $deleted = false;
305 2
        if ($this->redis->zRem($priorityQueue, $jobId)) {
306
            $deleted = true;
307 2
        } elseif ($this->redis->zRem($whenQueue, $jobId)) {
308 2
            $deleted = true;
309
        }
310
311 2
        if ($deleted) {
312 2
            $this->redis->del([$this->getJobCacheKey($jobId)]);
313 2
            $this->redis->lRem($this->getJobCrcHashKey($job->getCrcHash()), 1, $jobId);
314
        }
315 2
    }
316
317
    /**
318
     * @param string $workerName
319
     */
320 14
    public function getJob($workerName = null, $methodName = null, $prioritize = true, $runId = null)
321
    {
322
        // First thing migrate any jobs from When queue to Prioirty queue
323
324 14
        $this->verifyGetJobArgs($workerName, $methodName, $prioritize);
325 12
        if (null !== $this->maxPriority) {
326 12
            $this->transferQueues();
327 12
            $queue = $this->getPriorityQueueCacheKey();
328 12
            $jobId = $this->redis->zPop($queue);
329
        } else {
330
            $queue = $this->getWhenQueueCacheKey();
331
            $microtime = Util::getMicrotimeDecimal();
332
            $jobId = $this->redis->zPopByMaxScore($queue, $microtime);
333
        }
334
335 12
        if ($jobId) {
336 10
            $jobMessage = $this->redis->get($this->getJobCacheKey($jobId));
337 10
            $job = new \Dtc\QueueBundle\Redis\Job();
338 10
            $job->fromMessage($jobMessage);
339 10
            $crcCacheKey = $this->getJobCrcHashKey($job->getCrcHash());
340 10
            $this->redis->lRem($crcCacheKey, 1, $job->getId());
341 10
            $this->redis->del([$this->getJobCacheKey($job->getId())]);
342
343 10
            return $job;
344
        }
345
346 8
        return null;
347
    }
348
349
    protected function getCurTime()
350
    {
351
        $time = intval(microtime(true) * 1000000);
352
353
        return $time;
354
    }
355
356 4 View Code Duplication
    public function resetJob(RetryableJob $job)
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
357
    {
358 4
        if (!$job instanceof \Dtc\QueueBundle\Redis\Job) {
359
            throw new \InvalidArgumentException('$job must be instance of '.\Dtc\QueueBundle\Redis\Job::class);
360
        }
361 4
        $job->setStatus(BaseJob::STATUS_NEW);
362 4
        $job->setMessage(null);
363 4
        $job->setStartedAt(null);
364 4
        $job->setRetries($job->getRetries() + 1);
365 4
        $job->setUpdatedAt(new \DateTime());
366 4
        $this->saveJob($job);
367
368 4
        return true;
369
    }
370
371 6
    public function retryableSaveHistory(RetryableJob $job, $retry)
372
    {
373 6
    }
374
}
375