1
|
|
|
<?php |
2
|
|
|
|
3
|
|
|
namespace Simplario\Quedis; |
4
|
|
|
|
5
|
|
|
use Simplario\Quedis\Exceptions\FlowException; |
6
|
|
|
use Simplario\Quedis\Exceptions\QueueException; |
7
|
|
|
use Simplario\Quedis\Interfaces\MessageInterface; |
8
|
|
|
use Simplario\Quedis\Interfaces\QueueInterface; |
9
|
|
|
|
10
|
|
|
/** |
11
|
|
|
* Class Queue |
12
|
|
|
* |
13
|
|
|
* @package Simplario\Quedis |
14
|
|
|
* |
15
|
|
|
* |
16
|
|
|
* |
17
|
|
|
* Message flows (like in the Beanstalk: http://beanstalkc.readthedocs.io/en/latest/tutorial.html ) |
18
|
|
|
* ------------------------------------------------------------------------------------------------ |
19
|
|
|
* |
20
|
|
|
* 1) put pop |
21
|
|
|
* -----> [READY] --------> *poof* |
22
|
|
|
* |
23
|
|
|
* |
24
|
|
|
* 2) put reserve delete |
25
|
|
|
* -----> [READY] ---------> [RESERVED] --------> *poof* |
26
|
|
|
* |
27
|
|
|
* |
28
|
|
|
* 3) put with delay release with delay |
29
|
|
|
* ----------------> [DELAYED] <------------. |
30
|
|
|
* | | |
31
|
|
|
* | (time passes) | |
32
|
|
|
* | | |
33
|
|
|
* put v reserve | delete |
34
|
|
|
* -----------------> [READY] ---------> [RESERVED] --------> *poof* |
35
|
|
|
* ^ ^ | | |
36
|
|
|
* | \ release | | |
37
|
|
|
* | ``-------------' | |
38
|
|
|
* | | |
39
|
|
|
* | kick | |
40
|
|
|
* | | |
41
|
|
|
* | bury | |
42
|
|
|
* [BURIED] <---------------' |
43
|
|
|
* | |
44
|
|
|
* | delete |
45
|
|
|
* ``--------> *poof* |
46
|
|
|
* |
47
|
|
|
*/ |
48
|
|
|
class Queue implements QueueInterface |
49
|
|
|
{ |
50
|
|
|
|
51
|
|
|
const PRIORITY_HIGH = 'high'; |
52
|
|
|
const PRIORITY_LOW = 'low'; |
53
|
|
|
|
54
|
|
|
const NS_QUEUE = 'queue'; |
55
|
|
|
const NS_QUEUE_STOP = 'queue2stop'; |
56
|
|
|
const NS_MESSAGE = 'message'; |
57
|
|
|
const NS_MESSAGE_TO_QUEUE = 'message2queue'; |
58
|
|
|
const NS_MESSAGE_TO_STATE = 'message2state'; |
59
|
|
|
|
60
|
|
|
// State |
61
|
|
|
const STATE_READY = 'ready'; |
62
|
|
|
const STATE_DELAYED = 'delayed'; |
63
|
|
|
const STATE_RESERVED = 'reserved'; |
64
|
|
|
const STATE_BURIED = 'buried'; |
65
|
|
|
|
66
|
|
|
// Stats |
67
|
|
|
const STATS_QUEUES_LIST = 'queues'; |
68
|
|
|
const STATS_QUEUES = 'queues'; |
69
|
|
|
const STATS_MESSAGE_TOTAL = 'total'; |
70
|
|
|
const STATS_MESSAGE_READY = 'ready'; |
71
|
|
|
const STATS_MESSAGE_RESERVED = 'reserved'; |
72
|
|
|
const STATS_MESSAGE_DELAYED = 'delayed'; |
73
|
|
|
const STATS_MESSAGE_BURIED = 'buried'; |
74
|
|
|
const STATS_QUEUE_STOP = 'stop'; |
75
|
|
|
|
76
|
|
|
/** |
77
|
|
|
* @var \Predis\Client |
78
|
|
|
*/ |
79
|
|
|
protected $redis; |
80
|
|
|
|
81
|
|
|
/** |
82
|
|
|
* @var string |
83
|
|
|
*/ |
84
|
|
|
protected $namespace; |
85
|
|
|
|
86
|
|
|
/** |
87
|
|
|
* Queue constructor. |
88
|
|
|
* |
89
|
|
|
* @param mixed $redis |
90
|
|
|
* @param string $namespace |
91
|
|
|
*/ |
92
|
33 |
|
public function __construct($redis, $namespace = 'Quedis') |
93
|
|
|
{ |
94
|
33 |
|
$this->redis = $redis; |
95
|
33 |
|
$this->namespace = $namespace; |
96
|
33 |
|
} |
97
|
|
|
|
98
|
|
|
/** |
99
|
|
|
* @return \Predis\Client |
100
|
|
|
*/ |
101
|
33 |
|
public function getRedis() |
102
|
|
|
{ |
103
|
33 |
|
return $this->redis; |
104
|
|
|
} |
105
|
|
|
|
106
|
|
|
/** |
107
|
|
|
* @return string |
108
|
|
|
*/ |
109
|
1 |
|
public function getNamespace() |
110
|
|
|
{ |
111
|
1 |
|
return $this->namespace; |
112
|
|
|
} |
113
|
|
|
|
114
|
|
|
/** |
115
|
|
|
* @param string $queue |
116
|
|
|
* @param mixed $data |
117
|
|
|
* @param int $delay |
118
|
|
|
* @param string $priority |
119
|
|
|
* |
120
|
|
|
* @return MessageInterface |
121
|
|
|
* @throws \Exception |
122
|
|
|
*/ |
123
|
27 |
|
public function put($queue, $data, $delay = 0, $priority = self::PRIORITY_LOW) |
124
|
|
|
{ |
125
|
27 |
|
$message = $this->createMessage($data); |
126
|
27 |
|
$delay = $this->parseDelay($delay); |
127
|
|
|
|
128
|
27 |
|
$result = $this->getRedis()->transaction(function ($tx) use ($queue, $message, $delay, $priority) { |
129
|
|
|
/** @var $tx \Predis\Client */ |
130
|
|
|
|
131
|
27 |
|
$state = $delay == 0 ? self::STATE_READY : self::STATE_DELAYED; |
132
|
|
|
|
133
|
27 |
|
$tx->hset($this->ns(self::NS_MESSAGE), $message->getToken(), $message->encode()); |
134
|
27 |
|
$tx->hset($this->ns(self::NS_MESSAGE_TO_QUEUE), $message->getToken(), $queue); |
135
|
27 |
|
$tx->hset($this->ns(self::NS_MESSAGE_TO_STATE), $message->getToken(), $state); |
136
|
|
|
|
137
|
27 |
|
if ($state === self::STATE_READY) { |
138
|
23 |
|
if (self::PRIORITY_HIGH === $priority) { |
139
|
1 |
|
$tx->lpush($this->getKey($queue, self::STATE_READY), $message->getToken()); |
140
|
|
|
} else { |
141
|
23 |
|
$tx->rpush($this->getKey($queue, self::STATE_READY), $message->getToken()); |
142
|
|
|
} |
143
|
|
|
} else { |
144
|
6 |
|
$tx->zadd($this->getKey($queue, self::STATE_DELAYED), time() + $delay, $message->getToken()); |
145
|
|
|
} |
146
|
27 |
|
}); |
147
|
|
|
|
148
|
27 |
|
if (in_array(false, $result, true)) { |
149
|
|
|
throw new QueueException('Can not put message to queue'); |
150
|
|
|
} |
151
|
|
|
|
152
|
27 |
|
return $message; |
153
|
|
|
} |
154
|
|
|
|
155
|
|
|
/** |
156
|
|
|
* @param string $queue |
157
|
|
|
* @param int $timeout |
158
|
|
|
* |
159
|
|
|
* @return mixed|null|Queue |
160
|
|
|
*/ |
161
|
11 |
|
public function pop($queue, $timeout = 0) |
162
|
|
|
{ |
163
|
11 |
|
$message = $this->reserve($queue, $timeout); |
164
|
|
|
|
165
|
11 |
|
if ($message !== null) { |
166
|
11 |
|
$this->delete($message); |
167
|
|
|
} |
168
|
|
|
|
169
|
11 |
|
return $message; |
170
|
|
|
} |
171
|
|
|
|
172
|
|
|
/** |
173
|
|
|
* @param string $queue |
174
|
|
|
* @param int $timeout |
175
|
|
|
* |
176
|
|
|
* @return Message|null |
177
|
|
|
*/ |
178
|
21 |
|
public function reserve($queue, $timeout = 0) |
179
|
|
|
{ |
180
|
21 |
|
if ($this->isStop($queue)) { |
181
|
1 |
|
return null; |
182
|
|
|
} |
183
|
|
|
|
184
|
20 |
|
$this->migrate($queue); |
185
|
|
|
|
186
|
20 |
|
$token = $this->reserveToken($queue, $timeout); |
187
|
20 |
|
$message = $this->restoreMessage($token); |
188
|
|
|
|
189
|
20 |
|
if($message === null){ |
190
|
7 |
|
return null; |
191
|
|
|
} |
192
|
|
|
|
193
|
20 |
|
$this->getRedis()->transaction(function ($tx) use ($queue, $token) { |
194
|
|
|
/** @var $tx \Predis\Client */ |
195
|
20 |
|
$tx->zadd($this->getKey($queue, self::STATE_RESERVED), time(), $token); |
196
|
20 |
|
$tx->hset($this->ns(self::NS_MESSAGE_TO_STATE), $token, self::STATE_RESERVED); |
197
|
20 |
|
}); |
198
|
|
|
|
199
|
20 |
|
return $message; |
200
|
|
|
} |
201
|
|
|
|
202
|
|
|
/** |
203
|
|
|
* @param string $queue |
204
|
|
|
* @param int $timeout |
205
|
|
|
* |
206
|
|
|
* @return null|string |
207
|
|
|
*/ |
208
|
20 |
|
protected function reserveToken($queue, $timeout = 0) |
209
|
|
|
{ |
210
|
|
|
|
211
|
20 |
|
$redis = $this->getRedis(); |
212
|
|
|
|
213
|
20 |
|
if ($timeout > 0) { |
214
|
|
|
// example return [ '0' => queue name , '1' => jobId ] |
215
|
3 |
|
$token = $redis->blpop([$this->getKey($queue, self::STATE_READY)], $timeout); |
216
|
3 |
|
$token = (is_array($token)) ? $token[1] : null; |
217
|
|
|
} else { |
218
|
17 |
|
$token = $redis->lpop($this->getKey($queue, self::STATE_READY)); |
219
|
|
|
} |
220
|
|
|
|
221
|
20 |
|
return $token; |
222
|
|
|
} |
223
|
|
|
|
224
|
|
|
/** |
225
|
|
|
* @param string|null $token |
226
|
|
|
* |
227
|
|
|
* @return null|MessageInterface |
228
|
|
|
*/ |
229
|
20 |
|
protected function restoreMessage($token) |
230
|
|
|
{ |
231
|
20 |
|
if (is_null($token)) { |
232
|
7 |
|
return null; |
233
|
|
|
} |
234
|
|
|
|
235
|
20 |
|
$encoded = $this->getRedis()->hget($this->ns(self::NS_MESSAGE), $token); |
236
|
20 |
|
$message = Message::decode($encoded); |
237
|
|
|
|
238
|
20 |
|
return $message; |
239
|
|
|
} |
240
|
|
|
|
241
|
|
|
/** |
242
|
|
|
* @param string|MessageInterface $mixed |
243
|
|
|
* |
244
|
|
|
* @return $this |
245
|
|
|
* @throws QueueException |
246
|
|
|
*/ |
247
|
3 |
View Code Duplication |
public function bury($mixed) |
|
|
|
|
248
|
|
|
{ |
249
|
3 |
|
$payload = $this->payload($mixed); |
250
|
3 |
|
$this->checkMessageFlow($payload->getState(), 'bury'); |
251
|
|
|
|
252
|
2 |
|
$result = $this->getRedis()->transaction(function ($tx) use ($payload) { |
253
|
|
|
/** @var $tx \Predis\Client */ |
254
|
2 |
|
$tx->zrem($this->getKey($payload->getQueue(), self::STATE_RESERVED), $payload->getToken()); |
255
|
2 |
|
$tx->zadd($this->getKey($payload->getQueue(), self::STATE_BURIED), time(), $payload->getToken()); |
256
|
2 |
|
$tx->hset($this->ns(self::NS_MESSAGE_TO_STATE), $payload->getToken(), self::STATE_BURIED); |
257
|
2 |
|
}); |
258
|
|
|
|
259
|
2 |
|
if (in_array(false, $result, true)) { |
260
|
|
|
throw new QueueException("The message bury error."); |
261
|
|
|
} |
262
|
|
|
|
263
|
2 |
|
return $this; |
264
|
|
|
} |
265
|
|
|
|
266
|
|
|
/** |
267
|
|
|
* @param string|MessageInterface $mixed |
268
|
|
|
* |
269
|
|
|
* @return $this |
270
|
|
|
* @throws QueueException |
271
|
|
|
*/ |
272
|
20 |
|
public function delete($mixed) |
273
|
|
|
{ |
274
|
20 |
|
$payload = $this->payload($mixed); |
275
|
20 |
|
$this->checkMessageFlow($payload->getState(), 'delete'); |
276
|
|
|
|
277
|
19 |
|
$result = $this->getRedis()->transaction(function ($tx) use ($payload) { |
278
|
|
|
/** @var $tx \Predis\Client */ |
279
|
19 |
|
$tx->zrem($this->getKey($payload->getQueue(), self::STATE_RESERVED), $payload->getToken()); |
280
|
19 |
|
$tx->zrem($this->getKey($payload->getQueue(), self::STATE_BURIED), $payload->getToken()); |
281
|
19 |
|
$tx->zrem($this->getKey($payload->getQueue(), self::STATE_DELAYED), $payload->getToken()); // ? |
282
|
19 |
|
$tx->hdel($this->ns(self::NS_MESSAGE), $payload->getToken()); |
283
|
19 |
|
$tx->hdel($this->ns(self::NS_MESSAGE_TO_QUEUE), $payload->getToken()); |
284
|
19 |
|
$tx->hdel($this->ns(self::NS_MESSAGE_TO_STATE), $payload->getToken()); |
285
|
19 |
|
}); |
286
|
|
|
|
287
|
19 |
|
if (array_count_values($result) < 1) { |
288
|
|
|
throw new QueueException("The message deleting error"); |
289
|
|
|
} |
290
|
|
|
|
291
|
19 |
|
return $this; |
292
|
|
|
} |
293
|
|
|
|
294
|
|
|
/** |
295
|
|
|
* @param MessageInterface|string $mixed |
296
|
|
|
* |
297
|
|
|
* @return $this |
298
|
|
|
* @throws QueueException |
299
|
|
|
*/ |
300
|
3 |
View Code Duplication |
public function kick($mixed) |
|
|
|
|
301
|
|
|
{ |
302
|
3 |
|
$payload = $this->payload($mixed); |
303
|
3 |
|
$this->checkMessageFlow($payload->getState(), 'kick'); |
304
|
|
|
|
305
|
1 |
|
$result = $this->getRedis()->transaction(function ($tx) use ($payload) { |
306
|
|
|
/** @var $tx \Predis\Client */ |
307
|
1 |
|
$tx->zrem($this->getKey($payload->getQueue(), self::STATE_BURIED), $payload->getToken()); |
308
|
1 |
|
$tx->rpush($this->getKey($payload->getQueue(), self::STATE_READY), $payload->getToken()); |
309
|
1 |
|
$tx->hset($this->ns(self::NS_MESSAGE_TO_STATE), $payload->getToken(), self::STATE_READY); |
310
|
1 |
|
}); |
311
|
|
|
|
312
|
1 |
|
if (in_array(false, $result, true)) { |
313
|
|
|
throw new QueueException("The message kicked error"); |
314
|
|
|
} |
315
|
|
|
|
316
|
1 |
|
return $this; |
317
|
|
|
} |
318
|
|
|
|
319
|
|
|
/** |
320
|
|
|
* @param MessageInterface|string $mixed |
321
|
|
|
* @param int $delay |
322
|
|
|
* |
323
|
|
|
* @return $this |
324
|
|
|
* @throws FlowException |
325
|
|
|
* @throws QueueException |
326
|
|
|
*/ |
327
|
2 |
|
public function release($mixed, $delay = 0) |
328
|
|
|
{ |
329
|
2 |
|
$payload = $this->payload($mixed); |
330
|
2 |
|
$this->checkMessageFlow($payload->getState(), 'release'); |
331
|
1 |
|
$delay = $this->parseDelay($delay); |
332
|
|
|
|
333
|
1 |
|
$result = $this->getRedis()->transaction(function ($tx) use ($payload, $delay) { |
334
|
|
|
/** @var $tx \Predis\Client */ |
335
|
1 |
|
$tx->zrem($this->getKey($payload->getQueue(), self::STATE_RESERVED), $payload->getToken()); |
336
|
1 |
|
$tx->hset($this->ns(self::NS_MESSAGE_TO_STATE), $payload->getToken(), self::STATE_READY); |
337
|
|
|
|
338
|
1 |
|
if ($delay == 0) { |
339
|
1 |
|
$tx->rpush($this->getKey($payload->getQueue(), self::STATE_READY), $payload->getToken()); |
340
|
|
|
} else { |
341
|
|
|
$tx->zadd($this->getKey($payload->getQueue(), self::STATE_DELAYED), time() + $delay, |
342
|
|
|
$payload->getToken()); |
343
|
|
|
} |
344
|
1 |
|
}); |
345
|
|
|
|
346
|
1 |
|
if (in_array(false, $result, true)) { |
347
|
|
|
throw new QueueException("The message release error."); |
348
|
|
|
} |
349
|
|
|
|
350
|
1 |
|
return $this; |
351
|
|
|
} |
352
|
|
|
|
353
|
|
|
/** |
354
|
|
|
* @return array |
355
|
|
|
*/ |
356
|
4 |
|
public function getQueueList() |
357
|
|
|
{ |
358
|
4 |
|
$redis = $this->getRedis(); |
359
|
4 |
|
$queueList = $redis->hgetall($this->ns(self::NS_MESSAGE_TO_QUEUE)); |
360
|
4 |
|
$queueList = array_values($queueList); |
361
|
4 |
|
$queueList = array_unique($queueList); |
362
|
|
|
|
363
|
4 |
|
return $queueList; |
364
|
|
|
} |
365
|
|
|
|
366
|
|
|
/** |
367
|
|
|
* @param null|string $queue |
368
|
|
|
* |
369
|
|
|
* @return $this |
370
|
|
|
*/ |
371
|
21 |
|
public function migrate($queue = null) |
372
|
|
|
{ |
373
|
21 |
|
if (null === $queue) { |
374
|
1 |
|
$queueList = $this->getQueueList(); |
375
|
1 |
|
foreach ($queueList as $queue) { |
376
|
1 |
|
$this->migrate($queue); |
377
|
|
|
} |
378
|
|
|
|
379
|
1 |
|
return $this; |
380
|
|
|
} |
381
|
|
|
|
382
|
21 |
|
$keyReady = $this->getKey($queue, self::STATE_READY); |
383
|
21 |
|
$keyDelayed = $this->getKey($queue, self::STATE_DELAYED); |
384
|
|
|
|
385
|
21 |
|
$this->getRedis()->transaction(['cas' => true, 'watch' => [$keyReady, $keyDelayed], 'retry' => 10], |
386
|
21 |
|
function ($tx) use ($queue, $keyReady, $keyDelayed) { |
387
|
|
|
|
388
|
|
|
/** @var $tx \Predis\Client */ |
389
|
21 |
|
$time = time(); |
390
|
|
|
|
391
|
|
|
// get expired jobs from "delayed queue" |
392
|
21 |
|
$messageTokenSet = $tx->zrangebyscore($keyDelayed, '-inf', $time); |
393
|
|
|
|
394
|
21 |
|
if (count($messageTokenSet) > 0) { |
395
|
|
|
// remove jobs from "delayed queue" |
396
|
3 |
|
$tx->multi(); |
397
|
3 |
|
$tx->zremrangebyscore($keyDelayed, '-inf', $time); |
398
|
3 |
|
foreach ($messageTokenSet as $token) { |
399
|
3 |
|
$tx->rpush($keyReady, $token); |
400
|
3 |
|
$tx->hset($this->ns(self::NS_MESSAGE_TO_STATE), $token, self::STATE_READY); |
401
|
|
|
} |
402
|
|
|
} |
403
|
21 |
|
}); |
404
|
|
|
|
405
|
21 |
|
return $this; |
406
|
|
|
} |
407
|
|
|
|
408
|
|
|
|
409
|
|
|
/** |
410
|
|
|
* @param string $queue |
411
|
|
|
* @param array $options |
412
|
|
|
* |
413
|
|
|
* @return Iterator |
414
|
|
|
*/ |
415
|
1 |
|
public function iterator($queue, array $options = []) |
416
|
|
|
{ |
417
|
1 |
|
$options['queue'] = $queue; |
418
|
|
|
|
419
|
1 |
|
return new Iterator($this, $options); |
420
|
|
|
} |
421
|
|
|
|
422
|
|
|
/** |
423
|
|
|
* @param string $queue |
424
|
|
|
* |
425
|
|
|
* @return $this |
426
|
|
|
*/ |
427
|
1 |
|
public function start($queue) |
428
|
|
|
{ |
429
|
1 |
|
$this->getRedis()->hdel($this->ns(self::NS_QUEUE_STOP), [$queue]); |
430
|
|
|
|
431
|
1 |
|
return $this; |
432
|
|
|
} |
433
|
|
|
|
434
|
|
|
/** |
435
|
|
|
* @param string $queue |
436
|
|
|
* |
437
|
|
|
* @return $this |
438
|
|
|
*/ |
439
|
2 |
|
public function stop($queue) |
440
|
|
|
{ |
441
|
2 |
|
$this->getRedis()->hset($this->ns(self::NS_QUEUE_STOP), $queue, true); |
442
|
|
|
|
443
|
2 |
|
return $this; |
444
|
|
|
} |
445
|
|
|
|
446
|
|
|
/** |
447
|
|
|
* @param string $queue |
448
|
|
|
* |
449
|
|
|
* @return bool |
450
|
|
|
*/ |
451
|
24 |
|
public function isStop($queue) |
452
|
|
|
{ |
453
|
24 |
|
return (bool)$this->getRedis()->hexists($this->ns(self::NS_QUEUE_STOP), $queue); |
454
|
|
|
} |
455
|
|
|
|
456
|
|
|
/** |
457
|
|
|
* @param string $queue |
458
|
|
|
* |
459
|
|
|
* @return array |
460
|
|
|
*/ |
461
|
3 |
|
protected function queueStats($queue) |
462
|
|
|
{ |
463
|
|
|
$result = [ |
464
|
3 |
|
self::STATS_MESSAGE_READY => $this->size($queue, self::STATE_READY), |
465
|
3 |
|
self::STATS_MESSAGE_RESERVED => $this->size($queue, self::STATE_RESERVED), |
466
|
3 |
|
self::STATS_MESSAGE_DELAYED => $this->size($queue, self::STATE_DELAYED), |
467
|
3 |
|
self::STATS_MESSAGE_BURIED => $this->size($queue, self::STATE_BURIED), |
468
|
|
|
]; |
469
|
|
|
|
470
|
3 |
|
$result[self::STATS_MESSAGE_TOTAL] = array_sum($result); |
471
|
3 |
|
$result[self::STATS_QUEUE_STOP] = $this->isStop($queue); |
472
|
|
|
|
473
|
3 |
|
return $result; |
474
|
|
|
} |
475
|
|
|
|
476
|
|
|
/** |
477
|
|
|
* @param null|string $queue |
478
|
|
|
* |
479
|
|
|
* @return array |
480
|
|
|
*/ |
481
|
5 |
|
public function stats($queue = null) |
482
|
|
|
{ |
483
|
5 |
|
if ($queue !== null) { |
484
|
2 |
|
return $this->queueStats($queue); |
485
|
|
|
} |
486
|
|
|
|
487
|
|
|
$result = [ |
488
|
4 |
|
self::STATS_QUEUES_LIST => [], |
489
|
4 |
|
self::STATS_MESSAGE_TOTAL => 0, |
490
|
4 |
|
self::STATS_MESSAGE_READY => 0, |
491
|
4 |
|
self::STATS_MESSAGE_RESERVED => 0, |
492
|
4 |
|
self::STATS_MESSAGE_DELAYED => 0, |
493
|
4 |
|
self::STATS_MESSAGE_BURIED => 0, |
494
|
|
|
]; |
495
|
|
|
|
496
|
4 |
|
$queueList = $this->getQueueList(); |
497
|
|
|
|
498
|
4 |
|
if (count($queueList) == 0) { |
499
|
4 |
|
return $result; |
500
|
|
|
} |
501
|
|
|
|
502
|
1 |
|
$result[self::STATS_QUEUES_LIST] = $queueList; |
503
|
|
|
|
504
|
1 |
|
foreach ($queueList as $queue) { |
505
|
1 |
|
$itemStats = $this->queueStats($queue); |
506
|
1 |
|
$result[self::STATS_MESSAGE_READY] += $itemStats[self::STATS_MESSAGE_READY]; |
507
|
1 |
|
$result[self::STATS_MESSAGE_RESERVED] += $itemStats[self::STATS_MESSAGE_RESERVED]; |
508
|
1 |
|
$result[self::STATS_MESSAGE_DELAYED] += $itemStats[self::STATS_MESSAGE_DELAYED]; |
509
|
1 |
|
$result[self::STATS_MESSAGE_BURIED] += $itemStats[self::STATS_MESSAGE_BURIED]; |
510
|
1 |
|
$result[self::STATS_MESSAGE_TOTAL] += $itemStats[self::STATS_MESSAGE_TOTAL]; |
511
|
1 |
|
$result[self::STATS_QUEUES][$queue] = $itemStats; |
512
|
|
|
} |
513
|
|
|
|
514
|
1 |
|
return $result; |
515
|
|
|
} |
516
|
|
|
|
517
|
|
|
/** |
518
|
|
|
* @param string $queue |
519
|
|
|
* @param string $state |
520
|
|
|
* |
521
|
|
|
* @return int|string |
522
|
|
|
* @throws QueueException |
523
|
|
|
*/ |
524
|
4 |
|
public function size($queue, $state = self::STATE_READY) |
525
|
|
|
{ |
526
|
4 |
|
if (!in_array($state, [self::STATE_READY, self::STATE_DELAYED, self::STATE_BURIED, self::STATE_RESERVED])) { |
527
|
1 |
|
throw new QueueException('Unsupported state for size calculation.'); |
528
|
|
|
} |
529
|
|
|
|
530
|
3 |
|
$redis = $this->getRedis(); |
531
|
|
|
|
532
|
3 |
|
return ($state === self::STATE_READY) |
533
|
3 |
|
? $redis->llen($this->getKey($queue, self::STATE_READY)) |
534
|
3 |
|
: $redis->zcount($this->getKey($queue, $state), '-inf', '+inf'); |
535
|
|
|
} |
536
|
|
|
|
537
|
|
|
|
538
|
|
|
/** |
539
|
|
|
* @param null|string $queue |
540
|
|
|
* |
541
|
|
|
* @return $this |
542
|
|
|
*/ |
543
|
33 |
|
public function clean($queue = null) |
544
|
|
|
{ |
545
|
33 |
|
$redis = $this->getRedis(); |
546
|
|
|
|
547
|
33 |
|
$keys = $redis->keys($this->namespace . '*'); |
548
|
|
|
|
549
|
33 |
|
if (count($keys) == 0) { |
550
|
24 |
|
return $this; |
551
|
|
|
} |
552
|
|
|
|
553
|
9 |
|
foreach ($keys as $index => $name) { |
554
|
9 |
|
$redis->del($name); |
555
|
|
|
} |
556
|
|
|
|
557
|
9 |
|
return $this; |
558
|
|
|
} |
559
|
|
|
|
560
|
|
|
/** |
561
|
|
|
* @param mixed $mixed |
562
|
|
|
* |
563
|
|
|
* @return MessageInterface |
564
|
|
|
*/ |
565
|
27 |
|
protected function createMessage($mixed) |
566
|
|
|
{ |
567
|
27 |
|
if ($mixed instanceof MessageInterface) { |
568
|
1 |
|
return $mixed; |
569
|
|
|
} |
570
|
|
|
|
571
|
27 |
|
return new Message($mixed); |
572
|
|
|
} |
573
|
|
|
|
574
|
|
|
/** |
575
|
|
|
* @param string $type |
576
|
|
|
* |
577
|
|
|
* @return string |
578
|
|
|
*/ |
579
|
28 |
|
protected function ns($type) |
580
|
|
|
{ |
581
|
28 |
|
return implode(':', [$this->namespace, $type]); |
582
|
|
|
} |
583
|
|
|
|
584
|
|
|
|
585
|
|
|
/** |
586
|
|
|
* @param string $queue |
587
|
|
|
* @param string $state |
588
|
|
|
* |
589
|
|
|
* @return string |
590
|
|
|
*/ |
591
|
27 |
|
protected function getKey($queue, $state) |
592
|
|
|
{ |
593
|
27 |
|
return implode(':', [$this->ns(self::NS_QUEUE), $queue, $state]); |
594
|
|
|
} |
595
|
|
|
|
596
|
|
|
/** |
597
|
|
|
* @param int|\DateTime $mixed |
598
|
|
|
* |
599
|
|
|
* @return int |
600
|
|
|
*/ |
601
|
27 |
|
protected function parseDelay($mixed) |
602
|
|
|
{ |
603
|
27 |
|
if ($mixed instanceof \DateTime) { |
604
|
1 |
|
$delay = $mixed->getTimestamp() - time(); |
605
|
|
|
} else { |
606
|
27 |
|
$delay = (int)$mixed; |
607
|
|
|
} |
608
|
|
|
|
609
|
27 |
|
return $delay < 0 ? 0 : $delay; |
610
|
|
|
} |
611
|
|
|
|
612
|
|
|
/** |
613
|
|
|
* @param string $currentState |
614
|
|
|
* @param string $action |
615
|
|
|
* |
616
|
|
|
* @return $this |
617
|
|
|
* @throws FlowException |
618
|
|
|
*/ |
619
|
24 |
|
protected function checkMessageFlow($currentState, $action) |
620
|
|
|
{ |
621
|
|
|
$mapping = [ |
622
|
24 |
|
'bury' => [self::STATE_RESERVED], |
623
|
24 |
|
'delete' => [self::STATE_RESERVED, self::STATE_BURIED, self::STATE_DELAYED], |
624
|
24 |
|
'kick' => [self::STATE_BURIED], |
625
|
24 |
|
'release' => [self::STATE_RESERVED], |
626
|
|
|
]; |
627
|
|
|
|
628
|
24 |
|
if (!in_array($currentState, $mapping[$action], true)) { |
629
|
5 |
|
throw new FlowException("Flow error, the message state '{$currentState}' cannot be '{$action}'."); |
630
|
|
|
} |
631
|
|
|
|
632
|
19 |
|
return $this; |
633
|
|
|
} |
634
|
|
|
|
635
|
|
|
/** |
636
|
|
|
* @param string|MessageInterface $mixed |
637
|
|
|
* |
638
|
|
|
* @return Payload |
639
|
|
|
* @throws QueueException |
640
|
|
|
*/ |
641
|
24 |
|
protected function payload($mixed) |
642
|
|
|
{ |
643
|
24 |
|
$redis = $this->getRedis(); |
644
|
24 |
|
$token = $mixed instanceof MessageInterface ? $mixed->getToken() : $mixed; |
645
|
24 |
|
$queue = $redis->hget($this->ns(self::NS_MESSAGE_TO_QUEUE), $token); |
646
|
24 |
|
$state = $redis->hget($this->ns(self::NS_MESSAGE_TO_STATE), $token); |
647
|
|
|
|
648
|
24 |
|
return new Payload($token, $queue, $state); |
649
|
|
|
} |
650
|
|
|
|
651
|
|
|
} |
652
|
|
|
|
Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.
You can also find more detailed suggestions in the “Code” section of your repository.