1
|
|
|
<?php |
2
|
|
|
|
3
|
|
|
namespace Dtc\QueueBundle\RabbitMQ; |
4
|
|
|
|
5
|
|
|
use Dtc\QueueBundle\Model\BaseJob; |
6
|
|
|
use Dtc\QueueBundle\Manager\JobIdTrait; |
7
|
|
|
use Dtc\QueueBundle\Model\RetryableJob; |
8
|
|
|
use Dtc\QueueBundle\Model\JobTiming; |
9
|
|
|
use Dtc\QueueBundle\Manager\PriorityJobManager; |
10
|
|
|
use Dtc\QueueBundle\Exception\ArgumentsNotSetException; |
11
|
|
|
use Dtc\QueueBundle\Exception\ClassNotSubclassException; |
12
|
|
|
use Dtc\QueueBundle\Exception\PriorityException; |
13
|
|
|
use Dtc\QueueBundle\Exception\UnsupportedException; |
14
|
|
|
use Dtc\QueueBundle\Manager\RunManager; |
15
|
|
|
use Dtc\QueueBundle\Manager\JobTimingManager; |
16
|
|
|
use Dtc\QueueBundle\Util\Util; |
17
|
|
|
use PhpAmqpLib\Channel\AMQPChannel; |
18
|
|
|
use PhpAmqpLib\Connection\AbstractConnection; |
19
|
|
|
use PhpAmqpLib\Message\AMQPMessage; |
20
|
|
|
|
21
|
|
|
class JobManager extends PriorityJobManager |
22
|
|
|
{ |
23
|
|
|
use JobIdTrait; |
24
|
|
|
|
25
|
|
|
/** @var AMQPChannel */ |
26
|
|
|
protected $channel; |
27
|
|
|
|
28
|
|
|
/** @var AbstractConnection */ |
29
|
|
|
protected $connection; |
30
|
|
|
protected $queueArgs; |
31
|
|
|
protected $exchangeArgs; |
32
|
|
|
|
33
|
|
|
protected $channelSetup = false; |
34
|
|
|
|
35
|
|
|
protected $hostname; |
36
|
|
|
protected $pid; |
37
|
|
|
|
38
|
2 |
|
public function __construct(RunManager $runManager, JobTimingManager $jobTimingManager, $jobClass) |
39
|
|
|
{ |
40
|
2 |
|
$this->hostname = gethostname() ?: ''; |
41
|
2 |
|
$this->pid = getmypid(); |
42
|
2 |
|
parent::__construct($runManager, $jobTimingManager, $jobClass); |
43
|
2 |
|
} |
44
|
|
|
|
45
|
|
|
/** |
46
|
|
|
* @param string $exchange |
47
|
|
|
* @param string $type |
48
|
|
|
* @param bool $passive |
49
|
|
|
* @param bool $durable |
50
|
|
|
* @param bool $autoDelete |
51
|
|
|
*/ |
52
|
1 |
|
public function setExchangeArgs($exchange, $type, $passive, $durable, $autoDelete) |
53
|
|
|
{ |
54
|
1 |
|
$this->exchangeArgs = [$exchange, $type, $passive, $durable, $autoDelete]; |
55
|
1 |
|
} |
56
|
|
|
|
57
|
|
|
/** |
58
|
|
|
* @param string $queue |
59
|
|
|
* @param bool $passive |
60
|
|
|
* @param bool $durable |
61
|
|
|
* @param bool $exclusive |
62
|
|
|
* @param bool $autoDelete |
63
|
|
|
* |
64
|
|
|
* @throws PriorityException |
65
|
|
|
*/ |
66
|
1 |
|
public function setQueueArgs($queue, $passive, $durable, $exclusive, $autoDelete) |
67
|
|
|
{ |
68
|
1 |
|
$arguments = [$queue, $passive, $durable, $exclusive, $autoDelete]; |
69
|
|
|
|
70
|
1 |
|
$this->queueArgs = $arguments; |
71
|
1 |
|
if (!ctype_digit(strval($this->maxPriority))) { |
72
|
1 |
|
throw new PriorityException('Max Priority ('.$this->maxPriority.') needs to be a non-negative integer'); |
73
|
|
|
} |
74
|
1 |
|
if (strval(intval($this->maxPriority)) !== strval($this->maxPriority)) { |
75
|
|
|
throw new PriorityException('Priority is higher than '.PHP_INT_MAX); |
76
|
|
|
} |
77
|
1 |
|
} |
78
|
|
|
|
79
|
1 |
|
public function setAMQPConnection(AbstractConnection $connection) |
80
|
|
|
{ |
81
|
1 |
|
$this->connection = $connection; |
82
|
1 |
|
$this->channel = $connection->channel(); |
83
|
1 |
|
} |
84
|
|
|
|
85
|
|
|
/** |
86
|
|
|
* @return AMQPChannel |
87
|
|
|
*/ |
88
|
|
|
public function getChannel() |
89
|
|
|
{ |
90
|
|
|
return $this->channel; |
91
|
|
|
} |
92
|
|
|
|
93
|
|
|
/** |
94
|
|
|
* @throws ArgumentsNotSetException |
95
|
|
|
*/ |
96
|
9 |
|
protected function checkChannelArgs() |
97
|
|
|
{ |
98
|
9 |
|
if (empty($this->queueArgs)) { |
99
|
1 |
|
throw new ArgumentsNotSetException(__METHOD__.': queue args need to be set via setQueueArgs(...)'); |
100
|
|
|
} |
101
|
9 |
|
if (empty($this->exchangeArgs)) { |
102
|
1 |
|
throw new ArgumentsNotSetException(__METHOD__.': exchange args need to be set via setExchangeArgs(...)'); |
103
|
|
|
} |
104
|
9 |
|
} |
105
|
|
|
|
106
|
1 |
|
protected function performChannelSetup() |
107
|
|
|
{ |
108
|
1 |
|
call_user_func_array([$this->channel, 'exchange_declare'], $this->exchangeArgs); |
109
|
1 |
|
if ($this->maxPriority) { |
110
|
1 |
|
array_push($this->queueArgs, false); |
111
|
1 |
|
array_push($this->queueArgs, ['x-max-priority' => ['I', intval($this->maxPriority)]]); |
112
|
|
|
} |
113
|
1 |
|
call_user_func_array([$this->channel, 'queue_declare'], $this->queueArgs); |
114
|
1 |
|
$this->channel->queue_bind($this->queueArgs[0], $this->exchangeArgs[0]); |
115
|
1 |
|
} |
116
|
|
|
|
117
|
|
|
/** |
118
|
|
|
* @throws ArgumentsNotSetException |
119
|
|
|
*/ |
120
|
9 |
|
public function setupChannel() |
121
|
|
|
{ |
122
|
9 |
|
$this->checkChannelArgs(); |
123
|
|
|
|
124
|
9 |
|
if (!$this->channelSetup) { |
125
|
1 |
|
$this->performChannelSetup(); |
126
|
1 |
|
$this->channelSetup = true; |
127
|
|
|
} |
128
|
9 |
|
} |
129
|
|
|
|
130
|
|
|
/** |
131
|
|
|
* @param \Dtc\QueueBundle\Model\Job $job |
132
|
|
|
* |
133
|
|
|
* @return \Dtc\QueueBundle\Model\Job |
134
|
|
|
* |
135
|
|
|
* @throws ClassNotSubclassException |
136
|
|
|
* @throws PriorityException |
137
|
|
|
* @throws ArgumentsNotSetException |
138
|
|
|
*/ |
139
|
8 |
|
public function prioritySave(\Dtc\QueueBundle\Model\Job $job) |
140
|
|
|
{ |
141
|
8 |
|
if (!$job instanceof Job) { |
142
|
|
|
throw new ClassNotSubclassException('Must be derived from '.Job::class); |
143
|
|
|
} |
144
|
|
|
|
145
|
8 |
|
$this->setupChannel(); |
146
|
|
|
|
147
|
8 |
|
$this->validateSaveable($job); |
148
|
8 |
|
$this->setJobId($job); |
149
|
|
|
|
150
|
8 |
|
$this->publishJob($job); |
151
|
|
|
|
152
|
8 |
|
return $job; |
153
|
|
|
} |
154
|
|
|
|
155
|
8 |
|
protected function publishJob(Job $job) |
156
|
|
|
{ |
157
|
8 |
|
$msg = new AMQPMessage($job->toMessage()); |
158
|
8 |
|
$this->setMsgPriority($msg, $job); |
159
|
|
|
|
160
|
8 |
|
$this->channel->basic_publish($msg, $this->exchangeArgs[0]); |
161
|
8 |
|
} |
162
|
|
|
|
163
|
|
|
/** |
164
|
|
|
* Sets the priority of the AMQPMessage. |
165
|
|
|
* |
166
|
|
|
* @param AMQPMessage $msg |
167
|
|
|
* @param \Dtc\QueueBundle\Model\Job $job |
168
|
|
|
*/ |
169
|
8 |
|
protected function setMsgPriority(AMQPMessage $msg, \Dtc\QueueBundle\Model\Job $job) |
170
|
|
|
{ |
171
|
8 |
|
if (null !== $this->maxPriority) { |
172
|
8 |
|
$priority = $job->getPriority(); |
173
|
8 |
|
$msg->set('priority', $priority); |
174
|
|
|
} |
175
|
8 |
|
} |
176
|
|
|
|
177
|
8 |
|
protected function calculatePriority($priority) |
178
|
|
|
{ |
179
|
8 |
|
$priority = parent::calculatePriority($priority); |
180
|
8 |
|
if (null === $priority) { |
181
|
7 |
|
return 0; |
182
|
|
|
} |
183
|
|
|
|
184
|
3 |
|
return $priority; |
185
|
|
|
} |
186
|
|
|
|
187
|
|
|
/** |
188
|
|
|
* @param \Dtc\QueueBundle\Model\Job $job |
189
|
|
|
* |
190
|
|
|
* @throws PriorityException |
191
|
|
|
* @throws ClassNotSubclassException |
192
|
|
|
*/ |
193
|
8 |
View Code Duplication |
protected function validateSaveable(\Dtc\QueueBundle\Model\Job $job) |
|
|
|
|
194
|
|
|
{ |
195
|
8 |
|
if (null !== $job->getPriority() && null === $this->maxPriority) { |
196
|
|
|
throw new PriorityException('This queue does not support priorities'); |
197
|
|
|
} |
198
|
|
|
|
199
|
8 |
|
if (!$job instanceof Job) { |
200
|
|
|
throw new ClassNotSubclassException('Job needs to be instance of '.Job::class); |
201
|
|
|
} |
202
|
8 |
|
} |
203
|
|
|
|
204
|
|
|
/** |
205
|
|
|
* @param string|null $workerName |
206
|
|
|
* @param string|null $methodName |
207
|
|
|
* @param bool $prioritize |
208
|
|
|
* |
209
|
|
|
* @throws UnsupportedException |
210
|
|
|
*/ |
211
|
7 |
|
protected function verifyGetJobArgs($workerName = null, $methodName = null, $prioritize = true) |
212
|
|
|
{ |
213
|
7 |
View Code Duplication |
if (null !== $workerName || null !== $methodName || (null !== $this->maxPriority && true !== $prioritize)) { |
|
|
|
|
214
|
2 |
|
throw new UnsupportedException('Unsupported'); |
215
|
|
|
} |
216
|
6 |
|
} |
217
|
|
|
|
218
|
|
|
/** |
219
|
|
|
* @param string|null $workerName |
220
|
|
|
* @param string|null $methodName |
221
|
|
|
* |
222
|
|
|
* @throws UnsupportedException |
223
|
|
|
* @throws ArgumentsNotSetException |
224
|
|
|
*/ |
225
|
7 |
|
public function getJob($workerName = null, $methodName = null, $prioritize = true, $runId = null) |
226
|
|
|
{ |
227
|
7 |
|
$this->verifyGetJobArgs($workerName, $methodName, $prioritize); |
228
|
6 |
|
$this->setupChannel(); |
229
|
|
|
|
230
|
|
|
do { |
231
|
6 |
|
$expiredJob = false; |
232
|
6 |
|
$job = $this->findJob($expiredJob, $runId); |
233
|
6 |
|
} while ($expiredJob); |
234
|
|
|
|
235
|
6 |
|
return $job; |
236
|
|
|
} |
237
|
|
|
|
238
|
|
|
/** |
239
|
|
|
* @param bool $expiredJob |
240
|
|
|
* @param $runId |
241
|
|
|
* |
242
|
|
|
* @return Job|null |
243
|
|
|
*/ |
244
|
6 |
|
protected function findJob(&$expiredJob, $runId) |
245
|
|
|
{ |
246
|
6 |
|
$message = $this->channel->basic_get($this->queueArgs[0]); |
247
|
6 |
|
if ($message) { |
248
|
6 |
|
$job = new Job(); |
249
|
6 |
|
$job->fromMessage($message->body); |
250
|
6 |
|
$job->setRunId($runId); |
251
|
|
|
|
252
|
6 |
|
if (($expiresAt = $job->getExpiresAt()) && $expiresAt->getTimestamp() < time()) { |
253
|
1 |
|
$expiredJob = true; |
254
|
1 |
|
$this->channel->basic_nack($message->delivery_info['delivery_tag']); |
255
|
1 |
|
$this->jobTiminigManager->recordTiming(JobTiming::STATUS_FINISHED_EXPIRED); |
256
|
|
|
|
257
|
1 |
|
return null; |
258
|
|
|
} |
259
|
6 |
|
$job->setDeliveryTag($message->delivery_info['delivery_tag']); |
260
|
|
|
|
261
|
6 |
|
return $job; |
262
|
|
|
} |
263
|
|
|
|
264
|
3 |
|
return null; |
265
|
|
|
} |
266
|
|
|
|
267
|
2 |
View Code Duplication |
protected function resetJob(RetryableJob $job) |
|
|
|
|
268
|
|
|
{ |
269
|
2 |
|
if (!$job instanceof Job) { |
270
|
|
|
throw new \InvalidArgumentException('$job must be instance of '.Job::class); |
271
|
|
|
} |
272
|
2 |
|
$job->setStatus(BaseJob::STATUS_NEW); |
273
|
2 |
|
$job->setMessage(null); |
274
|
2 |
|
$job->setStartedAt(null); |
275
|
2 |
|
$job->setRetries($job->getRetries() + 1); |
276
|
2 |
|
$job->setUpdatedAt(Util::getMicrotimeDateTime()); |
277
|
2 |
|
$this->publishJob($job); |
278
|
|
|
|
279
|
2 |
|
return true; |
280
|
|
|
} |
281
|
|
|
|
282
|
|
|
// Save History get called upon completion of the job |
283
|
3 |
|
protected function retryableSaveHistory(RetryableJob $job, $retry) |
284
|
|
|
{ |
285
|
3 |
|
if (!$job instanceof Job) { |
286
|
|
|
throw new ClassNotSubclassException("Expected \Dtc\QueueBundle\RabbitMQ\Job, got ".get_class($job)); |
287
|
|
|
} |
288
|
3 |
|
$deliveryTag = $job->getDeliveryTag(); |
289
|
3 |
|
$this->channel->basic_ack($deliveryTag); |
290
|
|
|
|
291
|
3 |
|
return; |
292
|
|
|
} |
293
|
|
|
|
294
|
2 |
|
public function __destruct() |
295
|
|
|
{ |
296
|
|
|
// There's some kind of problem trying to close the channel, otherwise we'd call $this->channel->close() at this point. |
297
|
2 |
|
} |
298
|
|
|
} |
299
|
|
|
|
Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.
You can also find more detailed suggestions in the “Code” section of your repository.