Completed
Push — master ( 61225e...ca5196 )
by Peter
03:57
created

src/TreeHouse/WorkerBundle/QueueManager.php (2 issues)

Upgrade to new PHP Analysis Engine

These results are based on our legacy PHP analysis, consider migrating to our new PHP analysis engine instead. Learn more

1
<?php
2
3
namespace TreeHouse\WorkerBundle;
4
5
use Pheanstalk\Exception;
6
use Pheanstalk\Job;
7
use Pheanstalk\PheanstalkInterface;
8
use Psr\Log\LoggerInterface;
9
use Psr\Log\LogLevel;
10
use Psr\Log\NullLogger;
11
use Symfony\Component\EventDispatcher\EventDispatcherInterface;
12
use Symfony\Component\OptionsResolver\Exception\ExceptionInterface;
13
use Symfony\Component\OptionsResolver\OptionsResolver;
14
use TreeHouse\WorkerBundle\Event\ExecutionEvent;
15
use TreeHouse\WorkerBundle\Event\JobBuriedEvent;
16
use TreeHouse\WorkerBundle\Event\JobEvent;
17
use TreeHouse\WorkerBundle\Exception\AbortException;
18
use TreeHouse\WorkerBundle\Exception\RescheduleException;
19
use TreeHouse\WorkerBundle\Executor\ExecutorInterface;
20
use TreeHouse\WorkerBundle\Executor\ObjectPayloadInterface;
21
22
/**
23
 * The QueueManager is a service which handles persistent scheduled actions.
24
 * It defines certain actions which can be added and processed.
25
 *
26
 * This is useful when you have a lot of actions which you want to perform
27
 * later on, e.g. when importing large amounts of items.
28
 *
29
 * NOTE: While the manager tries to prevent duplicate actions, there is no
30
 * guarantee of this. As a result of this, you should make all jobs idempotent,
31
 * meaning they can be processed more than once.
32
 */
