Completed
Pull Request — master (#30)
by Matthew
07:24
created

JobManager::getJobCacheKey()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 4
Code Lines 2

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 2
CRAP Score 1

Importance

Changes 0
Metric Value
dl 0
loc 4
ccs 2
cts 2
cp 1
rs 10
c 0
b 0
f 0
cc 1
eloc 2
nc 1
nop 1
crap 1
1
<?php
2
3
namespace Dtc\QueueBundle\Redis;
4
5
use Dtc\QueueBundle\Exception\ClassNotSubclassException;
6
use Dtc\QueueBundle\Exception\PriorityException;
7
use Dtc\QueueBundle\Exception\UnsupportedException;
8
use Dtc\QueueBundle\Manager\JobTimingManager;
9
use Dtc\QueueBundle\Manager\PriorityJobManager;
10
use Dtc\QueueBundle\Manager\RunManager;
11
use Dtc\QueueBundle\Model\BaseJob;
12
use Dtc\QueueBundle\Model\RetryableJob;
13
use Dtc\QueueBundle\Util\Util;
14
15
/**
16
 * For future implementation.
17
 */
18
class JobManager extends PriorityJobManager
19
{
20
    /** @var RedisInterface */
21
    protected $redis;
22
23
    /** @var string */
24
    protected $cacheKeyPrefix;
25
26
    protected $hostname;
27
    protected $pid;
28
29 2
    public function __construct(RunManager $runManager, JobTimingManager $jobTimingManager, $jobClass, $cacheKeyPrefix)
30
    {
31 2
        $this->cacheKeyPrefix = $cacheKeyPrefix;
32 2
        $this->hostname = gethostname() ?: '';
33 2
        $this->pid = getmypid();
34
35 2
        parent::__construct($runManager, $jobTimingManager, $jobClass);
36 2
    }
37
38
    public function setRedis(RedisInterface $redis)
39
    {
40
        $this->redis = $redis;
41
    }
42
43 16
    protected function getJobCacheKey($jobId)
44
    {
45 16
        return $this->cacheKeyPrefix.'_job_'.$jobId;
46
    }
47
48 16
    protected function getJobCrcHashKey($jobCrc)
49
    {
50 16
        return $this->cacheKeyPrefix.'_job_crc_'.$jobCrc;
51
    }
52
53 12
    protected function getPriorityQueueCacheKey()
54
    {
55 12
        return $this->cacheKeyPrefix.'_priority';
56
    }
57
58 16
    protected function getWhenQueueCacheKey()
59
    {
60 16
        return $this->cacheKeyPrefix.'_when';
61
    }
62
63 12
    protected function transferQueues()
64
    {
65
        // Drains from WhenAt queue into Prioirty Queue
66 12
        $whenQueue = $this->getWhenQueueCacheKey();
67 12
        $priorityQueue = $this->getPriorityQueueCacheKey();
68 12
        $microtime = Util::getMicrotimeDecimal();
69 12
        while ($jobId = $this->redis->zPopByMaxScore($whenQueue, $microtime)) {
70 10
            $jobMessage = $this->redis->get($this->getJobCacheKey($jobId));
71 10
            if ($jobMessage) {
72 10
                $job = new \Dtc\QueueBundle\Redis\Job();
73 10
                $job->fromMessage($jobMessage);
74 10
                $this->redis->zAdd($priorityQueue, $job->getPriority(), $job->getId());
75
            }
76
        }
77 12
    }
78
79
    protected function batchSave(\Dtc\QueueBundle\Redis\Job $job)
80
    {
81
        $crcHash = $job->getCrcHash();
82
        $crcCacheKey = $this->getJobCrcHashKey($crcHash);
83
        $result = $this->redis->lrange($crcCacheKey, 0, 1000);
84
        if (is_array($result)) {
85
            foreach ($result as $jobId) {
86
                $jobCacheKey1 = $this->getJobCacheKey($jobId);
87
                if (!($foundJobMessage = $this->redis->get($jobCacheKey1))) {
88
                    $this->redis->lRem($crcCacheKey, 1, $jobCacheKey1);
89
                    continue;
90
                }
91
92
                /// There is one?
93
                if ($foundJobMessage) {
94
                    $foundJob = $this->batchFoundJob($job, $jobCacheKey1, $foundJobMessage);
95
                    if ($foundJob) {
96
                        return $foundJob;
97
                    }
98
                }
99
            }
100
        }
101
102
        return null;
103
    }
104
105
    protected function batchFoundJob(\Dtc\QueueBundle\Redis\Job $job, $foundJobCacheKey, $foundJobMessage)
106
    {
107
        $when = $job->getWhenUs();
108
        $crcHash = $job->getCrcHash();
109
        $crcCacheKey = $this->getJobCrcHashKey($crcHash);
110
111
        $foundJob = new \Dtc\QueueBundle\Redis\Job();
112
        $foundJob->fromMessage($foundJobMessage);
113
        $foundWhen = $foundJob->getWhenUs();
114
115
        // Fix this using bcmath
116
        $curtimeU = Util::getMicrotimeDecimal();
117
118
        if (bccomp($foundWhen, $curtimeU) > 0 && bccomp($foundWhen, $when) > 1) {
119
            $newFoundWhen = $when;
120
        }
121
        $foundPriority = $foundJob->getPriority();
122
        if ($foundPriority < $job->getPriority()) {
123
            $newFoundPriority = $job->getPriority();
124
        }
125
126
        // Now how do we adjust this job's priority or time?
127
        $adjust = false;
128
        if (isset($newFoundWhen)) {
129
            $foundJob->setWhenUs($newFoundWhen);
130
            $adjust = true;
131
        }
132
        if (isset($newFoundPriority)) {
133
            $foundJob->setPriority($newFoundPriority);
134
            $adjust = true;
135
        }
136
        if (!$adjust) {
137
            return $foundJob;
138
        }
139
140
        $whenQueue = $this->getWhenQueueCacheKey();
141 View Code Duplication
        if ($adjust && $this->redis->zRem($whenQueue, $foundJob->getId()) > 0) {
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated across your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
142
            if (!$this->insertJob($foundJob)) {
143
                // Job is expired
144
                $this->redis->lRem($crcCacheKey, 1, $foundJobCacheKey);
145
146
                return false;
147
            }
148
            $this->redis->zAdd($whenQueue, $foundJob->getWhenUs(), $foundJob->toMessage());
149
150
            return $foundJob;
151
        }
152
153
        if (null === $this->maxPriority) {
154
            return false;
155
        }
156
157
        $priorityQueue = $this->getPriorityQueueCacheKey();
158 View Code Duplication
        if ($adjust && $this->redis->zRem($priorityQueue, $foundJob->getId()) > 0) {
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated across your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
159
            if (!$this->insertJob($foundJob)) {
160
                // Job is expired
161
                $this->redis->lRem($crcCacheKey, 1, $foundJobCacheKey);
162
163
                return false;
164
            }
165
            $this->redis->zAdd($priorityQueue, $foundJob->getPriority(), $foundJob->toMessage());
166
167
            return $foundJob;
168
        }
169
170
        return false;
171
    }
172
173
    /**
174
     * @param \Dtc\QueueBundle\Model\Job $job
175
     *
176
     * @return \Dtc\QueueBundle\Model\Job
177
     *
178
     * @throws ClassNotSubclassException
179
     */
180 16
    public function prioritySave(\Dtc\QueueBundle\Model\Job $job)
181
    {
182 16
        if (!$job instanceof \Dtc\QueueBundle\Redis\Job) {
183
            throw new \InvalidArgumentException('$job must be instance of '.\Dtc\QueueBundle\Redis\Job::class);
184
        }
185
186 16
        $this->validateSaveable($job);
187 16
        $this->setJobId($job);
188
189
        // Add to whenAt or priority queue?  /// optimizaiton...
190 16
        $whenUs = $job->getWhenUs();
191 16
        if (!$whenUs) {
192
            $whenUs = Util::getMicrotimeDecimal();
193
            $job->setWhenUs($whenUs);
194
        }
195
196 16
        if (true === $job->getBatch()) {
197
            // is there a CRC Hash already for this job
198
            if ($oldJob = $this->batchSave($job)) {
0 ignored issues
show
Bug introduced by
Are you sure the assignment to $oldJob is correct as $this->batchSave($job) (which targets Dtc\QueueBundle\Redis\JobManager::batchSave()) seems to always return null.

This check looks for function or method calls that always return null and whose return value is assigned to a variable.

class A
{
    function getObject()
    {
        return null;
    }

}

$a = new A();
$object = $a->getObject();

The method getObject() can return nothing but null, so it makes no sense to assign that value to a variable.

The reason is most likely that a function or method is imcomplete or has been reduced for debug purposes.

Loading history...
199
                return $oldJob;
200
            }
201
        }
202
203 16
        return $this->saveJob($job);
204
    }
205
206 16
    protected function saveJob(\Dtc\QueueBundle\Redis\Job $job)
207
    {
208 16
        $whenQueue = $this->getWhenQueueCacheKey();
209 16
        $crcCacheKey = $this->getJobCrcHashKey($job->getCrcHash());
210
        // Save Job
211 16
        if (!$this->insertJob($job)) {
212
            // job is expired
213 2
            return null;
214
        }
215 16
        $jobId = $job->getId();
216 16
        $when = $job->getWhenUs();
217
        // Add Job to CRC list
218 16
        $this->redis->lPush($crcCacheKey, [$jobId]);
219
220 16
        $this->redis->zAdd($whenQueue, $when, $jobId);
221
222 16
        return $job;
223
    }
224
225 16
    protected function insertJob(\Dtc\QueueBundle\Redis\Job $job)
226
    {
227
        // Save Job
228 16
        $jobCacheKey = $this->getJobCacheKey($job->getId());
229 16
        if ($expiresAt = $job->getExpiresAt()) {
230 2
            $expiresAtTime = $expiresAt->getTimestamp() - time();
231 2
            if ($expiresAtTime <= 0) {
232 2
                return false; /// ??? job is already expired
233
            }
234
            $this->redis->setEx($jobCacheKey, $expiresAtTime, $job->toMessage());
235
236
            return true;
237
        }
238 16
        $this->redis->set($jobCacheKey, $job->toMessage());
239
240 16
        return true;
241
    }
242
243
    /**
244
     * Attach a unique id to a job since RabbitMQ will not.
245
     *
246
     * @param \Dtc\QueueBundle\Model\Job $job
247
     */
248 16
    protected function setJobId(\Dtc\QueueBundle\Model\Job $job)
249
    {
250 16 View Code Duplication
        if (!$job->getId()) {
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated across your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
251 16
            $job->setId(uniqid($this->hostname.'-'.$this->pid, true));
252
        }
253 16
    }
254
255
    /**
256
     * Returns the prioirty in DESCENDING order, except if maxPrioirty is null, then prioirty is 0.
257
     */
258 16
    protected function calculatePriority($priority)
259
    {
260 16
        $priority = parent::calculatePriority($priority);
261 16
        if (null === $priority) {
262 14
            return null === $this->maxPriority ? 0 : $this->maxPriority;
263
        }
264
265 6
        if (null === $this->maxPriority) {
266
            return 0;
267
        }
268
269
        // Redis priority should be in DESC order
270 6
        return $this->maxPriority - $priority;
271
    }
272
273
    /**
274
     * @param \Dtc\QueueBundle\Model\Job $job
275
     *
276
     * @throws PriorityException
277
     * @throws ClassNotSubclassException
278
     */
279 16 View Code Duplication
    protected function validateSaveable(\Dtc\QueueBundle\Model\Job $job)
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
280
    {
281 16
        if (null !== $job->getPriority() && null === $this->maxPriority) {
282
            throw new PriorityException('This queue does not support priorities');
283
        }
284
285 16
        if (!$job instanceof RetryableJob) {
286
            throw new ClassNotSubclassException('Job needs to be instance of '.RetryableJob::class);
287
        }
288 16
    }
289
290 14
    protected function verifyGetJobArgs($workerName = null, $methodName = null, $prioritize = true)
291
    {
292 14 View Code Duplication
        if (null !== $workerName || null !== $methodName || (null !== $this->maxPriority && true !== $prioritize)) {
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated across your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
293 4
            throw new UnsupportedException('Unsupported');
294
        }
295 12
    }
296
297 2
    public function deleteJob(\Dtc\QueueBundle\Model\Job $job)
298
    {
299 2
        $jobId = $job->getId();
300 2
        $priorityQueue = $this->getPriorityQueueCacheKey();
301 2
        $whenQueue = $this->getWhenQueueCacheKey();
302
303 2
        $deleted = false;
304 2
        if ($this->redis->zRem($priorityQueue, $jobId)) {
305
            $deleted = true;
306 2
        } elseif ($this->redis->zRem($whenQueue, $jobId)) {
307 2
            $deleted = true;
308
        }
309
310 2
        if ($deleted) {
311 2
            $this->redis->del([$this->getJobCacheKey($jobId)]);
312 2
            $this->redis->lRem($this->getJobCrcHashKey($job->getCrcHash()), 1, $jobId);
313
        }
314 2
    }
315
316
    /**
317
     * @param string $workerName
318
     */
319 14
    public function getJob($workerName = null, $methodName = null, $prioritize = true, $runId = null)
320
    {
321
        // First thing migrate any jobs from When queue to Prioirty queue
322
323 14
        $this->verifyGetJobArgs($workerName, $methodName, $prioritize);
324 12
        if (null !== $this->maxPriority) {
325 12
            $this->transferQueues();
326 12
            $queue = $this->getPriorityQueueCacheKey();
327 12
            $jobId = $this->redis->zPop($queue);
328
        } else {
329
            $queue = $this->getWhenQueueCacheKey();
330
            $microtime = Util::getMicrotimeDecimal();
331
            $jobId = $this->redis->zPopByMaxScore($queue, $microtime);
332
        }
333
334 12
        if ($jobId) {
335 10
            $jobMessage = $this->redis->get($this->getJobCacheKey($jobId));
336 10
            $job = new \Dtc\QueueBundle\Redis\Job();
337 10
            $job->fromMessage($jobMessage);
338 10
            $crcCacheKey = $this->getJobCrcHashKey($job->getCrcHash());
339 10
            $this->redis->lRem($crcCacheKey, 1, $job->getId());
340 10
            $this->redis->del([$this->getJobCacheKey($job->getId())]);
341
342 10
            return $job;
343
        }
344
345 8
        return null;
346
    }
347
348
    protected function getCurTime()
349
    {
350
        $time = intval(microtime(true) * 1000000);
351
352
        return $time;
353
    }
354
355 4 View Code Duplication
    public function resetJob(RetryableJob $job)
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
356
    {
357 4
        if (!$job instanceof \Dtc\QueueBundle\Redis\Job) {
358
            throw new \InvalidArgumentException('$job must be instance of '.\Dtc\QueueBundle\Redis\Job::class);
359
        }
360 4
        $job->setStatus(BaseJob::STATUS_NEW);
361 4
        $job->setMessage(null);
362 4
        $job->setStartedAt(null);
363 4
        $job->setRetries($job->getRetries() + 1);
364 4
        $job->setUpdatedAt(Util::getMicrotimeDateTime());
365 4
        $this->saveJob($job);
366
367 4
        return true;
368
    }
369
370 6
    public function retryableSaveHistory(RetryableJob $job, $retry)
371
    {
372 6
    }
373
}
374