1
|
|
|
<?php |
2
|
|
|
|
3
|
|
|
namespace TreeHouse\WorkerBundle; |
4
|
|
|
|
5
|
|
|
use InvalidArgumentException; |
6
|
|
|
use Pheanstalk\Exception; |
7
|
|
|
use Pheanstalk\Job; |
8
|
|
|
use Pheanstalk\PheanstalkInterface; |
9
|
|
|
use Psr\Log\LoggerInterface; |
10
|
|
|
use Psr\Log\LogLevel; |
11
|
|
|
use TreeHouse\WorkerBundle\Executor\ObjectPayloadInterface; |
12
|
|
|
|
13
|
|
|
class Queue |
14
|
|
|
{ |
15
|
|
|
/** |
16
|
|
|
* @var PheanstalkInterface |
17
|
|
|
*/ |
18
|
|
|
private $pheanstalk; |
19
|
|
|
/** |
20
|
|
|
* @var LoggerInterface |
21
|
|
|
*/ |
22
|
|
|
private $logger; |
23
|
|
|
/** |
24
|
|
|
* @var int |
25
|
|
|
*/ |
26
|
|
|
private $defaultTtr = PheanstalkInterface::DEFAULT_TTR; |
27
|
|
|
/** |
28
|
|
|
* @var ExecutorPool |
29
|
|
|
*/ |
30
|
|
|
private $executorPool; |
31
|
|
|
|
32
|
|
|
/** |
33
|
|
|
* @param PheanstalkInterface $pheanstalk |
34
|
|
|
* @param ExecutorPool $executorPool |
35
|
|
|
* @param LoggerInterface $logger |
36
|
|
|
*/ |
37
|
34 |
|
public function __construct( |
38
|
|
|
PheanstalkInterface $pheanstalk, |
39
|
|
|
ExecutorPool $executorPool, |
40
|
|
|
LoggerInterface $logger |
41
|
|
|
) { |
42
|
34 |
|
$this->pheanstalk = $pheanstalk; |
43
|
34 |
|
$this->logger = $logger; |
44
|
34 |
|
$this->executorPool = $executorPool; |
45
|
34 |
|
} |
46
|
|
|
|
47
|
|
|
/** |
48
|
|
|
* @return PheanstalkInterface |
49
|
|
|
* |
50
|
|
|
* @deprecated Removed in next major version |
51
|
|
|
*/ |
52
|
|
|
public function getPheanstalk() |
53
|
|
|
{ |
54
|
|
|
return $this->pheanstalk; |
55
|
|
|
} |
56
|
|
|
|
57
|
|
|
/** |
58
|
|
|
* @return LoggerInterface |
59
|
|
|
* |
60
|
|
|
* @deprecated Removed in next major version |
61
|
|
|
*/ |
62
|
|
|
public function getLogger() |
63
|
|
|
{ |
64
|
|
|
return $this->logger; |
65
|
|
|
} |
66
|
|
|
|
67
|
|
|
/** |
68
|
|
|
* @param int $defaultTtr |
69
|
|
|
* |
70
|
|
|
* @return $this |
71
|
|
|
*/ |
72
|
|
|
public function setDefaultTtr($defaultTtr) |
73
|
|
|
{ |
74
|
|
|
$this->defaultTtr = $defaultTtr; |
75
|
|
|
|
76
|
|
|
return $this; |
77
|
|
|
} |
78
|
|
|
|
79
|
|
|
/** |
80
|
|
|
* @param string $action |
81
|
|
|
* |
82
|
|
|
* @throws Exception |
83
|
|
|
* |
84
|
|
|
* @return array |
85
|
|
|
*/ |
86
|
3 |
View Code Duplication |
public function getActionStats($action) |
|
|
|
|
87
|
|
|
{ |
88
|
|
|
try { |
89
|
3 |
|
return $this->pheanstalk->statsTube($action); |
90
|
2 |
|
} catch (Exception $exception) { |
91
|
2 |
|
if (false !== strpos($exception->getMessage(), 'NOT_FOUND')) { |
92
|
1 |
|
return null; |
93
|
|
|
} |
94
|
|
|
|
95
|
1 |
|
throw $exception; |
96
|
|
|
} |
97
|
|
|
} |
98
|
|
|
|
99
|
|
|
/** |
100
|
|
|
* Add a job to the queue. |
101
|
|
|
* |
102
|
|
|
* @param string $action The action |
103
|
|
|
* @param array $payload The job's payload |
104
|
|
|
* @param string|int $delay The delay after which the job can be reserved. |
105
|
|
|
* Can be a number of seconds, or a date-diff |
106
|
|
|
* string relative from now, like "10 seconds". |
107
|
|
|
* @param int $priority From 0 (most urgent) to 0xFFFFFFFF (least urgent) |
108
|
|
|
* @param int $ttr Time To Run: seconds a job can be reserved for |
109
|
|
|
* |
110
|
|
|
* @throws InvalidArgumentException When the action is not defined |
111
|
|
|
* @throws InvalidArgumentException When `$delay` or `$priority` is negative |
112
|
|
|
* |
113
|
|
|
* @return int The job id |
114
|
|
|
*/ |
115
|
9 |
View Code Duplication |
public function add($action, array $payload, $delay = null, $priority = null, $ttr = null) |
|
|
|
|
116
|
|
|
{ |
117
|
9 |
|
if (false === $this->executorPool->hasExecutor($action)) { |
118
|
1 |
|
throw new InvalidArgumentException( |
119
|
1 |
|
sprintf( |
120
|
1 |
|
'Action "%s" is not defined in QueueManager', |
121
|
|
|
$action |
122
|
1 |
|
) |
123
|
1 |
|
); |
124
|
|
|
} |
125
|
|
|
|
126
|
8 |
|
if (null === $delay) { |
127
|
3 |
|
$delay = PheanstalkInterface::DEFAULT_DELAY; |
|
|
|
|
128
|
3 |
|
} |
129
|
|
|
|
130
|
8 |
|
if (null === $priority) { |
131
|
4 |
|
$priority = PheanstalkInterface::DEFAULT_PRIORITY; |
|
|
|
|
132
|
4 |
|
} |
133
|
|
|
|
134
|
8 |
|
if (null === $ttr) { |
135
|
5 |
|
$ttr = $this->defaultTtr; |
|
|
|
|
136
|
5 |
|
} |
137
|
|
|
|
138
|
8 |
|
if (!is_numeric($delay)) { |
139
|
2 |
|
$delay = strtotime(sprintf('+ %s', $delay)) - time(); |
|
|
|
|
140
|
2 |
|
} |
141
|
|
|
|
142
|
8 |
|
if ($delay < 0) { |
143
|
2 |
|
throw new InvalidArgumentException( |
144
|
2 |
|
sprintf('You cannot schedule a job in the past (delay was %d)', $delay) |
145
|
2 |
|
); |
146
|
|
|
} |
147
|
|
|
|
148
|
6 |
|
if ($priority < 0) { |
149
|
1 |
|
throw new InvalidArgumentException( |
150
|
1 |
|
sprintf('The priority for a job cannot be negative (was %d)', $priority) |
151
|
1 |
|
); |
152
|
|
|
} |
153
|
|
|
|
154
|
5 |
|
$payload = json_encode($payload); |
|
|
|
|
155
|
5 |
|
$jobId = $this->pheanstalk->putInTube($action, $payload, $priority, $delay, $ttr); |
156
|
|
|
|
157
|
5 |
|
$this->logJob( |
158
|
5 |
|
$jobId, |
159
|
5 |
|
sprintf( |
160
|
5 |
|
'Added job in tube "%s" with: payload: %s, priority: %d, delay: %ds, ttr: %s', |
161
|
5 |
|
$action, |
162
|
5 |
|
$payload, |
163
|
5 |
|
$priority, |
164
|
5 |
|
$delay, |
165
|
|
|
$ttr |
166
|
5 |
|
) |
167
|
5 |
|
); |
168
|
|
|
|
169
|
5 |
|
return $jobId; |
170
|
|
|
} |
171
|
|
|
|
172
|
|
|
/** |
173
|
|
|
* Adds a job to the queue for an object. |
174
|
|
|
* |
175
|
|
|
* @param string $action The action |
176
|
|
|
* @param object $object The object to add a job for |
177
|
|
|
* @param string|int $delay The delay after which the job can be reserved. |
178
|
|
|
* Can be a number of seconds, or a date-diff |
179
|
|
|
* string relative from now, like "10 seconds". |
180
|
|
|
* @param int $priority From 0 (most urgent) to 0xFFFFFFFF (least urgent) |
181
|
|
|
* @param int $ttr Time To Run: seconds a job can be reserved for |
182
|
|
|
* |
183
|
|
|
* @throws \LogicException If the executor does not accepts objects as payloads |
184
|
|
|
* @throws InvalidArgumentException If the executor does not accept the given object |
185
|
|
|
* @throws InvalidArgumentException When the action is not defined |
186
|
|
|
* |
187
|
|
|
* @return int The job id |
188
|
|
|
*/ |
189
|
4 |
View Code Duplication |
public function addForObject($action, $object, $delay = null, $priority = null, $ttr = null) |
|
|
|
|
190
|
|
|
{ |
191
|
4 |
|
$executor = $this->executorPool->getExecutor($action); |
192
|
|
|
|
193
|
4 |
|
if (!$executor instanceof ObjectPayloadInterface) { |
194
|
1 |
|
throw new \LogicException( |
195
|
1 |
|
sprintf( |
196
|
1 |
|
'The executor for action "%s" cannot be used for objects. Implement the ObjectPayloadInterface in class "%s" to enable this.', |
197
|
1 |
|
$action, |
198
|
1 |
|
get_class($executor) |
199
|
1 |
|
) |
200
|
1 |
|
); |
201
|
|
|
} |
202
|
|
|
|
203
|
3 |
|
if (!$executor->supportsObject($object)) { |
204
|
1 |
|
throw new InvalidArgumentException( |
205
|
1 |
|
sprintf( |
206
|
1 |
|
'The executor for action "%s" does not support %s objects', |
207
|
1 |
|
$action, |
208
|
1 |
|
get_class($object) |
209
|
1 |
|
) |
210
|
1 |
|
); |
211
|
|
|
} |
212
|
|
|
|
213
|
2 |
|
$payload = $executor->getObjectPayload($object); |
214
|
|
|
|
215
|
2 |
|
return $this->add($action, $payload, $delay, $priority, $ttr); |
216
|
|
|
} |
217
|
|
|
|
218
|
|
|
/** |
219
|
|
|
* Reschedules a job. |
220
|
|
|
* |
221
|
|
|
* @param Job $job |
222
|
|
|
* @param \DateTime $date |
223
|
|
|
* @param integer $priority |
224
|
|
|
* |
225
|
|
|
* @throws InvalidArgumentException When `$date` is in the past |
226
|
|
|
*/ |
227
|
3 |
View Code Duplication |
public function reschedule(Job $job, \DateTime $date, $priority = PheanstalkInterface::DEFAULT_PRIORITY) |
|
|
|
|
228
|
|
|
{ |
229
|
3 |
|
if ($date < new \DateTime()) { |
230
|
1 |
|
throw new InvalidArgumentException( |
231
|
1 |
|
sprintf( |
232
|
1 |
|
'You cannot reschedule a job in the past (got %s, and the current date is %s)', |
233
|
1 |
|
$date->format(DATE_ISO8601), |
234
|
1 |
|
date(DATE_ISO8601) |
235
|
1 |
|
) |
236
|
1 |
|
); |
237
|
|
|
} |
238
|
|
|
|
239
|
2 |
|
$this->pheanstalk->release($job, $priority, $date->getTimestamp() - time()); |
240
|
|
|
|
241
|
2 |
|
$this->logJob($job->getId(), sprintf('Rescheduled job for %s', $date->format('Y-m-d H:i:s'))); |
242
|
2 |
|
} |
243
|
|
|
|
244
|
|
|
/** |
245
|
|
|
* @param string|string[] $actions |
246
|
|
|
*/ |
247
|
3 |
View Code Duplication |
public function watch($actions) |
|
|
|
|
248
|
|
|
{ |
249
|
3 |
|
if (!is_array($actions)) { |
250
|
2 |
|
$actions = [$actions]; |
|
|
|
|
251
|
2 |
|
} |
252
|
|
|
|
253
|
3 |
|
foreach ($actions as $action) { |
254
|
3 |
|
$this->pheanstalk->watch($action); |
255
|
|
|
|
256
|
3 |
|
$this->logger->debug(sprintf('Watching tube "%s"', $action)); |
257
|
3 |
|
} |
258
|
3 |
|
} |
259
|
|
|
|
260
|
|
|
/** |
261
|
|
|
* @param string|string[] $actions |
262
|
|
|
*/ |
263
|
1 |
|
public function watchOnly($actions) |
264
|
|
|
{ |
265
|
1 |
|
$watching = $this->pheanstalk->listTubesWatched(); |
266
|
|
|
|
267
|
1 |
|
$this->watch($actions); |
268
|
1 |
|
$this->ignore($watching); |
269
|
1 |
|
} |
270
|
|
|
|
271
|
|
|
/** |
272
|
|
|
* @param string|string[] $actions |
273
|
|
|
*/ |
274
|
2 |
View Code Duplication |
public function ignore($actions) |
|
|
|
|
275
|
|
|
{ |
276
|
2 |
|
if (!is_array($actions)) { |
277
|
1 |
|
$actions = [$actions]; |
|
|
|
|
278
|
1 |
|
} |
279
|
|
|
|
280
|
2 |
|
foreach ($actions as $action) { |
281
|
2 |
|
$this->pheanstalk->ignore($action); |
282
|
|
|
|
283
|
2 |
|
$this->logger->debug(sprintf('Ignoring tube "%s"', $action)); |
284
|
2 |
|
} |
285
|
2 |
|
} |
286
|
|
|
|
287
|
|
|
/** |
288
|
|
|
* @param int $timeout |
289
|
|
|
* |
290
|
|
|
* @return Job|bool A job if there is one, false otherwise |
291
|
|
|
*/ |
292
|
2 |
|
public function get($timeout = null) |
293
|
|
|
{ |
294
|
2 |
|
return $this->pheanstalk->reserve($timeout); |
295
|
|
|
} |
296
|
|
|
|
297
|
|
|
/** |
298
|
|
|
* Inspects the next job from the queue. Note that this does not reserve |
299
|
|
|
* the job, so it will still be given to a worker if/once it's ready. |
300
|
|
|
* |
301
|
|
|
* @param string $action The action to peek |
302
|
|
|
* @param string $state The state to peek for, can be 'ready', 'delayed' or 'buried' |
303
|
|
|
* |
304
|
|
|
* @throws InvalidArgumentException When $action is not a defined action |
305
|
|
|
* @throws InvalidArgumentException When $state is not a valid state |
306
|
|
|
* @throws Exception When Pheanstalk decides to do this |
307
|
|
|
* |
308
|
|
|
* @return Job The next job for the given state, or null if there is no next job |
309
|
|
|
*/ |
310
|
6 |
View Code Duplication |
public function peek($action, $state = 'ready') |
|
|
|
|
311
|
|
|
{ |
312
|
6 |
|
if (false === $this->executorPool->hasExecutor($action)) { |
313
|
1 |
|
throw new InvalidArgumentException( |
314
|
1 |
|
sprintf( |
315
|
1 |
|
'Action "%s" is not defined in QueueManager', |
316
|
|
|
$action |
317
|
1 |
|
) |
318
|
1 |
|
); |
319
|
|
|
} |
320
|
|
|
|
321
|
5 |
|
$states = ['ready', 'delayed', 'buried']; |
322
|
|
|
|
323
|
5 |
|
if (!in_array($state, $states)) { |
324
|
2 |
|
throw new InvalidArgumentException( |
325
|
2 |
|
sprintf('$state must be one of %s, got %s', json_encode($states), json_encode($state)) |
326
|
2 |
|
); |
327
|
|
|
} |
328
|
|
|
|
329
|
3 |
|
$peekMethod = sprintf('peek%s', ucfirst($state)); |
330
|
|
|
|
331
|
|
|
try { |
332
|
3 |
|
return $this->pheanstalk->$peekMethod($action); |
333
|
1 |
|
} catch (Exception $exception) { |
334
|
1 |
|
if (false !== strpos($exception->getMessage(), 'NOT_FOUND')) { |
335
|
1 |
|
return null; |
336
|
|
|
} |
337
|
|
|
|
338
|
|
|
throw $exception; |
339
|
|
|
} |
340
|
|
|
} |
341
|
|
|
|
342
|
|
|
/** |
343
|
|
|
* Permanently deletes a job. |
344
|
|
|
* |
345
|
|
|
* @param Job $job |
346
|
|
|
*/ |
347
|
2 |
|
public function delete(Job $job) |
348
|
|
|
{ |
349
|
2 |
|
$this->pheanstalk->delete($job); |
350
|
|
|
|
351
|
2 |
|
$this->logJob($job->getId(), 'Job deleted'); |
352
|
2 |
|
} |
353
|
|
|
|
354
|
|
|
/** |
355
|
|
|
* Puts a job into a 'buried' state, revived only by 'kick' command. |
356
|
|
|
* |
357
|
|
|
* @param Job $job |
358
|
|
|
*/ |
359
|
1 |
|
public function bury(Job $job) |
360
|
|
|
{ |
361
|
1 |
|
$this->pheanstalk->bury($job); |
362
|
|
|
|
363
|
1 |
|
$this->logJob($job->getId(), 'Job buried'); |
364
|
1 |
|
} |
365
|
|
|
|
366
|
|
|
/** |
367
|
|
|
* Puts a job into a 'buried' state, revived only by 'kick' command. |
368
|
|
|
* |
369
|
|
|
* @param string $action |
370
|
|
|
* @param int $max |
371
|
|
|
* |
372
|
|
|
* @return int The number of kicked jobs |
373
|
|
|
*/ |
374
|
1 |
View Code Duplication |
public function kick($action, $max) |
|
|
|
|
375
|
|
|
{ |
376
|
1 |
|
$this->pheanstalk->useTube($action); |
377
|
|
|
|
378
|
1 |
|
$kicked = $this->pheanstalk->kick($max); |
379
|
|
|
|
380
|
1 |
|
$this->logger->debug( |
381
|
1 |
|
sprintf('Kicked %d "%s" jobs back onto the ready queue', $kicked, $action) |
382
|
1 |
|
); |
383
|
|
|
|
384
|
1 |
|
return $kicked; |
385
|
|
|
} |
386
|
|
|
|
387
|
|
|
/** |
388
|
|
|
* @param Job $job |
389
|
|
|
* |
390
|
|
|
* @return array |
391
|
|
|
*/ |
392
|
1 |
|
public function getJobStats(Job $job) |
393
|
|
|
{ |
394
|
1 |
|
return $this->pheanstalk->statsJob($job); |
395
|
|
|
} |
396
|
|
|
|
397
|
|
|
/** |
398
|
|
|
* CAUTION: this removes all items from an action's queue. |
399
|
|
|
* This is an irreversible action! |
400
|
|
|
* |
401
|
|
|
* @param string $action |
402
|
|
|
* @param array $states |
403
|
|
|
*/ |
404
|
2 |
View Code Duplication |
public function clear($action, array $states = []) |
|
|
|
|
405
|
|
|
{ |
406
|
2 |
|
if (empty($states)) { |
407
|
|
|
$states = ['ready', 'delayed', 'buried']; |
|
|
|
|
408
|
|
|
} |
409
|
|
|
|
410
|
2 |
|
foreach ($states as $state) { |
411
|
2 |
|
$this->clearTube($action, $state); |
412
|
1 |
|
} |
413
|
1 |
|
} |
414
|
|
|
|
415
|
|
|
/** |
416
|
|
|
* @param string $tube |
417
|
|
|
* @param string $state |
418
|
|
|
* |
419
|
|
|
* @throws Exception |
420
|
|
|
*/ |
421
|
2 |
View Code Duplication |
protected function clearTube($tube, $state = 'ready') |
|
|
|
|
422
|
|
|
{ |
423
|
2 |
|
$this->logger->info(sprintf('Clearing all jobs with the "%s" state in tube "%s"', $state, $tube)); |
424
|
|
|
|
425
|
2 |
|
while ($job = $this->peek($tube, $state)) { |
426
|
|
|
try { |
427
|
1 |
|
$this->delete($job); |
428
|
1 |
|
} catch (Exception $e) { |
429
|
|
|
// job could have been deleted by another process |
430
|
|
|
if (false === strpos($e->getMessage(), 'NOT_FOUND')) { |
431
|
|
|
throw $e; |
432
|
|
|
} |
433
|
|
|
} |
434
|
1 |
|
} |
435
|
1 |
|
} |
436
|
|
|
|
437
|
|
|
/** |
438
|
|
|
* @param int $jobId |
439
|
|
|
* @param string $msg |
440
|
|
|
* @param string $level |
441
|
|
|
* @param array $context |
442
|
|
|
*/ |
443
|
10 |
|
private function logJob($jobId, $msg, $level = LogLevel::DEBUG, array $context = []) |
444
|
|
|
{ |
445
|
10 |
|
$this->logger->log($level, sprintf('[%s] %s', $jobId, $msg), $context); |
446
|
10 |
|
} |
447
|
|
|
} |
448
|
|
|
|
Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.
You can also find more detailed suggestions in the “Code” section of your repository.