gyselroth /
mongodb-php-task-scheduler
| 1 | <?php |
||
| 2 | |||
| 3 | declare(strict_types=1); |
||
| 4 | |||
| 5 | /** |
||
| 6 | * TaskScheduler |
||
| 7 | * |
||
| 8 | * @author Raffael Sahli <[email protected]> |
||
| 9 | * @copyright Copryright (c) 2017-2019 gyselroth GmbH (https://gyselroth.com) |
||
| 10 | * @license MIT https://opensource.org/licenses/MIT |
||
| 11 | */ |
||
| 12 | |||
| 13 | namespace TaskScheduler; |
||
| 14 | |||
| 15 | use Closure; |
||
| 16 | use Generator; |
||
| 17 | use MongoDB\BSON\ObjectId; |
||
| 18 | use MongoDB\BSON\UTCDateTime; |
||
| 19 | use MongoDB\Database; |
||
| 20 | use MongoDB\UpdateResult; |
||
| 21 | use Psr\Log\LoggerInterface; |
||
| 22 | use TaskScheduler\Exception\InvalidArgumentException; |
||
| 23 | use TaskScheduler\Exception\JobNotFoundException; |
||
| 24 | |||
| 25 | class Scheduler |
||
| 26 | { |
||
| 27 | /** |
||
| 28 | * Job options. |
||
| 29 | */ |
||
| 30 | public const OPTION_AT = 'at'; |
||
| 31 | public const OPTION_INTERVAL = 'interval'; |
||
| 32 | public const OPTION_RETRY = 'retry'; |
||
| 33 | public const OPTION_RETRY_INTERVAL = 'retry_interval'; |
||
| 34 | public const OPTION_FORCE_SPAWN = 'force_spawn'; |
||
| 35 | public const OPTION_TIMEOUT = 'timeout'; |
||
| 36 | public const OPTION_ID = 'id'; |
||
| 37 | public const OPTION_IGNORE_DATA = 'ignore_data'; |
||
| 38 | |||
| 39 | /** |
||
| 40 | * Operation options: |
||
| 41 | */ |
||
| 42 | public const OPTION_THROW_EXCEPTION = 1; |
||
| 43 | |||
| 44 | /** |
||
| 45 | * Default job options. |
||
| 46 | */ |
||
| 47 | public const OPTION_DEFAULT_AT = 'default_at'; |
||
| 48 | public const OPTION_DEFAULT_INTERVAL = 'default_interval'; |
||
| 49 | public const OPTION_DEFAULT_RETRY = 'default_retry'; |
||
| 50 | public const OPTION_DEFAULT_RETRY_INTERVAL = 'default_retry_interval'; |
||
| 51 | public const OPTION_DEFAULT_TIMEOUT = 'default_timeout'; |
||
| 52 | |||
| 53 | /** |
||
| 54 | * Queue options. |
||
| 55 | */ |
||
| 56 | public const OPTION_JOB_QUEUE = 'job_queue'; |
||
| 57 | public const OPTION_JOB_QUEUE_SIZE = 'job_queue_size'; |
||
| 58 | public const OPTION_EVENT_QUEUE = 'event_queue'; |
||
| 59 | public const OPTION_EVENT_QUEUE_SIZE = 'event_queue_size'; |
||
| 60 | |||
| 61 | /** |
||
| 62 | * MongoDB type map. |
||
| 63 | */ |
||
| 64 | public const TYPE_MAP = [ |
||
| 65 | 'document' => 'array', |
||
| 66 | 'root' => 'array', |
||
| 67 | 'array' => 'array', |
||
| 68 | ]; |
||
| 69 | |||
| 70 | /** |
||
| 71 | * Database. |
||
| 72 | * |
||
| 73 | * @var Database |
||
| 74 | */ |
||
| 75 | protected $db; |
||
| 76 | |||
| 77 | /** |
||
| 78 | * LoggerInterface. |
||
| 79 | * |
||
| 80 | * @var LoggerInterface |
||
| 81 | */ |
||
| 82 | protected $logger; |
||
| 83 | |||
| 84 | /** |
||
| 85 | * Job Collection name. |
||
| 86 | * |
||
| 87 | * @var string |
||
| 88 | */ |
||
| 89 | protected $job_queue = 'taskscheduler.jobs'; |
||
| 90 | |||
| 91 | /** |
||
| 92 | * Event Collection name. |
||
| 93 | * |
||
| 94 | * @var string |
||
| 95 | */ |
||
| 96 | protected $event_queue = 'taskscheduler.events'; |
||
| 97 | |||
| 98 | /** |
||
| 99 | * Unix time. |
||
| 100 | * |
||
| 101 | * @var int |
||
| 102 | */ |
||
| 103 | protected $default_at = 0; |
||
| 104 | |||
| 105 | /** |
||
| 106 | * Default interval (secconds). |
||
| 107 | * |
||
| 108 | * @var int |
||
| 109 | */ |
||
| 110 | protected $default_interval = 0; |
||
| 111 | |||
| 112 | /** |
||
| 113 | * Default retry. |
||
| 114 | * |
||
| 115 | * @var int |
||
| 116 | */ |
||
| 117 | protected $default_retry = 0; |
||
| 118 | |||
| 119 | /** |
||
| 120 | * Default retry interval (secconds). |
||
| 121 | * |
||
| 122 | * @var int |
||
| 123 | */ |
||
| 124 | protected $default_retry_interval = 300; |
||
| 125 | |||
| 126 | /** |
||
| 127 | * Default timeout. |
||
| 128 | * |
||
| 129 | * @var int |
||
| 130 | */ |
||
| 131 | protected $default_timeout = 0; |
||
| 132 | |||
| 133 | /** |
||
| 134 | * Job Queue size. |
||
| 135 | * |
||
| 136 | * @var int |
||
| 137 | */ |
||
| 138 | protected $job_queue_size = 1000000; |
||
| 139 | |||
| 140 | /** |
||
| 141 | * Event Queue size. |
||
| 142 | * |
||
| 143 | * @var int |
||
| 144 | */ |
||
| 145 | protected $event_queue_size = 5000000; |
||
| 146 | |||
| 147 | /** |
||
| 148 | * Events queue. |
||
| 149 | * |
||
| 150 | * @var MessageQueue |
||
| 151 | */ |
||
| 152 | protected $events; |
||
| 153 | |||
| 154 | /** |
||
| 155 | * Init queue. |
||
| 156 | */ |
||
| 157 | 58 | public function __construct(Database $db, LoggerInterface $logger, array $config = []) |
|
| 158 | { |
||
| 159 | 58 | $this->db = $db; |
|
| 160 | 58 | $this->logger = $logger; |
|
| 161 | 58 | $this->setOptions($config); |
|
| 162 | 58 | $this->events = new MessageQueue($db, $this->getEventQueue(), $this->getEventQueueSize(), $logger); |
|
| 163 | 58 | } |
|
| 164 | |||
| 165 | /** |
||
| 166 | * Set options. |
||
| 167 | */ |
||
| 168 | 58 | public function setOptions(array $config = []): self |
|
| 169 | { |
||
| 170 | 58 | foreach ($config as $option => $value) { |
|
| 171 | switch ($option) { |
||
| 172 | 4 | case self::OPTION_JOB_QUEUE: |
|
| 173 | 4 | case self::OPTION_EVENT_QUEUE: |
|
| 174 | 2 | $this->{$option} = (string) $value; |
|
| 175 | |||
| 176 | 2 | break; |
|
| 177 | 4 | case self::OPTION_DEFAULT_AT: |
|
| 178 | 4 | case self::OPTION_DEFAULT_RETRY_INTERVAL: |
|
| 179 | 4 | case self::OPTION_DEFAULT_INTERVAL: |
|
| 180 | 4 | case self::OPTION_DEFAULT_RETRY: |
|
| 181 | 3 | case self::OPTION_DEFAULT_TIMEOUT: |
|
| 182 | 3 | case self::OPTION_JOB_QUEUE_SIZE: |
|
| 183 | 3 | case self::OPTION_EVENT_QUEUE_SIZE: |
|
| 184 | 3 | $this->{$option} = (int) $value; |
|
| 185 | |||
| 186 | 3 | break; |
|
| 187 | default: |
||
| 188 | 4 | throw new InvalidArgumentException('invalid option '.$option.' given'); |
|
| 189 | } |
||
| 190 | } |
||
| 191 | |||
| 192 | 58 | return $this; |
|
| 193 | } |
||
| 194 | |||
| 195 | /** |
||
| 196 | * Get job Queue size. |
||
| 197 | */ |
||
| 198 | 31 | public function getJobQueueSize(): int |
|
| 199 | { |
||
| 200 | 31 | return $this->job_queue_size; |
|
| 201 | } |
||
| 202 | |||
| 203 | /** |
||
| 204 | * Get event Queue size. |
||
| 205 | */ |
||
| 206 | 58 | public function getEventQueueSize(): int |
|
| 207 | { |
||
| 208 | 58 | return $this->event_queue_size; |
|
| 209 | } |
||
| 210 | |||
| 211 | /** |
||
| 212 | * Get job collection name. |
||
| 213 | */ |
||
| 214 | 31 | public function getJobQueue(): string |
|
| 215 | { |
||
| 216 | 31 | return $this->job_queue; |
|
| 217 | } |
||
| 218 | |||
| 219 | /** |
||
| 220 | * Get event collection name. |
||
| 221 | */ |
||
| 222 | 58 | public function getEventQueue(): string |
|
| 223 | { |
||
| 224 | 58 | return $this->event_queue; |
|
| 225 | } |
||
| 226 | |||
| 227 | /** |
||
| 228 | * Get job by Id. |
||
| 229 | */ |
||
| 230 | 27 | public function getJob(ObjectId $id): Process |
|
| 231 | { |
||
| 232 | 27 | $result = $this->db->{$this->job_queue}->findOne([ |
|
| 233 | 27 | '_id' => $id, |
|
| 234 | ], [ |
||
| 235 | 27 | 'typeMap' => self::TYPE_MAP, |
|
| 236 | ]); |
||
| 237 | |||
| 238 | 27 | if (null === $result) { |
|
| 239 | 2 | throw new JobNotFoundException('job '.$id.' was not found'); |
|
| 240 | } |
||
| 241 | |||
| 242 | 25 | return new Process($result, $this, $this->events); |
|
| 243 | } |
||
| 244 | |||
| 245 | /** |
||
| 246 | * Cancel job. |
||
| 247 | */ |
||
| 248 | 5 | public function cancelJob(ObjectId $id): bool |
|
| 249 | { |
||
| 250 | 5 | $result = $this->updateJob($id, JobInterface::STATUS_CANCELED); |
|
| 251 | |||
| 252 | 5 | if (1 !== $result->getMatchedCount()) { |
|
| 253 | 1 | throw new JobNotFoundException('job '.$id.' was not found'); |
|
| 254 | } |
||
| 255 | |||
| 256 | 4 | $this->db->{$this->event_queue}->insertOne([ |
|
| 257 | 4 | 'job' => $id, |
|
| 258 | 'status' => JobInterface::STATUS_CANCELED, |
||
| 259 | 4 | 'timestamp' => new UTCDateTime(), |
|
| 260 | ]); |
||
| 261 | |||
| 262 | 4 | return true; |
|
| 263 | } |
||
| 264 | |||
| 265 | /** |
||
| 266 | * Flush. |
||
| 267 | */ |
||
| 268 | 1 | public function flush(): Scheduler |
|
| 269 | { |
||
| 270 | 1 | $this->db->{$this->job_queue}->drop(); |
|
| 271 | 1 | $this->db->{$this->event_queue}->drop(); |
|
| 272 | |||
| 273 | 1 | return $this; |
|
| 274 | } |
||
| 275 | |||
| 276 | /** |
||
| 277 | * Get jobs (Pass a filter which contains job status, by default all active jobs get returned). |
||
| 278 | */ |
||
| 279 | 13 | public function getJobs(array $query = []): Generator |
|
| 280 | { |
||
| 281 | 13 | if (0 === count($query)) { |
|
| 282 | 12 | $query = ['status' => ['$in' => [ |
|
| 283 | JobInterface::STATUS_WAITING, |
||
| 284 | JobInterface::STATUS_PROCESSING, |
||
| 285 | JobInterface::STATUS_POSTPONED, |
||
| 286 | ]]]; |
||
| 287 | } |
||
| 288 | |||
| 289 | 13 | $result = $this->db->{$this->job_queue}->find($query, [ |
|
| 290 | 13 | 'typeMap' => self::TYPE_MAP, |
|
| 291 | ]); |
||
| 292 | |||
| 293 | 13 | foreach ($result as $job) { |
|
| 294 | 12 | yield new Process($job, $this, $this->events); |
|
| 295 | } |
||
| 296 | 13 | } |
|
| 297 | |||
| 298 | /** |
||
| 299 | * Add job to queue. |
||
| 300 | */ |
||
| 301 | 45 | public function addJob(string $class, $data, array $options = []): Process |
|
| 302 | { |
||
| 303 | 45 | $document = $this->prepareInsert($class, $data, $options); |
|
| 304 | |||
| 305 | 41 | $result = $this->db->{$this->job_queue}->insertOne($document); |
|
| 306 | 41 | $this->logger->debug('queue job ['.$result->getInsertedId().'] added to ['.$class.']', [ |
|
| 307 | 41 | 'category' => get_class($this), |
|
| 308 | 41 | 'params' => $options, |
|
| 309 | 41 | 'data' => $data, |
|
| 310 | ]); |
||
| 311 | |||
| 312 | 41 | $this->db->{$this->event_queue}->insertOne([ |
|
| 313 | 41 | 'job' => $result->getInsertedId(), |
|
| 314 | 'status' => JobInterface::STATUS_WAITING, |
||
| 315 | 41 | 'timestamp' => new UTCDateTime(), |
|
| 316 | ]); |
||
| 317 | |||
| 318 | 41 | $document = $this->db->{$this->job_queue}->findOne(['_id' => $result->getInsertedId()], [ |
|
| 319 | 41 | 'typeMap' => self::TYPE_MAP, |
|
| 320 | ]); |
||
| 321 | |||
| 322 | 41 | $process = new Process($document, $this, $this->events); |
|
| 323 | |||
| 324 | 41 | return $process; |
|
| 325 | } |
||
| 326 | |||
| 327 | /** |
||
| 328 | * Only add job if not in queue yet. |
||
| 329 | */ |
||
| 330 | 5 | public function addJobOnce(string $class, $data, array $options = []): Process |
|
| 331 | { |
||
| 332 | $filter = [ |
||
| 333 | 5 | 'class' => $class, |
|
| 334 | '$or' => [ |
||
| 335 | ['status' => JobInterface::STATUS_WAITING], |
||
| 336 | ['status' => JobInterface::STATUS_POSTPONED], |
||
| 337 | ['status' => JobInterface::STATUS_PROCESSING], |
||
| 338 | ], |
||
| 339 | ]; |
||
| 340 | |||
| 341 | 5 | $requested = $options; |
|
| 342 | 5 | $document = $this->prepareInsert($class, $data, $options); |
|
| 343 | |||
| 344 | 5 | if (true !== $options[self::OPTION_IGNORE_DATA]) { |
|
| 345 | 4 | $filter = ['data' => $data] + $filter; |
|
| 346 | } |
||
| 347 | |||
| 348 | 5 | $result = $this->db->{$this->job_queue}->updateOne($filter, ['$setOnInsert' => $document], [ |
|
| 349 | 5 | 'upsert' => true, |
|
| 350 | '$isolated' => true, |
||
| 351 | ]); |
||
| 352 | |||
| 353 | 5 | if ($result->getMatchedCount() > 0) { |
|
| 354 | 4 | $document = $this->db->{$this->job_queue}->findOne($filter, [ |
|
| 355 | 4 | 'typeMap' => self::TYPE_MAP, |
|
| 356 | ]); |
||
| 357 | |||
| 358 | 4 | if (array_intersect_key($document['options'], $requested) !== $requested || ($data !== $document['data'] && true === $options[self::OPTION_IGNORE_DATA])) { |
|
| 359 | 2 | $this->logger->debug('job ['.$document['_id'].'] options/data changed, reschedule new job', [ |
|
| 360 | 2 | 'category' => get_class($this), |
|
| 361 | 2 | 'data' => $data, |
|
| 362 | ]); |
||
| 363 | |||
| 364 | 2 | $this->cancelJob($document['_id']); |
|
| 365 | |||
| 366 | 2 | return $this->addJobOnce($class, $data, $options); |
|
| 367 | } |
||
| 368 | |||
| 369 | 2 | return new Process($document, $this, $this->events); |
|
| 370 | } |
||
| 371 | |||
| 372 | 5 | $this->logger->debug('queue job ['.$result->getUpsertedId().'] added to ['.$class.']', [ |
|
| 373 | 5 | 'category' => get_class($this), |
|
| 374 | 5 | 'params' => $options, |
|
| 375 | 5 | 'data' => $data, |
|
| 376 | ]); |
||
| 377 | |||
| 378 | 5 | $this->db->{$this->event_queue}->insertOne([ |
|
| 379 | 5 | 'job' => $result->getUpsertedId(), |
|
| 380 | 'status' => JobInterface::STATUS_WAITING, |
||
| 381 | 5 | 'timestamp' => new UTCDateTime(), |
|
| 382 | ]); |
||
| 383 | |||
| 384 | 5 | $document = $this->db->{$this->job_queue}->findOne(['_id' => $result->getUpsertedId()], [ |
|
| 385 | 5 | 'typeMap' => self::TYPE_MAP, |
|
| 386 | ]); |
||
| 387 | |||
| 388 | 5 | return new Process($document, $this, $this->events); |
|
| 389 | } |
||
| 390 | |||
| 391 | |||
| 392 | /** |
||
| 393 | * Wait for job beeing executed. |
||
| 394 | * |
||
| 395 | * @param Process[] $stack |
||
| 396 | */ |
||
| 397 | public function waitFor(array $stack, int $options=0): Scheduler |
||
| 398 | { |
||
| 399 | $jobs = array_map(function($job) { |
||
| 400 | if(!($job instanceof Process)) { |
||
| 401 | throw new InvalidArgumentException('waitFor() requires a stack of Process[]'); |
||
| 402 | } |
||
| 403 | |||
| 404 | return $job->getId(); |
||
| 405 | }, $stack); |
||
| 406 | |||
| 407 | $cursor = $this->events->getCursor([ |
||
| 408 | 'job' => ['$in' => $jobs], |
||
| 409 | 'status' => ['$gte' => JobInterface::STATUS_DONE], |
||
| 410 | ]); |
||
| 411 | |||
| 412 | $expected = count($stack); |
||
| 413 | $done = 0; |
||
| 414 | |||
| 415 | while (true) { |
||
| 416 | if (null === $cursor->current()) { |
||
| 417 | if ($cursor->getInnerIterator()->isDead()) { |
||
|
0 ignored issues
–
show
Bug
introduced
by
Loading history...
|
|||
| 418 | $this->events->create(); |
||
| 419 | |||
| 420 | return $this->waitFor($stack, $options); |
||
| 421 | } |
||
| 422 | |||
| 423 | $this->events->next($cursor, function () use($stack, $options) { |
||
| 424 | $this->waitFor($stack, $options); |
||
| 425 | }); |
||
| 426 | |||
| 427 | continue; |
||
| 428 | } |
||
| 429 | |||
| 430 | $event = $cursor->current(); |
||
| 431 | $this->events->next($cursor, function () use($stack, $options) { |
||
| 432 | $this->waitFor($stack, $options); |
||
| 433 | }); |
||
| 434 | |||
| 435 | if (JobInterface::STATUS_FAILED === $event['status'] && isset($event['exception']) && $options & self::OPTION_THROW_EXCEPTION) { |
||
| 436 | throw new $event['exception']['class']( |
||
| 437 | $event['exception']['message'], |
||
| 438 | $event['exception']['code'] |
||
| 439 | ); |
||
| 440 | } |
||
| 441 | |||
| 442 | $done++; |
||
| 443 | |||
| 444 | if($done >= $expected) { |
||
| 445 | return $this; |
||
| 446 | } |
||
| 447 | } |
||
|
0 ignored issues
–
show
In this branch, the function will implicitly return
null which is incompatible with the type-hinted return TaskScheduler\Scheduler. Consider adding a return statement or allowing null as return value.
For hinted functions/methods where all return statements with the correct type are only reachable via conditions, ?null? gets implicitly returned which may be incompatible with the hinted type. Let?s take a look at an example: interface ReturnsInt {
public function returnsIntHinted(): int;
}
class MyClass implements ReturnsInt {
public function returnsIntHinted(): int
{
if (foo()) {
return 123;
}
// here: null is implicitly returned
}
}
Loading history...
|
|||
| 448 | } |
||
| 449 | |||
| 450 | /** |
||
| 451 | * Listen for events. |
||
| 452 | */ |
||
| 453 | 1 | public function listen(Closure $callback, array $query = []): self |
|
| 454 | { |
||
| 455 | 1 | if (0 === count($query)) { |
|
| 456 | $query = [ |
||
| 457 | 'timestamp' => ['$gte' => new UTCDateTime()], |
||
| 458 | ]; |
||
| 459 | } |
||
| 460 | |||
| 461 | 1 | $cursor = $this->events->getCursor($query); |
|
| 462 | |||
| 463 | 1 | while (true) { |
|
| 464 | 1 | if (null === $cursor->current()) { |
|
| 465 | if ($cursor->getInnerIterator()->isDead()) { |
||
| 466 | $this->logger->error('events queue cursor is dead, is it a capped collection?', [ |
||
| 467 | 'category' => get_class($this), |
||
| 468 | ]); |
||
| 469 | |||
| 470 | $this->events->create(); |
||
| 471 | |||
| 472 | return $this->listen($callback, $query); |
||
| 473 | } |
||
| 474 | |||
| 475 | $this->events->next($cursor, function () use ($callback, $query) { |
||
| 476 | return $this->listen($callback, $query); |
||
| 477 | }); |
||
| 478 | |||
| 479 | continue; |
||
| 480 | } |
||
| 481 | |||
| 482 | 1 | $result = $cursor->current(); |
|
| 483 | 1 | $this->events->next($cursor, function () use ($callback, $query) { |
|
| 484 | $this->listen($callback, $query); |
||
| 485 | 1 | }); |
|
| 486 | |||
| 487 | 1 | $process = new Process($result, $this, $this->events); |
|
| 488 | 1 | if (true === $callback($process)) { |
|
| 489 | 1 | return $this; |
|
| 490 | } |
||
| 491 | } |
||
|
0 ignored issues
–
show
In this branch, the function will implicitly return
null which is incompatible with the type-hinted return TaskScheduler\Scheduler. Consider adding a return statement or allowing null as return value.
For hinted functions/methods where all return statements with the correct type are only reachable via conditions, ?null? gets implicitly returned which may be incompatible with the hinted type. Let?s take a look at an example: interface ReturnsInt {
public function returnsIntHinted(): int;
}
class MyClass implements ReturnsInt {
public function returnsIntHinted(): int
{
if (foo()) {
return 123;
}
// here: null is implicitly returned
}
}
Loading history...
|
|||
| 492 | } |
||
| 493 | |||
| 494 | /** |
||
| 495 | * Prepare insert. |
||
| 496 | */ |
||
| 497 | 50 | protected function prepareInsert(string $class, $data, array &$options = []): array |
|
| 498 | { |
||
| 499 | $defaults = [ |
||
| 500 | 50 | self::OPTION_AT => $this->default_at, |
|
| 501 | 50 | self::OPTION_INTERVAL => $this->default_interval, |
|
| 502 | 50 | self::OPTION_RETRY => $this->default_retry, |
|
| 503 | 50 | self::OPTION_RETRY_INTERVAL => $this->default_retry_interval, |
|
| 504 | 50 | self::OPTION_FORCE_SPAWN => false, |
|
| 505 | 50 | self::OPTION_TIMEOUT => $this->default_timeout, |
|
| 506 | 50 | self::OPTION_IGNORE_DATA => false, |
|
| 507 | ]; |
||
| 508 | |||
| 509 | 50 | $options = array_merge($defaults, $options); |
|
| 510 | 50 | $options = SchedulerValidator::validateOptions($options); |
|
| 511 | |||
| 512 | $document = [ |
||
| 513 | 46 | 'class' => $class, |
|
| 514 | 'status' => JobInterface::STATUS_WAITING, |
||
| 515 | 46 | 'created' => new UTCDateTime(), |
|
| 516 | 46 | 'started' => new UTCDateTime(), |
|
| 517 | 46 | 'ended' => new UTCDateTime(), |
|
| 518 | 46 | 'worker' => new ObjectId(), |
|
| 519 | 46 | 'data' => $data, |
|
| 520 | ]; |
||
| 521 | |||
| 522 | 46 | if (isset($options[self::OPTION_ID])) { |
|
| 523 | 2 | $id = $options[self::OPTION_ID]; |
|
| 524 | 2 | unset($options[self::OPTION_ID]); |
|
| 525 | 2 | $document['_id'] = $id; |
|
| 526 | } |
||
| 527 | |||
| 528 | 46 | $document['options'] = $options; |
|
| 529 | |||
| 530 | 46 | return $document; |
|
| 531 | } |
||
| 532 | |||
| 533 | /** |
||
| 534 | * Update job status. |
||
| 535 | */ |
||
| 536 | 5 | protected function updateJob(ObjectId $id, int $status): UpdateResult |
|
| 537 | { |
||
| 538 | 5 | $result = $this->db->{$this->job_queue}->updateMany([ |
|
| 539 | 5 | '_id' => $id, |
|
| 540 | '$isolated' => true, |
||
| 541 | ], [ |
||
| 542 | '$set' => [ |
||
| 543 | 5 | 'status' => $status, |
|
| 544 | ], |
||
| 545 | ]); |
||
| 546 | |||
| 547 | 5 | return $result; |
|
| 548 | } |
||
| 549 | } |
||
| 550 |