1 | <?php |
||
2 | |||
3 | namespace Dtc\QueueBundle\Redis; |
||
4 | |||
5 | use Dtc\QueueBundle\Exception\ClassNotSubclassException; |
||
6 | use Dtc\QueueBundle\Exception\PriorityException; |
||
7 | use Dtc\QueueBundle\Exception\UnsupportedException; |
||
8 | use Dtc\QueueBundle\Manager\SaveableTrait; |
||
9 | use Dtc\QueueBundle\Model\RetryableJob; |
||
10 | use Dtc\QueueBundle\Util\Util; |
||
11 | |||
12 | /** |
||
13 | * For future implementation. |
||
14 | */ |
||
15 | class JobManager extends BaseJobManager |
||
16 | { |
||
17 | use SaveableTrait; |
||
18 | use StatusTrait; |
||
19 | |||
20 | /** |
||
21 | * There's a bit of danger here if there are more jobs being inserted than can be efficiently drained |
||
22 | * What could happen is that this infinitely loops... |
||
23 | */ |
||
24 | 9 | protected function transferQueues() |
|
25 | { |
||
26 | // Drains from WhenAt queue into Prioirty Queue |
||
27 | 9 | $whenQueue = $this->getWhenQueueCacheKey(); |
|
28 | 9 | $priorityQueue = $this->getPriorityQueueCacheKey(); |
|
29 | 9 | $microtime = Util::getMicrotimeInteger(); |
|
30 | 9 | while ($jobId = $this->redis->zPopByMaxScore($whenQueue, $microtime)) { |
|
31 | 8 | $jobMessage = $this->redis->get($this->getJobCacheKey($jobId)); |
|
32 | 8 | if (is_string($jobMessage)) { |
|
33 | 8 | $job = new Job(); |
|
34 | 8 | $job->fromMessage($jobMessage); |
|
35 | 8 | $this->redis->zAdd($priorityQueue, $job->getPriority(), $job->getId()); |
|
36 | } |
||
37 | } |
||
38 | 9 | } |
|
39 | |||
40 | /** |
||
41 | * @return Job|null |
||
42 | */ |
||
43 | 3 | protected function batchSave(Job $job) |
|
44 | { |
||
45 | 3 | $crcHash = $job->getCrcHash(); |
|
46 | 3 | $crcCacheKey = $this->getJobCrcHashKey($crcHash); |
|
47 | 3 | $result = $this->redis->lrange($crcCacheKey, 0, 1000); |
|
48 | 3 | if (!is_array($result)) { |
|
49 | return null; |
||
50 | } |
||
51 | |||
52 | 3 | foreach ($result as $jobId) { |
|
53 | 3 | $jobCacheKey1 = $this->getJobCacheKey($jobId); |
|
54 | 3 | if (!($foundJobMessage = $this->redis->get($jobCacheKey1))) { |
|
55 | $this->redis->lRem($crcCacheKey, 1, $jobCacheKey1); |
||
56 | continue; |
||
57 | } |
||
58 | |||
59 | /// There is one? |
||
60 | 3 | if ($foundJobMessage) { |
|
61 | 3 | $foundJob = $this->batchFoundJob($job, $jobCacheKey1, $foundJobMessage); |
|
62 | 3 | if ($foundJob) { |
|
63 | 3 | return $foundJob; |
|
64 | } |
||
65 | } |
||
66 | } |
||
67 | |||
68 | return null; |
||
69 | } |
||
70 | |||
71 | /** |
||
72 | * @param string $foundJobCacheKey |
||
73 | * @param string $foundJobMessage |
||
74 | */ |
||
75 | 3 | protected function batchFoundJob(Job $job, $foundJobCacheKey, $foundJobMessage) |
|
76 | { |
||
77 | 3 | $when = $job->getWhenUs(); |
|
78 | 3 | $crcHash = $job->getCrcHash(); |
|
79 | 3 | $crcCacheKey = $this->getJobCrcHashKey($crcHash); |
|
80 | |||
81 | 3 | $foundJob = new Job(); |
|
82 | 3 | $foundJob->fromMessage($foundJobMessage); |
|
83 | 3 | $foundWhen = $foundJob->getWhenUs(); |
|
84 | |||
85 | // Fix this using bcmath |
||
86 | 3 | $curtimeU = Util::getMicrotimeInteger(); |
|
87 | 3 | $newFoundWhen = null; |
|
88 | 3 | if (bccomp($foundWhen, $curtimeU) > 0 && bccomp($foundWhen, $when) >= 1) { |
|
89 | 3 | $newFoundWhen = $when; |
|
90 | } |
||
91 | 3 | $foundPriority = $foundJob->getPriority(); |
|
92 | 3 | $newFoundPriority = null; |
|
93 | 3 | if ($foundPriority > $job->getPriority()) { |
|
94 | 1 | $newFoundPriority = $job->getPriority(); |
|
95 | } |
||
96 | |||
97 | 3 | return $this->finishBatchFoundJob($foundJob, $foundJobCacheKey, $crcCacheKey, $newFoundWhen, $newFoundPriority); |
|
98 | } |
||
99 | |||
100 | /** |
||
101 | * @param string $crcCacheKey |
||
102 | * @param int|null $newFoundPriority |
||
103 | */ |
||
104 | 3 | protected function finishBatchFoundJob(Job $foundJob, $foundJobCacheKey, $crcCacheKey, $newFoundWhen, $newFoundPriority) |
|
105 | { |
||
106 | // Now how do we adjust this job's priority or time? |
||
107 | 3 | $adjust = false; |
|
108 | 3 | if (isset($newFoundWhen)) { |
|
109 | 3 | $foundJob->setWhenUs($newFoundWhen); |
|
110 | 3 | $adjust = true; |
|
111 | } |
||
112 | 3 | if (isset($newFoundPriority)) { |
|
113 | 1 | $foundJob->setPriority($newFoundPriority); |
|
114 | 1 | $adjust = true; |
|
115 | } |
||
116 | 3 | if (!$adjust) { |
|
117 | 3 | return $foundJob; |
|
118 | } |
||
119 | |||
120 | 3 | return $this->addFoundJob($adjust, $foundJob, $foundJobCacheKey, $crcCacheKey); |
|
121 | } |
||
122 | |||
123 | /** |
||
124 | * @param bool $adjust |
||
125 | */ |
||
126 | 3 | protected function addFoundJob($adjust, Job $foundJob, $foundJobCacheKey, $crcCacheKey) |
|
127 | { |
||
128 | 3 | $whenQueue = $this->getWhenQueueCacheKey(); |
|
129 | 3 | $result = $this->adjustJob($adjust, $whenQueue, $foundJob, $foundJobCacheKey, $crcCacheKey, $foundJob->getWhenUs()); |
|
130 | 3 | if (null !== $result) { |
|
131 | 3 | return $result; |
|
132 | } |
||
133 | if (null === $this->maxPriority) { |
||
134 | return false; |
||
135 | } |
||
136 | |||
137 | $priorityQueue = $this->getPriorityQueueCacheKey(); |
||
138 | $result = $this->adjustJob($adjust, $priorityQueue, $foundJob, $foundJobCacheKey, $crcCacheKey, $foundJob->getPriority()); |
||
139 | |||
140 | return $result ?: false; |
||
141 | } |
||
142 | |||
143 | /** |
||
144 | * @param string $queue |
||
145 | * @param bool $adjust |
||
146 | */ |
||
147 | 3 | private function adjustJob($adjust, $queue, Job $foundJob, $foundJobCacheKey, $crcCacheKey, $zScore) |
|
148 | { |
||
149 | 3 | if ($adjust && $this->redis->zRem($queue, $foundJob->getId()) > 0) { |
|
150 | 3 | if (!$this->insertJob($foundJob)) { |
|
151 | // Job is expired |
||
152 | $this->redis->lRem($crcCacheKey, 1, $foundJobCacheKey); |
||
153 | |||
154 | return false; |
||
155 | } |
||
156 | 3 | $this->redis->zAdd($queue, $zScore, $foundJob->getId()); |
|
157 | |||
158 | 3 | return $foundJob; |
|
159 | } |
||
160 | |||
161 | return null; |
||
162 | } |
||
163 | |||
164 | /** |
||
165 | * @return Job|null |
||
166 | * |
||
167 | * @throws ClassNotSubclassException |
||
168 | * @throws PriorityException |
||
169 | */ |
||
170 | 28 | public function prioritySave(\Dtc\QueueBundle\Model\Job $job) |
|
171 | { |
||
172 | 28 | if (!$job instanceof Job) { |
|
173 | throw new \InvalidArgumentException('$job must be instance of '.Job::class); |
||
174 | } |
||
175 | |||
176 | 28 | $this->validateSaveable($job); |
|
177 | 28 | $this->setJobId($job); |
|
178 | |||
179 | // Add to whenAt or priority queue? /// optimizaiton... |
||
180 | 28 | $whenUs = $job->getWhenUs(); |
|
181 | 28 | if (!$whenUs) { |
|
182 | $whenUs = Util::getMicrotimeInteger(); |
||
183 | $job->setWhenUs($whenUs); |
||
184 | } |
||
185 | |||
186 | 28 | if (true === $job->getBatch()) { |
|
187 | // is there a CRC Hash already for this job |
||
188 | 3 | if ($oldJob = $this->batchSave($job)) { |
|
189 | 3 | return $oldJob; |
|
190 | } |
||
191 | } |
||
192 | |||
193 | 28 | return $this->saveJob($job); |
|
194 | } |
||
195 | |||
196 | /** |
||
197 | * @return Job|null |
||
198 | */ |
||
199 | 28 | protected function saveJob(Job $job) |
|
200 | { |
||
201 | 28 | $whenQueue = $this->getWhenQueueCacheKey(); |
|
202 | 28 | $crcCacheKey = $this->getJobCrcHashKey($job->getCrcHash()); |
|
203 | // Save Job |
||
204 | 28 | if (!$this->insertJob($job)) { |
|
205 | // job is expired |
||
206 | 3 | return null; |
|
207 | } |
||
208 | 28 | $jobId = $job->getId(); |
|
209 | 28 | $when = $job->getWhenUs(); |
|
210 | // Add Job to CRC list |
||
211 | 28 | $this->redis->lPush($crcCacheKey, [$jobId]); |
|
212 | |||
213 | 28 | $this->redis->zAdd($whenQueue, $when, $jobId); |
|
214 | |||
215 | 28 | return $job; |
|
216 | } |
||
217 | |||
218 | /** |
||
219 | * @return bool false if the job is already expired, true otherwise |
||
220 | */ |
||
221 | 28 | protected function insertJob(Job $job) |
|
222 | { |
||
223 | // Save Job |
||
224 | 28 | $jobCacheKey = $this->getJobCacheKey($job->getId()); |
|
225 | 28 | if ($expiresAt = $job->getExpiresAt()) { |
|
226 | 3 | $expiresAtTime = $expiresAt->getTimestamp() - time(); |
|
227 | 3 | if ($expiresAtTime <= 0) { |
|
228 | 3 | return false; /// ??? job is already expired |
|
229 | } |
||
230 | $this->redis->setEx($jobCacheKey, $expiresAtTime, $job->toMessage()); |
||
231 | |||
232 | return true; |
||
233 | } |
||
234 | 28 | $this->redis->set($jobCacheKey, $job->toMessage()); |
|
235 | |||
236 | 28 | return true; |
|
237 | } |
||
238 | |||
239 | /** |
||
240 | * Returns the prioirty in DESCENDING order, except if maxPrioirty is null, then prioirty is 0. |
||
241 | * |
||
242 | * @param int|null $priority |
||
243 | * |
||
244 | * @return int |
||
245 | */ |
||
246 | 28 | protected function calculatePriority($priority) |
|
247 | { |
||
248 | 28 | $priority = parent::calculatePriority($priority); |
|
249 | 28 | if (null === $priority) { |
|
250 | 27 | return null === $this->maxPriority ? null : $this->maxPriority; |
|
251 | } |
||
252 | |||
253 | 4 | if (null === $this->maxPriority) { |
|
254 | return null; |
||
255 | } |
||
256 | |||
257 | // Redis priority should be in DESC order |
||
258 | 4 | return $this->maxPriority - $priority; |
|
259 | } |
||
260 | |||
261 | /** |
||
262 | * @param string|null $workerName |
||
263 | * @param string|null $methodName |
||
264 | * @param bool $prioritize |
||
265 | * @param mixed $runId |
||
266 | * |
||
267 | * @throws UnsupportedException |
||
268 | * |
||
269 | * @return Job|null |
||
270 | */ |
||
271 | 26 | public function getJob($workerName = null, $methodName = null, $prioritize = true, $runId = null) |
|
272 | { |
||
273 | // First thing migrate any jobs from When queue to Prioirty queue |
||
274 | |||
275 | 26 | $this->verifyGetJobArgs($workerName, $methodName, $prioritize); |
|
276 | 23 | if (null !== $this->maxPriority) { |
|
277 | 9 | $this->transferQueues(); |
|
278 | 9 | $queue = $this->getPriorityQueueCacheKey(); |
|
279 | 9 | $jobId = $this->redis->zPop($queue); |
|
280 | } else { |
||
281 | 14 | $queue = $this->getWhenQueueCacheKey(); |
|
282 | 14 | $microtime = Util::getMicrotimeInteger(); |
|
283 | 14 | $jobId = $this->redis->zPopByMaxScore($queue, $microtime); |
|
284 | } |
||
285 | |||
286 | 23 | if ($jobId) { |
|
287 | 20 | return $this->retrieveJob($jobId); |
|
288 | } |
||
289 | |||
290 | 22 | return null; |
|
291 | } |
||
292 | |||
293 | 20 | protected function retrieveJob($jobId) |
|
294 | { |
||
295 | 20 | $job = null; |
|
296 | 20 | $jobMessage = $this->redis->get($this->getJobCacheKey($jobId)); |
|
297 | 20 | if (is_string($jobMessage)) { |
|
0 ignored issues
–
show
introduced
by
![]() |
|||
298 | 20 | $job = new Job(); |
|
299 | 20 | $job->fromMessage($jobMessage); |
|
300 | 20 | $crcCacheKey = $this->getJobCrcHashKey($job->getCrcHash()); |
|
301 | 20 | $this->redis->lRem($crcCacheKey, 1, $job->getId()); |
|
302 | 20 | $this->redis->del([$this->getJobCacheKey($job->getId())]); |
|
303 | } |
||
304 | |||
305 | 20 | return $job; |
|
306 | } |
||
307 | |||
308 | 3 | public function getWaitingJobCount($workerName = null, $methodName = null) |
|
309 | { |
||
310 | 3 | $microtime = Util::getMicrotimeInteger(); |
|
311 | 3 | $count = $this->redis->zCount($this->getWhenQueueCacheKey(), 0, $microtime); |
|
312 | |||
313 | 3 | if (null !== $this->maxPriority) { |
|
314 | 1 | $count += $this->redis->zCount($this->getPriorityQueueCacheKey(), '-inf', '+inf'); |
|
315 | } |
||
316 | |||
317 | 3 | return $count; |
|
318 | } |
||
319 | |||
320 | 3 | public function getStatus(): array |
|
321 | { |
||
322 | 3 | $whenQueueCacheKey = $this->getWhenQueueCacheKey(); |
|
323 | 3 | $priorityQueueCacheKey = $this->getPriorityQueueCacheKey(); |
|
324 | 3 | $results = []; |
|
325 | 3 | $this->collateStatusResults($results, $whenQueueCacheKey); |
|
326 | 3 | if (null !== $this->maxPriority) { |
|
327 | 1 | $this->collateStatusResults($results, $priorityQueueCacheKey); |
|
328 | } |
||
329 | |||
330 | 3 | $cacheKey = $this->getStatusCacheKey(); |
|
331 | 3 | $cursor = null; |
|
332 | 3 | while ($hResults = $this->redis->hScan($cacheKey, $cursor, '', 100)) { |
|
333 | 2 | $this->extractStatusHashResults($hResults, $results); |
|
334 | 2 | if (0 === $cursor) { |
|
335 | 2 | break; |
|
336 | } |
||
337 | } |
||
338 | |||
339 | 3 | return $results; |
|
340 | } |
||
341 | |||
342 | 12 | public function retryableSaveHistory(RetryableJob $job, $retry) |
|
343 | { |
||
344 | 12 | $cacheKey = $this->getStatusCacheKey(); |
|
345 | 12 | $hashKey = $job->getWorkerName(); |
|
346 | 12 | $hashKey .= ','; |
|
347 | 12 | $hashKey .= $job->getMethod(); |
|
348 | 12 | $hashKey .= ','; |
|
349 | 12 | $hashKey .= $job->getStatus(); |
|
350 | 12 | $this->redis->hIncrBy($cacheKey, $hashKey, 1); |
|
351 | 12 | } |
|
352 | } |
||
353 |