1 | <?php |
||
2 | |||
3 | namespace Dtc\QueueBundle\RabbitMQ; |
||
4 | |||
5 | use Dtc\QueueBundle\Exception\ArgumentsNotSetException; |
||
6 | use Dtc\QueueBundle\Exception\ClassNotSubclassException; |
||
7 | use Dtc\QueueBundle\Exception\PriorityException; |
||
8 | use Dtc\QueueBundle\Exception\UnsupportedException; |
||
9 | use Dtc\QueueBundle\Manager\JobIdTrait; |
||
10 | use Dtc\QueueBundle\Manager\JobTimingManager; |
||
11 | use Dtc\QueueBundle\Manager\PriorityJobManager; |
||
12 | use Dtc\QueueBundle\Manager\RunManager; |
||
13 | use Dtc\QueueBundle\Manager\SaveableTrait; |
||
14 | use Dtc\QueueBundle\Manager\VerifyTrait; |
||
15 | use Dtc\QueueBundle\Model\BaseJob; |
||
16 | use Dtc\QueueBundle\Model\JobTiming; |
||
17 | use Dtc\QueueBundle\Model\RetryableJob; |
||
18 | use Dtc\QueueBundle\Util\Util; |
||
19 | use PhpAmqpLib\Channel\AMQPChannel; |
||
20 | use PhpAmqpLib\Connection\AbstractConnection; |
||
21 | use PhpAmqpLib\Message\AMQPMessage; |
||
0 ignored issues
–
show
|
|||
22 | |||
23 | class JobManager extends PriorityJobManager |
||
24 | { |
||
25 | use JobIdTrait; |
||
26 | use SaveableTrait; |
||
27 | use VerifyTrait; |
||
28 | |||
29 | /** @var AMQPChannel */ |
||
30 | protected $channel; |
||
31 | |||
32 | /** @var AbstractConnection */ |
||
33 | protected $connection; |
||
34 | protected $queueArgs; |
||
35 | protected $exchangeArgs; |
||
36 | |||
37 | protected $channelSetup = false; |
||
38 | |||
39 | protected $hostname; |
||
40 | protected $pid; |
||
41 | |||
42 | 2 | public function __construct(RunManager $runManager, JobTimingManager $jobTimingManager, $jobClass) |
|
43 | { |
||
44 | 2 | $this->hostname = gethostname() ?: ''; |
|
45 | 2 | $this->pid = getmypid(); |
|
46 | 2 | parent::__construct($runManager, $jobTimingManager, $jobClass); |
|
47 | 2 | } |
|
48 | |||
49 | /** |
||
50 | * @param string $exchange |
||
51 | * @param string $type |
||
52 | * @param bool $passive |
||
53 | * @param bool $durable |
||
54 | * @param bool $autoDelete |
||
55 | */ |
||
56 | 1 | public function setExchangeArgs($exchange, $type, $passive, $durable, $autoDelete) |
|
57 | { |
||
58 | 1 | $this->exchangeArgs = [$exchange, $type, $passive, $durable, $autoDelete]; |
|
59 | 1 | } |
|
60 | |||
61 | /** |
||
62 | * @param string $queue |
||
63 | * @param bool $passive |
||
64 | * @param bool $durable |
||
65 | * @param bool $exclusive |
||
66 | * @param bool $autoDelete |
||
67 | * |
||
68 | * @throws PriorityException |
||
69 | */ |
||
70 | 1 | public function setQueueArgs($queue, $passive, $durable, $exclusive, $autoDelete) |
|
71 | { |
||
72 | 1 | $arguments = [$queue, $passive, $durable, $exclusive, $autoDelete]; |
|
73 | |||
74 | 1 | $this->queueArgs = $arguments; |
|
75 | 1 | if (!ctype_digit(strval($this->maxPriority))) { |
|
76 | 1 | throw new PriorityException('Max Priority ('.$this->maxPriority.') needs to be a non-negative integer'); |
|
77 | } |
||
78 | 1 | if (strval(intval($this->maxPriority)) !== strval($this->maxPriority)) { |
|
79 | throw new PriorityException('Priority is higher than '.PHP_INT_MAX); |
||
80 | } |
||
81 | 1 | } |
|
82 | |||
83 | 1 | public function setAMQPConnection(AbstractConnection $connection) |
|
84 | { |
||
85 | 1 | $this->connection = $connection; |
|
86 | 1 | $this->channel = $connection->channel(); |
|
87 | 1 | } |
|
88 | |||
89 | /** |
||
90 | * @return AMQPChannel |
||
91 | */ |
||
92 | 1 | public function getChannel() |
|
93 | { |
||
94 | 1 | return $this->channel; |
|
95 | } |
||
96 | |||
97 | /** |
||
98 | * @throws ArgumentsNotSetException |
||
99 | */ |
||
100 | 11 | protected function checkChannelArgs() |
|
101 | { |
||
102 | 11 | if (empty($this->queueArgs)) { |
|
103 | 1 | throw new ArgumentsNotSetException(__METHOD__.': queue args need to be set via setQueueArgs(...)'); |
|
104 | } |
||
105 | 11 | if (empty($this->exchangeArgs)) { |
|
106 | 1 | throw new ArgumentsNotSetException(__METHOD__.': exchange args need to be set via setExchangeArgs(...)'); |
|
107 | } |
||
108 | 11 | } |
|
109 | |||
110 | 1 | protected function performChannelSetup() |
|
111 | { |
||
112 | 1 | call_user_func_array([$this->channel, 'exchange_declare'], $this->exchangeArgs); |
|
113 | 1 | if ($this->maxPriority) { |
|
114 | 1 | array_push($this->queueArgs, false); |
|
115 | 1 | array_push($this->queueArgs, ['x-max-priority' => ['I', intval($this->maxPriority)]]); |
|
116 | } |
||
117 | 1 | call_user_func_array([$this->channel, 'queue_declare'], $this->queueArgs); |
|
118 | 1 | $this->channel->queue_bind($this->queueArgs[0], $this->exchangeArgs[0]); |
|
119 | 1 | } |
|
120 | |||
121 | /** |
||
122 | * @throws ArgumentsNotSetException |
||
123 | */ |
||
124 | 11 | public function setupChannel() |
|
125 | { |
||
126 | 11 | $this->checkChannelArgs(); |
|
127 | |||
128 | 11 | if (!$this->channelSetup) { |
|
129 | 1 | $this->performChannelSetup(); |
|
130 | 1 | $this->channelSetup = true; |
|
131 | } |
||
132 | 11 | } |
|
133 | |||
134 | /** |
||
135 | * @return \Dtc\QueueBundle\Model\Job |
||
136 | * |
||
137 | * @throws ClassNotSubclassException |
||
138 | * @throws PriorityException |
||
139 | * @throws ArgumentsNotSetException |
||
140 | */ |
||
141 | 9 | public function prioritySave(\Dtc\QueueBundle\Model\Job $job) |
|
142 | { |
||
143 | 9 | if (!$job instanceof Job) { |
|
144 | throw new ClassNotSubclassException('Must be derived from '.Job::class); |
||
145 | } |
||
146 | |||
147 | 9 | $this->setupChannel(); |
|
148 | |||
149 | 9 | $this->validateSaveable($job); |
|
150 | 9 | $this->setJobId($job); |
|
151 | |||
152 | 9 | $this->publishJob($job); |
|
153 | |||
154 | 9 | return $job; |
|
155 | } |
||
156 | |||
157 | 9 | protected function publishJob(Job $job) |
|
158 | { |
||
159 | 9 | $msg = new AMQPMessage($job->toMessage()); |
|
160 | 9 | $this->setMsgPriority($msg, $job); |
|
161 | |||
162 | 9 | $this->channel->basic_publish($msg, $this->exchangeArgs[0]); |
|
163 | 9 | } |
|
164 | |||
165 | /** |
||
166 | * Sets the priority of the AMQPMessage. |
||
167 | */ |
||
168 | 9 | protected function setMsgPriority(AMQPMessage $msg, \Dtc\QueueBundle\Model\Job $job) |
|
169 | { |
||
170 | 9 | if (null !== $this->maxPriority) { |
|
171 | 9 | $priority = $job->getPriority(); |
|
172 | 9 | $msg->set('priority', $priority); |
|
173 | } |
||
174 | 9 | } |
|
175 | |||
176 | 9 | protected function calculatePriority($priority) |
|
177 | { |
||
178 | 9 | $priority = parent::calculatePriority($priority); |
|
179 | 9 | if (null === $priority) { |
|
180 | 8 | return 0; |
|
181 | } |
||
182 | |||
183 | 3 | return $priority; |
|
184 | } |
||
185 | |||
186 | /** |
||
187 | * @param string|null $workerName |
||
188 | * @param string|null $methodName |
||
189 | * |
||
190 | * @throws UnsupportedException |
||
191 | * @throws ArgumentsNotSetException |
||
192 | */ |
||
193 | 9 | public function getJob($workerName = null, $methodName = null, $prioritize = true, $runId = null) |
|
194 | { |
||
195 | 9 | $this->verifyGetJobArgs($workerName, $methodName, $prioritize); |
|
196 | 8 | $this->setupChannel(); |
|
197 | |||
198 | do { |
||
199 | 8 | $expiredJob = false; |
|
200 | 8 | $job = $this->findJob($expiredJob, $runId); |
|
201 | 8 | } while ($expiredJob); |
|
202 | |||
203 | 8 | return $job; |
|
204 | } |
||
205 | |||
206 | /** |
||
207 | * @param bool $expiredJob |
||
208 | * @param $runId |
||
209 | * |
||
210 | * @return Job|null |
||
211 | */ |
||
212 | 8 | protected function findJob(&$expiredJob, $runId) |
|
213 | { |
||
214 | 8 | $message = $this->channel->basic_get($this->queueArgs[0]); |
|
215 | 8 | if ($message) { |
|
216 | 8 | $job = new Job(); |
|
217 | 8 | $job->fromMessage($message->body); |
|
218 | 8 | $job->setRunId($runId); |
|
219 | |||
220 | 8 | if (($expiresAt = $job->getExpiresAt()) && $expiresAt->getTimestamp() < time()) { |
|
221 | 1 | $expiredJob = true; |
|
222 | 1 | $this->channel->basic_nack($message->delivery_info['delivery_tag']); |
|
223 | 1 | $this->jobTiminigManager->recordTiming(JobTiming::STATUS_FINISHED_EXPIRED); |
|
224 | |||
225 | 1 | return null; |
|
226 | } |
||
227 | 8 | $job->setDeliveryTag($message->delivery_info['delivery_tag']); |
|
228 | |||
229 | 8 | return $job; |
|
230 | } |
||
231 | |||
232 | 6 | return null; |
|
233 | } |
||
234 | |||
235 | 2 | protected function resetJob(RetryableJob $job) |
|
236 | { |
||
237 | 2 | if (!$job instanceof Job) { |
|
238 | throw new \InvalidArgumentException('$job must be instance of '.Job::class); |
||
239 | } |
||
240 | 2 | $job->setStatus(BaseJob::STATUS_NEW); |
|
241 | 2 | $job->setMessage(null); |
|
242 | 2 | $job->setStartedAt(null); |
|
243 | 2 | $job->setDeliveryTag(null); |
|
244 | 2 | $job->setRetries($job->getRetries() + 1); |
|
245 | 2 | $job->setUpdatedAt(Util::getMicrotimeDateTime()); |
|
246 | 2 | $this->publishJob($job); |
|
247 | |||
248 | 2 | return true; |
|
249 | } |
||
250 | |||
251 | // Save History get called upon completion of the job |
||
252 | 3 | protected function retryableSaveHistory(RetryableJob $job, $retry) |
|
253 | { |
||
254 | 3 | if (!$job instanceof Job) { |
|
255 | throw new ClassNotSubclassException("Expected \Dtc\QueueBundle\RabbitMQ\Job, got ".get_class($job)); |
||
256 | } |
||
257 | 3 | $deliveryTag = $job->getDeliveryTag(); |
|
258 | 3 | if (null !== $deliveryTag) { |
|
259 | 3 | $this->channel->basic_ack($deliveryTag); |
|
260 | } |
||
261 | |||
262 | 3 | return; |
|
263 | } |
||
264 | |||
265 | 3 | public function getWaitingJobCount($workerName = null, $methodName = null) |
|
266 | { |
||
267 | 3 | $this->setupChannel(); |
|
268 | |||
269 | 3 | if ($workerName) { |
|
270 | 1 | throw new UnsupportedException('Waiting Job Count by $workerName is not supported'); |
|
271 | } |
||
272 | 3 | if ($methodName) { |
|
273 | 1 | throw new UnsupportedException('Waiting Job Count by $methodName is not supported'); |
|
274 | } |
||
275 | |||
276 | 3 | $count = call_user_func_array([$this->channel, 'queue_declare'], $this->queueArgs); |
|
277 | |||
278 | 3 | return isset($count[1]) ? $count[1] : 0; |
|
279 | } |
||
280 | |||
281 | 2 | public function __destruct() |
|
282 | { |
||
283 | // There's some kind of problem trying to close the channel, otherwise we'd call $this->channel->close() at this point. |
||
284 | 2 | } |
|
285 | } |
||
286 |
The issue could also be caused by a filter entry in the build configuration. If the path has been excluded in your configuration, e.g.
excluded_paths: ["lib/*"]
, you can move it to the dependency path list as follows:For further information see https://scrutinizer-ci.com/docs/tools/php/php-scrutinizer/#list-dependency-paths