1 | <?php |
||
2 | |||
3 | declare(strict_types=1); |
||
4 | |||
5 | /** |
||
6 | * TaskScheduler |
||
7 | * |
||
8 | * @author Raffael Sahli <[email protected]> |
||
9 | * @copyright Copryright (c) 2017-2019 gyselroth GmbH (https://gyselroth.com) |
||
10 | * @license MIT https://opensource.org/licenses/MIT |
||
11 | */ |
||
12 | |||
13 | namespace TaskScheduler; |
||
14 | |||
15 | use Closure; |
||
16 | use Generator; |
||
17 | use MongoDB\BSON\ObjectId; |
||
18 | use MongoDB\BSON\UTCDateTime; |
||
19 | use MongoDB\Database; |
||
20 | use MongoDB\UpdateResult; |
||
21 | use Psr\Log\LoggerInterface; |
||
22 | use TaskScheduler\Exception\InvalidArgumentException; |
||
23 | use TaskScheduler\Exception\JobNotFoundException; |
||
24 | |||
25 | class Scheduler |
||
26 | { |
||
27 | /** |
||
28 | * Job options. |
||
29 | */ |
||
30 | public const OPTION_AT = 'at'; |
||
31 | public const OPTION_INTERVAL = 'interval'; |
||
32 | public const OPTION_RETRY = 'retry'; |
||
33 | public const OPTION_RETRY_INTERVAL = 'retry_interval'; |
||
34 | public const OPTION_FORCE_SPAWN = 'force_spawn'; |
||
35 | public const OPTION_TIMEOUT = 'timeout'; |
||
36 | public const OPTION_ID = 'id'; |
||
37 | public const OPTION_IGNORE_DATA = 'ignore_data'; |
||
38 | |||
39 | /** |
||
40 | * Operation options: |
||
41 | */ |
||
42 | public const OPTION_THROW_EXCEPTION = 1; |
||
43 | |||
44 | /** |
||
45 | * Default job options. |
||
46 | */ |
||
47 | public const OPTION_DEFAULT_AT = 'default_at'; |
||
48 | public const OPTION_DEFAULT_INTERVAL = 'default_interval'; |
||
49 | public const OPTION_DEFAULT_RETRY = 'default_retry'; |
||
50 | public const OPTION_DEFAULT_RETRY_INTERVAL = 'default_retry_interval'; |
||
51 | public const OPTION_DEFAULT_TIMEOUT = 'default_timeout'; |
||
52 | |||
53 | /** |
||
54 | * Queue options. |
||
55 | */ |
||
56 | public const OPTION_JOB_QUEUE = 'job_queue'; |
||
57 | public const OPTION_JOB_QUEUE_SIZE = 'job_queue_size'; |
||
58 | public const OPTION_EVENT_QUEUE = 'event_queue'; |
||
59 | public const OPTION_EVENT_QUEUE_SIZE = 'event_queue_size'; |
||
60 | |||
61 | /** |
||
62 | * MongoDB type map. |
||
63 | */ |
||
64 | public const TYPE_MAP = [ |
||
65 | 'document' => 'array', |
||
66 | 'root' => 'array', |
||
67 | 'array' => 'array', |
||
68 | ]; |
||
69 | |||
70 | /** |
||
71 | * Database. |
||
72 | * |
||
73 | * @var Database |
||
74 | */ |
||
75 | protected $db; |
||
76 | |||
77 | /** |
||
78 | * LoggerInterface. |
||
79 | * |
||
80 | * @var LoggerInterface |
||
81 | */ |
||
82 | protected $logger; |
||
83 | |||
84 | /** |
||
85 | * Job Collection name. |
||
86 | * |
||
87 | * @var string |
||
88 | */ |
||
89 | protected $job_queue = 'taskscheduler.jobs'; |
||
90 | |||
91 | /** |
||
92 | * Event Collection name. |
||
93 | * |
||
94 | * @var string |
||
95 | */ |
||
96 | protected $event_queue = 'taskscheduler.events'; |
||
97 | |||
98 | /** |
||
99 | * Unix time. |
||
100 | * |
||
101 | * @var int |
||
102 | */ |
||
103 | protected $default_at = 0; |
||
104 | |||
105 | /** |
||
106 | * Default interval (secconds). |
||
107 | * |
||
108 | * @var int |
||
109 | */ |
||
110 | protected $default_interval = 0; |
||
111 | |||
112 | /** |
||
113 | * Default retry. |
||
114 | * |
||
115 | * @var int |
||
116 | */ |
||
117 | protected $default_retry = 0; |
||
118 | |||
119 | /** |
||
120 | * Default retry interval (secconds). |
||
121 | * |
||
122 | * @var int |
||
123 | */ |
||
124 | protected $default_retry_interval = 300; |
||
125 | |||
126 | /** |
||
127 | * Default timeout. |
||
128 | * |
||
129 | * @var int |
||
130 | */ |
||
131 | protected $default_timeout = 0; |
||
132 | |||
133 | /** |
||
134 | * Job Queue size. |
||
135 | * |
||
136 | * @var int |
||
137 | */ |
||
138 | protected $job_queue_size = 1000000; |
||
139 | |||
140 | /** |
||
141 | * Event Queue size. |
||
142 | * |
||
143 | * @var int |
||
144 | */ |
||
145 | protected $event_queue_size = 5000000; |
||
146 | |||
147 | /** |
||
148 | * Events queue. |
||
149 | * |
||
150 | * @var MessageQueue |
||
151 | */ |
||
152 | protected $events; |
||
153 | |||
154 | /** |
||
155 | * Init queue. |
||
156 | */ |
||
157 | 58 | public function __construct(Database $db, LoggerInterface $logger, array $config = []) |
|
158 | { |
||
159 | 58 | $this->db = $db; |
|
160 | 58 | $this->logger = $logger; |
|
161 | 58 | $this->setOptions($config); |
|
162 | 58 | $this->events = new MessageQueue($db, $this->getEventQueue(), $this->getEventQueueSize(), $logger); |
|
163 | 58 | } |
|
164 | |||
165 | /** |
||
166 | * Set options. |
||
167 | */ |
||
168 | 58 | public function setOptions(array $config = []): self |
|
169 | { |
||
170 | 58 | foreach ($config as $option => $value) { |
|
171 | switch ($option) { |
||
172 | 4 | case self::OPTION_JOB_QUEUE: |
|
173 | 4 | case self::OPTION_EVENT_QUEUE: |
|
174 | 2 | $this->{$option} = (string) $value; |
|
175 | |||
176 | 2 | break; |
|
177 | 4 | case self::OPTION_DEFAULT_AT: |
|
178 | 4 | case self::OPTION_DEFAULT_RETRY_INTERVAL: |
|
179 | 4 | case self::OPTION_DEFAULT_INTERVAL: |
|
180 | 4 | case self::OPTION_DEFAULT_RETRY: |
|
181 | 3 | case self::OPTION_DEFAULT_TIMEOUT: |
|
182 | 3 | case self::OPTION_JOB_QUEUE_SIZE: |
|
183 | 3 | case self::OPTION_EVENT_QUEUE_SIZE: |
|
184 | 3 | $this->{$option} = (int) $value; |
|
185 | |||
186 | 3 | break; |
|
187 | default: |
||
188 | 4 | throw new InvalidArgumentException('invalid option '.$option.' given'); |
|
189 | } |
||
190 | } |
||
191 | |||
192 | 58 | return $this; |
|
193 | } |
||
194 | |||
195 | /** |
||
196 | * Get job Queue size. |
||
197 | */ |
||
198 | 31 | public function getJobQueueSize(): int |
|
199 | { |
||
200 | 31 | return $this->job_queue_size; |
|
201 | } |
||
202 | |||
203 | /** |
||
204 | * Get event Queue size. |
||
205 | */ |
||
206 | 58 | public function getEventQueueSize(): int |
|
207 | { |
||
208 | 58 | return $this->event_queue_size; |
|
209 | } |
||
210 | |||
211 | /** |
||
212 | * Get job collection name. |
||
213 | */ |
||
214 | 31 | public function getJobQueue(): string |
|
215 | { |
||
216 | 31 | return $this->job_queue; |
|
217 | } |
||
218 | |||
219 | /** |
||
220 | * Get event collection name. |
||
221 | */ |
||
222 | 58 | public function getEventQueue(): string |
|
223 | { |
||
224 | 58 | return $this->event_queue; |
|
225 | } |
||
226 | |||
227 | /** |
||
228 | * Get job by Id. |
||
229 | */ |
||
230 | 27 | public function getJob(ObjectId $id): Process |
|
231 | { |
||
232 | 27 | $result = $this->db->{$this->job_queue}->findOne([ |
|
233 | 27 | '_id' => $id, |
|
234 | ], [ |
||
235 | 27 | 'typeMap' => self::TYPE_MAP, |
|
236 | ]); |
||
237 | |||
238 | 27 | if (null === $result) { |
|
239 | 2 | throw new JobNotFoundException('job '.$id.' was not found'); |
|
240 | } |
||
241 | |||
242 | 25 | return new Process($result, $this, $this->events); |
|
243 | } |
||
244 | |||
245 | /** |
||
246 | * Cancel job. |
||
247 | */ |
||
248 | 5 | public function cancelJob(ObjectId $id): bool |
|
249 | { |
||
250 | 5 | $result = $this->updateJob($id, JobInterface::STATUS_CANCELED); |
|
251 | |||
252 | 5 | if (1 !== $result->getMatchedCount()) { |
|
253 | 1 | throw new JobNotFoundException('job '.$id.' was not found'); |
|
254 | } |
||
255 | |||
256 | 4 | $this->db->{$this->event_queue}->insertOne([ |
|
257 | 4 | 'job' => $id, |
|
258 | 'status' => JobInterface::STATUS_CANCELED, |
||
259 | 4 | 'timestamp' => new UTCDateTime(), |
|
260 | ]); |
||
261 | |||
262 | 4 | return true; |
|
263 | } |
||
264 | |||
265 | /** |
||
266 | * Flush. |
||
267 | */ |
||
268 | 1 | public function flush(): Scheduler |
|
269 | { |
||
270 | 1 | $this->db->{$this->job_queue}->drop(); |
|
271 | 1 | $this->db->{$this->event_queue}->drop(); |
|
272 | |||
273 | 1 | return $this; |
|
274 | } |
||
275 | |||
276 | /** |
||
277 | * Get jobs (Pass a filter which contains job status, by default all active jobs get returned). |
||
278 | */ |
||
279 | 13 | public function getJobs(array $query = []): Generator |
|
280 | { |
||
281 | 13 | if (0 === count($query)) { |
|
282 | 12 | $query = ['status' => ['$in' => [ |
|
283 | JobInterface::STATUS_WAITING, |
||
284 | JobInterface::STATUS_PROCESSING, |
||
285 | JobInterface::STATUS_POSTPONED, |
||
286 | ]]]; |
||
287 | } |
||
288 | |||
289 | 13 | $result = $this->db->{$this->job_queue}->find($query, [ |
|
290 | 13 | 'typeMap' => self::TYPE_MAP, |
|
291 | ]); |
||
292 | |||
293 | 13 | foreach ($result as $job) { |
|
294 | 12 | yield new Process($job, $this, $this->events); |
|
295 | } |
||
296 | 13 | } |
|
297 | |||
298 | /** |
||
299 | * Add job to queue. |
||
300 | */ |
||
301 | 45 | public function addJob(string $class, $data, array $options = []): Process |
|
302 | { |
||
303 | 45 | $document = $this->prepareInsert($class, $data, $options); |
|
304 | |||
305 | 41 | $result = $this->db->{$this->job_queue}->insertOne($document); |
|
306 | 41 | $this->logger->debug('queue job ['.$result->getInsertedId().'] added to ['.$class.']', [ |
|
307 | 41 | 'category' => get_class($this), |
|
308 | 41 | 'params' => $options, |
|
309 | 41 | 'data' => $data, |
|
310 | ]); |
||
311 | |||
312 | 41 | $this->db->{$this->event_queue}->insertOne([ |
|
313 | 41 | 'job' => $result->getInsertedId(), |
|
314 | 'status' => JobInterface::STATUS_WAITING, |
||
315 | 41 | 'timestamp' => new UTCDateTime(), |
|
316 | ]); |
||
317 | |||
318 | 41 | $document = $this->db->{$this->job_queue}->findOne(['_id' => $result->getInsertedId()], [ |
|
319 | 41 | 'typeMap' => self::TYPE_MAP, |
|
320 | ]); |
||
321 | |||
322 | 41 | $process = new Process($document, $this, $this->events); |
|
323 | |||
324 | 41 | return $process; |
|
325 | } |
||
326 | |||
327 | /** |
||
328 | * Only add job if not in queue yet. |
||
329 | */ |
||
330 | 5 | public function addJobOnce(string $class, $data, array $options = []): Process |
|
331 | { |
||
332 | $filter = [ |
||
333 | 5 | 'class' => $class, |
|
334 | '$or' => [ |
||
335 | ['status' => JobInterface::STATUS_WAITING], |
||
336 | ['status' => JobInterface::STATUS_POSTPONED], |
||
337 | ['status' => JobInterface::STATUS_PROCESSING], |
||
338 | ], |
||
339 | ]; |
||
340 | |||
341 | 5 | $requested = $options; |
|
342 | 5 | $document = $this->prepareInsert($class, $data, $options); |
|
343 | |||
344 | 5 | if (true !== $options[self::OPTION_IGNORE_DATA]) { |
|
345 | 4 | $filter = ['data' => $data] + $filter; |
|
346 | } |
||
347 | |||
348 | 5 | $result = $this->db->{$this->job_queue}->updateOne($filter, ['$setOnInsert' => $document], [ |
|
349 | 5 | 'upsert' => true, |
|
350 | '$isolated' => true, |
||
351 | ]); |
||
352 | |||
353 | 5 | if ($result->getMatchedCount() > 0) { |
|
354 | 4 | $document = $this->db->{$this->job_queue}->findOne($filter, [ |
|
355 | 4 | 'typeMap' => self::TYPE_MAP, |
|
356 | ]); |
||
357 | |||
358 | 4 | if (array_intersect_key($document['options'], $requested) !== $requested || ($data !== $document['data'] && true === $options[self::OPTION_IGNORE_DATA])) { |
|
359 | 2 | $this->logger->debug('job ['.$document['_id'].'] options/data changed, reschedule new job', [ |
|
360 | 2 | 'category' => get_class($this), |
|
361 | 2 | 'data' => $data, |
|
362 | ]); |
||
363 | |||
364 | 2 | $this->cancelJob($document['_id']); |
|
365 | |||
366 | 2 | return $this->addJobOnce($class, $data, $options); |
|
367 | } |
||
368 | |||
369 | 2 | return new Process($document, $this, $this->events); |
|
370 | } |
||
371 | |||
372 | 5 | $this->logger->debug('queue job ['.$result->getUpsertedId().'] added to ['.$class.']', [ |
|
373 | 5 | 'category' => get_class($this), |
|
374 | 5 | 'params' => $options, |
|
375 | 5 | 'data' => $data, |
|
376 | ]); |
||
377 | |||
378 | 5 | $this->db->{$this->event_queue}->insertOne([ |
|
379 | 5 | 'job' => $result->getUpsertedId(), |
|
380 | 'status' => JobInterface::STATUS_WAITING, |
||
381 | 5 | 'timestamp' => new UTCDateTime(), |
|
382 | ]); |
||
383 | |||
384 | 5 | $document = $this->db->{$this->job_queue}->findOne(['_id' => $result->getUpsertedId()], [ |
|
385 | 5 | 'typeMap' => self::TYPE_MAP, |
|
386 | ]); |
||
387 | |||
388 | 5 | return new Process($document, $this, $this->events); |
|
389 | } |
||
390 | |||
391 | |||
392 | /** |
||
393 | * Wait for job beeing executed. |
||
394 | * |
||
395 | * @param Process[] $stack |
||
396 | */ |
||
397 | public function waitFor(array $stack, int $options=0): Scheduler |
||
398 | { |
||
399 | $jobs = array_map(function($job) { |
||
400 | if(!($job instanceof Process)) { |
||
401 | throw new InvalidArgumentException('waitFor() requires a stack of Process[]'); |
||
402 | } |
||
403 | |||
404 | return $job->getId(); |
||
405 | }, $stack); |
||
406 | |||
407 | $cursor = $this->events->getCursor([ |
||
408 | 'job' => ['$in' => $jobs], |
||
409 | 'status' => ['$gte' => JobInterface::STATUS_DONE], |
||
410 | ]); |
||
411 | |||
412 | $expected = count($stack); |
||
413 | $done = 0; |
||
414 | |||
415 | while (true) { |
||
416 | if (null === $cursor->current()) { |
||
417 | if ($cursor->getInnerIterator()->isDead()) { |
||
0 ignored issues
–
show
Bug
introduced
by
![]() |
|||
418 | $this->events->create(); |
||
419 | |||
420 | return $this->waitFor($stack, $options); |
||
421 | } |
||
422 | |||
423 | $this->events->next($cursor, function () use($stack, $options) { |
||
424 | $this->waitFor($stack, $options); |
||
425 | }); |
||
426 | |||
427 | continue; |
||
428 | } |
||
429 | |||
430 | $event = $cursor->current(); |
||
431 | $this->events->next($cursor, function () use($stack, $options) { |
||
432 | $this->waitFor($stack, $options); |
||
433 | }); |
||
434 | |||
435 | if (JobInterface::STATUS_FAILED === $event['status'] && isset($event['exception']) && $options & self::OPTION_THROW_EXCEPTION) { |
||
436 | throw new $event['exception']['class']( |
||
437 | $event['exception']['message'], |
||
438 | $event['exception']['code'] |
||
439 | ); |
||
440 | } |
||
441 | |||
442 | $done++; |
||
443 | |||
444 | if($done >= $expected) { |
||
445 | return $this; |
||
446 | } |
||
447 | } |
||
0 ignored issues
–
show
In this branch, the function will implicitly return
null which is incompatible with the type-hinted return TaskScheduler\Scheduler . Consider adding a return statement or allowing null as return value.
For hinted functions/methods where all return statements with the correct type are only reachable via conditions, ?null? gets implicitly returned which may be incompatible with the hinted type. Let?s take a look at an example: interface ReturnsInt {
public function returnsIntHinted(): int;
}
class MyClass implements ReturnsInt {
public function returnsIntHinted(): int
{
if (foo()) {
return 123;
}
// here: null is implicitly returned
}
}
![]() |
|||
448 | } |
||
449 | |||
450 | /** |
||
451 | * Listen for events. |
||
452 | */ |
||
453 | 1 | public function listen(Closure $callback, array $query = []): self |
|
454 | { |
||
455 | 1 | if (0 === count($query)) { |
|
456 | $query = [ |
||
457 | 'timestamp' => ['$gte' => new UTCDateTime()], |
||
458 | ]; |
||
459 | } |
||
460 | |||
461 | 1 | $cursor = $this->events->getCursor($query); |
|
462 | |||
463 | 1 | while (true) { |
|
464 | 1 | if (null === $cursor->current()) { |
|
465 | if ($cursor->getInnerIterator()->isDead()) { |
||
466 | $this->logger->error('events queue cursor is dead, is it a capped collection?', [ |
||
467 | 'category' => get_class($this), |
||
468 | ]); |
||
469 | |||
470 | $this->events->create(); |
||
471 | |||
472 | return $this->listen($callback, $query); |
||
473 | } |
||
474 | |||
475 | $this->events->next($cursor, function () use ($callback, $query) { |
||
476 | return $this->listen($callback, $query); |
||
477 | }); |
||
478 | |||
479 | continue; |
||
480 | } |
||
481 | |||
482 | 1 | $result = $cursor->current(); |
|
483 | 1 | $this->events->next($cursor, function () use ($callback, $query) { |
|
484 | $this->listen($callback, $query); |
||
485 | 1 | }); |
|
486 | |||
487 | 1 | $process = new Process($result, $this, $this->events); |
|
488 | 1 | if (true === $callback($process)) { |
|
489 | 1 | return $this; |
|
490 | } |
||
491 | } |
||
0 ignored issues
–
show
In this branch, the function will implicitly return
null which is incompatible with the type-hinted return TaskScheduler\Scheduler . Consider adding a return statement or allowing null as return value.
For hinted functions/methods where all return statements with the correct type are only reachable via conditions, ?null? gets implicitly returned which may be incompatible with the hinted type. Let?s take a look at an example: interface ReturnsInt {
public function returnsIntHinted(): int;
}
class MyClass implements ReturnsInt {
public function returnsIntHinted(): int
{
if (foo()) {
return 123;
}
// here: null is implicitly returned
}
}
![]() |
|||
492 | } |
||
493 | |||
494 | /** |
||
495 | * Prepare insert. |
||
496 | */ |
||
497 | 50 | protected function prepareInsert(string $class, $data, array &$options = []): array |
|
498 | { |
||
499 | $defaults = [ |
||
500 | 50 | self::OPTION_AT => $this->default_at, |
|
501 | 50 | self::OPTION_INTERVAL => $this->default_interval, |
|
502 | 50 | self::OPTION_RETRY => $this->default_retry, |
|
503 | 50 | self::OPTION_RETRY_INTERVAL => $this->default_retry_interval, |
|
504 | 50 | self::OPTION_FORCE_SPAWN => false, |
|
505 | 50 | self::OPTION_TIMEOUT => $this->default_timeout, |
|
506 | 50 | self::OPTION_IGNORE_DATA => false, |
|
507 | ]; |
||
508 | |||
509 | 50 | $options = array_merge($defaults, $options); |
|
510 | 50 | $options = SchedulerValidator::validateOptions($options); |
|
511 | |||
512 | $document = [ |
||
513 | 46 | 'class' => $class, |
|
514 | 'status' => JobInterface::STATUS_WAITING, |
||
515 | 46 | 'created' => new UTCDateTime(), |
|
516 | 46 | 'started' => new UTCDateTime(), |
|
517 | 46 | 'ended' => new UTCDateTime(), |
|
518 | 46 | 'worker' => new ObjectId(), |
|
519 | 46 | 'data' => $data, |
|
520 | ]; |
||
521 | |||
522 | 46 | if (isset($options[self::OPTION_ID])) { |
|
523 | 2 | $id = $options[self::OPTION_ID]; |
|
524 | 2 | unset($options[self::OPTION_ID]); |
|
525 | 2 | $document['_id'] = $id; |
|
526 | } |
||
527 | |||
528 | 46 | $document['options'] = $options; |
|
529 | |||
530 | 46 | return $document; |
|
531 | } |
||
532 | |||
533 | /** |
||
534 | * Update job status. |
||
535 | */ |
||
536 | 5 | protected function updateJob(ObjectId $id, int $status): UpdateResult |
|
537 | { |
||
538 | 5 | $result = $this->db->{$this->job_queue}->updateMany([ |
|
539 | 5 | '_id' => $id, |
|
540 | '$isolated' => true, |
||
541 | ], [ |
||
542 | '$set' => [ |
||
543 | 5 | 'status' => $status, |
|
544 | ], |
||
545 | ]); |
||
546 | |||
547 | 5 | return $result; |
|
548 | } |
||
549 | } |
||
550 |