|
1
|
|
|
<?php |
|
2
|
|
|
|
|
3
|
|
|
namespace Dtc\QueueBundle\RabbitMQ; |
|
4
|
|
|
|
|
5
|
|
|
use Dtc\QueueBundle\Manager\SaveableTrait; |
|
6
|
|
|
use Dtc\QueueBundle\Manager\VerifyTrait; |
|
7
|
|
|
use Dtc\QueueBundle\Model\BaseJob; |
|
8
|
|
|
use Dtc\QueueBundle\Manager\JobIdTrait; |
|
9
|
|
|
use Dtc\QueueBundle\Model\RetryableJob; |
|
10
|
|
|
use Dtc\QueueBundle\Model\JobTiming; |
|
11
|
|
|
use Dtc\QueueBundle\Manager\PriorityJobManager; |
|
12
|
|
|
use Dtc\QueueBundle\Exception\ArgumentsNotSetException; |
|
13
|
|
|
use Dtc\QueueBundle\Exception\ClassNotSubclassException; |
|
14
|
|
|
use Dtc\QueueBundle\Exception\PriorityException; |
|
15
|
|
|
use Dtc\QueueBundle\Exception\UnsupportedException; |
|
16
|
|
|
use Dtc\QueueBundle\Manager\RunManager; |
|
17
|
|
|
use Dtc\QueueBundle\Manager\JobTimingManager; |
|
18
|
|
|
use Dtc\QueueBundle\Util\Util; |
|
19
|
|
|
use PhpAmqpLib\Channel\AMQPChannel; |
|
20
|
|
|
use PhpAmqpLib\Connection\AbstractConnection; |
|
21
|
|
|
use PhpAmqpLib\Message\AMQPMessage; |
|
22
|
|
|
|
|
23
|
|
|
class JobManager extends PriorityJobManager |
|
24
|
|
|
{ |
|
25
|
|
|
use JobIdTrait; |
|
26
|
|
|
use VerifyTrait; |
|
27
|
|
|
use SaveableTrait; |
|
28
|
|
|
|
|
29
|
|
|
/** @var AMQPChannel */ |
|
30
|
|
|
protected $channel; |
|
31
|
|
|
|
|
32
|
|
|
/** @var AbstractConnection */ |
|
33
|
|
|
protected $connection; |
|
34
|
|
|
protected $queueArgs; |
|
35
|
|
|
protected $exchangeArgs; |
|
36
|
|
|
|
|
37
|
|
|
protected $channelSetup = false; |
|
38
|
|
|
|
|
39
|
|
|
protected $hostname; |
|
40
|
|
|
protected $pid; |
|
41
|
|
|
|
|
42
|
2 |
|
public function __construct(RunManager $runManager, JobTimingManager $jobTimingManager, $jobClass) |
|
43
|
|
|
{ |
|
44
|
2 |
|
$this->hostname = gethostname() ?: ''; |
|
45
|
2 |
|
$this->pid = getmypid(); |
|
46
|
2 |
|
parent::__construct($runManager, $jobTimingManager, $jobClass); |
|
47
|
2 |
|
} |
|
48
|
|
|
|
|
49
|
|
|
/** |
|
50
|
|
|
* @param string $exchange |
|
51
|
|
|
* @param string $type |
|
52
|
|
|
* @param bool $passive |
|
53
|
|
|
* @param bool $durable |
|
54
|
|
|
* @param bool $autoDelete |
|
55
|
|
|
*/ |
|
56
|
1 |
|
public function setExchangeArgs($exchange, $type, $passive, $durable, $autoDelete) |
|
57
|
|
|
{ |
|
58
|
1 |
|
$this->exchangeArgs = [$exchange, $type, $passive, $durable, $autoDelete]; |
|
59
|
1 |
|
} |
|
60
|
|
|
|
|
61
|
|
|
/** |
|
62
|
|
|
* @param string $queue |
|
63
|
|
|
* @param bool $passive |
|
64
|
|
|
* @param bool $durable |
|
65
|
|
|
* @param bool $exclusive |
|
66
|
|
|
* @param bool $autoDelete |
|
67
|
|
|
* |
|
68
|
|
|
* @throws PriorityException |
|
69
|
|
|
*/ |
|
70
|
1 |
|
public function setQueueArgs($queue, $passive, $durable, $exclusive, $autoDelete) |
|
71
|
|
|
{ |
|
72
|
1 |
|
$arguments = [$queue, $passive, $durable, $exclusive, $autoDelete]; |
|
73
|
|
|
|
|
74
|
1 |
|
$this->queueArgs = $arguments; |
|
75
|
1 |
|
if (!ctype_digit(strval($this->maxPriority))) { |
|
76
|
1 |
|
throw new PriorityException('Max Priority ('.$this->maxPriority.') needs to be a non-negative integer'); |
|
77
|
|
|
} |
|
78
|
1 |
|
if (strval(intval($this->maxPriority)) !== strval($this->maxPriority)) { |
|
79
|
|
|
throw new PriorityException('Priority is higher than '.PHP_INT_MAX); |
|
80
|
|
|
} |
|
81
|
1 |
|
} |
|
82
|
|
|
|
|
83
|
1 |
|
public function setAMQPConnection(AbstractConnection $connection) |
|
84
|
|
|
{ |
|
85
|
1 |
|
$this->connection = $connection; |
|
86
|
1 |
|
$this->channel = $connection->channel(); |
|
87
|
1 |
|
} |
|
88
|
|
|
|
|
89
|
|
|
/** |
|
90
|
|
|
* @return AMQPChannel |
|
91
|
|
|
*/ |
|
92
|
1 |
|
public function getChannel() |
|
93
|
|
|
{ |
|
94
|
1 |
|
return $this->channel; |
|
95
|
|
|
} |
|
96
|
|
|
|
|
97
|
|
|
/** |
|
98
|
|
|
* @throws ArgumentsNotSetException |
|
99
|
|
|
*/ |
|
100
|
11 |
|
protected function checkChannelArgs() |
|
101
|
|
|
{ |
|
102
|
11 |
|
if (empty($this->queueArgs)) { |
|
103
|
1 |
|
throw new ArgumentsNotSetException(__METHOD__.': queue args need to be set via setQueueArgs(...)'); |
|
104
|
|
|
} |
|
105
|
11 |
|
if (empty($this->exchangeArgs)) { |
|
106
|
1 |
|
throw new ArgumentsNotSetException(__METHOD__.': exchange args need to be set via setExchangeArgs(...)'); |
|
107
|
|
|
} |
|
108
|
11 |
|
} |
|
109
|
|
|
|
|
110
|
1 |
|
protected function performChannelSetup() |
|
111
|
|
|
{ |
|
112
|
1 |
|
call_user_func_array([$this->channel, 'exchange_declare'], $this->exchangeArgs); |
|
113
|
1 |
|
if ($this->maxPriority) { |
|
114
|
1 |
|
array_push($this->queueArgs, false); |
|
115
|
1 |
|
array_push($this->queueArgs, ['x-max-priority' => ['I', intval($this->maxPriority)]]); |
|
116
|
|
|
} |
|
117
|
1 |
|
call_user_func_array([$this->channel, 'queue_declare'], $this->queueArgs); |
|
118
|
1 |
|
$this->channel->queue_bind($this->queueArgs[0], $this->exchangeArgs[0]); |
|
119
|
1 |
|
} |
|
120
|
|
|
|
|
121
|
|
|
/** |
|
122
|
|
|
* @throws ArgumentsNotSetException |
|
123
|
|
|
*/ |
|
124
|
11 |
|
public function setupChannel() |
|
125
|
|
|
{ |
|
126
|
11 |
|
$this->checkChannelArgs(); |
|
127
|
|
|
|
|
128
|
11 |
|
if (!$this->channelSetup) { |
|
129
|
1 |
|
$this->performChannelSetup(); |
|
130
|
1 |
|
$this->channelSetup = true; |
|
131
|
|
|
} |
|
132
|
11 |
|
} |
|
133
|
|
|
|
|
134
|
|
|
/** |
|
135
|
|
|
* @param \Dtc\QueueBundle\Model\Job $job |
|
136
|
|
|
* |
|
137
|
|
|
* @return \Dtc\QueueBundle\Model\Job |
|
138
|
|
|
* |
|
139
|
|
|
* @throws ClassNotSubclassException |
|
140
|
|
|
* @throws PriorityException |
|
141
|
|
|
* @throws ArgumentsNotSetException |
|
142
|
|
|
*/ |
|
143
|
9 |
|
public function prioritySave(\Dtc\QueueBundle\Model\Job $job) |
|
144
|
|
|
{ |
|
145
|
9 |
|
if (!$job instanceof Job) { |
|
146
|
|
|
throw new ClassNotSubclassException('Must be derived from '.Job::class); |
|
147
|
|
|
} |
|
148
|
|
|
|
|
149
|
9 |
|
$this->setupChannel(); |
|
150
|
|
|
|
|
151
|
9 |
|
$this->validateSaveable($job); |
|
152
|
9 |
|
$this->setJobId($job); |
|
153
|
|
|
|
|
154
|
9 |
|
$this->publishJob($job); |
|
155
|
|
|
|
|
156
|
9 |
|
return $job; |
|
157
|
|
|
} |
|
158
|
|
|
|
|
159
|
9 |
|
protected function publishJob(Job $job) |
|
160
|
|
|
{ |
|
161
|
9 |
|
$msg = new AMQPMessage($job->toMessage()); |
|
162
|
9 |
|
$this->setMsgPriority($msg, $job); |
|
163
|
|
|
|
|
164
|
9 |
|
$this->channel->basic_publish($msg, $this->exchangeArgs[0]); |
|
165
|
9 |
|
} |
|
166
|
|
|
|
|
167
|
|
|
/** |
|
168
|
|
|
* Sets the priority of the AMQPMessage. |
|
169
|
|
|
* |
|
170
|
|
|
* @param AMQPMessage $msg |
|
171
|
|
|
* @param \Dtc\QueueBundle\Model\Job $job |
|
172
|
|
|
*/ |
|
173
|
9 |
|
protected function setMsgPriority(AMQPMessage $msg, \Dtc\QueueBundle\Model\Job $job) |
|
174
|
|
|
{ |
|
175
|
9 |
|
if (null !== $this->maxPriority) { |
|
176
|
9 |
|
$priority = $job->getPriority(); |
|
177
|
9 |
|
$msg->set('priority', $priority); |
|
178
|
|
|
} |
|
179
|
9 |
|
} |
|
180
|
|
|
|
|
181
|
9 |
|
protected function calculatePriority($priority) |
|
182
|
|
|
{ |
|
183
|
9 |
|
$priority = parent::calculatePriority($priority); |
|
184
|
9 |
|
if (null === $priority) { |
|
185
|
8 |
|
return 0; |
|
186
|
|
|
} |
|
187
|
|
|
|
|
188
|
3 |
|
return $priority; |
|
189
|
|
|
} |
|
190
|
|
|
|
|
191
|
|
|
/** |
|
192
|
|
|
* @param string|null $workerName |
|
193
|
|
|
* @param string|null $methodName |
|
194
|
|
|
* |
|
195
|
|
|
* @throws UnsupportedException |
|
196
|
|
|
* @throws ArgumentsNotSetException |
|
197
|
|
|
*/ |
|
198
|
9 |
|
public function getJob($workerName = null, $methodName = null, $prioritize = true, $runId = null) |
|
199
|
|
|
{ |
|
200
|
9 |
|
$this->verifyGetJobArgs($workerName, $methodName, $prioritize); |
|
201
|
8 |
|
$this->setupChannel(); |
|
202
|
|
|
|
|
203
|
|
|
do { |
|
204
|
8 |
|
$expiredJob = false; |
|
205
|
8 |
|
$job = $this->findJob($expiredJob, $runId); |
|
206
|
8 |
|
} while ($expiredJob); |
|
207
|
|
|
|
|
208
|
8 |
|
return $job; |
|
209
|
|
|
} |
|
210
|
|
|
|
|
211
|
|
|
/** |
|
212
|
|
|
* @param bool $expiredJob |
|
213
|
|
|
* @param $runId |
|
214
|
|
|
* |
|
215
|
|
|
* @return Job|null |
|
216
|
|
|
*/ |
|
217
|
8 |
|
protected function findJob(&$expiredJob, $runId) |
|
218
|
|
|
{ |
|
219
|
8 |
|
$message = $this->channel->basic_get($this->queueArgs[0]); |
|
220
|
8 |
|
if ($message) { |
|
221
|
8 |
|
$job = new Job(); |
|
222
|
8 |
|
$job->fromMessage($message->body); |
|
223
|
8 |
|
$job->setRunId($runId); |
|
224
|
|
|
|
|
225
|
8 |
|
if (($expiresAt = $job->getExpiresAt()) && $expiresAt->getTimestamp() < time()) { |
|
226
|
1 |
|
$expiredJob = true; |
|
227
|
1 |
|
$this->channel->basic_nack($message->delivery_info['delivery_tag']); |
|
228
|
1 |
|
$this->jobTiminigManager->recordTiming(JobTiming::STATUS_FINISHED_EXPIRED); |
|
229
|
|
|
|
|
230
|
1 |
|
return null; |
|
231
|
|
|
} |
|
232
|
8 |
|
$job->setDeliveryTag($message->delivery_info['delivery_tag']); |
|
233
|
|
|
|
|
234
|
8 |
|
return $job; |
|
235
|
|
|
} |
|
236
|
|
|
|
|
237
|
6 |
|
return null; |
|
238
|
|
|
} |
|
239
|
|
|
|
|
240
|
2 |
View Code Duplication |
protected function resetJob(RetryableJob $job) |
|
|
|
|
|
|
241
|
|
|
{ |
|
242
|
2 |
|
if (!$job instanceof Job) { |
|
243
|
|
|
throw new \InvalidArgumentException('$job must be instance of '.Job::class); |
|
244
|
|
|
} |
|
245
|
2 |
|
$job->setStatus(BaseJob::STATUS_NEW); |
|
246
|
2 |
|
$job->setMessage(null); |
|
247
|
2 |
|
$job->setStartedAt(null); |
|
248
|
2 |
|
$job->setDeliveryTag(null); |
|
249
|
2 |
|
$job->setRetries($job->getRetries() + 1); |
|
250
|
2 |
|
$job->setUpdatedAt(Util::getMicrotimeDateTime()); |
|
251
|
2 |
|
$this->publishJob($job); |
|
252
|
|
|
|
|
253
|
2 |
|
return true; |
|
254
|
|
|
} |
|
255
|
|
|
|
|
256
|
|
|
// Save History get called upon completion of the job |
|
257
|
3 |
|
protected function retryableSaveHistory(RetryableJob $job, $retry) |
|
258
|
|
|
{ |
|
259
|
3 |
|
if (!$job instanceof Job) { |
|
260
|
|
|
throw new ClassNotSubclassException("Expected \Dtc\QueueBundle\RabbitMQ\Job, got ".get_class($job)); |
|
261
|
|
|
} |
|
262
|
3 |
|
$deliveryTag = $job->getDeliveryTag(); |
|
263
|
3 |
|
if (null !== $deliveryTag) { |
|
264
|
3 |
|
$this->channel->basic_ack($deliveryTag); |
|
265
|
|
|
} |
|
266
|
|
|
|
|
267
|
3 |
|
return; |
|
268
|
|
|
} |
|
269
|
|
|
|
|
270
|
3 |
|
public function getWaitingJobCount($workerName = null, $methodName = null) |
|
271
|
|
|
{ |
|
272
|
3 |
|
$this->setupChannel(); |
|
273
|
|
|
|
|
274
|
3 |
|
if ($workerName) { |
|
275
|
1 |
|
throw new UnsupportedException('Waiting Job Count by $workerName is not supported'); |
|
276
|
|
|
} |
|
277
|
3 |
|
if ($methodName) { |
|
278
|
1 |
|
throw new UnsupportedException('Waiting Job Count by $methodName is not supported'); |
|
279
|
|
|
} |
|
280
|
|
|
|
|
281
|
3 |
|
$count = call_user_func_array([$this->channel, 'queue_declare'], $this->queueArgs); |
|
282
|
|
|
|
|
283
|
3 |
|
return isset($count[1]) ? $count[1] : 0; |
|
284
|
|
|
} |
|
285
|
|
|
|
|
286
|
2 |
|
public function __destruct() |
|
287
|
|
|
{ |
|
288
|
|
|
// There's some kind of problem trying to close the channel, otherwise we'd call $this->channel->close() at this point. |
|
289
|
2 |
|
} |
|
290
|
|
|
} |
|
291
|
|
|
|
Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.
You can also find more detailed suggestions in the “Code” section of your repository.