33
class QueueManager
34
{
35
    /**
36
     * @var PheanstalkInterface
37
     */
38
    protected $pheanstalk;
39
40
    /**
41
     * @var EventDispatcherInterface
42
     */
43
    protected $dispatcher;
44
45
    /**
46
     * @var LoggerInterface
47
     */
48
    protected $logger;
49
50
    /**
51
     * Registered executors.
52
     *
53
     * @var array<string, ExecutorInterface>
54
     */
55
    protected $executors = [];
56
57
    /**
58
     * Cached payload resolvers for the executors.
59
     *
60
     * @var OptionsResolver[]
61
     */
62
    protected $resolvers = [];
63
64
    /**
65
     * @var int
66
     */
67
    protected $defaultTtr = PheanstalkInterface::DEFAULT_TTR;
68
69
    /**
70
     * @param PheanstalkInterface      $pheanstalk
71
     * @param EventDispatcherInterface $dispatcher
72
     * @param LoggerInterface          $logger
73
     */
74 46
    public function __construct(PheanstalkInterface $pheanstalk, EventDispatcherInterface $dispatcher, LoggerInterface $logger = null)
75
    {
76 46
        $this->pheanstalk = $pheanstalk;
77 46
        $this->dispatcher = $dispatcher;
78 46
        $this->logger     = $logger ?: new NullLogger();
79 46
    }
80
81
    /**
82
     * @param int $defaultTtr
83
     *
84
     * @return $this
85
     */
86
    public function setDefaultTtr($defaultTtr)
87
    {
88
        $this->defaultTtr = $defaultTtr;
89
90
        return $this;
91
    }
92
93
    /**
94
     * @return PheanstalkInterface
95
     */
96 1
    public function getPheanstalk()
97
    {
98 1
        return $this->pheanstalk;
99
    }
100
101
    /**
102
     * @return EventDispatcherInterface
103
     */
104 1
    public function getDispatcher()
105
    {
106 1
        return $this->dispatcher;
107
    }
108
109
    /**
110
     * @param string $action
111
     *
112
     * @return bool
113
     */
114 31
    public function hasExecutor($action)
115
    {
116 31
        return array_key_exists($action, $this->executors);
117
    }
118
119
    /**
120
     * Add an executor.
121
     *
122
     * @param ExecutorInterface $executor
123
     *
124
     * @throws \InvalidArgumentException
125
     */
126 28
    public function addExecutor(ExecutorInterface $executor)
127
    {
128 28
        $action = $executor->getName();
129
130 28
        if ($this->hasExecutor($action)) {
131 1
            throw new \InvalidArgumentException(sprintf(
132 1
                'There is already an executor registered for action "%s".',
133
                $action
134 1
            ));
135
        }
136
137 28
        $this->executors[$action] = $executor;
138 28
    }
139
140
    /**
141
     * Returns a registered executor for given action.
142
     *
143
     * @param string $action
144
     *
145
     * @throws \OutOfBoundsException
146
     *
147
     * @return ExecutorInterface
148
     */
149 14
    public function getExecutor($action)
150
    {
151 14
        if (!$this->hasExecutor($action)) {
152 1
            throw new \OutOfBoundsException(sprintf(
153 1
                'There is no executor registered for action "%s".',
154
                $action
155 1
            ));
156
        }
157
158 13
        return $this->executors[$action];
159
    }
160
161
    /**
162
     * @return array<string, ExecutorInterface>
163
     */
164 1
    public function getExecutors()
165
    {
166 1
        return $this->executors;
167
    }
168
169
    /**
170
     * @param string $action
171
     *
172
     * @throws Exception
173
     *
174
     * @return array
175
     */
176 3
    public function getActionStats($action)
177
    {
178
        try {
179 3
            return $this->pheanstalk->statsTube($action);
180 2
        } catch (Exception $exception) {
181 2
            if (false !== strpos($exception->getMessage(), 'NOT_FOUND')) {
182 1
                return null;
183
            }
184
185 1
            throw $exception;
186
        }
187
    }
188
189
    /**
190
     * Add a job to the queue.
191
     *
192
     * @param string     $action   The action
193
     * @param array      $payload  The job's payload
194
     * @param string|int $delay    The delay after which the job can be reserved.
195
     *                             Can be a number of seconds, or a date-diff
196
     *                             string relative from now, like "10 seconds".
197
     * @param int        $priority From 0 (most urgent) to 0xFFFFFFFF (least urgent)
198
     * @param int        $ttr      Time To Run: seconds a job can be reserved for
199
     *
200
     * @throws \InvalidArgumentException When the action is not defined
201
     * @throws \InvalidArgumentException When `$delay` or `$priority` is negative
202
     *
203
     * @return int The job id
204
     */
205 9
    public function add($action, array $payload, $delay = null, $priority = null, $ttr = null)
206
    {
207 9
        if (false === $this->hasExecutor($action)) {
208 1
            throw new \InvalidArgumentException(sprintf(
209 1
                'Action "%s" is not defined in QueueManager',
210
                $action
211 1
            ));
212
        }
213
214 8
        if (null === $delay) {
215 3
            $delay = PheanstalkInterface::DEFAULT_DELAY;
216 3
        }
217
218 8
        if (null === $priority) {
219 4
            $priority = PheanstalkInterface::DEFAULT_PRIORITY;
220 4
        }
221
222 8
        if (null === $ttr) {
223 5
            $ttr = $this->defaultTtr;
224 5
        }
225
226 8
        if (!is_numeric($delay)) {
227 1
            $delay = strtotime(sprintf('+ %s', $delay)) - time();
228 1
        }
229
230 8
        if ($delay < 0) {
231 2
            throw new \InvalidArgumentException(
232 2
                sprintf('You cannot schedule a job in the past (delay was %d)', $delay)
233 2
            );
234
        }
235
236 6
        if ($priority < 0) {
237 1
            throw new \InvalidArgumentException(
238 1
                sprintf('The priority for a job cannot be negative (was %d)', $priority)
239 1
            );
240
        }
241
242 5
        $payload = json_encode($payload);
243 5
        $jobId   = $this->pheanstalk->putInTube($action, $payload, $priority, $delay, $ttr);
244
245 5
        $this->logJob(
246 5
            $jobId,
247 5
            sprintf(
248 5
                'Added job in tube "%s" with: payload: %s, priority: %d, delay: %ds, ttr: %s',
249 5
                $action,
250 5
                $payload,
251 5
                $priority,
252 5
                $delay,
253
                $ttr
254 5
            )
255 5
        );
256
257 5
        return $jobId;
258
    }
259
260
    /**
261
     * Adds a job to the queue for an object.
262
     *
263
     * @param string     $action   The action
264
     * @param object     $object   The object to add a job for
265
     * @param string|int $delay    The delay after which the job can be reserved.
266
     *                             Can be a number of seconds, or a date-diff
267
     *                             string relative from now, like "10 seconds".
268
     * @param int        $priority From 0 (most urgent) to 0xFFFFFFFF (least urgent)
269
     * @param int        $ttr      Time To Run: seconds a job can be reserved for
270
     *
271
     * @throws \LogicException           If the executor does not accepts objects as payloads
272
     * @throws \InvalidArgumentException If the executor does not accept the given object
273
     * @throws \InvalidArgumentException When the action is not defined
274
     *
275
     * @return int The job id
276
     */
277 4
    public function addForObject($action, $object, $delay = null, $priority = null, $ttr = null)
278
    {
279 4
        $executor = $this->getExecutor($action);
280
281 4
        if (!$executor instanceof ObjectPayloadInterface) {
282 1
            throw new \LogicException(
283 1
                sprintf(
284 1
                    'The executor for action "%s" cannot be used for objects. Implement the ObjectPayloadInterface in class "%s" to enable this.',
285 1
                    $action,
286 1
                    get_class($executor)
287 1
                )
288 1
            );
289
        }
290
291 3
        if (!$executor->supportsObject($object)) {
292 1
            throw new \InvalidArgumentException(
293 1
                sprintf(
294 1
                    'The executor for action "%s" does not support %s objects',
295 1
                    $action,
296 1
                    get_class($object)
297 1
                )
298 1
            );
299
        }
300
301 2
        $payload = $executor->getObjectPayload($object);
302
303 2
        return $this->add($action, $payload, $delay, $priority, $ttr);
304
    }
305
306
    /**
307
     * Reschedules a job.
308
     *
309
     * @param Job       $job
310
     * @param \DateTime $date
311
     * @param integer   $priority
312
     *
313
     * @throws \InvalidArgumentException When `$date` is in the past
314
     */
315 6
    public function reschedule(Job $job, \DateTime $date, $priority = PheanstalkInterface::DEFAULT_PRIORITY)
316
    {
317 6
        if ($date < new \DateTime()) {
318 1
            throw new \InvalidArgumentException(
319 1
                sprintf('You cannot reschedule a job in the past (got %s, and the current date is %s)', $date->format(DATE_ISO8601), date(DATE_ISO8601))
320 1
            );
321
        }
322
323 5
        $this->pheanstalk->release($job, $priority, $date->getTimestamp() - time());
324
325 5
        $this->logJob($job->getId(), sprintf('Rescheduled job for %s', $date->format('Y-m-d H:i:s')));
326 5
    }
327
328
    /**
329
     * @param string|string[] $actions
330
     */
331 3 View Code Duplication
    public function watch($actions)
0 ignored issues
show
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
332
    {
333 3
        if (!is_array($actions)) {
334 2
            $actions = [$actions];
335 2
        }
336
337 3
        foreach ($actions as $action) {
338 3
            $this->pheanstalk->watch($action);
339
340 3
            $this->logger->debug(sprintf('Watching tube "%s"', $action));
341 3
        }
342 3
    }
343
344
    /**
345
     * @param string|string[] $actions
346
     */
347 1
    public function watchOnly($actions)
348
    {
349 1
        $watching = $this->pheanstalk->listTubesWatched();
350
351 1
        $this->watch($actions);
352 1
        $this->ignore($watching);
353 1
    }
354
355
    /**
356
     * @param string|string[] $actions
357
     */
358 2 View Code Duplication
    public function ignore($actions)
0 ignored issues
show
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
359
    {
360 2
        if (!is_array($actions)) {
361 1
            $actions = [$actions];
362 1
        }
363
364 2
        foreach ($actions as $action) {
365 2
            $this->pheanstalk->ignore($action);
366
367 2
            $this->logger->debug(sprintf('Ignoring tube "%s"', $action));
368 2
        }
369 2
    }
370
371
    /**
372
     * @param int $timeout
373
     *
374
     * @return Job|bool A job if there is one, false otherwise
375
     */
376 2
    public function get($timeout = null)
377
    {
378 2
        return $this->pheanstalk->reserve($timeout);
379
    }
380
381
    /**
382
     * Inspects the next job from the queue. Note that this does not reserve
383
     * the job, so it will still be given to a worker if/once it's ready.
384
     *
385
     * @param string $action The action to peek
386
     * @param string $state  The state to peek for, can be 'ready', 'delayed' or 'buried'
387
     *
388
     * @throws \InvalidArgumentException When $action is not a defined action
389
     * @throws \InvalidArgumentException When $state is not a valid state
390
     * @throws Exception                 When Pheanstalk decides to do this
391
     *
392
     * @return Job The next job for the given state, or null if there is no next job
393
     */
394 6
    public function peek($action, $state = 'ready')
395
    {
396 6
        if (false === $this->hasExecutor($action)) {
397 1
            throw new \InvalidArgumentException(sprintf(
398 1
                'Action "%s" is not defined in QueueManager',
399
                $action
400 1
            ));
401
        }
402
403 5
        $states = ['ready', 'delayed', 'buried'];
404 5
        if (!in_array($state, $states)) {
405 2
            throw new \InvalidArgumentException(
406 2
                sprintf('$state must be one of %s, got %s', json_encode($states), json_encode($state))
407 2
            );
408
        }
409
410 3
        $peekMethod = sprintf('peek%s', ucfirst($state));
411
412
        try {
413 3
            return $this->pheanstalk->$peekMethod($action);
414 1
        } catch (Exception $exception) {
415 1
            if (false !== strpos($exception->getMessage(), 'NOT_FOUND')) {
416 1
                return null;
417
            }
418
419
            throw $exception;
420
        }
421
    }
422
423
    /**
424
     * Permanently deletes a job.
425
     *
426
     * @param Job $job
427
     */
428 3
    public function delete(Job $job)
429
    {
430 3
        $this->pheanstalk->delete($job);
431
432 3
        $this->logJob($job->getId(), 'Job deleted');
433 3
    }
434
435
    /**
436
     * Puts a job into a 'buried' state, revived only by 'kick' command.
437
     *
438
     * @param Job $job
439
     */
440 2
    public function bury(Job $job)
441
    {
442 2
        $this->pheanstalk->bury($job);
443
444 2
        $this->logJob($job->getId(), 'Job buried');
445 2
    }
446
447
    /**
448
     * Puts a job into a 'buried' state, revived only by 'kick' command.
449
     *
450
     * @param string $action
451
     * @param int    $max
452
     *
453
     * @return int The number of kicked jobs
454
     */
455 1
    public function kick($action, $max)
456
    {
457 1
        $this->pheanstalk->useTube($action);
458
459 1
        $kicked = $this->pheanstalk->kick($max);
460
461 1
        $this->logger->debug(
462 1
            sprintf('Kicked %d "%s" jobs back onto the ready queue', $kicked, $action)
463 1
        );
464
465 1
        return $kicked;
466
    }
467
468
    /**
469
     * @param Job $job
470
     *
471
     * @return array
472
     */
473 1
    public function getJobStats(Job $job)
474
    {
475 1
        return $this->pheanstalk->statsJob($job);
476
    }
477
478
    /**
479
     * @param Job $job        The job to process
480
     * @param int $maxRetries The number of retries for this job
481
     *
482
     * @throws AbortException
483
     *
484
     * @return bool|mixed The executor result if successful, false otherwise
485
     */
486 6
    public function executeJob(Job $job, $maxRetries = 1)
487
    {
488 6
        $this->dispatcher->dispatch(WorkerEvents::EXECUTE_JOB, new JobEvent($job));
489
490 6
        $stats    = $this->pheanstalk->statsJob($job);
491 6
        $payload  = (array) json_decode($job->getData(), true);
492 6
        $releases = intval($stats['releases']);
493 6
        $priority = intval($stats['pri']);
494
495
        // context for logging
496
        $context = [
497 6
            'tube'    => $stats['tube'],
498 6
            'payload' => $payload,
499 6
            'attempt' => $releases + 1,
500 6
        ];
501
502
        try {
503
            // execute command
504 6
            $result = $this->execute($stats['tube'], $payload);
505
506
            // delete job if it completed without exceptions
507 1
            $this->delete($job);
508
509 2
            return $result;
510 5
        } catch (RescheduleException $re) {
511
            // Override priority if the RescheduleException provides a new one.
512 2
            if (!is_null($re->getReshedulePriority())) {
513 1
                $priority = $re->getReshedulePriority();
514 1
            }
515
            // reschedule the job
516 2
            $this->reschedule($job, $re->getRescheduleDate(), $priority);
517 5
        } catch (AbortException $e) {
518
            // abort thrown from executor, rethrow it and let the caller handle it
519 1
            throw $e;
520 2
        } catch (\Exception $e) {
521
            // some other exception occured
522 2
            $message = sprintf('Exception occurred: %s in %s on line %d', $e->getMessage(), $e->getFile(), $e->getLine());
523 2
            $this->logJob($job->getId(), $message, LogLevel::ERROR, $context);
524 2
            $this->logJob($job->getId(), $e->getTraceAsString(), LogLevel::DEBUG, $context);
525
526
            // see if we have any retries left
527 2
            if ($releases > $maxRetries) {
528
                // no more retries, bury job for manual inspection
529 1
                $this->bury($job);
530
531 1
                $this->dispatcher->dispatch(WorkerEvents::JOB_BURIED_EVENT, new JobBuriedEvent($job, $e, $releases));
532 1
            } else {
533
                // try again, regardless of the error
534 1
                $this->reschedule($job, new \DateTime('+10 minutes'), $priority);
535
            }
536
        }
537
538 4
        return false;
539
    }
540
541
    /**
542
     * Executes an action with a specific payload.
543
     *
544
     * @param string $action
545
     * @param array  $payload
546
     *
547
     * @return mixed
548
     */
549 8
    public function execute($action, array $payload)
550
    {
551 8
        $executor = $this->getExecutor($action);
552
553
        // dispatch pre event, listeners may change the payload here
554 8
        $event = new ExecutionEvent($executor, $action, $payload);
555 8
        $this->dispatcher->dispatch(WorkerEvents::PRE_EXECUTE_ACTION, $event);
556
557
        try {
558 8
            $resolver = $this->getPayloadResolver($executor);
559 8
            $payload  = $resolver->resolve($event->getPayload());
560 8
        } catch (ExceptionInterface $exception) {
561 1
            $this->logger->error(
562 1
                sprintf(
563 1
                    'Payload %s for "%s" is invalid: %s',
564 1
                    json_encode($payload, JSON_UNESCAPED_SLASHES),
565 1
                    $action,
566 1
                    $exception->getMessage()
567 1
                )
568 1
            );
569
570 1
            return false;
571
        }
572
573 7
        $result = $executor->execute($payload);
574
        // dispatch post event, listeners may change the result here
575 2
        $event->setResult($result);
576 2
        $this->dispatcher->dispatch(WorkerEvents::POST_EXECUTE_ACTION, $event);
577
578 2
        return $event->getResult();
579
    }
580
581
    /**
582
     * CAUTION: this removes all items from an action's queue.
583
     * This is an irreversible action!
584
     *
585
     * @param string $action
586
     * @param array  $states
587
     */
588 2
    public function clear($action, array $states = [])
589
    {
590 2
        if (empty($states)) {
591
            $states = ['ready', 'delayed', 'buried'];
592
        }
593
594 2
        foreach ($states as $state) {
595 2
            $this->clearTube($action, $state);
596 1
        }
597 1
    }
598
599
    /**
600
     * @return LoggerInterface
601
     */
602
    public function getLogger()
603
    {
604
        return $this->logger;
605
    }
606
607
    /**
608
     * @param string $tube
609
     * @param string $state
610
     *
611
     * @throws Exception
612
     */
613 2
    protected function clearTube($tube, $state = 'ready')
614
    {
615 2
        $this->logger->info(sprintf('Clearing all jobs with the "%s" state in tube "%s"', $state, $tube));
616
617 2
        while ($job = $this->peek($tube, $state)) {
618
            try {
619 1
                $this->delete($job);
620 1
            } catch (Exception $e) {
621
                // job could have been deleted by another process
622
                if (false === strpos($e->getMessage(), 'NOT_FOUND')) {
623
                    throw $e;
624
                }
625
            }
626 1
        }
627 1
    }
628
629
    /**
630
     * Returns a cached version of the payload resolver for an executor.
631
     *
632
     * @param ExecutorInterface $executor
633
     *
634
     * @return OptionsResolver
635
     */
636 8
    protected function getPayloadResolver(ExecutorInterface $executor)
637
    {
638 8
        $key = $executor->getName();
639
640 8
        if (!array_key_exists($key, $this->resolvers)) {
641 8
            $resolver = new OptionsResolver();
642 8
            $executor->configurePayload($resolver);
643
644 8
            $this->resolvers[$key] = $resolver;
645 8
        }
646
647 8
        return $this->resolvers[$key];
648
    }
649
650
    /**
651
     * @param int    $jobId
652
     * @param string $msg
653
     * @param string $level
654
     * @param array  $context
655
     */
656 15
    protected function logJob($jobId, $msg, $level = LogLevel::DEBUG, array $context = [])
657
    {
658 15
        $this->logger->log($level, sprintf('[%s] %s', $jobId, $msg), $context);
659 15
    }
660
}
661