Completed
Pull Request — master (#13)
by Marijan
17:08
created

QueueExecutor::executeJob()   B

Complexity

Conditions 6
Paths 11

Size

Total Lines 59
Code Lines 34

Duplication

Lines 9
Ratio 15.25 %

Code Coverage

Tests 35
CRAP Score 6

Importance

Changes 0
Metric Value
dl 9
loc 59
rs 8.7117
c 0
b 0
f 0
ccs 35
cts 35
cp 1
cc 6
eloc 34
nc 11
nop 2
crap 6

How to fix   Long Method   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

1
<?php
2
3
namespace TreeHouse\WorkerBundle;
4
5
use Exception;
6
use Pheanstalk\Job;
7
use Psr\Log\LoggerInterface;
8
use Psr\Log\LogLevel;
9
use Symfony\Component\EventDispatcher\EventDispatcherInterface;
10
use Symfony\Component\OptionsResolver\Exception\ExceptionInterface;
11
use Symfony\Component\OptionsResolver\OptionsResolver;
12
use TreeHouse\WorkerBundle\Event\ExecutionEvent;
13
use TreeHouse\WorkerBundle\Event\JobBuriedEvent;
14
use TreeHouse\WorkerBundle\Event\JobEvent;
15
use TreeHouse\WorkerBundle\Exception\AbortException;
16
use TreeHouse\WorkerBundle\Exception\RescheduleException;
17
use TreeHouse\WorkerBundle\Executor\ExecutorInterface;
18
19
class QueueExecutor
20
{
21
    /**
22
     * @var Queue
23
     */
24
    private $queue;
25
    /**
26
     * @var ExecutorPool
27
     */
28
    private $executorPool;
29
    /**
30
     * @var EventDispatcherInterface
31
     */
32
    private $dispatcher;
33
    /**
34
     * @var LoggerInterface
35
     */
36
    private $logger;
37
    /**
38
     * @var OptionsResolver[]
39
     */
40
    private $resolvers = [];
41
42 9
    public function __construct(
43
        Queue $queue,
44
        ExecutorPool $executorPool,
45
        EventDispatcherInterface $dispatcher,
46
        LoggerInterface $logger
47
    ) {
48 9
        $this->queue = $queue;
49 9
        $this->executorPool = $executorPool;
50 9
        $this->dispatcher = $dispatcher;
51 9
        $this->logger = $logger;
52 9
    }
53
54
    /**
55
     * @return EventDispatcherInterface
56
     *
57
     * @deprecated Removed in next major version
58
     */
59
    public function getDispatcher()
60
    {
61
        return $this->dispatcher;
62
    }
63
64
    /**
65
     * @param Job $job The job to process
66
     * @param int $maxRetries The number of retries for this job
67
     *
68
     * @throws AbortException
69
     *
70
     * @return bool|mixed The executor result if successful, false otherwise
71
     */
72 6
    public function executeJob(Job $job, $maxRetries = 1)
73
    {
74 6
        $this->dispatcher->dispatch(WorkerEvents::EXECUTE_JOB, new JobEvent($job));
75
76 6
        $stats = $this->queue->getJobStats($job);
77 6
        $payload = (array) json_decode($job->getData(), true);
78 6
        $releases = intval($stats['releases']);
79 6
        $priority = intval($stats['pri']);
80
81
        // context for logging
82
        $context = [
83 6
            'tube'    => $stats['tube'],
84 6
            'payload' => $payload,
85 6
            'attempt' => $releases + 1,
86 6
        ];
87
88
        try {
89
            // execute command
90 6
            $result = $this->execute($stats['tube'], $payload);
91
92
            // delete job if it completed without exceptions
93 1
            $this->queue->delete($job);
94
95 1
            return $result;
96 5
        } catch (RescheduleException $re) {
97
            // Override priority if the RescheduleException provides a new one.
98 2
            if (!is_null($re->getReshedulePriority())) {
99 1
                $priority = $re->getReshedulePriority();
100 1
            }
101
            // reschedule the job
102 2
            $this->queue->reschedule($job, $re->getRescheduleDate(), $priority);
103 5
        } catch (AbortException $e) {
104
            // abort thrown from executor, rethrow it and let the caller handle it
105 1
            throw $e;
106 2
        } catch (Exception $e) {
107
            // some other exception occured
108 2
            $message = sprintf(
109 2
                'Exception occurred: %s in %s on line %d',
110 2
                $e->getMessage(),
111 2
                $e->getFile(),
112 2
                $e->getLine()
113 2
            );
114 2
            $this->logJob($job->getId(), $message, LogLevel::ERROR, $context);
115 2
            $this->logJob($job->getId(), $e->getTraceAsString(), LogLevel::DEBUG, $context);
116
117
            // see if we have any retries left
118 2 View Code Duplication
            if ($releases > $maxRetries) {
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated across your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
119
                // no more retries, bury job for manual inspection
120 1
                $this->queue->bury($job);
121
122 1
                $this->dispatcher->dispatch(WorkerEvents::JOB_BURIED_EVENT, new JobBuriedEvent($job, $e, $releases));
123 1
            } else {
124
                // try again, regardless of the error
125 1
                $this->queue->reschedule($job, new \DateTime('+10 minutes'), $priority);
126
            }
127
        }
128
129 4
        return false;
130
    }
131
132
    /**
133
     * Executes an action with a specific payload.
134
     *
135
     * @param string $action
136
     * @param array $payload
137
     *
138
     * @return mixed
139
     */
140 8 View Code Duplication
    public function execute($action, array $payload)
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
141
    {
142 8
        $executor = $this->executorPool->getExecutor($action);
143
144
        // dispatch pre event, listeners may change the payload here
145 8
        $event = new ExecutionEvent($executor, $action, $payload);
146 8
        $this->dispatcher->dispatch(WorkerEvents::PRE_EXECUTE_ACTION, $event);
147
148
        try {
149 8
            $resolver = $this->getPayloadResolver($executor);
150 8
            $payload = $resolver->resolve($event->getPayload());
0 ignored issues
show
Coding Style introduced by
Consider using a different name than the parameter $payload. This often makes code more readable.
Loading history...
151 8
        } catch (ExceptionInterface $exception) {
152 1
            $this->logger->error(
153 1
                sprintf(
154 1
                    'Payload %s for "%s" is invalid: %s',
155 1
                    json_encode($payload, JSON_UNESCAPED_SLASHES),
156 1
                    $action,
157 1
                    $exception->getMessage()
158 1
                )
159 1
            );
160
161 1
            return false;
162
        }
163
164 7
        $result = $executor->execute($payload);
165
166 2
        $event = clone $event;
167
168
        // dispatch post event, listeners may change the result here
169 2
        $event->setResult($result);
170 2
        $this->dispatcher->dispatch(WorkerEvents::POST_EXECUTE_ACTION, $event);
171
172 2
        return $event->getResult();
173
    }
174
175
    /**
176
     * Returns a cached version of the payload resolver for an executor.
177
     *
178
     * @param ExecutorInterface $executor
179
     *
180
     * @return OptionsResolver
181
     */
182 8 View Code Duplication
    protected function getPayloadResolver(ExecutorInterface $executor)
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
183
    {
184 8
        $key = $executor->getName();
185
186 8
        if (!array_key_exists($key, $this->resolvers)) {
187 8
            $resolver = new OptionsResolver();
188 8
            $executor->configurePayload($resolver);
189
190 8
            $this->resolvers[$key] = $resolver;
191 8
        }
192
193 8
        return $this->resolvers[$key];
194
    }
195
196
    /**
197
     * @param int $jobId
198
     * @param string $msg
199
     * @param string $level
200
     * @param array $context
201
     */
202 2
    private function logJob($jobId, $msg, $level = LogLevel::DEBUG, array $context = [])
203
    {
204 2
        $this->logger->log($level, sprintf('[%s] %s', $jobId, $msg), $context);
205 2
    }
206
}
207