These results are based on our legacy PHP analysis, consider migrating to our new PHP analysis engine instead. Learn more
1 | <?php |
||
2 | |||
3 | namespace TreeHouse\WorkerBundle; |
||
4 | |||
5 | use Pheanstalk\Exception; |
||
6 | use Pheanstalk\Job; |
||
7 | use Pheanstalk\PheanstalkInterface; |
||
8 | use Psr\Log\LoggerInterface; |
||
9 | use Psr\Log\LogLevel; |
||
10 | use Psr\Log\NullLogger; |
||
11 | use Symfony\Component\EventDispatcher\EventDispatcherInterface; |
||
12 | use Symfony\Component\OptionsResolver\Exception\ExceptionInterface; |
||
13 | use Symfony\Component\OptionsResolver\OptionsResolver; |
||
14 | use TreeHouse\WorkerBundle\Event\ExecutionEvent; |
||
15 | use TreeHouse\WorkerBundle\Event\JobBuriedEvent; |
||
16 | use TreeHouse\WorkerBundle\Event\JobEvent; |
||
17 | use TreeHouse\WorkerBundle\Exception\AbortException; |
||
18 | use TreeHouse\WorkerBundle\Exception\RescheduleException; |
||
19 | use TreeHouse\WorkerBundle\Executor\ExecutorInterface; |
||
20 | use TreeHouse\WorkerBundle\Executor\ObjectPayloadInterface; |
||
21 | |||
22 | /** |
||
23 | * The QueueManager is a service which handles persistent scheduled actions. |
||
24 | * It defines certain actions which can be added and processed. |
||
25 | * |
||
26 | * This is useful when you have a lot of actions which you want to perform |
||
27 | * later on, e.g. when importing large amounts of items. |
||
28 | * |
||
29 | * NOTE: While the manager tries to prevent duplicate actions, there is no |
||
30 | * guarantee of this. As a result of this, you should make all jobs idempotent, |
||
31 | * meaning they can be processed more than once. |
||
32 | */ |
||
33 | class QueueManager |
||
34 | { |
||
35 | /** |
||
36 | * @var PheanstalkInterface |
||
37 | */ |
||
38 | protected $pheanstalk; |
||
39 | |||
40 | /** |
||
41 | * @var EventDispatcherInterface |
||
42 | */ |
||
43 | protected $dispatcher; |
||
44 | |||
45 | /** |
||
46 | * @var LoggerInterface |
||
47 | */ |
||
48 | protected $logger; |
||
49 | |||
50 | /** |
||
51 | * Registered executors. |
||
52 | * |
||
53 | * @var array<string, ExecutorInterface> |
||
54 | */ |
||
55 | protected $executors = []; |
||
56 | |||
57 | /** |
||
58 | * Cached payload resolvers for the executors. |
||
59 | * |
||
60 | * @var OptionsResolver[] |
||
61 | */ |
||
62 | protected $resolvers = []; |
||
63 | |||
64 | /** |
||
65 | * @var int |
||
66 | */ |
||
67 | protected $defaultTtr = PheanstalkInterface::DEFAULT_TTR; |
||
68 | |||
69 | /** |
||
70 | * @param PheanstalkInterface $pheanstalk |
||
71 | * @param EventDispatcherInterface $dispatcher |
||
72 | * @param LoggerInterface $logger |
||
73 | */ |
||
74 | 46 | public function __construct(PheanstalkInterface $pheanstalk, EventDispatcherInterface $dispatcher, LoggerInterface $logger = null) |
|
75 | { |
||
76 | 46 | $this->pheanstalk = $pheanstalk; |
|
77 | 46 | $this->dispatcher = $dispatcher; |
|
78 | 46 | $this->logger = $logger ?: new NullLogger(); |
|
79 | 46 | } |
|
80 | |||
81 | /** |
||
82 | * @param int $defaultTtr |
||
83 | * |
||
84 | * @return $this |
||
85 | */ |
||
86 | public function setDefaultTtr($defaultTtr) |
||
87 | { |
||
88 | $this->defaultTtr = $defaultTtr; |
||
89 | |||
90 | return $this; |
||
91 | } |
||
92 | |||
93 | /** |
||
94 | * @return PheanstalkInterface |
||
95 | */ |
||
96 | 1 | public function getPheanstalk() |
|
97 | { |
||
98 | 1 | return $this->pheanstalk; |
|
99 | } |
||
100 | |||
101 | /** |
||
102 | * @return EventDispatcherInterface |
||
103 | */ |
||
104 | 1 | public function getDispatcher() |
|
105 | { |
||
106 | 1 | return $this->dispatcher; |
|
107 | } |
||
108 | |||
109 | /** |
||
110 | * @param string $action |
||
111 | * |
||
112 | * @return bool |
||
113 | */ |
||
114 | 31 | public function hasExecutor($action) |
|
115 | { |
||
116 | 31 | return array_key_exists($action, $this->executors); |
|
117 | } |
||
118 | |||
119 | /** |
||
120 | * Add an executor. |
||
121 | * |
||
122 | * @param ExecutorInterface $executor |
||
123 | * |
||
124 | * @throws \InvalidArgumentException |
||
125 | */ |
||
126 | 28 | public function addExecutor(ExecutorInterface $executor) |
|
127 | { |
||
128 | 28 | $action = $executor->getName(); |
|
129 | |||
130 | 28 | if ($this->hasExecutor($action)) { |
|
131 | 1 | throw new \InvalidArgumentException(sprintf( |
|
132 | 1 | 'There is already an executor registered for action "%s".', |
|
133 | $action |
||
134 | 1 | )); |
|
135 | } |
||
136 | |||
137 | 28 | $this->executors[$action] = $executor; |
|
138 | 28 | } |
|
139 | |||
140 | /** |
||
141 | * Returns a registered executor for given action. |
||
142 | * |
||
143 | * @param string $action |
||
144 | * |
||
145 | * @throws \OutOfBoundsException |
||
146 | * |
||
147 | * @return ExecutorInterface |
||
148 | */ |
||
149 | 14 | public function getExecutor($action) |
|
150 | { |
||
151 | 14 | if (!$this->hasExecutor($action)) { |
|
152 | 1 | throw new \OutOfBoundsException(sprintf( |
|
153 | 1 | 'There is no executor registered for action "%s".', |
|
154 | $action |
||
155 | 1 | )); |
|
156 | } |
||
157 | |||
158 | 13 | return $this->executors[$action]; |
|
159 | } |
||
160 | |||
161 | /** |
||
162 | * @return array<string, ExecutorInterface> |
||
163 | */ |
||
164 | 1 | public function getExecutors() |
|
165 | { |
||
166 | 1 | return $this->executors; |
|
167 | } |
||
168 | |||
169 | /** |
||
170 | * @param string $action |
||
171 | * |
||
172 | * @throws Exception |
||
173 | * |
||
174 | * @return array |
||
175 | */ |
||
176 | 3 | public function getActionStats($action) |
|
177 | { |
||
178 | try { |
||
179 | 3 | return $this->pheanstalk->statsTube($action); |
|
180 | 2 | } catch (Exception $exception) { |
|
181 | 2 | if (false !== strpos($exception->getMessage(), 'NOT_FOUND')) { |
|
182 | 1 | return null; |
|
183 | } |
||
184 | |||
185 | 1 | throw $exception; |
|
186 | } |
||
187 | } |
||
188 | |||
189 | /** |
||
190 | * Add a job to the queue. |
||
191 | * |
||
192 | * @param string $action The action |
||
193 | * @param array $payload The job's payload |
||
194 | * @param string|int $delay The delay after which the job can be reserved. |
||
195 | * Can be a number of seconds, or a date-diff |
||
196 | * string relative from now, like "10 seconds". |
||
197 | * @param int $priority From 0 (most urgent) to 0xFFFFFFFF (least urgent) |
||
198 | * @param int $ttr Time To Run: seconds a job can be reserved for |
||
199 | * |
||
200 | * @throws \InvalidArgumentException When the action is not defined |
||
201 | * @throws \InvalidArgumentException When `$delay` or `$priority` is negative |
||
202 | * |
||
203 | * @return int The job id |
||
204 | */ |
||
205 | 9 | public function add($action, array $payload, $delay = null, $priority = null, $ttr = null) |
|
206 | { |
||
207 | 9 | if (false === $this->hasExecutor($action)) { |
|
208 | 1 | throw new \InvalidArgumentException(sprintf( |
|
209 | 1 | 'Action "%s" is not defined in QueueManager', |
|
210 | $action |
||
211 | 1 | )); |
|
212 | } |
||
213 | |||
214 | 8 | if (null === $delay) { |
|
215 | 3 | $delay = PheanstalkInterface::DEFAULT_DELAY; |
|
216 | 3 | } |
|
217 | |||
218 | 8 | if (null === $priority) { |
|
219 | 4 | $priority = PheanstalkInterface::DEFAULT_PRIORITY; |
|
220 | 4 | } |
|
221 | |||
222 | 8 | if (null === $ttr) { |
|
223 | 5 | $ttr = $this->defaultTtr; |
|
224 | 5 | } |
|
225 | |||
226 | 8 | if (!is_numeric($delay)) { |
|
227 | 1 | $delay = strtotime(sprintf('+ %s', $delay)) - time(); |
|
228 | 1 | } |
|
229 | |||
230 | 8 | if ($delay < 0) { |
|
231 | 2 | throw new \InvalidArgumentException( |
|
232 | 2 | sprintf('You cannot schedule a job in the past (delay was %d)', $delay) |
|
233 | 2 | ); |
|
234 | } |
||
235 | |||
236 | 6 | if ($priority < 0) { |
|
237 | 1 | throw new \InvalidArgumentException( |
|
238 | 1 | sprintf('The priority for a job cannot be negative (was %d)', $priority) |
|
239 | 1 | ); |
|
240 | } |
||
241 | |||
242 | 5 | $payload = json_encode($payload); |
|
243 | 5 | $jobId = $this->pheanstalk->putInTube($action, $payload, $priority, $delay, $ttr); |
|
244 | |||
245 | 5 | $this->logJob( |
|
246 | 5 | $jobId, |
|
247 | 5 | sprintf( |
|
248 | 5 | 'Added job in tube "%s" with: payload: %s, priority: %d, delay: %ds, ttr: %s', |
|
249 | 5 | $action, |
|
250 | 5 | $payload, |
|
251 | 5 | $priority, |
|
252 | 5 | $delay, |
|
253 | $ttr |
||
254 | 5 | ) |
|
255 | 5 | ); |
|
256 | |||
257 | 5 | return $jobId; |
|
258 | } |
||
259 | |||
260 | /** |
||
261 | * Adds a job to the queue for an object. |
||
262 | * |
||
263 | * @param string $action The action |
||
264 | * @param object $object The object to add a job for |
||
265 | * @param string|int $delay The delay after which the job can be reserved. |
||
266 | * Can be a number of seconds, or a date-diff |
||
267 | * string relative from now, like "10 seconds". |
||
268 | * @param int $priority From 0 (most urgent) to 0xFFFFFFFF (least urgent) |
||
269 | * @param int $ttr Time To Run: seconds a job can be reserved for |
||
270 | * |
||
271 | * @throws \LogicException If the executor does not accepts objects as payloads |
||
272 | * @throws \InvalidArgumentException If the executor does not accept the given object |
||
273 | * @throws \InvalidArgumentException When the action is not defined |
||
274 | * |
||
275 | * @return int The job id |
||
276 | */ |
||
277 | 4 | public function addForObject($action, $object, $delay = null, $priority = null, $ttr = null) |
|
278 | { |
||
279 | 4 | $executor = $this->getExecutor($action); |
|
280 | |||
281 | 4 | if (!$executor instanceof ObjectPayloadInterface) { |
|
282 | 1 | throw new \LogicException( |
|
283 | 1 | sprintf( |
|
284 | 1 | 'The executor for action "%s" cannot be used for objects. Implement the ObjectPayloadInterface in class "%s" to enable this.', |
|
285 | 1 | $action, |
|
286 | 1 | get_class($executor) |
|
287 | 1 | ) |
|
288 | 1 | ); |
|
289 | } |
||
290 | |||
291 | 3 | if (!$executor->supportsObject($object)) { |
|
292 | 1 | throw new \InvalidArgumentException( |
|
293 | 1 | sprintf( |
|
294 | 1 | 'The executor for action "%s" does not support %s objects', |
|
295 | 1 | $action, |
|
296 | 1 | get_class($object) |
|
297 | 1 | ) |
|
298 | 1 | ); |
|
299 | } |
||
300 | |||
301 | 2 | $payload = $executor->getObjectPayload($object); |
|
302 | |||
303 | 2 | return $this->add($action, $payload, $delay, $priority, $ttr); |
|
304 | } |
||
305 | |||
306 | /** |
||
307 | * Reschedules a job. |
||
308 | * |
||
309 | * @param Job $job |
||
310 | * @param \DateTime $date |
||
311 | * @param integer $priority |
||
312 | * |
||
313 | * @throws \InvalidArgumentException When `$date` is in the past |
||
314 | */ |
||
315 | 6 | public function reschedule(Job $job, \DateTime $date, $priority = PheanstalkInterface::DEFAULT_PRIORITY) |
|
316 | { |
||
317 | 6 | if ($date < new \DateTime()) { |
|
318 | 1 | throw new \InvalidArgumentException( |
|
319 | 1 | sprintf('You cannot reschedule a job in the past (got %s, and the current date is %s)', $date->format(DATE_ISO8601), date(DATE_ISO8601)) |
|
320 | 1 | ); |
|
321 | } |
||
322 | |||
323 | 5 | $this->pheanstalk->release($job, $priority, $date->getTimestamp() - time()); |
|
324 | |||
325 | 5 | $this->logJob($job->getId(), sprintf('Rescheduled job for %s', $date->format('Y-m-d H:i:s'))); |
|
326 | 5 | } |
|
327 | |||
328 | /** |
||
329 | * @param string|string[] $actions |
||
330 | */ |
||
331 | 3 | View Code Duplication | public function watch($actions) |
0 ignored issues
–
show
|
|||
332 | { |
||
333 | 3 | if (!is_array($actions)) { |
|
334 | 2 | $actions = [$actions]; |
|
335 | 2 | } |
|
336 | |||
337 | 3 | foreach ($actions as $action) { |
|
338 | 3 | $this->pheanstalk->watch($action); |
|
339 | |||
340 | 3 | $this->logger->debug(sprintf('Watching tube "%s"', $action)); |
|
341 | 3 | } |
|
342 | 3 | } |
|
343 | |||
344 | /** |
||
345 | * @param string|string[] $actions |
||
346 | */ |
||
347 | 1 | public function watchOnly($actions) |
|
348 | { |
||
349 | 1 | $watching = $this->pheanstalk->listTubesWatched(); |
|
350 | |||
351 | 1 | $this->watch($actions); |
|
352 | 1 | $this->ignore($watching); |
|
353 | 1 | } |
|
354 | |||
355 | /** |
||
356 | * @param string|string[] $actions |
||
357 | */ |
||
358 | 2 | View Code Duplication | public function ignore($actions) |
0 ignored issues
–
show
This method seems to be duplicated in your project.
Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation. You can also find more detailed suggestions in the “Code” section of your repository.
Loading history...
|
|||
359 | { |
||
360 | 2 | if (!is_array($actions)) { |
|
361 | 1 | $actions = [$actions]; |
|
362 | 1 | } |
|
363 | |||
364 | 2 | foreach ($actions as $action) { |
|
365 | 2 | $this->pheanstalk->ignore($action); |
|
366 | |||
367 | 2 | $this->logger->debug(sprintf('Ignoring tube "%s"', $action)); |
|
368 | 2 | } |
|
369 | 2 | } |
|
370 | |||
371 | /** |
||
372 | * @param int $timeout |
||
373 | * |
||
374 | * @return Job|bool A job if there is one, false otherwise |
||
375 | */ |
||
376 | 2 | public function get($timeout = null) |
|
377 | { |
||
378 | 2 | return $this->pheanstalk->reserve($timeout); |
|
379 | } |
||
380 | |||
381 | /** |
||
382 | * Inspects the next job from the queue. Note that this does not reserve |
||
383 | * the job, so it will still be given to a worker if/once it's ready. |
||
384 | * |
||
385 | * @param string $action The action to peek |
||
386 | * @param string $state The state to peek for, can be 'ready', 'delayed' or 'buried' |
||
387 | * |
||
388 | * @throws \InvalidArgumentException When $action is not a defined action |
||
389 | * @throws \InvalidArgumentException When $state is not a valid state |
||
390 | * @throws Exception When Pheanstalk decides to do this |
||
391 | * |
||
392 | * @return Job The next job for the given state, or null if there is no next job |
||
393 | */ |
||
394 | 6 | public function peek($action, $state = 'ready') |
|
395 | { |
||
396 | 6 | if (false === $this->hasExecutor($action)) { |
|
397 | 1 | throw new \InvalidArgumentException(sprintf( |
|
398 | 1 | 'Action "%s" is not defined in QueueManager', |
|
399 | $action |
||
400 | 1 | )); |
|
401 | } |
||
402 | |||
403 | 5 | $states = ['ready', 'delayed', 'buried']; |
|
404 | 5 | if (!in_array($state, $states)) { |
|
405 | 2 | throw new \InvalidArgumentException( |
|
406 | 2 | sprintf('$state must be one of %s, got %s', json_encode($states), json_encode($state)) |
|
407 | 2 | ); |
|
408 | } |
||
409 | |||
410 | 3 | $peekMethod = sprintf('peek%s', ucfirst($state)); |
|
411 | |||
412 | try { |
||
413 | 3 | return $this->pheanstalk->$peekMethod($action); |
|
414 | 1 | } catch (Exception $exception) { |
|
415 | 1 | if (false !== strpos($exception->getMessage(), 'NOT_FOUND')) { |
|
416 | 1 | return null; |
|
417 | } |
||
418 | |||
419 | throw $exception; |
||
420 | } |
||
421 | } |
||
422 | |||
423 | /** |
||
424 | * Permanently deletes a job. |
||
425 | * |
||
426 | * @param Job $job |
||
427 | */ |
||
428 | 3 | public function delete(Job $job) |
|
429 | { |
||
430 | 3 | $this->pheanstalk->delete($job); |
|
431 | |||
432 | 3 | $this->logJob($job->getId(), 'Job deleted'); |
|
433 | 3 | } |
|
434 | |||
435 | /** |
||
436 | * Puts a job into a 'buried' state, revived only by 'kick' command. |
||
437 | * |
||
438 | * @param Job $job |
||
439 | */ |
||
440 | 2 | public function bury(Job $job) |
|
441 | { |
||
442 | 2 | $this->pheanstalk->bury($job); |
|
443 | |||
444 | 2 | $this->logJob($job->getId(), 'Job buried'); |
|
445 | 2 | } |
|
446 | |||
447 | /** |
||
448 | * Puts a job into a 'buried' state, revived only by 'kick' command. |
||
449 | * |
||
450 | * @param string $action |
||
451 | * @param int $max |
||
452 | * |
||
453 | * @return int The number of kicked jobs |
||
454 | */ |
||
455 | 1 | public function kick($action, $max) |
|
456 | { |
||
457 | 1 | $this->pheanstalk->useTube($action); |
|
458 | |||
459 | 1 | $kicked = $this->pheanstalk->kick($max); |
|
460 | |||
461 | 1 | $this->logger->debug( |
|
462 | 1 | sprintf('Kicked %d "%s" jobs back onto the ready queue', $kicked, $action) |
|
463 | 1 | ); |
|
464 | |||
465 | 1 | return $kicked; |
|
466 | } |
||
467 | |||
468 | /** |
||
469 | * @param Job $job |
||
470 | * |
||
471 | * @return array |
||
472 | */ |
||
473 | 1 | public function getJobStats(Job $job) |
|
474 | { |
||
475 | 1 | return $this->pheanstalk->statsJob($job); |
|
476 | } |
||
477 | |||
478 | /** |
||
479 | * @param Job $job The job to process |
||
480 | * @param int $maxRetries The number of retries for this job |
||
481 | * |
||
482 | * @throws AbortException |
||
483 | * |
||
484 | * @return bool|mixed The executor result if successful, false otherwise |
||
485 | */ |
||
486 | 6 | public function executeJob(Job $job, $maxRetries = 1) |
|
487 | { |
||
488 | 6 | $this->dispatcher->dispatch(WorkerEvents::EXECUTE_JOB, new JobEvent($job)); |
|
489 | |||
490 | 6 | $stats = $this->pheanstalk->statsJob($job); |
|
491 | 6 | $payload = (array) json_decode($job->getData(), true); |
|
492 | 6 | $releases = intval($stats['releases']); |
|
493 | 6 | $priority = intval($stats['pri']); |
|
494 | |||
495 | // context for logging |
||
496 | $context = [ |
||
497 | 6 | 'tube' => $stats['tube'], |
|
498 | 6 | 'payload' => $payload, |
|
499 | 6 | 'attempt' => $releases + 1, |
|
500 | 6 | ]; |
|
501 | |||
502 | try { |
||
503 | // execute command |
||
504 | 6 | $result = $this->execute($stats['tube'], $payload); |
|
505 | |||
506 | // delete job if it completed without exceptions |
||
507 | 1 | $this->delete($job); |
|
508 | |||
509 | 2 | return $result; |
|
510 | 5 | } catch (RescheduleException $re) { |
|
511 | // Override priority if the RescheduleException provides a new one. |
||
512 | 2 | if (!is_null($re->getReshedulePriority())) { |
|
513 | 1 | $priority = $re->getReshedulePriority(); |
|
514 | 1 | } |
|
515 | // reschedule the job |
||
516 | 2 | $this->reschedule($job, $re->getRescheduleDate(), $priority); |
|
517 | 5 | } catch (AbortException $e) { |
|
518 | // abort thrown from executor, rethrow it and let the caller handle it |
||
519 | 1 | throw $e; |
|
520 | 2 | } catch (\Exception $e) { |
|
521 | // some other exception occured |
||
522 | 2 | $message = sprintf('Exception occurred: %s in %s on line %d', $e->getMessage(), $e->getFile(), $e->getLine()); |
|
523 | 2 | $this->logJob($job->getId(), $message, LogLevel::ERROR, $context); |
|
524 | 2 | $this->logJob($job->getId(), $e->getTraceAsString(), LogLevel::DEBUG, $context); |
|
525 | |||
526 | // see if we have any retries left |
||
527 | 2 | if ($releases > $maxRetries) { |
|
528 | // no more retries, bury job for manual inspection |
||
529 | 1 | $this->bury($job); |
|
530 | |||
531 | 1 | $this->dispatcher->dispatch(WorkerEvents::JOB_BURIED_EVENT, new JobBuriedEvent($job, $e, $releases)); |
|
532 | 1 | } else { |
|
533 | // try again, regardless of the error |
||
534 | 1 | $this->reschedule($job, new \DateTime('+10 minutes'), $priority); |
|
535 | } |
||
536 | } |
||
537 | |||
538 | 4 | return false; |
|
539 | } |
||
540 | |||
541 | /** |
||
542 | * Executes an action with a specific payload. |
||
543 | * |
||
544 | * @param string $action |
||
545 | * @param array $payload |
||
546 | * |
||
547 | * @return mixed |
||
548 | */ |
||
549 | 8 | public function execute($action, array $payload) |
|
550 | { |
||
551 | 8 | $executor = $this->getExecutor($action); |
|
552 | |||
553 | // dispatch pre event, listeners may change the payload here |
||
554 | 8 | $event = new ExecutionEvent($executor, $action, $payload); |
|
555 | 8 | $this->dispatcher->dispatch(WorkerEvents::PRE_EXECUTE_ACTION, $event); |
|
556 | |||
557 | try { |
||
558 | 8 | $resolver = $this->getPayloadResolver($executor); |
|
559 | 8 | $payload = $resolver->resolve($event->getPayload()); |
|
560 | 8 | } catch (ExceptionInterface $exception) { |
|
561 | 1 | $this->logger->error( |
|
562 | 1 | sprintf( |
|
563 | 1 | 'Payload %s for "%s" is invalid: %s', |
|
564 | 1 | json_encode($payload, JSON_UNESCAPED_SLASHES), |
|
565 | 1 | $action, |
|
566 | 1 | $exception->getMessage() |
|
567 | 1 | ) |
|
568 | 1 | ); |
|
569 | |||
570 | 1 | return false; |
|
571 | } |
||
572 | |||
573 | 7 | $result = $executor->execute($payload); |
|
574 | // dispatch post event, listeners may change the result here |
||
575 | 2 | $event->setResult($result); |
|
576 | 2 | $this->dispatcher->dispatch(WorkerEvents::POST_EXECUTE_ACTION, $event); |
|
577 | |||
578 | 2 | return $event->getResult(); |
|
579 | } |
||
580 | |||
581 | /** |
||
582 | * CAUTION: this removes all items from an action's queue. |
||
583 | * This is an irreversible action! |
||
584 | * |
||
585 | * @param string $action |
||
586 | * @param array $states |
||
587 | */ |
||
588 | 2 | public function clear($action, array $states = []) |
|
589 | { |
||
590 | 2 | if (empty($states)) { |
|
591 | $states = ['ready', 'delayed', 'buried']; |
||
592 | } |
||
593 | |||
594 | 2 | foreach ($states as $state) { |
|
595 | 2 | $this->clearTube($action, $state); |
|
596 | 1 | } |
|
597 | 1 | } |
|
598 | |||
599 | /** |
||
600 | * @return LoggerInterface |
||
601 | */ |
||
602 | public function getLogger() |
||
603 | { |
||
604 | return $this->logger; |
||
605 | } |
||
606 | |||
607 | /** |
||
608 | * @param string $tube |
||
609 | * @param string $state |
||
610 | * |
||
611 | * @throws Exception |
||
612 | */ |
||
613 | 2 | protected function clearTube($tube, $state = 'ready') |
|
614 | { |
||
615 | 2 | $this->logger->info(sprintf('Clearing all jobs with the "%s" state in tube "%s"', $state, $tube)); |
|
616 | |||
617 | 2 | while ($job = $this->peek($tube, $state)) { |
|
618 | try { |
||
619 | 1 | $this->delete($job); |
|
620 | 1 | } catch (Exception $e) { |
|
621 | // job could have been deleted by another process |
||
622 | if (false === strpos($e->getMessage(), 'NOT_FOUND')) { |
||
623 | throw $e; |
||
624 | } |
||
625 | } |
||
626 | 1 | } |
|
627 | 1 | } |
|
628 | |||
629 | /** |
||
630 | * Returns a cached version of the payload resolver for an executor. |
||
631 | * |
||
632 | * @param ExecutorInterface $executor |
||
633 | * |
||
634 | * @return OptionsResolver |
||
635 | */ |
||
636 | 8 | protected function getPayloadResolver(ExecutorInterface $executor) |
|
637 | { |
||
638 | 8 | $key = $executor->getName(); |
|
639 | |||
640 | 8 | if (!array_key_exists($key, $this->resolvers)) { |
|
641 | 8 | $resolver = new OptionsResolver(); |
|
642 | 8 | $executor->configurePayload($resolver); |
|
643 | |||
644 | 8 | $this->resolvers[$key] = $resolver; |
|
645 | 8 | } |
|
646 | |||
647 | 8 | return $this->resolvers[$key]; |
|
648 | } |
||
649 | |||
650 | /** |
||
651 | * @param int $jobId |
||
652 | * @param string $msg |
||
653 | * @param string $level |
||
654 | * @param array $context |
||
655 | */ |
||
656 | 15 | protected function logJob($jobId, $msg, $level = LogLevel::DEBUG, array $context = []) |
|
657 | { |
||
658 | 15 | $this->logger->log($level, sprintf('[%s] %s', $jobId, $msg), $context); |
|
659 | 15 | } |
|
660 | } |
||
661 |
Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.
You can also find more detailed suggestions in the “Code” section of your repository.