Completed
Pull Request — master (#30)
by Matthew
19:54
created

JobManager::prioritySave()   B

Complexity

Conditions 5
Paths 7

Size

Total Lines 25
Code Lines 13

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
dl 0
loc 25
rs 8.439
c 0
b 0
f 0
cc 5
eloc 13
nc 7
nop 1
1
<?php
2
3
namespace Dtc\QueueBundle\Redis;
4
5
use Dtc\QueueBundle\Exception\ClassNotSubclassException;
6
use Dtc\QueueBundle\Exception\PriorityException;
7
use Dtc\QueueBundle\Exception\UnsupportedException;
8
use Dtc\QueueBundle\Manager\JobTimingManager;
9
use Dtc\QueueBundle\Manager\PriorityJobManager;
10
use Dtc\QueueBundle\Manager\RunManager;
11
use Dtc\QueueBundle\Model\BaseJob;
12
use Dtc\QueueBundle\Model\RetryableJob;
13
use Dtc\QueueBundle\Util\Util;
14
15
/**
16
 * For future implementation.
17
 */
18
class JobManager extends PriorityJobManager
19
{
20
    /** @var RedisInterface */
21
    protected $redis;
22
23
    /** @var string */
24
    protected $cacheKeyPrefix;
25
26
    protected $hostname;
27
    protected $pid;
28
29
    public function __construct(RunManager $runManager, JobTimingManager $jobTimingManager, $jobClass, $cacheKeyPrefix)
30
    {
31
        $this->cacheKeyPrefix = $cacheKeyPrefix;
32
        $this->hostname = gethostname() ?: '';
33
        $this->pid = getmypid();
34
35
        parent::__construct($runManager, $jobTimingManager, $jobClass);
36
    }
37
38
    public function setRedis(RedisInterface $redis)
39
    {
40
        $this->redis = $redis;
41
    }
42
43
    protected function getJobCacheKey($jobId)
44
    {
45
        return $this->cacheKeyPrefix.'_job_'.$jobId;
46
    }
47
48
    protected function getJobCrcHashKey($jobCrc)
49
    {
50
        return $this->cacheKeyPrefix.'_job_crc_'.$jobCrc;
51
    }
52
53
    protected function getPriorityQueueCacheKey()
54
    {
55
        return $this->cacheKeyPrefix.'_priority';
56
    }
57
58
    protected function getWhenQueueCacheKey()
59
    {
60
        return $this->cacheKeyPrefix.'_when';
61
    }
62
63
    protected function transferQueues()
64
    {
65
        // Drains from WhenAt queue into Prioirty Queue
66
        $whenQueue = $this->getWhenQueueCacheKey();
67
        $priorityQueue = $this->getPriorityQueueCacheKey();
68
        $microtime = Util::getMicrotimeDecimal();
69
        while ($jobId = $this->redis->zPopByMaxScore($whenQueue, $microtime)) {
70
            $jobMessage = $this->redis->get($this->getJobCacheKey($jobId));
71
            if ($jobMessage) {
72
                $job = new \Dtc\QueueBundle\Redis\Job();
73
                $job->fromMessage($jobMessage);
74
                $this->redis->zAdd($priorityQueue, $job->getPriority(), $job->getId());
75
            }
76
        }
77
    }
78
79
    protected function batchSave(\Dtc\QueueBundle\Redis\Job $job)
80
    {
81
        $crcHash = $job->getCrcHash();
82
        $crcCacheKey = $this->getJobCrcHashKey($crcHash);
83
        $result = $this->redis->lrange($crcCacheKey, 0, 1000);
84
        if (is_array($result)) {
85
            foreach ($result as $jobId) {
86
                $jobCacheKey1 = $this->getJobCacheKey($jobId);
87
                if (!($foundJobMessage = $this->redis->get($jobCacheKey1))) {
88
                    $this->redis->lRem($crcCacheKey, 1, $jobCacheKey1);
89
                    continue;
90
                }
91
92
                /// There is one?
93
                if ($foundJobMessage) {
94
                    $foundJob = $this->batchFoundJob($job, $jobCacheKey1, $foundJobMessage);
95
                    if ($foundJob) {
96
                        return $foundJob;
97
                    }
98
                }
99
            }
100
        }
101
102
        return null;
103
    }
104
105
    protected function batchFoundJob(\Dtc\QueueBundle\Redis\Job $job, $foundJobCacheKey, $foundJobMessage)
106
    {
107
        $when = $job->getWhenUs();
108
        $crcHash = $job->getCrcHash();
109
        $crcCacheKey = $this->getJobCrcHashKey($crcHash);
110
111
        $foundJob = new \Dtc\QueueBundle\Redis\Job();
112
        $foundJob->fromMessage($foundJobMessage);
113
        $foundWhen = $foundJob->getWhenUs();
114
115
        // Fix this using bcmath
116
        $curtimeU = Util::getMicrotimeDecimal();
117
118
        if (bccomp($foundWhen, $curtimeU) > 0 && bccomp($foundWhen, $when) > 1) {
119
            $newFoundWhen = $when;
120
        }
121
        $foundPriority = $foundJob->getPriority();
122
        if ($foundPriority < $job->getPriority()) {
123
            $newFoundPriority = $job->getPriority();
124
        }
125
126
        // Now how do we adjust this job's priority or time?
127
        $adjust = false;
128
        if (isset($newFoundWhen)) {
129
            $foundJob->setWhenUs($newFoundWhen);
130
            $adjust = true;
131
        }
132
        if (isset($newFoundPriority)) {
133
            $foundJob->setPriority($newFoundPriority);
134
            $adjust = true;
135
        }
136
        if (!$adjust) {
137
            return $foundJob;
138
        }
139
140
        $whenQueue = $this->getWhenQueueCacheKey();
141 View Code Duplication
        if ($adjust && $this->redis->zRem($whenQueue, $foundJob->getId()) > 0) {
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated across your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
142
            if (!$this->insertJob($foundJob)) {
143
                // Job is expired
144
                $this->redis->lRem($crcCacheKey, 1, $foundJobCacheKey);
145
146
                return false;
147
            }
148
            $this->redis->zAdd($whenQueue, $foundJob->getWhenUs(), $foundJob->toMessage());
149
150
            return $foundJob;
151
        }
152
153
        if (null === $this->maxPriority) {
154
            return false;
155
        }
156
157
        $priorityQueue = $this->getPriorityQueueCacheKey();
158 View Code Duplication
        if ($adjust && $this->redis->zRem($priorityQueue, $foundJob->getId()) > 0) {
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated across your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
159
            if (!$this->insertJob($foundJob)) {
160
                // Job is expired
161
                $this->redis->lRem($crcCacheKey, 1, $foundJobCacheKey);
162
163
                return false;
164
            }
165
            $this->redis->zAdd($priorityQueue, $foundJob->getPriority(), $foundJob->toMessage());
166
167
            return $foundJob;
168
        }
169
170
        return false;
171
    }
172
173
    /**
174
     * @param \Dtc\QueueBundle\Model\Job $job
175
     *
176
     * @return \Dtc\QueueBundle\Model\Job
177
     *
178
     * @throws ClassNotSubclassException
179
     */
180
    public function prioritySave(\Dtc\QueueBundle\Model\Job $job)
181
    {
182
        if (!$job instanceof \Dtc\QueueBundle\Redis\Job) {
183
            throw new \InvalidArgumentException('$job must be instance of '.\Dtc\QueueBundle\Redis\Job::class);
184
        }
185
186
        $this->validateSaveable($job);
187
        $this->setJobId($job);
188
189
        // Add to whenAt or priority queue?  /// optimizaiton...
190
        $whenUs = $job->getWhenUs();
191
        if (!$whenUs) {
192
            $whenUs = Util::getMicrotimeDecimal();
193
            $job->setWhenUs($whenUs);
194
        }
195
196
        if (true === $job->getBatch()) {
197
            // is there a CRC Hash already for this job
198
            if ($oldJob = $this->batchSave($job)) {
0 ignored issues
show
Bug introduced by
Are you sure the assignment to $oldJob is correct as $this->batchSave($job) (which targets Dtc\QueueBundle\Redis\JobManager::batchSave()) seems to always return null.

This check looks for function or method calls that always return null and whose return value is assigned to a variable.

class A
{
    function getObject()
    {
        return null;
    }

}

$a = new A();
$object = $a->getObject();

The method getObject() can return nothing but null, so it makes no sense to assign that value to a variable.

The reason is most likely that a function or method is imcomplete or has been reduced for debug purposes.

Loading history...
199
                return $oldJob;
200
            }
201
        }
202
203
        return $this->saveJob($job);
204
    }
205
206
    protected function saveJob(\Dtc\QueueBundle\Redis\Job $job)
207
    {
208
        $whenQueue = $this->getWhenQueueCacheKey();
209
        $crcCacheKey = $this->getJobCrcHashKey($job->getCrcHash());
210
        // Save Job
211
        if (!$this->insertJob($job)) {
212
            // job is expired
213
            return null;
214
        }
215
        $jobId = $job->getId();
216
        $when = $job->getWhenUs();
217
        // Add Job to CRC list
218
        $this->redis->lPush($crcCacheKey, [$jobId]);
219
220
        $this->redis->zAdd($whenQueue, $when, $jobId);
221
222
        return $job;
223
    }
224
225
    protected function insertJob(\Dtc\QueueBundle\Redis\Job $job)
226
    {
227
        // Save Job
228
        $jobCacheKey = $this->getJobCacheKey($job->getId());
229
        if ($expiresAt = $job->getExpiresAt()) {
230
            $expiresAtTime = $expiresAt->getTimestamp() - time();
231
            if ($expiresAtTime <= 0) {
232
                return false; /// ??? job is already expired
233
            }
234
            $this->redis->setEx($jobCacheKey, $expiresAtTime, $job->toMessage());
235
236
            return true;
237
        }
238
        $this->redis->set($jobCacheKey, $job->toMessage());
239
240
        return true;
241
    }
242
243
    /**
244
     * Attach a unique id to a job since RabbitMQ will not.
245
     *
246
     * @param \Dtc\QueueBundle\Model\Job $job
247
     */
248
    protected function setJobId(\Dtc\QueueBundle\Model\Job $job)
249
    {
250 View Code Duplication
        if (!$job->getId()) {
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated across your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
251
            $job->setId(uniqid($this->hostname.'-'.$this->pid, true));
252
        }
253
    }
254
255
    /**
256
     * Returns the prioirty in DESCENDING order, except if maxPrioirty is null, then prioirty is 0.
257
     */
258
    protected function calculatePriority($priority)
259
    {
260
        $priority = parent::calculatePriority($priority);
261
        if (null === $priority) {
262
            return null === $this->maxPriority ? 0 : $this->maxPriority;
263
        }
264
265
        if (null === $this->maxPriority) {
266
            return 0;
267
        }
268
269
        // Redis priority should be in DESC order
270
        return $this->maxPriority - $priority;
271
    }
272
273
    /**
274
     * @param \Dtc\QueueBundle\Model\Job $job
275
     *
276
     * @throws PriorityException
277
     * @throws ClassNotSubclassException
278
     */
279 View Code Duplication
    protected function validateSaveable(\Dtc\QueueBundle\Model\Job $job)
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
280
    {
281
        if (null !== $job->getPriority() && null === $this->maxPriority) {
282
            throw new PriorityException('This queue does not support priorities');
283
        }
284
285
        if (!$job instanceof RetryableJob) {
286
            throw new ClassNotSubclassException('Job needs to be instance of '.RetryableJob::class);
287
        }
288
    }
289
290
    protected function verifyGetJobArgs($workerName = null, $methodName = null, $prioritize = true)
291
    {
292 View Code Duplication
        if (null !== $workerName || null !== $methodName || (null !== $this->maxPriority && true !== $prioritize)) {
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated across your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
293
            throw new UnsupportedException('Unsupported');
294
        }
295
    }
296
297
    public function deleteJob(\Dtc\QueueBundle\Model\Job $job)
298
    {
299
        $jobId = $job->getId();
300
        $priorityQueue = $this->getPriorityQueueCacheKey();
301
        $whenQueue = $this->getWhenQueueCacheKey();
302
303
        $deleted = false;
304
        if ($this->redis->zRem($priorityQueue, $jobId)) {
305
            $deleted = true;
306
        } elseif ($this->redis->zRem($whenQueue, $jobId)) {
307
            $deleted = true;
308
        }
309
310
        if ($deleted) {
311
            $this->redis->del([$this->getJobCacheKey($jobId)]);
312
            $this->redis->lRem($this->getJobCrcHashKey($job->getCrcHash()), 1, $jobId);
313
        }
314
    }
315
316
    /**
317
     * @param string $workerName
318
     */
319
    public function getJob($workerName = null, $methodName = null, $prioritize = true, $runId = null)
320
    {
321
        // First thing migrate any jobs from When queue to Prioirty queue
322
323
        $this->verifyGetJobArgs($workerName, $methodName, $prioritize);
324
        if (null !== $this->maxPriority) {
325
            $this->transferQueues();
326
            $queue = $this->getPriorityQueueCacheKey();
327
            $jobId = $this->redis->zPop($queue);
328
        } else {
329
            $queue = $this->getWhenQueueCacheKey();
330
            $microtime = Util::getMicrotimeDecimal();
331
            $jobId = $this->redis->zPopByMaxScore($queue, $microtime);
332
        }
333
334
        if ($jobId) {
335
            $jobMessage = $this->redis->get($this->getJobCacheKey($jobId));
336
            $job = new \Dtc\QueueBundle\Redis\Job();
337
            $job->fromMessage($jobMessage);
338
            $crcCacheKey = $this->getJobCrcHashKey($job->getCrcHash());
339
            $this->redis->lRem($crcCacheKey, 1, $job->getId());
340
            $this->redis->del([$this->getJobCacheKey($job->getId())]);
341
342
            return $job;
343
        }
344
345
        return null;
346
    }
347
348
    protected function getCurTime()
349
    {
350
        $time = intval(microtime(true) * 1000000);
351
352
        return $time;
353
    }
354
355 View Code Duplication
    public function resetJob(RetryableJob $job)
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
356
    {
357
        if (!$job instanceof \Dtc\QueueBundle\Redis\Job) {
358
            throw new \InvalidArgumentException('$job must be instance of '.\Dtc\QueueBundle\Redis\Job::class);
359
        }
360
        $job->setStatus(BaseJob::STATUS_NEW);
361
        $job->setMessage(null);
362
        $job->setStartedAt(null);
363
        $job->setRetries($job->getRetries() + 1);
364
        $job->setUpdatedAt(Util::getMicrotimeDateTime());
365
        $this->saveJob($job);
366
367
        return true;
368
    }
369
370
    public function retryableSaveHistory(RetryableJob $job, $retry)
371
    {
372
    }
373
}
374