1
|
|
|
<?php |
2
|
|
|
|
3
|
|
|
namespace SfCod\QueueBundle\Queue; |
4
|
|
|
|
5
|
|
|
use DateInterval; |
6
|
|
|
use DateTime; |
7
|
|
|
use Predis\Client; |
8
|
|
|
use Predis\Collection\Iterator\HashKey; |
9
|
|
|
use SfCod\QueueBundle\Base\JobResolverInterface; |
10
|
|
|
use SfCod\QueueBundle\Base\RandomizeTrait; |
11
|
|
|
use SfCod\QueueBundle\Entity\Job; |
12
|
|
|
use SfCod\QueueBundle\Job\JobContract; |
13
|
|
|
use SfCod\QueueBundle\Job\JobContractInterface; |
14
|
|
|
use SfCod\QueueBundle\Service\RedisDriver; |
15
|
|
|
|
16
|
|
|
/** |
17
|
|
|
* Class RedisQueue |
18
|
|
|
* |
19
|
|
|
* @author Virchenko Maksim <[email protected]> |
20
|
|
|
* |
21
|
|
|
* @package SfCod\QueueBundle\Queue |
22
|
|
|
*/ |
23
|
|
|
class RedisQueue extends Queue |
24
|
|
|
{ |
25
|
|
|
use RandomizeTrait; |
26
|
|
|
|
27
|
|
|
/** |
28
|
|
|
* Job resolver |
29
|
|
|
* |
30
|
|
|
* @var JobResolverInterface |
31
|
|
|
*/ |
32
|
|
|
private $resolver; |
33
|
|
|
|
34
|
|
|
/** |
35
|
|
|
* @var RedisDriver |
36
|
|
|
*/ |
37
|
|
|
private $redis; |
38
|
|
|
|
39
|
|
|
/** |
40
|
|
|
* The collection that holds the jobs. |
41
|
|
|
* |
42
|
|
|
* @var string |
43
|
|
|
*/ |
44
|
|
|
private $collection; |
45
|
|
|
|
46
|
|
|
/** |
47
|
|
|
* The name of the default queue. |
48
|
|
|
* |
49
|
|
|
* @var string |
50
|
|
|
*/ |
51
|
|
|
private $queue = 'default'; |
52
|
|
|
|
53
|
|
|
/** |
54
|
|
|
* The expiration time of a job. |
55
|
|
|
* |
56
|
|
|
* @var int|null |
57
|
|
|
*/ |
58
|
|
|
private $expire = 60; |
59
|
|
|
|
60
|
|
|
/** |
61
|
|
|
* @var int |
62
|
|
|
*/ |
63
|
|
|
private $limit = 15; |
64
|
|
|
|
65
|
|
|
/** |
66
|
|
|
* Create a new redis queue instance. |
67
|
|
|
* |
68
|
|
|
* @param JobResolverInterface $resolver |
69
|
|
|
* @param RedisDriver $redis |
70
|
|
|
* @param string $collection |
71
|
|
|
* @param string $queue |
72
|
|
|
* @param int $expire |
73
|
|
|
* @param int $limit |
74
|
|
|
*/ |
75
|
|
|
public function __construct( |
76
|
|
|
JobResolverInterface $resolver, |
77
|
|
|
RedisDriver $redis, |
78
|
|
|
string $collection = 'queue_jobs', |
79
|
|
|
string $queue = 'default', |
80
|
|
|
int $expire = 60, |
81
|
|
|
int $limit = 15 |
82
|
|
|
) { |
83
|
|
|
$this->resolver = $resolver; |
84
|
|
|
$this->redis = $redis; |
85
|
|
|
$this->collection = $collection; |
86
|
|
|
$this->expire = $expire; |
87
|
|
|
$this->queue = $queue; |
88
|
|
|
$this->limit = $limit; |
89
|
|
|
} |
90
|
|
|
|
91
|
|
|
/** |
92
|
|
|
* Get the size of the queue. |
93
|
|
|
* |
94
|
|
|
* @param string|null $queue |
95
|
|
|
* |
96
|
|
|
* @return int |
97
|
|
|
*/ |
98
|
|
|
public function size(?string $queue = null): int |
99
|
|
|
{ |
100
|
|
|
return (int)$this->getClient()->zcount($this->buildKey($queue), '-inf', '+inf'); |
101
|
|
|
} |
102
|
|
|
|
103
|
|
|
/** |
104
|
|
|
* Push a new job onto the queue. |
105
|
|
|
* |
106
|
|
|
* @param string $job |
107
|
|
|
* @param mixed $data |
108
|
|
|
* @param string|null $queue |
109
|
|
|
* |
110
|
|
|
* @return mixed |
111
|
|
|
* |
112
|
|
|
* @throws \Exception |
113
|
|
|
*/ |
114
|
|
|
public function push(string $job, array $data = [], ?string $queue = null) |
115
|
|
|
{ |
116
|
|
|
return $this->pushRaw($this->createPayload($job, $data), $queue); |
117
|
|
|
} |
118
|
|
|
|
119
|
|
|
/** |
120
|
|
|
* Push a raw payload onto the queue. |
121
|
|
|
* |
122
|
|
|
* @param string $payload |
123
|
|
|
* @param string|null $queue |
124
|
|
|
* @param array $options |
125
|
|
|
* |
126
|
|
|
* @return mixed |
127
|
|
|
* |
128
|
|
|
* @throws \Exception |
129
|
|
|
*/ |
130
|
|
|
public function pushRaw(string $payload, ?string $queue = null, array $options = []) |
131
|
|
|
{ |
132
|
|
|
return $this->pushToDatabase(0, $queue, $payload); |
133
|
|
|
} |
134
|
|
|
|
135
|
|
|
/** |
136
|
|
|
* Push a new job onto the queue after a delay. |
137
|
|
|
* |
138
|
|
|
* @param DateInterval|int $delay |
139
|
|
|
* @param string $job |
140
|
|
|
* @param array $data |
141
|
|
|
* @param string|null $queue |
142
|
|
|
* |
143
|
|
|
* @return mixed |
144
|
|
|
* |
145
|
|
|
* @throws \Exception |
146
|
|
|
*/ |
147
|
|
|
public function later($delay, string $job, array $data = [], ?string $queue = null) |
148
|
|
|
{ |
149
|
|
|
return $this->pushToDatabase($delay, $queue, $this->createPayload($job, $data)); |
150
|
|
|
} |
151
|
|
|
|
152
|
|
|
/** |
153
|
|
|
* Pop the next job off of the queue. |
154
|
|
|
* |
155
|
|
|
* @param string|null $queue |
156
|
|
|
* |
157
|
|
|
* @return JobContractInterface|null |
158
|
|
|
*/ |
159
|
|
|
public function pop(?string $queue = null): ?JobContractInterface |
160
|
|
|
{ |
161
|
|
|
$id = $this->getClient()->zrangebyscore($this->buildKey($queue), 0, $this->currentTime(), ['LIMIT' => [0, 1]]); |
162
|
|
|
|
163
|
|
|
if (empty($id)) { |
164
|
|
|
return null; |
165
|
|
|
} |
166
|
|
|
|
167
|
|
|
if (is_array($id)) { |
168
|
|
|
$id = array_shift($id); |
169
|
|
|
} |
170
|
|
|
|
171
|
|
|
$job = $this->getJobById($queue, $id); |
172
|
|
|
|
173
|
|
|
if ($job->reserved() && $job->reservedAt() > ($this->currentTime() - $this->expire)) { |
174
|
|
|
return null; |
175
|
|
|
} |
176
|
|
|
|
177
|
|
|
return $job; |
178
|
|
|
} |
179
|
|
|
|
180
|
|
|
/** |
181
|
|
|
* Check if job exists in the queue. |
182
|
|
|
* |
183
|
|
|
* @param string $job |
184
|
|
|
* @param array $data |
185
|
|
|
* @param string|null $queue |
186
|
|
|
* |
187
|
|
|
* @return bool |
188
|
|
|
*/ |
189
|
|
|
public function exists(string $job, array $data = [], ?string $queue = null): bool |
190
|
|
|
{ |
191
|
|
|
$cursor = new HashKey($this->getClient(), $this->buildKey($queue, 'payload')); |
192
|
|
|
$payload = $this->createPayload($job, $data); |
193
|
|
|
|
194
|
|
|
foreach ($cursor as $key => $value) { |
195
|
|
|
if ($value === $payload) { |
196
|
|
|
return true; |
197
|
|
|
} |
198
|
|
|
} |
199
|
|
|
|
200
|
|
|
return false; |
201
|
|
|
} |
202
|
|
|
|
203
|
|
|
/** |
204
|
|
|
* Check if can run process depend on limits |
205
|
|
|
* |
206
|
|
|
* @param JobContractInterface $job |
207
|
|
|
* |
208
|
|
|
* @return bool |
209
|
|
|
*/ |
210
|
|
|
public function canRunJob(JobContractInterface $job): bool |
211
|
|
|
{ |
212
|
|
|
return $this->getClient()->zcount( |
213
|
|
|
$this->buildKey($job->getQueue(), 'reserved'), |
214
|
|
|
'-inf', |
215
|
|
|
'+inf' |
216
|
|
|
) < $this->limit || $job->reserved(); |
217
|
|
|
} |
218
|
|
|
|
219
|
|
|
/** |
220
|
|
|
* Get job by its id |
221
|
|
|
* |
222
|
|
|
* @param string $queue |
223
|
|
|
* @param string $id |
224
|
|
|
* |
225
|
|
|
* @return JobContractInterface|null |
226
|
|
|
*/ |
227
|
|
|
public function getJobById(string $queue, string $id): ?JobContractInterface |
228
|
|
|
{ |
229
|
|
|
$job = $this->getClient()->hget($this->buildKey($queue, 'payload'), $id); |
230
|
|
|
|
231
|
|
|
if (!$job) { |
232
|
|
|
return null; |
233
|
|
|
} else { |
234
|
|
|
$reservedAt = $this->getClient()->zscore($this->buildKey($queue, 'reserved'), $id); |
235
|
|
|
$attempts = $this->getClient()->zscore($this->buildKey($queue, 'attempted'), $id); |
236
|
|
|
|
237
|
|
|
return new JobContract( |
238
|
|
|
$this->resolver, |
239
|
|
|
$this, |
240
|
|
|
$this->buildJob($id, $queue, $attempts ?? 0, json_decode($job, true), $reservedAt) |
241
|
|
|
); |
242
|
|
|
} |
243
|
|
|
} |
244
|
|
|
|
245
|
|
|
/** |
246
|
|
|
* Mark the given job ID as reserved. |
247
|
|
|
* |
248
|
|
|
* @param JobContractInterface $job |
249
|
|
|
* |
250
|
|
|
* @throws \Exception |
251
|
|
|
*/ |
252
|
|
|
public function markJobAsReserved(JobContractInterface $job) |
253
|
|
|
{ |
254
|
|
|
$this->getClient()->pipeline(['atomic' => true]) |
255
|
|
|
->zadd($this->buildKey($job->getQueue(), 'reserved'), [ |
256
|
|
|
$job->getJobId() => $this->currentTime(), |
257
|
|
|
]) |
258
|
|
|
->zincrby($this->buildKey($job->getQueue(), 'attempted'), 1, $job->getJobId()) |
259
|
|
|
->execute(); |
260
|
|
|
} |
261
|
|
|
|
262
|
|
|
/** |
263
|
|
|
* Delete a reserved job from the queue. |
264
|
|
|
* |
265
|
|
|
* @param string $queue |
266
|
|
|
* @param string $id |
267
|
|
|
* |
268
|
|
|
* @return bool |
269
|
|
|
* |
270
|
|
|
* @throws \Exception |
271
|
|
|
*/ |
272
|
|
|
public function deleteReserved(string $queue, string $id): bool |
273
|
|
|
{ |
274
|
|
|
$this->getClient()->pipeline(['atomic' => true]) |
275
|
|
|
->hdel($this->buildKey($queue, 'payload'), [$id]) |
276
|
|
|
->zrem($this->buildKey($queue, 'reserved'), $id) |
277
|
|
|
->zrem($this->buildKey($queue, 'attempted'), $id) |
278
|
|
|
->zrem($this->buildKey($queue), $id) |
279
|
|
|
->execute(); |
280
|
|
|
|
281
|
|
|
return true; |
282
|
|
|
} |
283
|
|
|
|
284
|
|
|
/** |
285
|
|
|
* Release a reserved job back onto the queue. |
286
|
|
|
* |
287
|
|
|
* @param JobContractInterface $job |
288
|
|
|
* @param DateInterval|int $delay |
289
|
|
|
* |
290
|
|
|
* @return mixed |
291
|
|
|
* |
292
|
|
|
* @throws \Exception |
293
|
|
|
*/ |
294
|
|
|
public function release(JobContractInterface $job, $delay) |
295
|
|
|
{ |
296
|
|
|
return $this->pushToDatabase($delay, $job->getQueue(), $job->getRawBody(), $job->attempts()); |
297
|
|
|
} |
298
|
|
|
|
299
|
|
|
/** |
300
|
|
|
* Build collection:queue:postfix key |
301
|
|
|
* |
302
|
|
|
* @param string|null $queue |
303
|
|
|
* @param string|null $postfix |
304
|
|
|
* |
305
|
|
|
* @return string |
306
|
|
|
*/ |
307
|
|
|
private function buildKey(?string $queue = 'default', ?string $postfix = null) |
308
|
|
|
{ |
309
|
|
|
return "$this->collection:$queue" . ($postfix ? ":$postfix" : ''); |
310
|
|
|
} |
311
|
|
|
|
312
|
|
|
/** |
313
|
|
|
* Get the "available at" UNIX timestamp. |
314
|
|
|
* |
315
|
|
|
* @param DateInterval|int $delay |
316
|
|
|
* |
317
|
|
|
* @return int |
318
|
|
|
*/ |
319
|
|
|
private function getAvailableAt($delay = 0) |
320
|
|
|
{ |
321
|
|
|
return $delay instanceof DateInterval |
322
|
|
|
? (new DateTime())->add($delay)->getTimestamp() |
323
|
|
|
: $this->currentTime() + $delay; |
324
|
|
|
} |
325
|
|
|
|
326
|
|
|
/** |
327
|
|
|
* Push job to database |
328
|
|
|
* |
329
|
|
|
* @param DateInterval|int $delay |
330
|
|
|
* @param string|null $queue |
331
|
|
|
* @param string $payload |
332
|
|
|
* @param int $attempts |
333
|
|
|
* |
334
|
|
|
* @throws \Exception |
335
|
|
|
*/ |
336
|
|
|
private function pushToDatabase($delay, ?string $queue, string $payload, int $attempts = 0) |
337
|
|
|
{ |
338
|
|
|
$id = $this->getRandomId(); |
339
|
|
|
|
340
|
|
|
$pipeline = $this->getClient()->pipeline(['atomic' => true]) |
341
|
|
|
->hset( |
342
|
|
|
$this->buildKey($queue, 'payload'), |
343
|
|
|
$id, |
344
|
|
|
$payload |
345
|
|
|
) |
346
|
|
|
->zadd($this->buildKey($queue), [ |
347
|
|
|
$id => $this->getAvailableAt($delay), |
|
|
|
|
348
|
|
|
]); |
349
|
|
|
|
350
|
|
|
if ($attempts > 0) { |
351
|
|
|
$pipeline->zadd($this->buildKey($queue, 'attempted'), [ |
352
|
|
|
$id => $attempts, |
353
|
|
|
]); |
354
|
|
|
} |
355
|
|
|
|
356
|
|
|
$pipeline->execute(); |
357
|
|
|
} |
358
|
|
|
|
359
|
|
|
/** |
360
|
|
|
* Build job from database record |
361
|
|
|
* |
362
|
|
|
* @param string $id |
363
|
|
|
* @param string $queue |
364
|
|
|
* @param int $attempts |
365
|
|
|
* @param array $payload |
366
|
|
|
* @param int|null $reservedAt |
367
|
|
|
* |
368
|
|
|
* @return Job |
369
|
|
|
*/ |
370
|
|
|
private function buildJob(string $id, string $queue, int $attempts, array $payload, ?int $reservedAt = null): Job |
371
|
|
|
{ |
372
|
|
|
$job = new Job(); |
373
|
|
|
$job->setId($id); |
374
|
|
|
$job->setAttempts($attempts); |
375
|
|
|
$job->setQueue($queue); |
376
|
|
|
$job->setReserved((bool)$reservedAt); |
377
|
|
|
$job->setReservedAt($reservedAt); |
378
|
|
|
$job->setPayload($payload); |
379
|
|
|
|
380
|
|
|
return $job; |
381
|
|
|
} |
382
|
|
|
|
383
|
|
|
/** |
384
|
|
|
* Get redis client |
385
|
|
|
* |
386
|
|
|
* @return Client |
387
|
|
|
*/ |
388
|
|
|
private function getClient(): Client |
389
|
|
|
{ |
390
|
|
|
return $this->redis->getClient(); |
391
|
|
|
} |
392
|
|
|
} |
393
|
|
|
|
This check looks at variables that have been passed in as parameters and are passed out again to other methods.
If the outgoing method call has stricter type requirements than the method itself, an issue is raised.
An additional type check may prevent trouble.