1
|
|
|
<?php |
2
|
|
|
|
3
|
|
|
namespace TreeHouse\WorkerBundle; |
4
|
|
|
|
5
|
|
|
use Exception; |
6
|
|
|
use Pheanstalk\Job; |
7
|
|
|
use Psr\Log\LoggerInterface; |
8
|
|
|
use Psr\Log\LogLevel; |
9
|
|
|
use Symfony\Component\EventDispatcher\EventDispatcherInterface; |
10
|
|
|
use Symfony\Component\OptionsResolver\Exception\ExceptionInterface; |
11
|
|
|
use Symfony\Component\OptionsResolver\OptionsResolver; |
12
|
|
|
use TreeHouse\WorkerBundle\Event\ExecutionEvent; |
13
|
|
|
use TreeHouse\WorkerBundle\Event\JobBuriedEvent; |
14
|
|
|
use TreeHouse\WorkerBundle\Event\JobEvent; |
15
|
|
|
use TreeHouse\WorkerBundle\Exception\AbortException; |
16
|
|
|
use TreeHouse\WorkerBundle\Exception\RescheduleException; |
17
|
|
|
use TreeHouse\WorkerBundle\Executor\ExecutorInterface; |
18
|
|
|
|
19
|
|
|
class QueueExecutor |
20
|
|
|
{ |
21
|
|
|
/** |
22
|
|
|
* @var Queue |
23
|
|
|
*/ |
24
|
|
|
private $queue; |
25
|
|
|
/** |
26
|
|
|
* @var ExecutorPool |
27
|
|
|
*/ |
28
|
|
|
private $executorPool; |
29
|
|
|
/** |
30
|
|
|
* @var EventDispatcherInterface |
31
|
|
|
*/ |
32
|
|
|
private $dispatcher; |
33
|
|
|
/** |
34
|
|
|
* @var LoggerInterface |
35
|
|
|
*/ |
36
|
|
|
private $logger; |
37
|
|
|
/** |
38
|
|
|
* @var OptionsResolver[] |
39
|
|
|
*/ |
40
|
|
|
private $resolvers = []; |
41
|
|
|
|
42
|
9 |
|
public function __construct( |
43
|
|
|
Queue $queue, |
44
|
|
|
ExecutorPool $executorPool, |
45
|
|
|
EventDispatcherInterface $dispatcher, |
46
|
|
|
LoggerInterface $logger |
47
|
|
|
) { |
48
|
9 |
|
$this->queue = $queue; |
49
|
9 |
|
$this->executorPool = $executorPool; |
50
|
9 |
|
$this->dispatcher = $dispatcher; |
51
|
9 |
|
$this->logger = $logger; |
52
|
9 |
|
} |
53
|
|
|
|
54
|
|
|
/** |
55
|
|
|
* @return EventDispatcherInterface |
56
|
|
|
* |
57
|
|
|
* @deprecated Removed in next major version |
58
|
|
|
*/ |
59
|
|
|
public function getDispatcher() |
60
|
|
|
{ |
61
|
|
|
return $this->dispatcher; |
62
|
|
|
} |
63
|
|
|
|
64
|
|
|
/** |
65
|
|
|
* @param Job $job The job to process |
66
|
|
|
* @param int $maxRetries The number of retries for this job |
67
|
|
|
* |
68
|
|
|
* @throws AbortException |
69
|
|
|
* |
70
|
|
|
* @return bool|mixed The executor result if successful, false otherwise |
71
|
|
|
*/ |
72
|
6 |
|
public function executeJob(Job $job, $maxRetries = 1) |
73
|
|
|
{ |
74
|
6 |
|
$this->dispatcher->dispatch(WorkerEvents::EXECUTE_JOB, new JobEvent($job)); |
75
|
|
|
|
76
|
6 |
|
$stats = $this->queue->getJobStats($job); |
77
|
6 |
|
$payload = (array) json_decode($job->getData(), true); |
78
|
6 |
|
$releases = intval($stats['releases']); |
79
|
6 |
|
$priority = intval($stats['pri']); |
80
|
|
|
|
81
|
|
|
// context for logging |
82
|
|
|
$context = [ |
83
|
6 |
|
'tube' => $stats['tube'], |
84
|
6 |
|
'payload' => $payload, |
85
|
6 |
|
'attempt' => $releases + 1, |
86
|
6 |
|
]; |
87
|
|
|
|
88
|
|
|
try { |
89
|
|
|
// execute command |
90
|
6 |
|
$result = $this->execute($stats['tube'], $payload); |
91
|
|
|
|
92
|
|
|
// delete job if it completed without exceptions |
93
|
1 |
|
$this->queue->delete($job); |
94
|
|
|
|
95
|
1 |
|
return $result; |
96
|
5 |
|
} catch (RescheduleException $re) { |
97
|
|
|
// Override priority if the RescheduleException provides a new one. |
98
|
2 |
|
if (!is_null($re->getReshedulePriority())) { |
99
|
1 |
|
$priority = $re->getReshedulePriority(); |
100
|
1 |
|
} |
101
|
|
|
// reschedule the job |
102
|
2 |
|
$this->queue->reschedule($job, $re->getRescheduleDate(), $priority); |
103
|
5 |
|
} catch (AbortException $e) { |
104
|
|
|
// abort thrown from executor, rethrow it and let the caller handle it |
105
|
1 |
|
throw $e; |
106
|
2 |
|
} catch (Exception $e) { |
107
|
|
|
// some other exception occured |
108
|
2 |
|
$message = sprintf( |
109
|
2 |
|
'Exception occurred: %s in %s on line %d', |
110
|
2 |
|
$e->getMessage(), |
111
|
2 |
|
$e->getFile(), |
112
|
2 |
|
$e->getLine() |
113
|
2 |
|
); |
114
|
2 |
|
$this->logJob($job->getId(), $message, LogLevel::ERROR, $context); |
115
|
2 |
|
$this->logJob($job->getId(), $e->getTraceAsString(), LogLevel::DEBUG, $context); |
116
|
|
|
|
117
|
|
|
// see if we have any retries left |
118
|
2 |
View Code Duplication |
if ($releases > $maxRetries) { |
|
|
|
|
119
|
|
|
// no more retries, bury job for manual inspection |
120
|
1 |
|
$this->queue->bury($job); |
121
|
|
|
|
122
|
1 |
|
$this->dispatcher->dispatch(WorkerEvents::JOB_BURIED_EVENT, new JobBuriedEvent($job, $e, $releases)); |
123
|
1 |
|
} else { |
124
|
|
|
// try again, regardless of the error |
125
|
1 |
|
$this->queue->reschedule($job, new \DateTime('+10 minutes'), $priority); |
126
|
|
|
} |
127
|
|
|
} |
128
|
|
|
|
129
|
4 |
|
return false; |
130
|
|
|
} |
131
|
|
|
|
132
|
|
|
/** |
133
|
|
|
* Executes an action with a specific payload. |
134
|
|
|
* |
135
|
|
|
* @param string $action |
136
|
|
|
* @param array $payload |
137
|
|
|
* |
138
|
|
|
* @return mixed |
139
|
|
|
*/ |
140
|
8 |
View Code Duplication |
public function execute($action, array $payload) |
|
|
|
|
141
|
|
|
{ |
142
|
8 |
|
$executor = $this->executorPool->getExecutor($action); |
143
|
|
|
|
144
|
|
|
// dispatch pre event, listeners may change the payload here |
145
|
8 |
|
$event = new ExecutionEvent($executor, $action, $payload); |
146
|
8 |
|
$this->dispatcher->dispatch(WorkerEvents::PRE_EXECUTE_ACTION, $event); |
147
|
|
|
|
148
|
|
|
try { |
149
|
8 |
|
$resolver = $this->getPayloadResolver($executor); |
150
|
8 |
|
$payload = $resolver->resolve($event->getPayload()); |
|
|
|
|
151
|
8 |
|
} catch (ExceptionInterface $exception) { |
152
|
1 |
|
$this->logger->error( |
153
|
1 |
|
sprintf( |
154
|
1 |
|
'Payload %s for "%s" is invalid: %s', |
155
|
1 |
|
json_encode($payload, JSON_UNESCAPED_SLASHES), |
156
|
1 |
|
$action, |
157
|
1 |
|
$exception->getMessage() |
158
|
1 |
|
) |
159
|
1 |
|
); |
160
|
|
|
|
161
|
1 |
|
return false; |
162
|
|
|
} |
163
|
|
|
|
164
|
7 |
|
$result = $executor->execute($payload); |
165
|
|
|
|
166
|
2 |
|
$event = clone $event; |
167
|
|
|
|
168
|
|
|
// dispatch post event, listeners may change the result here |
169
|
2 |
|
$event->setResult($result); |
170
|
2 |
|
$this->dispatcher->dispatch(WorkerEvents::POST_EXECUTE_ACTION, $event); |
171
|
|
|
|
172
|
2 |
|
return $event->getResult(); |
173
|
|
|
} |
174
|
|
|
|
175
|
|
|
/** |
176
|
|
|
* Returns a cached version of the payload resolver for an executor. |
177
|
|
|
* |
178
|
|
|
* @param ExecutorInterface $executor |
179
|
|
|
* |
180
|
|
|
* @return OptionsResolver |
181
|
|
|
*/ |
182
|
8 |
View Code Duplication |
protected function getPayloadResolver(ExecutorInterface $executor) |
|
|
|
|
183
|
|
|
{ |
184
|
8 |
|
$key = $executor->getName(); |
185
|
|
|
|
186
|
8 |
|
if (!array_key_exists($key, $this->resolvers)) { |
187
|
8 |
|
$resolver = new OptionsResolver(); |
188
|
8 |
|
$executor->configurePayload($resolver); |
189
|
|
|
|
190
|
8 |
|
$this->resolvers[$key] = $resolver; |
191
|
8 |
|
} |
192
|
|
|
|
193
|
8 |
|
return $this->resolvers[$key]; |
194
|
|
|
} |
195
|
|
|
|
196
|
|
|
/** |
197
|
|
|
* @param int $jobId |
198
|
|
|
* @param string $msg |
199
|
|
|
* @param string $level |
200
|
|
|
* @param array $context |
201
|
|
|
*/ |
202
|
2 |
|
private function logJob($jobId, $msg, $level = LogLevel::DEBUG, array $context = []) |
203
|
|
|
{ |
204
|
2 |
|
$this->logger->log($level, sprintf('[%s] %s', $jobId, $msg), $context); |
205
|
2 |
|
} |
206
|
|
|
} |
207
|
|
|
|
Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.
You can also find more detailed suggestions in the “Code” section of your repository.