1
|
|
|
<?php |
2
|
|
|
|
3
|
|
|
namespace Dtc\QueueBundle\Redis; |
4
|
|
|
|
5
|
|
|
use Dtc\QueueBundle\Exception\ClassNotSubclassException; |
6
|
|
|
use Dtc\QueueBundle\Exception\PriorityException; |
7
|
|
|
use Dtc\QueueBundle\Exception\UnsupportedException; |
8
|
|
|
use Dtc\QueueBundle\Manager\SaveableTrait; |
9
|
|
|
use Dtc\QueueBundle\Model\RetryableJob; |
10
|
|
|
use Dtc\QueueBundle\Util\Util; |
11
|
|
|
|
12
|
|
|
/** |
13
|
|
|
* For future implementation. |
14
|
|
|
*/ |
15
|
|
|
class JobManager extends BaseJobManager |
16
|
|
|
{ |
17
|
|
|
use StatusTrait; |
18
|
|
|
use SaveableTrait; |
19
|
|
|
|
20
|
|
|
/** |
21
|
|
|
* There's a bit of danger here if there are more jobs being inserted than can be efficiently drained |
22
|
|
|
* What could happen is that this infinitely loops... |
23
|
|
|
*/ |
24
|
9 |
|
protected function transferQueues() |
25
|
|
|
{ |
26
|
|
|
// Drains from WhenAt queue into Prioirty Queue |
27
|
9 |
|
$whenQueue = $this->getWhenQueueCacheKey(); |
28
|
9 |
|
$priorityQueue = $this->getPriorityQueueCacheKey(); |
29
|
9 |
|
$microtime = Util::getMicrotimeDecimal(); |
30
|
9 |
|
while ($jobId = $this->redis->zPopByMaxScore($whenQueue, $microtime)) { |
31
|
8 |
|
$jobMessage = $this->redis->get($this->getJobCacheKey($jobId)); |
32
|
8 |
|
if (is_string($jobMessage)) { |
33
|
8 |
|
$job = new Job(); |
34
|
8 |
|
$job->fromMessage($jobMessage); |
35
|
8 |
|
$this->redis->zAdd($priorityQueue, $job->getPriority(), $job->getId()); |
36
|
|
|
} |
37
|
|
|
} |
38
|
9 |
|
} |
39
|
|
|
|
40
|
|
|
/** |
41
|
|
|
* @param Job $job |
42
|
|
|
* |
43
|
|
|
* @return Job|null |
44
|
|
|
*/ |
45
|
3 |
|
protected function batchSave(Job $job) |
46
|
|
|
{ |
47
|
3 |
|
$crcHash = $job->getCrcHash(); |
48
|
3 |
|
$crcCacheKey = $this->getJobCrcHashKey($crcHash); |
49
|
3 |
|
$result = $this->redis->lrange($crcCacheKey, 0, 1000); |
50
|
3 |
|
if (!is_array($result)) { |
51
|
|
|
return null; |
52
|
|
|
} |
53
|
|
|
|
54
|
3 |
|
foreach ($result as $jobId) { |
55
|
3 |
|
$jobCacheKey1 = $this->getJobCacheKey($jobId); |
56
|
3 |
|
if (!($foundJobMessage = $this->redis->get($jobCacheKey1))) { |
57
|
|
|
$this->redis->lRem($crcCacheKey, 1, $jobCacheKey1); |
58
|
|
|
continue; |
59
|
|
|
} |
60
|
|
|
|
61
|
|
|
/// There is one? |
62
|
3 |
|
if ($foundJobMessage) { |
63
|
3 |
|
$foundJob = $this->batchFoundJob($job, $jobCacheKey1, $foundJobMessage); |
64
|
3 |
|
if ($foundJob) { |
65
|
3 |
|
return $foundJob; |
66
|
|
|
} |
67
|
|
|
} |
68
|
|
|
} |
69
|
|
|
|
70
|
|
|
return null; |
71
|
|
|
} |
72
|
|
|
|
73
|
|
|
/** |
74
|
|
|
* @param string $foundJobCacheKey |
75
|
|
|
* @param string $foundJobMessage |
76
|
|
|
*/ |
77
|
3 |
|
protected function batchFoundJob(Job $job, $foundJobCacheKey, $foundJobMessage) |
78
|
|
|
{ |
79
|
3 |
|
$when = $job->getWhenUs(); |
80
|
3 |
|
$crcHash = $job->getCrcHash(); |
81
|
3 |
|
$crcCacheKey = $this->getJobCrcHashKey($crcHash); |
82
|
|
|
|
83
|
3 |
|
$foundJob = new Job(); |
84
|
3 |
|
$foundJob->fromMessage($foundJobMessage); |
85
|
3 |
|
$foundWhen = $foundJob->getWhenUs(); |
86
|
|
|
|
87
|
|
|
// Fix this using bcmath |
88
|
3 |
|
$curtimeU = Util::getMicrotimeDecimal(); |
89
|
3 |
|
$newFoundWhen = null; |
90
|
3 |
|
if (bccomp($foundWhen, $curtimeU) > 0 && bccomp($foundWhen, $when) >= 1) { |
91
|
3 |
|
$newFoundWhen = $when; |
92
|
|
|
} |
93
|
3 |
|
$foundPriority = $foundJob->getPriority(); |
94
|
3 |
|
$newFoundPriority = null; |
95
|
3 |
|
if ($foundPriority > $job->getPriority()) { |
96
|
1 |
|
$newFoundPriority = $job->getPriority(); |
97
|
|
|
} |
98
|
|
|
|
99
|
3 |
|
return $this->finishBatchFoundJob($foundJob, $foundJobCacheKey, $crcCacheKey, $newFoundWhen, $newFoundPriority); |
100
|
|
|
} |
101
|
|
|
|
102
|
|
|
/** |
103
|
|
|
* @param string $crcCacheKey |
104
|
|
|
* @param int|null $newFoundPriority |
105
|
|
|
*/ |
106
|
3 |
|
protected function finishBatchFoundJob(Job $foundJob, $foundJobCacheKey, $crcCacheKey, $newFoundWhen, $newFoundPriority) |
107
|
|
|
{ |
108
|
|
|
// Now how do we adjust this job's priority or time? |
109
|
3 |
|
$adjust = false; |
110
|
3 |
|
if (isset($newFoundWhen)) { |
111
|
3 |
|
$foundJob->setWhenUs($newFoundWhen); |
112
|
3 |
|
$adjust = true; |
113
|
|
|
} |
114
|
3 |
|
if (isset($newFoundPriority)) { |
115
|
1 |
|
$foundJob->setPriority($newFoundPriority); |
116
|
1 |
|
$adjust = true; |
117
|
|
|
} |
118
|
3 |
|
if (!$adjust) { |
119
|
3 |
|
return $foundJob; |
120
|
|
|
} |
121
|
|
|
|
122
|
3 |
|
return $this->addFoundJob($adjust, $foundJob, $foundJobCacheKey, $crcCacheKey); |
123
|
|
|
} |
124
|
|
|
|
125
|
|
|
/** |
126
|
|
|
* @param bool $adjust |
127
|
|
|
*/ |
128
|
3 |
|
protected function addFoundJob($adjust, Job $foundJob, $foundJobCacheKey, $crcCacheKey) |
129
|
|
|
{ |
130
|
3 |
|
$whenQueue = $this->getWhenQueueCacheKey(); |
131
|
3 |
|
$result = $this->adjustJob($adjust, $whenQueue, $foundJob, $foundJobCacheKey, $crcCacheKey, $foundJob->getWhenUs()); |
132
|
3 |
|
if (null !== $result) { |
133
|
3 |
|
return $result; |
134
|
|
|
} |
135
|
|
|
if (null === $this->maxPriority) { |
136
|
|
|
return false; |
137
|
|
|
} |
138
|
|
|
|
139
|
|
|
$priorityQueue = $this->getPriorityQueueCacheKey(); |
140
|
|
|
$result = $this->adjustJob($adjust, $priorityQueue, $foundJob, $foundJobCacheKey, $crcCacheKey, $foundJob->getPriority()); |
141
|
|
|
|
142
|
|
|
return $result ?: false; |
143
|
|
|
} |
144
|
|
|
|
145
|
|
|
/** |
146
|
|
|
* @param string $queue |
147
|
|
|
* @param bool $adjust |
148
|
|
|
*/ |
149
|
3 |
|
private function adjustJob($adjust, $queue, Job $foundJob, $foundJobCacheKey, $crcCacheKey, $zScore) |
150
|
|
|
{ |
151
|
3 |
|
if ($adjust && $this->redis->zRem($queue, $foundJob->getId()) > 0) { |
152
|
3 |
|
if (!$this->insertJob($foundJob)) { |
153
|
|
|
// Job is expired |
154
|
|
|
$this->redis->lRem($crcCacheKey, 1, $foundJobCacheKey); |
155
|
|
|
|
156
|
|
|
return false; |
157
|
|
|
} |
158
|
3 |
|
$this->redis->zAdd($queue, $zScore, $foundJob->getId()); |
159
|
|
|
|
160
|
3 |
|
return $foundJob; |
161
|
|
|
} |
162
|
|
|
|
163
|
|
|
return null; |
164
|
|
|
} |
165
|
|
|
|
166
|
|
|
/** |
167
|
|
|
* @param \Dtc\QueueBundle\Model\Job $job |
168
|
|
|
* |
169
|
|
|
* @return Job|null |
170
|
|
|
* |
171
|
|
|
* @throws ClassNotSubclassException |
172
|
|
|
* @throws PriorityException |
173
|
|
|
*/ |
174
|
28 |
|
public function prioritySave(\Dtc\QueueBundle\Model\Job $job) |
175
|
|
|
{ |
176
|
28 |
|
if (!$job instanceof Job) { |
177
|
|
|
throw new \InvalidArgumentException('$job must be instance of '.Job::class); |
178
|
|
|
} |
179
|
|
|
|
180
|
28 |
|
$this->validateSaveable($job); |
181
|
28 |
|
$this->setJobId($job); |
182
|
|
|
|
183
|
|
|
// Add to whenAt or priority queue? /// optimizaiton... |
184
|
28 |
|
$whenUs = $job->getWhenUs(); |
185
|
28 |
|
if (!$whenUs) { |
186
|
|
|
$whenUs = Util::getMicrotimeDecimal(); |
187
|
|
|
$job->setWhenUs($whenUs); |
188
|
|
|
} |
189
|
|
|
|
190
|
28 |
|
if (true === $job->getBatch()) { |
191
|
|
|
// is there a CRC Hash already for this job |
192
|
3 |
|
if ($oldJob = $this->batchSave($job)) { |
193
|
3 |
|
return $oldJob; |
194
|
|
|
} |
195
|
|
|
} |
196
|
|
|
|
197
|
28 |
|
return $this->saveJob($job); |
198
|
|
|
} |
199
|
|
|
|
200
|
|
|
/** |
201
|
|
|
* @param Job $job |
202
|
|
|
* |
203
|
|
|
* @return Job|null |
204
|
|
|
*/ |
205
|
28 |
|
protected function saveJob(Job $job) |
206
|
|
|
{ |
207
|
28 |
|
$whenQueue = $this->getWhenQueueCacheKey(); |
208
|
28 |
|
$crcCacheKey = $this->getJobCrcHashKey($job->getCrcHash()); |
209
|
|
|
// Save Job |
210
|
28 |
|
if (!$this->insertJob($job)) { |
211
|
|
|
// job is expired |
212
|
3 |
|
return null; |
213
|
|
|
} |
214
|
28 |
|
$jobId = $job->getId(); |
215
|
28 |
|
$when = $job->getWhenUs(); |
216
|
|
|
// Add Job to CRC list |
217
|
28 |
|
$this->redis->lPush($crcCacheKey, [$jobId]); |
218
|
|
|
|
219
|
28 |
|
$this->redis->zAdd($whenQueue, $when, $jobId); |
220
|
|
|
|
221
|
28 |
|
return $job; |
222
|
|
|
} |
223
|
|
|
|
224
|
|
|
/** |
225
|
|
|
* @param Job $job |
226
|
|
|
* |
227
|
|
|
* @return bool false if the job is already expired, true otherwise |
228
|
|
|
*/ |
229
|
28 |
|
protected function insertJob(Job $job) |
230
|
|
|
{ |
231
|
|
|
// Save Job |
232
|
28 |
|
$jobCacheKey = $this->getJobCacheKey($job->getId()); |
233
|
28 |
|
if ($expiresAt = $job->getExpiresAt()) { |
234
|
3 |
|
$expiresAtTime = $expiresAt->getTimestamp() - time(); |
235
|
3 |
|
if ($expiresAtTime <= 0) { |
236
|
3 |
|
return false; /// ??? job is already expired |
237
|
|
|
} |
238
|
|
|
$this->redis->setEx($jobCacheKey, $expiresAtTime, $job->toMessage()); |
239
|
|
|
|
240
|
|
|
return true; |
241
|
|
|
} |
242
|
28 |
|
$this->redis->set($jobCacheKey, $job->toMessage()); |
243
|
|
|
|
244
|
28 |
|
return true; |
245
|
|
|
} |
246
|
|
|
|
247
|
|
|
/** |
248
|
|
|
* Returns the prioirty in DESCENDING order, except if maxPrioirty is null, then prioirty is 0. |
249
|
|
|
* |
250
|
|
|
* @param int|null $priority |
251
|
|
|
* |
252
|
|
|
* @return int |
253
|
|
|
*/ |
254
|
28 |
|
protected function calculatePriority($priority) |
255
|
|
|
{ |
256
|
28 |
|
$priority = parent::calculatePriority($priority); |
257
|
28 |
|
if (null === $priority) { |
258
|
27 |
|
return null === $this->maxPriority ? null : $this->maxPriority; |
259
|
|
|
} |
260
|
|
|
|
261
|
4 |
|
if (null === $this->maxPriority) { |
262
|
|
|
return null; |
263
|
|
|
} |
264
|
|
|
|
265
|
|
|
// Redis priority should be in DESC order |
266
|
4 |
|
return $this->maxPriority - $priority; |
267
|
|
|
} |
268
|
|
|
|
269
|
|
|
/** |
270
|
|
|
* @param string|null $workerName |
271
|
|
|
* @param string|null $methodName |
272
|
|
|
* @param bool $prioritize |
273
|
|
|
* @param mixed $runId |
274
|
|
|
* |
275
|
|
|
* @throws UnsupportedException |
276
|
|
|
* |
277
|
|
|
* @return Job|null |
278
|
|
|
*/ |
279
|
26 |
|
public function getJob($workerName = null, $methodName = null, $prioritize = true, $runId = null) |
280
|
|
|
{ |
281
|
|
|
// First thing migrate any jobs from When queue to Prioirty queue |
282
|
|
|
|
283
|
26 |
|
$this->verifyGetJobArgs($workerName, $methodName, $prioritize); |
284
|
23 |
|
if (null !== $this->maxPriority) { |
285
|
9 |
|
$this->transferQueues(); |
286
|
9 |
|
$queue = $this->getPriorityQueueCacheKey(); |
287
|
9 |
|
$jobId = $this->redis->zPop($queue); |
288
|
|
|
} else { |
289
|
14 |
|
$queue = $this->getWhenQueueCacheKey(); |
290
|
14 |
|
$microtime = Util::getMicrotimeDecimal(); |
291
|
14 |
|
$jobId = $this->redis->zPopByMaxScore($queue, $microtime); |
292
|
|
|
} |
293
|
|
|
|
294
|
23 |
|
if ($jobId) { |
295
|
20 |
|
return $this->retrieveJob($jobId); |
|
|
|
|
296
|
|
|
} |
297
|
|
|
|
298
|
22 |
|
return null; |
299
|
|
|
} |
300
|
|
|
|
301
|
20 |
|
protected function retrieveJob($jobId) |
302
|
|
|
{ |
303
|
20 |
|
$job = null; |
304
|
20 |
|
$jobMessage = $this->redis->get($this->getJobCacheKey($jobId)); |
305
|
20 |
|
if (is_string($jobMessage)) { |
|
|
|
|
306
|
20 |
|
$job = new Job(); |
307
|
20 |
|
$job->fromMessage($jobMessage); |
308
|
20 |
|
$crcCacheKey = $this->getJobCrcHashKey($job->getCrcHash()); |
309
|
20 |
|
$this->redis->lRem($crcCacheKey, 1, $job->getId()); |
310
|
20 |
|
$this->redis->del([$this->getJobCacheKey($job->getId())]); |
311
|
|
|
} |
312
|
|
|
|
313
|
20 |
|
return $job; |
314
|
|
|
} |
315
|
|
|
|
316
|
3 |
|
public function getWaitingJobCount($workerName = null, $methodName = null) |
317
|
|
|
{ |
318
|
3 |
|
$microtime = Util::getMicrotimeDecimal(); |
319
|
3 |
|
$count = $this->redis->zCount($this->getWhenQueueCacheKey(), 0, $microtime); |
320
|
|
|
|
321
|
3 |
|
if (null !== $this->maxPriority) { |
322
|
1 |
|
$count += $this->redis->zCount($this->getPriorityQueueCacheKey(), '-inf', '+inf'); |
323
|
|
|
} |
324
|
|
|
|
325
|
3 |
|
return $count; |
326
|
|
|
} |
327
|
|
|
|
328
|
3 |
|
public function getStatus() |
329
|
|
|
{ |
330
|
3 |
|
$whenQueueCacheKey = $this->getWhenQueueCacheKey(); |
331
|
3 |
|
$priorityQueueCacheKey = $this->getPriorityQueueCacheKey(); |
332
|
3 |
|
$results = []; |
333
|
3 |
|
$this->collateStatusResults($results, $whenQueueCacheKey); |
334
|
3 |
|
if (null !== $this->maxPriority) { |
335
|
1 |
|
$this->collateStatusResults($results, $priorityQueueCacheKey); |
336
|
|
|
} |
337
|
|
|
|
338
|
3 |
|
$cacheKey = $this->getStatusCacheKey(); |
339
|
3 |
|
$cursor = null; |
340
|
3 |
|
while ($hResults = $this->redis->hScan($cacheKey, $cursor, '', 100)) { |
341
|
2 |
|
$this->extractStatusHashResults($hResults, $results); |
342
|
2 |
|
if (0 === $cursor) { |
343
|
2 |
|
break; |
344
|
|
|
} |
345
|
|
|
} |
346
|
|
|
|
347
|
3 |
|
return $results; |
348
|
|
|
} |
349
|
|
|
|
350
|
12 |
|
public function retryableSaveHistory(RetryableJob $job, $retry) |
351
|
|
|
{ |
352
|
12 |
|
$cacheKey = $this->getStatusCacheKey(); |
353
|
12 |
|
$hashKey = $job->getWorkerName(); |
354
|
12 |
|
$hashKey .= ','; |
355
|
12 |
|
$hashKey .= $job->getMethod(); |
356
|
12 |
|
$hashKey .= ','; |
357
|
12 |
|
$hashKey .= $job->getStatus(); |
358
|
12 |
|
$this->redis->hIncrBy($cacheKey, $hashKey, 1); |
359
|
12 |
|
} |
360
|
|
|
} |
361
|
|
|
|
This check looks for function or method calls that always return null and whose return value is used.
The method
getObject()
can return nothing but null, so it makes no sense to use the return value.The reason is most likely that a function or method is imcomplete or has been reduced for debug purposes.