These results are based on our legacy PHP analysis, consider migrating to our new PHP analysis engine instead. Learn more
1 | <?php |
||
2 | |||
3 | namespace TreeHouse\WorkerBundle; |
||
4 | |||
5 | use Pheanstalk\Exception; |
||
6 | use Pheanstalk\Job; |
||
7 | use Pheanstalk\PheanstalkInterface; |
||
8 | use Psr\Log\LoggerInterface; |
||
9 | use Psr\Log\LogLevel; |
||
10 | use Psr\Log\NullLogger; |
||
11 | use Symfony\Component\EventDispatcher\EventDispatcherInterface; |
||
12 | use Symfony\Component\OptionsResolver\Exception\ExceptionInterface; |
||
13 | use Symfony\Component\OptionsResolver\OptionsResolver; |
||
14 | use TreeHouse\WorkerBundle\Event\ExecutionEvent; |
||
15 | use TreeHouse\WorkerBundle\Event\JobBuriedEvent; |
||
16 | use TreeHouse\WorkerBundle\Event\JobEvent; |
||
17 | use TreeHouse\WorkerBundle\Exception\AbortException; |
||
18 | use TreeHouse\WorkerBundle\Exception\RescheduleException; |
||
19 | use TreeHouse\WorkerBundle\Executor\ExecutorInterface; |
||
20 | use TreeHouse\WorkerBundle\Executor\ObjectPayloadInterface; |
||
21 | |||
22 | /** |
||
23 | * The QueueManager is a service which handles persistent scheduled actions. |
||
24 | * It defines certain actions which can be added and processed. |
||
25 | * |
||
26 | * This is useful when you have a lot of actions which you want to perform |
||
27 | * later on, e.g. when importing large amounts of items. |
||
28 | * |
||
29 | * NOTE: While the manager tries to prevent duplicate actions, there is no |
||
30 | * guarantee of this. As a result of this, you should make all jobs idempotent, |
||
31 | * meaning they can be processed more than once. |
||
32 | */ |
||
33 | class QueueManager |
||
34 | { |
||
35 | /** |
||
36 | * @var PheanstalkInterface |
||
37 | */ |
||
38 | protected $pheanstalk; |
||
39 | |||
40 | /** |
||
41 | * @var EventDispatcherInterface |
||
42 | */ |
||
43 | protected $dispatcher; |
||
44 | |||
45 | /** |
||
46 | * @var LoggerInterface |
||
47 | */ |
||
48 | protected $logger; |
||
49 | |||
50 | /** |
||
51 | * Registered executors. |
||
52 | * |
||
53 | * @var array<string, ExecutorInterface> |
||
54 | */ |
||
55 | protected $executors = []; |
||
56 | |||
57 | /** |
||
58 | * Cached payload resolvers for the executors. |
||
59 | * |
||
60 | * @var OptionsResolver[] |
||
61 | */ |
||
62 | protected $resolvers = []; |
||
63 | |||
64 | /** |
||
65 | * @var int |
||
66 | */ |
||
67 | protected $defaultTtr = PheanstalkInterface::DEFAULT_TTR; |
||
68 | |||
69 | /** |
||
70 | * @param PheanstalkInterface $pheanstalk |
||
71 | * @param EventDispatcherInterface $dispatcher |
||
72 | * @param LoggerInterface $logger |
||
73 | */ |
||
74 | 47 | public function __construct(PheanstalkInterface $pheanstalk, EventDispatcherInterface $dispatcher, LoggerInterface $logger = null) |
|
75 | { |
||
76 | 47 | $this->pheanstalk = $pheanstalk; |
|
77 | 47 | $this->dispatcher = $dispatcher; |
|
78 | 47 | $this->logger = $logger ?: new NullLogger(); |
|
79 | 47 | } |
|
80 | |||
81 | /** |
||
82 | * @param int $defaultTtr |
||
83 | * |
||
84 | * @return $this |
||
85 | */ |
||
86 | public function setDefaultTtr($defaultTtr) |
||
87 | { |
||
88 | $this->defaultTtr = $defaultTtr; |
||
89 | |||
90 | return $this; |
||
91 | } |
||
92 | |||
93 | /** |
||
94 | * @return PheanstalkInterface |
||
95 | */ |
||
96 | 1 | public function getPheanstalk() |
|
97 | { |
||
98 | 1 | return $this->pheanstalk; |
|
99 | } |
||
100 | |||
101 | /** |
||
102 | * @return EventDispatcherInterface |
||
103 | */ |
||
104 | 1 | public function getDispatcher() |
|
105 | { |
||
106 | 1 | return $this->dispatcher; |
|
107 | } |
||
108 | |||
109 | /** |
||
110 | * @param string $action |
||
111 | * |
||
112 | * @return bool |
||
113 | */ |
||
114 | 32 | public function hasExecutor($action) |
|
115 | { |
||
116 | 32 | return array_key_exists($action, $this->executors); |
|
117 | } |
||
118 | |||
119 | /** |
||
120 | * Add an executor. |
||
121 | * |
||
122 | * @param ExecutorInterface $executor |
||
123 | * |
||
124 | * @throws \InvalidArgumentException |
||
125 | */ |
||
126 | 29 | public function addExecutor(ExecutorInterface $executor) |
|
127 | { |
||
128 | 29 | $action = $executor->getName(); |
|
129 | |||
130 | 29 | if ($this->hasExecutor($action)) { |
|
131 | 1 | throw new \InvalidArgumentException(sprintf( |
|
132 | 1 | 'There is already an executor registered for action "%s".', |
|
133 | 1 | $action |
|
134 | )); |
||
135 | } |
||
136 | |||
137 | 29 | $this->executors[$action] = $executor; |
|
138 | 29 | } |
|
139 | |||
140 | /** |
||
141 | * Returns a registered executor for given action. |
||
142 | * |
||
143 | * @param string $action |
||
144 | * |
||
145 | * @throws \OutOfBoundsException |
||
146 | * |
||
147 | * @return ExecutorInterface |
||
148 | */ |
||
149 | 15 | public function getExecutor($action) |
|
150 | { |
||
151 | 15 | if (!$this->hasExecutor($action)) { |
|
152 | 1 | throw new \OutOfBoundsException(sprintf( |
|
153 | 1 | 'There is no executor registered for action "%s".', |
|
154 | 1 | $action |
|
155 | )); |
||
156 | } |
||
157 | |||
158 | 14 | return $this->executors[$action]; |
|
159 | } |
||
160 | |||
161 | /** |
||
162 | * @return array<string, ExecutorInterface> |
||
163 | */ |
||
164 | 1 | public function getExecutors() |
|
165 | { |
||
166 | 1 | return $this->executors; |
|
167 | } |
||
168 | |||
169 | /** |
||
170 | * @param string $action |
||
171 | * |
||
172 | * @throws Exception |
||
173 | * |
||
174 | * @return array |
||
175 | */ |
||
176 | 3 | public function getActionStats($action) |
|
177 | { |
||
178 | try { |
||
179 | 3 | return $this->pheanstalk->statsTube($action); |
|
180 | 2 | } catch (Exception $exception) { |
|
181 | 2 | if (false !== strpos($exception->getMessage(), 'NOT_FOUND')) { |
|
182 | 1 | return null; |
|
183 | } |
||
184 | |||
185 | 1 | throw $exception; |
|
186 | } |
||
187 | } |
||
188 | |||
189 | /** |
||
190 | * Add a job to the queue. |
||
191 | * |
||
192 | * @param string $action The action |
||
193 | * @param array $payload The job's payload |
||
194 | * @param string|int $delay The delay after which the job can be reserved. |
||
195 | * Can be a number of seconds, or a date-diff |
||
196 | * string relative from now, like "10 seconds". |
||
197 | * @param int $priority From 0 (most urgent) to 0xFFFFFFFF (least urgent) |
||
198 | * @param int $ttr Time To Run: seconds a job can be reserved for |
||
199 | * |
||
200 | * @throws \InvalidArgumentException When the action is not defined |
||
201 | * @throws \InvalidArgumentException When `$delay` or `$priority` is negative |
||
202 | * |
||
203 | * @return int The job id |
||
204 | */ |
||
205 | 9 | public function add($action, array $payload, $delay = null, $priority = null, $ttr = null) |
|
206 | { |
||
207 | 9 | if (false === $this->hasExecutor($action)) { |
|
208 | 1 | throw new \InvalidArgumentException(sprintf( |
|
209 | 1 | 'Action "%s" is not defined in QueueManager', |
|
210 | 1 | $action |
|
211 | )); |
||
212 | } |
||
213 | |||
214 | 8 | if (null === $delay) { |
|
215 | 3 | $delay = PheanstalkInterface::DEFAULT_DELAY; |
|
216 | } |
||
217 | |||
218 | 8 | if (null === $priority) { |
|
219 | 4 | $priority = PheanstalkInterface::DEFAULT_PRIORITY; |
|
220 | } |
||
221 | |||
222 | 8 | if (null === $ttr) { |
|
223 | 5 | $ttr = $this->defaultTtr; |
|
224 | } |
||
225 | |||
226 | 8 | if (!is_numeric($delay)) { |
|
227 | 1 | $delay = strtotime(sprintf('+ %s', $delay)) - time(); |
|
228 | } |
||
229 | |||
230 | 8 | if ($delay < 0) { |
|
231 | 2 | throw new \InvalidArgumentException( |
|
232 | 2 | sprintf('You cannot schedule a job in the past (delay was %d)', $delay) |
|
233 | ); |
||
234 | } |
||
235 | |||
236 | 6 | if ($priority < 0) { |
|
237 | 1 | throw new \InvalidArgumentException( |
|
238 | 1 | sprintf('The priority for a job cannot be negative (was %d)', $priority) |
|
239 | ); |
||
240 | } |
||
241 | |||
242 | 5 | $payload = json_encode($payload); |
|
243 | 5 | $jobId = $this->pheanstalk->putInTube($action, $payload, $priority, $delay, $ttr); |
|
244 | |||
245 | 5 | $this->logJob( |
|
246 | 5 | $jobId, |
|
247 | 5 | sprintf( |
|
248 | 5 | 'Added job in tube "%s" with: payload: %s, priority: %d, delay: %ds, ttr: %s', |
|
249 | 5 | $action, |
|
250 | 5 | $payload, |
|
251 | 5 | $priority, |
|
252 | 5 | $delay, |
|
253 | 5 | $ttr |
|
254 | ) |
||
255 | ); |
||
256 | |||
257 | 5 | return $jobId; |
|
258 | } |
||
259 | |||
260 | /** |
||
261 | * Adds a job to the queue for an object. |
||
262 | * |
||
263 | * @param string $action The action |
||
264 | * @param object $object The object to add a job for |
||
265 | * @param string|int $delay The delay after which the job can be reserved. |
||
266 | * Can be a number of seconds, or a date-diff |
||
267 | * string relative from now, like "10 seconds". |
||
268 | * @param int $priority From 0 (most urgent) to 0xFFFFFFFF (least urgent) |
||
269 | * @param int $ttr Time To Run: seconds a job can be reserved for |
||
270 | * |
||
271 | * @throws \LogicException If the executor does not accepts objects as payloads |
||
272 | * @throws \InvalidArgumentException If the executor does not accept the given object |
||
273 | * @throws \InvalidArgumentException When the action is not defined |
||
274 | * |
||
275 | * @return int The job id |
||
276 | */ |
||
277 | 4 | public function addForObject($action, $object, $delay = null, $priority = null, $ttr = null) |
|
278 | { |
||
279 | 4 | $executor = $this->getExecutor($action); |
|
280 | |||
281 | 4 | if (!$executor instanceof ObjectPayloadInterface) { |
|
282 | 1 | throw new \LogicException( |
|
283 | 1 | sprintf( |
|
284 | 1 | 'The executor for action "%s" cannot be used for objects. Implement the ObjectPayloadInterface in class "%s" to enable this.', |
|
285 | 1 | $action, |
|
286 | 1 | get_class($executor) |
|
287 | ) |
||
288 | ); |
||
289 | } |
||
290 | |||
291 | 3 | if (!$executor->supportsObject($object)) { |
|
292 | 1 | throw new \InvalidArgumentException( |
|
293 | 1 | sprintf( |
|
294 | 1 | 'The executor for action "%s" does not support %s objects', |
|
295 | 1 | $action, |
|
296 | 1 | get_class($object) |
|
297 | ) |
||
298 | ); |
||
299 | } |
||
300 | |||
301 | 2 | $payload = $executor->getObjectPayload($object); |
|
302 | |||
303 | 2 | return $this->add($action, $payload, $delay, $priority, $ttr); |
|
304 | } |
||
305 | |||
306 | /** |
||
307 | * Reschedules a job. |
||
308 | * |
||
309 | * @param Job $job |
||
310 | * @param \DateTime $date |
||
311 | * @param integer $priority |
||
312 | * |
||
313 | * @throws \InvalidArgumentException When `$date` is in the past |
||
314 | */ |
||
315 | 7 | public function reschedule(Job $job, \DateTime $date, $priority = PheanstalkInterface::DEFAULT_PRIORITY) |
|
316 | { |
||
317 | 7 | if ($date < new \DateTime()) { |
|
318 | 1 | throw new \InvalidArgumentException( |
|
319 | 1 | sprintf('You cannot reschedule a job in the past (got %s, and the current date is %s)', $date->format(DATE_ISO8601), date(DATE_ISO8601)) |
|
320 | ); |
||
321 | } |
||
322 | |||
323 | 6 | $this->pheanstalk->release($job, $priority, $date->getTimestamp() - time()); |
|
324 | |||
325 | 6 | $this->logJob($job->getId(), sprintf('Rescheduled job for %s', $date->format('Y-m-d H:i:s'))); |
|
326 | 6 | } |
|
327 | |||
328 | /** |
||
329 | * @param string|string[] $actions |
||
330 | */ |
||
331 | 3 | View Code Duplication | public function watch($actions) |
332 | { |
||
333 | 3 | if (!is_array($actions)) { |
|
334 | 2 | $actions = [$actions]; |
|
335 | } |
||
336 | |||
337 | 3 | foreach ($actions as $action) { |
|
338 | 3 | $this->pheanstalk->watch($action); |
|
339 | |||
340 | 3 | $this->logger->debug(sprintf('Watching tube "%s"', $action)); |
|
341 | } |
||
342 | 3 | } |
|
343 | |||
344 | /** |
||
345 | * @param string|string[] $actions |
||
346 | */ |
||
347 | 1 | public function watchOnly($actions) |
|
348 | { |
||
349 | 1 | $watching = $this->pheanstalk->listTubesWatched(); |
|
350 | |||
351 | 1 | $this->watch($actions); |
|
352 | 1 | $this->ignore($watching); |
|
353 | 1 | } |
|
354 | |||
355 | /** |
||
356 | * @param string|string[] $actions |
||
357 | */ |
||
358 | 2 | View Code Duplication | public function ignore($actions) |
359 | { |
||
360 | 2 | if (!is_array($actions)) { |
|
361 | 1 | $actions = [$actions]; |
|
362 | } |
||
363 | |||
364 | 2 | foreach ($actions as $action) { |
|
365 | 2 | $this->pheanstalk->ignore($action); |
|
366 | |||
367 | 2 | $this->logger->debug(sprintf('Ignoring tube "%s"', $action)); |
|
368 | } |
||
369 | 2 | } |
|
370 | |||
371 | /** |
||
372 | * @param int $timeout |
||
373 | * |
||
374 | * @return Job|bool A job if there is one, false otherwise |
||
375 | */ |
||
376 | 2 | public function get($timeout = null) |
|
377 | { |
||
378 | 2 | return $this->pheanstalk->reserve($timeout); |
|
379 | } |
||
380 | |||
381 | /** |
||
382 | * Inspects the next job from the queue. Note that this does not reserve |
||
383 | * the job, so it will still be given to a worker if/once it's ready. |
||
384 | * |
||
385 | * @param string $action The action to peek |
||
386 | * @param string $state The state to peek for, can be 'ready', 'delayed' or 'buried' |
||
387 | * |
||
388 | * @throws \InvalidArgumentException When $action is not a defined action |
||
389 | * @throws \InvalidArgumentException When $state is not a valid state |
||
390 | * @throws Exception When Pheanstalk decides to do this |
||
391 | * |
||
392 | * @return Job The next job for the given state, or null if there is no next job |
||
393 | */ |
||
394 | 6 | public function peek($action, $state = 'ready') |
|
395 | { |
||
396 | 6 | if (false === $this->hasExecutor($action)) { |
|
397 | 1 | throw new \InvalidArgumentException(sprintf( |
|
398 | 1 | 'Action "%s" is not defined in QueueManager', |
|
399 | 1 | $action |
|
400 | )); |
||
401 | } |
||
402 | |||
403 | 5 | $states = ['ready', 'delayed', 'buried']; |
|
404 | 5 | if (!in_array($state, $states)) { |
|
405 | 2 | throw new \InvalidArgumentException( |
|
406 | 2 | sprintf('$state must be one of %s, got %s', json_encode($states), json_encode($state)) |
|
407 | ); |
||
408 | } |
||
409 | |||
410 | 3 | $peekMethod = sprintf('peek%s', ucfirst($state)); |
|
411 | |||
412 | try { |
||
413 | 3 | return $this->pheanstalk->$peekMethod($action); |
|
414 | 1 | } catch (Exception $exception) { |
|
415 | 1 | if (false !== strpos($exception->getMessage(), 'NOT_FOUND')) { |
|
416 | 1 | return null; |
|
417 | } |
||
418 | |||
419 | throw $exception; |
||
420 | } |
||
421 | } |
||
422 | |||
423 | /** |
||
424 | * Permanently deletes a job. |
||
425 | * |
||
426 | * @param Job $job |
||
427 | */ |
||
428 | 3 | public function delete(Job $job) |
|
429 | { |
||
430 | 3 | $this->pheanstalk->delete($job); |
|
431 | |||
432 | 3 | $this->logJob($job->getId(), 'Job deleted'); |
|
433 | 3 | } |
|
434 | |||
435 | /** |
||
436 | * Puts a job into a 'buried' state, revived only by 'kick' command. |
||
437 | * |
||
438 | * @param Job $job |
||
439 | */ |
||
440 | 2 | public function bury(Job $job) |
|
441 | { |
||
442 | 2 | $this->pheanstalk->bury($job); |
|
443 | |||
444 | 2 | $this->logJob($job->getId(), 'Job buried'); |
|
445 | 2 | } |
|
446 | |||
447 | /** |
||
448 | * Puts a job into a 'buried' state, revived only by 'kick' command. |
||
449 | * |
||
450 | * @param string $action |
||
451 | * @param int $max |
||
452 | * |
||
453 | * @return int The number of kicked jobs |
||
454 | */ |
||
455 | 1 | public function kick($action, $max) |
|
456 | { |
||
457 | 1 | $this->pheanstalk->useTube($action); |
|
458 | |||
459 | 1 | $kicked = $this->pheanstalk->kick($max); |
|
460 | |||
461 | 1 | $this->logger->debug( |
|
462 | 1 | sprintf('Kicked %d "%s" jobs back onto the ready queue', $kicked, $action) |
|
463 | ); |
||
464 | |||
465 | 1 | return $kicked; |
|
466 | } |
||
467 | |||
468 | /** |
||
469 | * @param Job $job |
||
470 | * |
||
471 | * @return array |
||
472 | */ |
||
473 | 1 | public function getJobStats(Job $job) |
|
474 | { |
||
475 | 1 | return $this->pheanstalk->statsJob($job); |
|
476 | } |
||
477 | |||
478 | /** |
||
479 | * @param Job $job The job to process |
||
480 | * @param int $maxRetries The number of retries for this job |
||
481 | * |
||
482 | * @throws AbortException |
||
483 | * |
||
484 | * @return bool|mixed The executor result if successful, false otherwise |
||
485 | */ |
||
486 | 7 | public function executeJob(Job $job, $maxRetries = 1) |
|
487 | { |
||
488 | 7 | $this->dispatcher->dispatch(WorkerEvents::EXECUTE_JOB, new JobEvent($job)); |
|
489 | |||
490 | 7 | $stats = $this->pheanstalk->statsJob($job); |
|
491 | 7 | $payload = (array) json_decode($job->getData(), true); |
|
492 | 7 | $releases = intval($stats['releases']); |
|
493 | 7 | $priority = intval($stats['pri']); |
|
494 | |||
495 | // context for logging |
||
496 | $context = [ |
||
497 | 7 | 'tube' => $stats['tube'], |
|
498 | 7 | 'payload' => $payload, |
|
499 | 7 | 'attempt' => $releases + 1, |
|
500 | ]; |
||
501 | |||
502 | try { |
||
503 | // execute command |
||
504 | 7 | $result = $this->execute($stats['tube'], $payload); |
|
505 | |||
506 | // delete job if it completed without exceptions |
||
507 | 1 | $this->delete($job); |
|
508 | |||
509 | 1 | return $result; |
|
510 | 6 | } catch (RescheduleException $re) { |
|
511 | // Override priority if the RescheduleException provides a new one. |
||
512 | 2 | if (!is_null($re->getReshedulePriority())) { |
|
513 | 1 | $priority = $re->getReshedulePriority(); |
|
514 | } |
||
515 | // reschedule the job |
||
516 | 2 | $this->reschedule($job, $re->getRescheduleDate(), $priority); |
|
517 | 4 | } catch (AbortException $e) { |
|
518 | // abort thrown from executor, rethrow it and let the caller handle it |
||
519 | 1 | throw $e; |
|
520 | 3 | } catch (\Throwable $e) { |
|
521 | // some other exception occured |
||
522 | 3 | $message = sprintf('Exception occurred: %s in %s on line %d', $e->getMessage(), $e->getFile(), $e->getLine()); |
|
523 | 3 | $this->logJob($job->getId(), $message, LogLevel::ERROR, $context); |
|
524 | 3 | $this->logJob($job->getId(), $e->getTraceAsString(), LogLevel::DEBUG, $context); |
|
525 | |||
526 | // see if we have any retries left |
||
527 | 3 | if ($releases > $maxRetries) { |
|
528 | // no more retries, bury job for manual inspection |
||
529 | 1 | $this->bury($job); |
|
530 | |||
531 | 1 | $this->dispatcher->dispatch(WorkerEvents::JOB_BURIED_EVENT, new JobBuriedEvent($job, $e, $releases)); |
|
0 ignored issues
–
show
|
|||
532 | } else { |
||
533 | // try again, regardless of the error |
||
534 | 2 | $this->reschedule($job, new \DateTime('+10 minutes'), $priority); |
|
535 | } |
||
536 | } |
||
537 | |||
538 | 5 | return false; |
|
539 | } |
||
540 | |||
541 | /** |
||
542 | * Executes an action with a specific payload. |
||
543 | * |
||
544 | * @param string $action |
||
545 | * @param array $payload |
||
546 | * |
||
547 | * @return mixed |
||
548 | */ |
||
549 | 9 | public function execute($action, array $payload) |
|
550 | { |
||
551 | 9 | $executor = $this->getExecutor($action); |
|
552 | |||
553 | // dispatch pre event, listeners may change the payload here |
||
554 | 9 | $event = new ExecutionEvent($executor, $action, $payload); |
|
555 | 9 | $this->dispatcher->dispatch(WorkerEvents::PRE_EXECUTE_ACTION, $event); |
|
556 | |||
557 | try { |
||
558 | 9 | $resolver = $this->getPayloadResolver($executor); |
|
559 | 9 | $payload = $resolver->resolve($event->getPayload()); |
|
560 | 1 | } catch (ExceptionInterface $exception) { |
|
561 | 1 | $this->logger->error( |
|
562 | 1 | sprintf( |
|
563 | 1 | 'Payload %s for "%s" is invalid: %s', |
|
564 | 1 | json_encode($payload, JSON_UNESCAPED_SLASHES), |
|
565 | 1 | $action, |
|
566 | 1 | $exception->getMessage() |
|
567 | ) |
||
568 | ); |
||
569 | |||
570 | 1 | return false; |
|
571 | } |
||
572 | |||
573 | 8 | $result = $executor->execute($payload); |
|
574 | // dispatch post event, listeners may change the result here |
||
575 | 2 | $event->setResult($result); |
|
576 | 2 | $this->dispatcher->dispatch(WorkerEvents::POST_EXECUTE_ACTION, $event); |
|
577 | |||
578 | 2 | return $event->getResult(); |
|
579 | } |
||
580 | |||
581 | /** |
||
582 | * CAUTION: this removes all items from an action's queue. |
||
583 | * This is an irreversible action! |
||
584 | * |
||
585 | * @param string $action |
||
586 | * @param array $states |
||
587 | */ |
||
588 | 2 | public function clear($action, array $states = []) |
|
589 | { |
||
590 | 2 | if (empty($states)) { |
|
591 | $states = ['ready', 'delayed', 'buried']; |
||
592 | } |
||
593 | |||
594 | 2 | foreach ($states as $state) { |
|
595 | 2 | $this->clearTube($action, $state); |
|
596 | } |
||
597 | 1 | } |
|
598 | |||
599 | /** |
||
600 | * @return LoggerInterface |
||
601 | */ |
||
602 | public function getLogger() |
||
603 | { |
||
604 | return $this->logger; |
||
605 | } |
||
606 | |||
607 | /** |
||
608 | * @param string $tube |
||
609 | * @param string $state |
||
610 | * |
||
611 | * @throws Exception |
||
612 | */ |
||
613 | 2 | protected function clearTube($tube, $state = 'ready') |
|
614 | { |
||
615 | 2 | $this->logger->info(sprintf('Clearing all jobs with the "%s" state in tube "%s"', $state, $tube)); |
|
616 | |||
617 | 2 | while ($job = $this->peek($tube, $state)) { |
|
618 | try { |
||
619 | 1 | $this->delete($job); |
|
620 | } catch (Exception $e) { |
||
621 | // job could have been deleted by another process |
||
622 | if (false === strpos($e->getMessage(), 'NOT_FOUND')) { |
||
623 | throw $e; |
||
624 | } |
||
625 | } |
||
626 | } |
||
627 | 1 | } |
|
628 | |||
629 | /** |
||
630 | * Returns a cached version of the payload resolver for an executor. |
||
631 | * |
||
632 | * @param ExecutorInterface $executor |
||
633 | * |
||
634 | * @return OptionsResolver |
||
635 | */ |
||
636 | 9 | protected function getPayloadResolver(ExecutorInterface $executor) |
|
637 | { |
||
638 | 9 | $key = $executor->getName(); |
|
639 | |||
640 | 9 | if (!array_key_exists($key, $this->resolvers)) { |
|
641 | 9 | $resolver = new OptionsResolver(); |
|
642 | 9 | $executor->configurePayload($resolver); |
|
643 | |||
644 | 9 | $this->resolvers[$key] = $resolver; |
|
645 | } |
||
646 | |||
647 | 9 | return $this->resolvers[$key]; |
|
648 | } |
||
649 | |||
650 | /** |
||
651 | * @param int $jobId |
||
652 | * @param string $msg |
||
653 | * @param string $level |
||
654 | * @param array $context |
||
655 | */ |
||
656 | 16 | protected function logJob($jobId, $msg, $level = LogLevel::DEBUG, array $context = []) |
|
657 | { |
||
658 | 16 | $this->logger->log($level, sprintf('[%s] %s', $jobId, $msg), $context); |
|
659 | 16 | } |
|
660 | } |
||
661 |
This check looks for parameters that are defined as one type in their type hint or doc comment but seem to be used as a narrower type, i.e an implementation of an interface or a subclass.
Consider changing the type of the parameter or doing an instanceof check before assuming your parameter is of the expected type.