Completed
Pull Request — master (#13)
by Marijan
17:08
created

QueueExecutor   A

Complexity

Total Complexity 13

Size/Duplication

Total Lines 188
Duplicated Lines 29.79 %

Coupling/Cohesion

Components 1
Dependencies 12

Test Coverage

Coverage 100%

Importance

Changes 0
Metric Value
wmc 13
lcom 1
cbo 12
dl 56
loc 188
rs 10
c 0
b 0
f 0
ccs 73
cts 73
cp 1

6 Methods

Rating   Name   Duplication   Size   Complexity  
A __construct() 0 11 1
A getDispatcher() 0 4 1
B executeJob() 9 59 6
B execute() 34 34 2
A getPayloadResolver() 13 13 2
A logJob() 0 4 1

How to fix   Duplicated Code   

Duplicated Code

Duplicate code is one of the most pungent code smells. A rule that is often used is to re-structure code once it is duplicated in three or more places.

Common duplication problems, and corresponding solutions are:

1
<?php
2
3
namespace TreeHouse\WorkerBundle;
4
5
use Exception;
6
use Pheanstalk\Job;
7
use Psr\Log\LoggerInterface;
8
use Psr\Log\LogLevel;
9
use Symfony\Component\EventDispatcher\EventDispatcherInterface;
10
use Symfony\Component\OptionsResolver\Exception\ExceptionInterface;
11
use Symfony\Component\OptionsResolver\OptionsResolver;
12
use TreeHouse\WorkerBundle\Event\ExecutionEvent;
13
use TreeHouse\WorkerBundle\Event\JobBuriedEvent;
14
use TreeHouse\WorkerBundle\Event\JobEvent;
15
use TreeHouse\WorkerBundle\Exception\AbortException;
16
use TreeHouse\WorkerBundle\Exception\RescheduleException;
17
use TreeHouse\WorkerBundle\Executor\ExecutorInterface;
18
19
class QueueExecutor
20
{
21
    /**
22
     * @var Queue
23
     */
24
    private $queue;
25
    /**
26
     * @var ExecutorPool
27
     */
28
    private $executorPool;
29
    /**
30
     * @var EventDispatcherInterface
31
     */
32
    private $dispatcher;
33
    /**
34
     * @var LoggerInterface
35
     */
36
    private $logger;
37
    /**
38
     * @var OptionsResolver[]
39
     */
40
    private $resolvers = [];
41
42 9
    public function __construct(
43
        Queue $queue,
44
        ExecutorPool $executorPool,
45
        EventDispatcherInterface $dispatcher,
46
        LoggerInterface $logger
47
    ) {
48 9
        $this->queue = $queue;
49 9
        $this->executorPool = $executorPool;
50 9
        $this->dispatcher = $dispatcher;
51 9
        $this->logger = $logger;
52 9
    }
53
54
    /**
55
     * @return EventDispatcherInterface
56
     *
57
     * @deprecated Removed in next major version
58
     */
59
    public function getDispatcher()
60
    {
61
        return $this->dispatcher;
62
    }
63
64
    /**
65
     * @param Job $job The job to process
66
     * @param int $maxRetries The number of retries for this job
67
     *
68
     * @throws AbortException
69
     *
70
     * @return bool|mixed The executor result if successful, false otherwise
71
     */
72 6
    public function executeJob(Job $job, $maxRetries = 1)
73
    {
74 6
        $this->dispatcher->dispatch(WorkerEvents::EXECUTE_JOB, new JobEvent($job));
75
76 6
        $stats = $this->queue->getJobStats($job);
77 6
        $payload = (array) json_decode($job->getData(), true);
78 6
        $releases = intval($stats['releases']);
79 6
        $priority = intval($stats['pri']);
80
81
        // context for logging
82
        $context = [
83 6
            'tube'    => $stats['tube'],
84 6
            'payload' => $payload,
85 6
            'attempt' => $releases + 1,
86 6
        ];
87
88
        try {
89
            // execute command
90 6
            $result = $this->execute($stats['tube'], $payload);
91
92
            // delete job if it completed without exceptions
93 1
            $this->queue->delete($job);
94
95 1
            return $result;
96 5
        } catch (RescheduleException $re) {
97
            // Override priority if the RescheduleException provides a new one.
98 2
            if (!is_null($re->getReshedulePriority())) {
99 1
                $priority = $re->getReshedulePriority();
100 1
            }
101
            // reschedule the job
102 2
            $this->queue->reschedule($job, $re->getRescheduleDate(), $priority);
103 5
        } catch (AbortException $e) {
104
            // abort thrown from executor, rethrow it and let the caller handle it
105 1
            throw $e;
106 2
        } catch (Exception $e) {
107
            // some other exception occured
108 2
            $message = sprintf(
109 2
                'Exception occurred: %s in %s on line %d',
110 2
                $e->getMessage(),
111 2
                $e->getFile(),
112 2
                $e->getLine()
113 2
            );
114 2
            $this->logJob($job->getId(), $message, LogLevel::ERROR, $context);
115 2
            $this->logJob($job->getId(), $e->getTraceAsString(), LogLevel::DEBUG, $context);
116
117
            // see if we have any retries left
118 2 View Code Duplication
            if ($releases > $maxRetries) {
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated across your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
119
                // no more retries, bury job for manual inspection
120 1
                $this->queue->bury($job);
121
122 1
                $this->dispatcher->dispatch(WorkerEvents::JOB_BURIED_EVENT, new JobBuriedEvent($job, $e, $releases));
123 1
            } else {
124
                // try again, regardless of the error
125 1
                $this->queue->reschedule($job, new \DateTime('+10 minutes'), $priority);
126
            }
127
        }
128
129 4
        return false;
130
    }
131
132
    /**
133
     * Executes an action with a specific payload.
134
     *
135
     * @param string $action
136
     * @param array $payload
137
     *
138
     * @return mixed
139
     */
140 8 View Code Duplication
    public function execute($action, array $payload)
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
141
    {
142 8
        $executor = $this->executorPool->getExecutor($action);
143
144
        // dispatch pre event, listeners may change the payload here
145 8
        $event = new ExecutionEvent($executor, $action, $payload);
146 8
        $this->dispatcher->dispatch(WorkerEvents::PRE_EXECUTE_ACTION, $event);
147
148
        try {
149 8
            $resolver = $this->getPayloadResolver($executor);
150 8
            $payload = $resolver->resolve($event->getPayload());
0 ignored issues
show
Coding Style introduced by
Consider using a different name than the parameter $payload. This often makes code more readable.
Loading history...
151 8
        } catch (ExceptionInterface $exception) {
152 1
            $this->logger->error(
153 1
                sprintf(
154 1
                    'Payload %s for "%s" is invalid: %s',
155 1
                    json_encode($payload, JSON_UNESCAPED_SLASHES),
156 1
                    $action,
157 1
                    $exception->getMessage()
158 1
                )
159 1
            );
160
161 1
            return false;
162
        }
163
164 7
        $result = $executor->execute($payload);
165
166 2
        $event = clone $event;
167
168
        // dispatch post event, listeners may change the result here
169 2
        $event->setResult($result);
170 2
        $this->dispatcher->dispatch(WorkerEvents::POST_EXECUTE_ACTION, $event);
171
172 2
        return $event->getResult();
173
    }
174
175
    /**
176
     * Returns a cached version of the payload resolver for an executor.
177
     *
178
     * @param ExecutorInterface $executor
179
     *
180
     * @return OptionsResolver
181
     */
182 8 View Code Duplication
    protected function getPayloadResolver(ExecutorInterface $executor)
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
183
    {
184 8
        $key = $executor->getName();
185
186 8
        if (!array_key_exists($key, $this->resolvers)) {
187 8
            $resolver = new OptionsResolver();
188 8
            $executor->configurePayload($resolver);
189
190 8
            $this->resolvers[$key] = $resolver;
191 8
        }
192
193 8
        return $this->resolvers[$key];
194
    }
195
196
    /**
197
     * @param int $jobId
198
     * @param string $msg
199
     * @param string $level
200
     * @param array $context
201
     */
202 2
    private function logJob($jobId, $msg, $level = LogLevel::DEBUG, array $context = [])
203
    {
204 2
        $this->logger->log($level, sprintf('[%s] %s', $jobId, $msg), $context);
205 2
    }
206
}
207