This project does not seem to handle request data directly as such no vulnerable execution paths were found.
include
, or for example
via PHP's auto-loading mechanism.
These results are based on our legacy PHP analysis, consider migrating to our new PHP analysis engine instead. Learn more
1 | <?php |
||
2 | |||
3 | namespace SfCod\QueueBundle\Queue; |
||
4 | |||
5 | use DateInterval; |
||
6 | use DateTime; |
||
7 | use MongoDB\Collection; |
||
8 | use MongoDB\DeleteResult; |
||
9 | use SfCod\QueueBundle\Base\JobResolverInterface; |
||
10 | use SfCod\QueueBundle\Entity\Job; |
||
11 | use SfCod\QueueBundle\Job\JobContract; |
||
12 | use SfCod\QueueBundle\Job\JobContractInterface; |
||
13 | use SfCod\QueueBundle\Service\MongoDriver; |
||
14 | |||
15 | /** |
||
16 | * Class MongoQueue |
||
17 | * |
||
18 | * @author Alexey Orlov <[email protected]> |
||
19 | * |
||
20 | * @package yiiSfCod\jobqueue\queues |
||
21 | */ |
||
22 | class MongoQueue extends Queue |
||
23 | { |
||
24 | /** |
||
25 | * Job resolver |
||
26 | * |
||
27 | * @var JobResolverInterface |
||
28 | */ |
||
29 | protected $resolver; |
||
30 | |||
31 | /** |
||
32 | * The mongo connection instance. |
||
33 | * |
||
34 | * @var MongoDriver |
||
35 | */ |
||
36 | protected $mongo; |
||
37 | |||
38 | /** |
||
39 | * The mongo collection that holds the jobs. |
||
40 | * |
||
41 | * @var string |
||
42 | */ |
||
43 | protected $collection; |
||
44 | |||
45 | /** |
||
46 | * The name of the default queue. |
||
47 | * |
||
48 | * @var string |
||
49 | */ |
||
50 | protected $queue = 'default'; |
||
51 | |||
52 | /** |
||
53 | * The expiration time of a job. |
||
54 | * |
||
55 | * @var int|null |
||
56 | */ |
||
57 | protected $expire = 60; |
||
58 | |||
59 | /** |
||
60 | * @var int |
||
61 | */ |
||
62 | protected $limit = 15; |
||
63 | |||
64 | /** |
||
65 | * Create a new mongo queue instance. |
||
66 | * |
||
67 | * @param JobResolverInterface $resolver |
||
68 | * @param MongoDriver $mongo |
||
69 | * @param string $collection |
||
70 | * @param string $queue |
||
71 | * @param int $expire |
||
72 | * @param int $limit |
||
73 | */ |
||
74 | public function __construct( |
||
75 | JobResolverInterface $resolver, |
||
76 | MongoDriver $mongo, |
||
77 | string $collection, |
||
78 | string $queue = 'default', |
||
79 | int $expire = 60, |
||
80 | int $limit = 15 |
||
81 | ) { |
||
82 | $this->resolver = $resolver; |
||
83 | $this->mongo = $mongo; |
||
84 | $this->collection = $collection; |
||
85 | $this->expire = $expire; |
||
86 | $this->queue = $queue; |
||
87 | $this->limit = $limit; |
||
88 | } |
||
89 | |||
90 | /** |
||
91 | * Push a new job onto the queue. |
||
92 | * |
||
93 | * @param string $job |
||
94 | * @param mixed $data |
||
95 | * @param string|null $queue |
||
96 | * |
||
97 | * @return mixed |
||
98 | */ |
||
99 | public function push(string $job, array $data = [], ?string $queue = null) |
||
100 | { |
||
101 | return $this->pushToDatabase(0, $queue, $this->createPayload($job, $data)); |
||
102 | } |
||
103 | |||
104 | /** |
||
105 | * Pop the next job off of the queue. |
||
106 | * |
||
107 | * @param string|null $queue |
||
108 | * |
||
109 | * @return JobContractInterface|null |
||
110 | */ |
||
111 | public function pop(?string $queue = null): ?JobContractInterface |
||
112 | { |
||
113 | $queue = $this->getQueue($queue); |
||
114 | |||
115 | if ($job = $this->getNextAvailableJob($queue)) { |
||
116 | return $job; |
||
117 | } |
||
118 | |||
119 | return null; |
||
120 | } |
||
121 | |||
122 | /** |
||
123 | * Check if job exists in the queue. |
||
124 | * |
||
125 | * @param string $job |
||
126 | * @param array $data |
||
127 | * @param string|null $queue |
||
128 | * |
||
129 | * @return bool |
||
130 | */ |
||
131 | public function exists(string $job, array $data = [], ?string $queue = null): bool |
||
132 | { |
||
133 | return null !== $this->getCollection()->findOne([ |
||
134 | 'queue' => $this->getQueue($queue), |
||
135 | 'payload' => $this->createPayload($job, $data), |
||
136 | ]); |
||
137 | } |
||
138 | |||
139 | /** |
||
140 | * Push a raw payload onto the queue. |
||
141 | * |
||
142 | * @param string $payload |
||
143 | * @param string|null $queue |
||
144 | * @param array $options |
||
145 | * |
||
146 | * @return mixed |
||
147 | */ |
||
148 | public function pushRaw(string $payload, ?string $queue = null, array $options = []) |
||
149 | { |
||
150 | return $this->pushToDatabase(0, $queue, $payload); |
||
151 | } |
||
152 | |||
153 | /** |
||
154 | * Push a new job onto the queue after a delay. |
||
155 | * |
||
156 | * @param DateInterval|int $delay |
||
157 | * @param string $job |
||
158 | * @param array $data |
||
159 | * @param string|null $queue |
||
160 | * |
||
161 | * @return mixed |
||
162 | */ |
||
163 | public function later($delay, string $job, array $data = [], ?string $queue = null) |
||
164 | { |
||
165 | return $this->pushToDatabase($delay, $queue, $this->createPayload($job, $data)); |
||
166 | } |
||
167 | |||
168 | /** |
||
169 | * Push an array of jobs onto the queue. |
||
170 | * |
||
171 | * @param array $jobs |
||
172 | * @param mixed $data |
||
173 | * @param string|null $queue |
||
174 | * |
||
175 | * @return mixed |
||
176 | */ |
||
177 | public function bulk(array $jobs, array $data = [], ?string $queue = null) |
||
178 | { |
||
179 | $queue = $this->getQueue($queue); |
||
180 | |||
181 | $availableAt = $this->getAvailableAt(0); |
||
182 | |||
183 | $records = array_map(function ($job) use ($queue, $data, $availableAt) { |
||
184 | return $this->buildDatabaseRecord($queue, $this->createPayload($job, $data), $availableAt); |
||
185 | }, $jobs); |
||
186 | |||
187 | return $this->getCollection()->insertMany($records); |
||
188 | } |
||
189 | |||
190 | /** |
||
191 | * Release a reserved job back onto the queue. |
||
192 | * |
||
193 | * @param JobContractInterface $job |
||
194 | * @param DateInterval|int $delay |
||
195 | * |
||
196 | * @return mixed |
||
197 | */ |
||
198 | public function release(JobContractInterface $job, $delay) |
||
199 | { |
||
200 | return $this->pushToDatabase($delay, $job->getQueue(), json_encode($job->payload()), $job->attempts()); |
||
201 | } |
||
202 | |||
203 | /** |
||
204 | * Get the next available job for the queue. |
||
205 | * |
||
206 | * @param string $queue |
||
207 | * @param string $id |
||
208 | * |
||
209 | * @return JobContractInterface|null |
||
210 | */ |
||
211 | public function getJobById(string $queue, string $id): ?JobContractInterface |
||
212 | { |
||
213 | $job = $this->getCollection()->findOne(['_id' => new \MongoDB\BSON\ObjectID($id)]); |
||
214 | |||
215 | if (is_null($job)) { |
||
216 | return null; |
||
217 | } else { |
||
218 | return new JobContract($this->resolver, $this, $this->buildJob($job)); |
||
219 | } |
||
220 | } |
||
221 | |||
222 | /** |
||
223 | * Delete a reserved job from the queue. |
||
224 | * |
||
225 | * @param string $queue |
||
226 | * @param string $id |
||
227 | * |
||
228 | * @return bool |
||
229 | */ |
||
230 | public function deleteReserved(string $queue, string $id): bool |
||
231 | { |
||
232 | $query = [ |
||
233 | '_id' => new \MongoDB\BSON\ObjectID($id), |
||
234 | 'queue' => $queue, |
||
235 | ]; |
||
236 | |||
237 | $result = $this->getCollection()->deleteOne($query); |
||
238 | |||
239 | if ($result instanceof DeleteResult) { |
||
240 | return (bool)$result->getDeletedCount(); |
||
241 | } |
||
242 | |||
243 | return true; |
||
244 | } |
||
245 | |||
246 | /** |
||
247 | * Get the expiration time in seconds. |
||
248 | * |
||
249 | * @return int|null |
||
250 | */ |
||
251 | public function getExpire() |
||
252 | { |
||
253 | return $this->expire; |
||
254 | } |
||
255 | |||
256 | /** |
||
257 | * Set the expiration time in seconds. |
||
258 | * |
||
259 | * @param int $seconds |
||
260 | */ |
||
261 | public function setExpire(int $seconds) |
||
262 | { |
||
263 | $this->expire = $seconds; |
||
264 | } |
||
265 | |||
266 | /** |
||
267 | * Get the size of the queue. |
||
268 | * |
||
269 | * @param string|null $queue |
||
270 | * |
||
271 | * @return int |
||
272 | */ |
||
273 | public function size(?string $queue = null): int |
||
274 | { |
||
275 | if ($queue) { |
||
0 ignored issues
–
show
|
|||
276 | return $this->getCollection()->count(['queue' => $queue]); |
||
0 ignored issues
–
show
The method
MongoDB\Collection::count() has been deprecated with message: 1.4
This method has been deprecated. The supplier of the class has supplied an explanatory message. The explanatory message should give you some clue as to whether and when the method will be removed from the class and what other method or class to use instead. ![]() |
|||
277 | } |
||
278 | |||
279 | return $this->getCollection()->count(); |
||
0 ignored issues
–
show
The method
MongoDB\Collection::count() has been deprecated with message: 1.4
This method has been deprecated. The supplier of the class has supplied an explanatory message. The explanatory message should give you some clue as to whether and when the method will be removed from the class and what other method or class to use instead. ![]() |
|||
280 | } |
||
281 | |||
282 | /** |
||
283 | * Check if can run process depend on limits |
||
284 | * |
||
285 | * @param JobContractInterface $job |
||
286 | * |
||
287 | * @return bool |
||
288 | */ |
||
289 | public function canRunJob(JobContractInterface $job): bool |
||
290 | { |
||
291 | return $this->getCollection()->count([ |
||
0 ignored issues
–
show
The method
MongoDB\Collection::count() has been deprecated with message: 1.4
This method has been deprecated. The supplier of the class has supplied an explanatory message. The explanatory message should give you some clue as to whether and when the method will be removed from the class and what other method or class to use instead. ![]() |
|||
292 | 'reserved' => 1, |
||
293 | 'queue' => $job->getQueue(), |
||
294 | ]) < $this->limit || !$job->reserved(); |
||
295 | } |
||
296 | |||
297 | /** |
||
298 | * Mark the given job ID as reserved. |
||
299 | * |
||
300 | * @param JobContractInterface $job |
||
301 | */ |
||
302 | public function markJobAsReserved(JobContractInterface $job) |
||
303 | { |
||
304 | $attempts = $job->attempts() + 1; |
||
305 | $reserved_at = $this->currentTime(); |
||
306 | |||
307 | $this->getCollection()->updateOne(['_id' => new \MongoDB\BSON\ObjectID($job->getJobId())], [ |
||
308 | '$set' => [ |
||
309 | 'attempts' => $attempts, |
||
310 | 'reserved' => 1, |
||
311 | 'reserved_at' => $reserved_at, |
||
312 | ], |
||
313 | ]); |
||
314 | } |
||
315 | |||
316 | /** |
||
317 | * Push a raw payload to the mongo with a given delay. |
||
318 | * |
||
319 | * @param DateInterval|int $delay |
||
320 | * @param string|null $queue |
||
321 | * @param string $payload |
||
322 | * @param int $attempts |
||
323 | * |
||
324 | * @return mixed |
||
325 | */ |
||
326 | protected function pushToDatabase($delay, $queue, $payload, $attempts = 0) |
||
327 | { |
||
328 | $attributes = $this->buildDatabaseRecord($this->getQueue($queue), $payload, $this->getAvailableAt($delay), $attempts); |
||
0 ignored issues
–
show
It seems like
$delay defined by parameter $delay on line 326 can also be of type object<DateInterval> ; however, SfCod\QueueBundle\Queue\...Queue::getAvailableAt() does only seem to accept integer , maybe add an additional type check?
This check looks at variables that have been passed in as parameters and are passed out again to other methods. If the outgoing method call has stricter type requirements than the method itself, an issue is raised. An additional type check may prevent trouble. ![]() |
|||
329 | |||
330 | return $this->getCollection()->insertOne($attributes); |
||
331 | } |
||
332 | |||
333 | /** |
||
334 | * Get the "available at" UNIX timestamp. |
||
335 | * |
||
336 | * @param DateInterval|int $delay |
||
337 | * |
||
338 | * @return int |
||
339 | */ |
||
340 | protected function getAvailableAt($delay = 0) |
||
341 | { |
||
342 | return $delay instanceof DateInterval |
||
343 | ? (new DateTime())->add($delay)->getTimestamp() |
||
344 | : $this->currentTime() + $delay; |
||
345 | } |
||
346 | |||
347 | /** |
||
348 | * Get the queue or return the default. |
||
349 | * |
||
350 | * @param string|null $queue |
||
351 | * |
||
352 | * @return string |
||
353 | */ |
||
354 | protected function getQueue($queue) |
||
355 | { |
||
356 | return $queue ?: $this->queue; |
||
357 | } |
||
358 | |||
359 | /** |
||
360 | * Get the next available job for the queue. |
||
361 | * |
||
362 | * @param string|null $queue |
||
363 | * |
||
364 | * @return JobContractInterface|null |
||
365 | */ |
||
366 | protected function getNextAvailableJob($queue) |
||
367 | { |
||
368 | $job = $this->getCollection() |
||
369 | ->findOne([ |
||
370 | 'queue' => $this->getQueue($queue), |
||
371 | '$or' => [ |
||
372 | $this->isAvailable(), |
||
373 | $this->isReservedButExpired(), |
||
374 | ], |
||
375 | ], [ |
||
376 | 'sort' => ['_id' => 1], |
||
377 | ]); |
||
378 | |||
379 | return $job ? new JobContract($this->resolver, $this, $this->buildJob($job)) : null; |
||
380 | } |
||
381 | |||
382 | /** |
||
383 | * Create an array to insert for the given job. |
||
384 | * |
||
385 | * @param string|null $queue |
||
386 | * @param string $payload |
||
387 | * @param int $availableAt |
||
388 | * @param int $attempts |
||
389 | * |
||
390 | * @return array |
||
391 | */ |
||
392 | protected function buildDatabaseRecord($queue, $payload, $availableAt, $attempts = 0) |
||
393 | { |
||
394 | return [ |
||
395 | 'queue' => $queue, |
||
396 | 'payload' => $payload, |
||
397 | 'attempts' => $attempts, |
||
398 | 'reserved' => 0, |
||
399 | 'reserved_at' => null, |
||
400 | 'available_at' => $availableAt, |
||
401 | 'created_at' => $this->currentTime(), |
||
402 | ]; |
||
403 | } |
||
404 | |||
405 | /** |
||
406 | * Get available jobs |
||
407 | * |
||
408 | * @return array |
||
409 | */ |
||
410 | protected function isAvailable() |
||
411 | { |
||
412 | return [ |
||
413 | 'reserved_at' => null, |
||
414 | 'available_at' => ['$lte' => $this->currentTime()], |
||
415 | ]; |
||
416 | } |
||
417 | |||
418 | /** |
||
419 | * Get reserved but expired by time jobs |
||
420 | * |
||
421 | * @return array |
||
422 | */ |
||
423 | protected function isReservedButExpired() |
||
424 | { |
||
425 | return [ |
||
426 | 'reserved_at' => ['$lte' => $this->currentTime() - $this->expire], |
||
427 | ]; |
||
428 | } |
||
429 | |||
430 | /** |
||
431 | * Get queue collection |
||
432 | * |
||
433 | * @return Collection Mongo collection instance |
||
434 | */ |
||
435 | protected function getCollection(): Collection |
||
436 | { |
||
437 | return $this->mongo->getDatabase()->selectCollection($this->collection); |
||
438 | } |
||
439 | |||
440 | /** |
||
441 | * Build job from database record |
||
442 | * |
||
443 | * @param $data |
||
444 | * |
||
445 | * @return Job |
||
446 | */ |
||
447 | protected function buildJob($data): Job |
||
448 | { |
||
449 | $job = new Job(); |
||
450 | $job->setId($data->_id); |
||
451 | $job->setAttempts($data->attempts); |
||
452 | $job->setQueue($data->queue); |
||
453 | $job->setReserved($data->reserved); |
||
454 | $job->setReservedAt($data->reserved_at); |
||
455 | $job->setPayload(json_decode($data->payload, true)); |
||
456 | |||
457 | return $job; |
||
458 | } |
||
459 | } |
||
460 |
In PHP, under loose comparison (like
==
, or!=
, orswitch
conditions), values of different types might be equal.For
string
values, the empty string''
is a special case, in particular the following results might be unexpected: