 sfcod    /
                    jobqueue
                      sfcod    /
                    jobqueue
                
                            This project does not seem to handle request data directly as such no vulnerable execution paths were found.
include, or for example
                                via PHP's auto-loading mechanism.
                                                    These results are based on our legacy PHP analysis, consider migrating to our new PHP analysis engine instead. Learn more
| 1 | <?php | ||
| 2 | |||
| 3 | namespace SfCod\QueueBundle\Queue; | ||
| 4 | |||
| 5 | use DateInterval; | ||
| 6 | use DateTime; | ||
| 7 | use Predis\Client; | ||
| 8 | use Predis\Collection\Iterator\HashKey; | ||
| 9 | use SfCod\QueueBundle\Base\JobResolverInterface; | ||
| 10 | use SfCod\QueueBundle\Base\RandomizeTrait; | ||
| 11 | use SfCod\QueueBundle\Entity\Job; | ||
| 12 | use SfCod\QueueBundle\Job\JobContract; | ||
| 13 | use SfCod\QueueBundle\Job\JobContractInterface; | ||
| 14 | use SfCod\QueueBundle\Service\RedisDriver; | ||
| 15 | |||
| 16 | /** | ||
| 17 | * Class RedisQueue | ||
| 18 | * | ||
| 19 | * @author Virchenko Maksim <[email protected]> | ||
| 20 | * | ||
| 21 | * @package SfCod\QueueBundle\Queue | ||
| 22 | */ | ||
| 23 | class RedisQueue extends Queue | ||
| 24 | { | ||
| 25 | use RandomizeTrait; | ||
| 26 | |||
| 27 | /** | ||
| 28 | * Job resolver | ||
| 29 | * | ||
| 30 | * @var JobResolverInterface | ||
| 31 | */ | ||
| 32 | private $resolver; | ||
| 33 | |||
| 34 | /** | ||
| 35 | * @var RedisDriver | ||
| 36 | */ | ||
| 37 | private $redis; | ||
| 38 | |||
| 39 | /** | ||
| 40 | * The collection that holds the jobs. | ||
| 41 | * | ||
| 42 | * @var string | ||
| 43 | */ | ||
| 44 | private $collection; | ||
| 45 | |||
| 46 | /** | ||
| 47 | * The name of the default queue. | ||
| 48 | * | ||
| 49 | * @var string | ||
| 50 | */ | ||
| 51 | private $queue = 'default'; | ||
| 52 | |||
| 53 | /** | ||
| 54 | * The expiration time of a job. | ||
| 55 | * | ||
| 56 | * @var int|null | ||
| 57 | */ | ||
| 58 | private $expire = 60; | ||
| 59 | |||
| 60 | /** | ||
| 61 | * @var int | ||
| 62 | */ | ||
| 63 | private $limit = 15; | ||
| 64 | |||
| 65 | /** | ||
| 66 | * Create a new redis queue instance. | ||
| 67 | * | ||
| 68 | * @param JobResolverInterface $resolver | ||
| 69 | * @param RedisDriver $redis | ||
| 70 | * @param string $collection | ||
| 71 | * @param string $queue | ||
| 72 | * @param int $expire | ||
| 73 | * @param int $limit | ||
| 74 | */ | ||
| 75 | public function __construct( | ||
| 76 | JobResolverInterface $resolver, | ||
| 77 | RedisDriver $redis, | ||
| 78 | string $collection = 'queue_jobs', | ||
| 79 | string $queue = 'default', | ||
| 80 | int $expire = 60, | ||
| 81 | int $limit = 15 | ||
| 82 |     ) { | ||
| 83 | $this->resolver = $resolver; | ||
| 84 | $this->redis = $redis; | ||
| 85 | $this->collection = $collection; | ||
| 86 | $this->expire = $expire; | ||
| 87 | $this->queue = $queue; | ||
| 88 | $this->limit = $limit; | ||
| 89 | } | ||
| 90 | |||
| 91 | /** | ||
| 92 | * Get the size of the queue. | ||
| 93 | * | ||
| 94 | * @param string|null $queue | ||
| 95 | * | ||
| 96 | * @return int | ||
| 97 | */ | ||
| 98 | public function size(?string $queue = null): int | ||
| 99 |     { | ||
| 100 | return (int)$this->getClient()->zcount($this->buildKey($queue), '-inf', '+inf'); | ||
| 101 | } | ||
| 102 | |||
| 103 | /** | ||
| 104 | * Get redis client | ||
| 105 | * | ||
| 106 | * @return Client | ||
| 107 | */ | ||
| 108 | private function getClient(): Client | ||
| 109 |     { | ||
| 110 | return $this->redis->getClient(); | ||
| 111 | } | ||
| 112 | |||
| 113 | /** | ||
| 114 | * Build collection:queue:postfix key | ||
| 115 | * | ||
| 116 | * @param string|null $queue | ||
| 117 | * @param string|null $postfix | ||
| 118 | * | ||
| 119 | * @return string | ||
| 120 | */ | ||
| 121 | private function buildKey(?string $queue = 'default', ?string $postfix = null) | ||
| 122 |     { | ||
| 123 | return "$this->collection:$queue" . ($postfix ? ":$postfix" : ''); | ||
| 124 | } | ||
| 125 | |||
| 126 | /** | ||
| 127 | * Push a new job onto the queue. | ||
| 128 | * | ||
| 129 | * @param string $job | ||
| 130 | * @param mixed $data | ||
| 131 | * @param string|null $queue | ||
| 132 | * | ||
| 133 | * @return mixed | ||
| 134 | * | ||
| 135 | * @throws \Exception | ||
| 136 | */ | ||
| 137 | public function push(string $job, array $data = [], ?string $queue = null) | ||
| 138 |     { | ||
| 139 | return $this->pushRaw($this->createPayload($job, $data), $queue); | ||
| 140 | } | ||
| 141 | |||
| 142 | /** | ||
| 143 | * Push a raw payload onto the queue. | ||
| 144 | * | ||
| 145 | * @param string $payload | ||
| 146 | * @param string|null $queue | ||
| 147 | * @param array $options | ||
| 148 | * | ||
| 149 | * @return mixed | ||
| 150 | * | ||
| 151 | * @throws \Exception | ||
| 152 | */ | ||
| 153 | public function pushRaw(string $payload, ?string $queue = null, array $options = []) | ||
| 154 |     { | ||
| 155 | return $this->pushToDatabase(0, $queue, $payload); | ||
| 156 | } | ||
| 157 | |||
| 158 | /** | ||
| 159 | * Push job to database | ||
| 160 | * | ||
| 161 | * @param DateInterval|int $delay | ||
| 162 | * @param string|null $queue | ||
| 163 | * @param string $payload | ||
| 164 | * @param int $attempts | ||
| 165 | * | ||
| 166 | * @throws \Exception | ||
| 167 | */ | ||
| 168 | private function pushToDatabase($delay, ?string $queue, string $payload, int $attempts = 0) | ||
| 169 |     { | ||
| 170 | $id = $this->getRandomId(); | ||
| 171 | |||
| 172 | $pipeline = $this->getClient()->pipeline(['atomic' => true]) | ||
| 173 | ->hset( | ||
| 174 | $this->buildKey($queue, 'payload'), | ||
| 175 | $id, | ||
| 176 | $payload | ||
| 177 | ) | ||
| 178 | ->zadd($this->buildKey($queue), [ | ||
| 179 | $id => $this->getAvailableAt($delay), | ||
| 0 ignored issues–
                            show | |||
| 180 | ]); | ||
| 181 | |||
| 182 |         if ($attempts > 0) { | ||
| 183 | $pipeline->zadd($this->buildKey($queue, 'attempted'), [ | ||
| 184 | $id => $attempts, | ||
| 185 | ]); | ||
| 186 | } | ||
| 187 | |||
| 188 | $pipeline->execute(); | ||
| 189 | } | ||
| 190 | |||
| 191 | /** | ||
| 192 | * Get the "available at" UNIX timestamp. | ||
| 193 | * | ||
| 194 | * @param DateInterval|int $delay | ||
| 195 | * | ||
| 196 | * @return int | ||
| 197 | */ | ||
| 198 | private function getAvailableAt($delay = 0) | ||
| 199 |     { | ||
| 200 | return $delay instanceof DateInterval | ||
| 201 | ? (new DateTime())->add($delay)->getTimestamp() | ||
| 202 | : $this->currentTime() + $delay; | ||
| 203 | } | ||
| 204 | |||
| 205 | /** | ||
| 206 | * Push a new job onto the queue after a delay. | ||
| 207 | * | ||
| 208 | * @param DateInterval|int $delay | ||
| 209 | * @param string $job | ||
| 210 | * @param array $data | ||
| 211 | * @param string|null $queue | ||
| 212 | * | ||
| 213 | * @return mixed | ||
| 214 | * | ||
| 215 | * @throws \Exception | ||
| 216 | */ | ||
| 217 | public function later($delay, string $job, array $data = [], ?string $queue = null) | ||
| 218 |     { | ||
| 219 | return $this->pushToDatabase($delay, $queue, $this->createPayload($job, $data)); | ||
| 220 | } | ||
| 221 | |||
| 222 | /** | ||
| 223 | * Pop the next job off of the queue. | ||
| 224 | * | ||
| 225 | * @param string|null $queue | ||
| 226 | * | ||
| 227 | * @return JobContractInterface|null | ||
| 228 | */ | ||
| 229 | public function pop(?string $queue = null): ?JobContractInterface | ||
| 230 |     { | ||
| 231 | $ids = $this->getClient()->zrangebyscore($this->buildKey($queue), 0, $this->currentTime(), ['LIMIT' => [0, $this->limit]]); | ||
| 232 | |||
| 233 |         if (empty($ids)) { | ||
| 234 | return null; | ||
| 235 | } | ||
| 236 | |||
| 237 |         foreach ($ids as $id) { | ||
| 238 | $reservedAt = $this->getClient()->zscore($this->buildKey($queue, 'reserved'), $id); | ||
| 239 | $isAvailable = null === $reservedAt; | ||
| 240 | $isReservedButExpired = false === ($reservedAt > ($this->currentTime() - $this->expire)); | ||
| 241 | // Take first available or reserved but expired job | ||
| 242 |             if ($isAvailable || $isReservedButExpired) { | ||
| 243 | return $this->getJobById($queue, $id); | ||
| 244 | } | ||
| 245 | } | ||
| 246 | |||
| 247 | return null; | ||
| 248 | } | ||
| 249 | |||
| 250 | /** | ||
| 251 | * Get job by its id | ||
| 252 | * | ||
| 253 | * @param string $queue | ||
| 254 | * @param string $id | ||
| 255 | * | ||
| 256 | * @return JobContractInterface|null | ||
| 257 | */ | ||
| 258 | public function getJobById(string $queue, string $id): ?JobContractInterface | ||
| 259 |     { | ||
| 260 | $job = $this->getClient()->hget($this->buildKey($queue, 'payload'), $id); | ||
| 261 | |||
| 262 |         if (!$job) { | ||
| 263 | return null; | ||
| 264 |         } else { | ||
| 265 | $reservedAt = $this->getClient()->zscore($this->buildKey($queue, 'reserved'), $id); | ||
| 266 | $attempts = $this->getClient()->zscore($this->buildKey($queue, 'attempted'), $id); | ||
| 267 | |||
| 268 | return new JobContract( | ||
| 269 | $this->resolver, | ||
| 270 | $this, | ||
| 271 | $this->buildJob($id, $queue, $attempts ?? 0, json_decode($job, true), $reservedAt) | ||
| 272 | ); | ||
| 273 | } | ||
| 274 | } | ||
| 275 | |||
| 276 | /** | ||
| 277 | * Build job from database record | ||
| 278 | * | ||
| 279 | * @param string $id | ||
| 280 | * @param string $queue | ||
| 281 | * @param int $attempts | ||
| 282 | * @param array $payload | ||
| 283 | * @param int|null $reservedAt | ||
| 284 | * | ||
| 285 | * @return Job | ||
| 286 | */ | ||
| 287 | private function buildJob(string $id, string $queue, int $attempts, array $payload, ?int $reservedAt = null): Job | ||
| 288 |     { | ||
| 289 | $job = new Job(); | ||
| 290 | $job->setId($id); | ||
| 291 | $job->setAttempts($attempts); | ||
| 292 | $job->setQueue($queue); | ||
| 293 | $job->setReserved((bool)$reservedAt); | ||
| 294 | $job->setReservedAt($reservedAt); | ||
| 295 | $job->setPayload($payload); | ||
| 296 | |||
| 297 | return $job; | ||
| 298 | } | ||
| 299 | |||
| 300 | /** | ||
| 301 | * Check if job exists in the queue. | ||
| 302 | * | ||
| 303 | * @param string $job | ||
| 304 | * @param array $data | ||
| 305 | * @param string|null $queue | ||
| 306 | * | ||
| 307 | * @return bool | ||
| 308 | */ | ||
| 309 | public function exists(string $job, array $data = [], ?string $queue = null): bool | ||
| 310 |     { | ||
| 311 | $cursor = new HashKey($this->getClient(), $this->buildKey($queue, 'payload')); | ||
| 312 | $payload = $this->createPayload($job, $data); | ||
| 313 | |||
| 314 |         foreach ($cursor as $key => $value) { | ||
| 315 |             if ($value === $payload) { | ||
| 316 | return true; | ||
| 317 | } | ||
| 318 | } | ||
| 319 | |||
| 320 | return false; | ||
| 321 | } | ||
| 322 | |||
| 323 | /** | ||
| 324 | * Check if can run process depend on limits | ||
| 325 | * | ||
| 326 | * @param JobContractInterface $job | ||
| 327 | * | ||
| 328 | * @return bool | ||
| 329 | */ | ||
| 330 | public function canRunJob(JobContractInterface $job): bool | ||
| 331 |     { | ||
| 332 | return $this->getClient()->zcount( | ||
| 333 | $this->buildKey($job->getQueue(), 'reserved'), | ||
| 334 | '-inf', | ||
| 335 | '+inf' | ||
| 336 | ) < $this->limit || $job->reserved(); | ||
| 337 | } | ||
| 338 | |||
| 339 | /** | ||
| 340 | * Mark the given job ID as reserved. | ||
| 341 | * | ||
| 342 | * @param JobContractInterface $job | ||
| 343 | * | ||
| 344 | * @throws \Exception | ||
| 345 | */ | ||
| 346 | public function markJobAsReserved(JobContractInterface $job) | ||
| 347 |     { | ||
| 348 | $this->getClient()->pipeline(['atomic' => true]) | ||
| 349 | ->zadd($this->buildKey($job->getQueue(), 'reserved'), [ | ||
| 350 | $job->getJobId() => $this->currentTime(), | ||
| 351 | ]) | ||
| 352 | ->zincrby($this->buildKey($job->getQueue(), 'attempted'), 1, $job->getJobId()) | ||
| 353 | ->execute(); | ||
| 354 | } | ||
| 355 | |||
| 356 | /** | ||
| 357 | * Delete a reserved job from the queue. | ||
| 358 | * | ||
| 359 | * @param string $queue | ||
| 360 | * @param string $id | ||
| 361 | * | ||
| 362 | * @return bool | ||
| 363 | * | ||
| 364 | * @throws \Exception | ||
| 365 | */ | ||
| 366 | public function deleteReserved(string $queue, string $id): bool | ||
| 367 |     { | ||
| 368 | $this->getClient()->pipeline(['atomic' => true]) | ||
| 369 | ->hdel($this->buildKey($queue, 'payload'), [$id]) | ||
| 370 | ->zrem($this->buildKey($queue, 'reserved'), $id) | ||
| 371 | ->zrem($this->buildKey($queue, 'attempted'), $id) | ||
| 372 | ->zrem($this->buildKey($queue), $id) | ||
| 373 | ->execute(); | ||
| 374 | |||
| 375 | return true; | ||
| 376 | } | ||
| 377 | |||
| 378 | /** | ||
| 379 | * Release a reserved job back onto the queue. | ||
| 380 | * | ||
| 381 | * @param JobContractInterface $job | ||
| 382 | * @param DateInterval|int $delay | ||
| 383 | * | ||
| 384 | * @return mixed | ||
| 385 | * | ||
| 386 | * @throws \Exception | ||
| 387 | */ | ||
| 388 | public function release(JobContractInterface $job, $delay) | ||
| 389 |     { | ||
| 390 | return $this->pushToDatabase($delay, $job->getQueue(), $job->getRawBody(), $job->attempts()); | ||
| 391 | } | ||
| 392 | } | ||
| 393 | 
 
                                
This check looks at variables that have been passed in as parameters and are passed out again to other methods.
If the outgoing method call has stricter type requirements than the method itself, an issue is raised.
An additional type check may prevent trouble.