Completed
Branch dev (11269e)
by Raffael
04:12
created

Scheduler::addJobOnce()   A

Complexity

Conditions 4
Paths 4

Size

Total Lines 35
Code Lines 20

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 17
CRAP Score 4

Importance

Changes 0
Metric Value
eloc 20
dl 0
loc 35
c 0
b 0
f 0
ccs 17
cts 17
cp 1
rs 9.6
cc 4
nc 4
nop 3
crap 4
1
<?php
2
3
declare(strict_types=1);
4
5
/**
6
 * TaskScheduler
7
 *
8
 * @author      Raffael Sahli <[email protected]>
9
 * @copyright   Copryright (c) 2017-2018 gyselroth GmbH (https://gyselroth.com)
10
 * @license     MIT https://opensource.org/licenses/MIT
11
 */
12
13
namespace TaskScheduler;
14
15
use InvalidArgumentException;
16
use MongoDB\BSON\ObjectId;
17
use MongoDB\BSON\UTCDateTime;
18
use MongoDB\Database;
19
use MongoDB\UpdateResult;
20
use Psr\Log\LoggerInterface;
21
use Traversable;
22
23
class Scheduler
24
{
25
    /**
26
     * Job options.
27
     */
28
    public const OPTION_AT = 'at';
29
    public const OPTION_INTERVAL = 'interval';
30
    public const OPTION_RETRY = 'retry';
31
    public const OPTION_RETRY_INTERVAL = 'retry_interval';
32
    public const OPTION_IGNORE_MAX_CHILDREN = 'ignore_max_children';
33
    public const OPTION_DEFAULT_AT = 'default_at';
34
    public const OPTION_DEFAULT_INTERVAL = 'default_interval';
35
    public const OPTION_DEFAULT_RETRY = 'default_retry';
36
    public const OPTION_DEFAULT_RETRY_INTERVAL = 'default_retry_interval';
37
    public const OPTION_COLLECTION_NAME = 'collection_name';
38
    public const OPTION_QUEUE_SIZE = 'queue_size';
39
40
    /**
41
     * MongoDB type map.
42
     */
43
    public const TYPE_MAP = [
44
        'document' => 'array',
45
        'root' => 'array',
46
        'array' => 'array',
47
    ];
48
49
    /**
50
     * Database.
51
     *
52
     * @var Database
53
     */
54
    protected $db;
55
56
    /**
57
     * LoggerInterface.
58
     *
59
     * @var LoggerInterface
60
     */
61
    protected $logger;
62
63
    /**
64
     * Collection name.
65
     *
66
     * @var string
67
     */
68
    protected $collection_name = 'queue';
69
70
    /**
71
     * Default at (Secconds from now).
72
     *
73
     * @var int
74
     */
75
    protected $default_at = 0;
76
77
    /**
78
     * Default interval (secconds).
79
     *
80
     * @var int
81
     */
82
    protected $default_interval = -1;
83
84
    /**
85
     * Default retry.
86
     *
87
     * @var int
88
     */
89
    protected $default_retry = -1;
90
91
    /**
92
     * Default retry interval (secconds).
93
     *
94
     * @var int
95
     */
96
    protected $default_retry_interval = 300;
97
98
    /**
99
     * Queue size.
100
     *
101
     * @var int
102
     */
103
    protected $queue_size = 100000;
104
105
    /**
106
     * Init queue.
107
     *
108
     * @param Database        $db
109
     * @param LoggerInterface $logger
110
     * @param array           $config
111
     */
112 53
    public function __construct(Database $db, LoggerInterface $logger, array $config = [])
113
    {
114 53
        $this->db = $db;
115 53
        $this->logger = $logger;
116 53
        $this->setOptions($config);
117 53
    }
118
119
    /**
120
     * Set options.
121
     *
122
     * @param array $config
123
     *
124
     * @return Scheduler
125
     */
126 53
    public function setOptions(array $config = []): self
127
    {
128 53
        foreach ($config as $option => $value) {
129
            switch ($option) {
130 1
                case self::OPTION_COLLECTION_NAME:
131 1
                    $this->{$option} = (string) $value;
132
133 1
                break;
134 1
                case self::OPTION_DEFAULT_AT:
135 1
                case self::OPTION_DEFAULT_RETRY_INTERVAL:
136 1
                case self::OPTION_DEFAULT_INTERVAL:
137 1
                case self::OPTION_DEFAULT_RETRY:
138 1
                case self::OPTION_QUEUE_SIZE:
139 1
                    $this->{$option} = (int) $value;
140
141 1
                break;
142
                default:
143 1
                    throw new InvalidArgumentException('invalid option '.$option.' given');
144
            }
145
        }
146
147 53
        return $this;
148
    }
149
150
    /**
151
     * Get Queue size.
152
     *
153
     * @return int
154
     */
155 2
    public function getQueueSize(): int
156
    {
157 2
        return $this->queue_size;
158
    }
159
160
    /**
161
     * Get collection name.
162
     *
163
     * @return string
164
     */
165 38
    public function getCollection(): string
166
    {
167 38
        return $this->collection_name;
168
    }
169
170
    /**
171
     * Get job by ID.
172
     *
173
     * @param ObjectId
174
     *
175
     * @return array
176
     */
177 27
    public function getJob(ObjectId $id): array
178
    {
179 27
        $result = $this->db->{$this->collection_name}->findOne([
180 27
            '_id' => $id,
181
        ], [
182 27
            'typeMap' => self::TYPE_MAP,
183
        ]);
184
185 27
        if (null === $result) {
186 1
            throw new Exception\JobNotFound('job '.$id.' was not found');
187
        }
188
189 26
        return $result;
190
    }
191
192
    /**
193
     * Cancel job.
194
     *
195
     * @param ObjectId $id
196
     *
197
     * @return bool
198
     */
199 3
    public function cancelJob(ObjectId $id): bool
200
    {
201 3
        $result = $this->updateJob($id, JobInterface::STATUS_CANCELED);
202
203 3
        if (1 !== $result->getModifiedCount()) {
204 1
            throw new Exception\JobNotFound('job '.$id.' was not found');
205
        }
206
207 2
        return true;
208
    }
209
210
    /**
211
     * Get jobs (Pass a filter which contains job status, by default all active jobs get returned).
212
     *
213
     * @param array $filter
214
     *
215
     * @return Traversable
216
     */
217 4
    public function getJobs(array $filter = []): Traversable
218
    {
219 4
        if (0 === count($filter)) {
220
            $filter = [
221 3
                JobInterface::STATUS_WAITING,
222
                JobInterface::STATUS_PROCESSING,
223
                JobInterface::STATUS_POSTPONED,
224
            ];
225
        }
226
227 4
        $result = $this->db->{$this->collection_name}->find([
228
            'status' => [
229 4
                '$in' => $filter,
230
            ],
231
        ], [
232 4
            'typeMap' => self::TYPE_MAP,
233
        ]);
234
235 4
        return $result;
236
    }
237
238
    /**
239
     * Add job to queue.
240
     *
241
     * @param string $class
242
     * @param mixed  $data
243
     * @param array  $options
244
     *
245
     * @return ObjectId
246
     */
247 34
    public function addJob(string $class, $data, array $options = []): ObjectId
248
    {
249
        $defaults = [
250 34
            self::OPTION_AT => $this->default_at,
251 34
            self::OPTION_INTERVAL => $this->default_interval,
252 34
            self::OPTION_RETRY => $this->default_retry,
253 34
            self::OPTION_RETRY_INTERVAL => $this->default_retry_interval,
254 34
            self::OPTION_IGNORE_MAX_CHILDREN => false,
255
        ];
256
257 34
        $options = array_merge($defaults, $options);
258 34
        $this->validateOptions($options);
259 32
        $at = null;
260
261 32
        if ($options[self::OPTION_AT] > 0) {
262 7
            $at = new UTCDateTime($options[self::OPTION_AT] * 1000);
263
        }
264
265 32
        $result = $this->db->{$this->collection_name}->insertOne([
266 32
            'class' => $class,
267
            'status' => JobInterface::STATUS_WAITING,
268 32
            'created' => new UTCDateTime(),
269 32
            'started' => new UTCDateTime(0),
270 32
            'ended' => new UTCDateTime(0),
271 32
            'at' => $at,
272 32
            'retry' => $options[self::OPTION_RETRY],
273 32
            'retry_interval' => $options[self::OPTION_RETRY_INTERVAL],
274 32
            'interval' => $options[self::OPTION_INTERVAL],
275 32
            'ignore_max_children' => $options[self::OPTION_IGNORE_MAX_CHILDREN],
276 32
            'data' => $data,
277 32
        ], ['$isolated' => true]);
278
279 32
        $this->logger->debug('queue job ['.$result->getInsertedId().'] added to ['.$class.']', [
280 32
            'category' => get_class($this),
281 32
            'params' => $options,
282 32
            'data' => $data,
283
        ]);
284
285 32
        return $result->getInsertedId();
286
    }
287
288
    /**
289
     * Only add job if not in queue yet.
290
     *
291
     * @param string $class
292
     * @param mixed  $data
293
     * @param array  $options
294
     *
295
     * @return ObjectId
296
     */
297 2
    public function addJobOnce(string $class, $data, array $options = []): ObjectId
298
    {
299
        $filter = [
300 2
            'class' => $class,
301 2
            'data' => $data,
302
            '$or' => [
303
                ['status' => JobInterface::STATUS_WAITING],
304
                ['status' => JobInterface::STATUS_POSTPONED],
305
            ],
306
        ];
307
308 2
        $result = $this->db->{$this->collection_name}->findOne($filter, [
309 2
            'typeMap' => self::TYPE_MAP,
310
        ]);
311
312 2
        if (null !== $result && array_intersect_key($result, $options) !== $options) {
313 1
            $this->logger->debug('job ['.$result['_id'].'] options changed, reschedule new job', [
314 1
                'category' => get_class($this),
315 1
                'data' => $data,
316
            ]);
317
318 1
            $this->cancelJob($result['_id']);
319 1
            $result = null;
320
        }
321
322 2
        if (null === $result) {
323 2
            return $this->addJob($class, $data, $options);
324
        }
325
326 1
        $this->logger->debug('queue job ['.$result['_id'].'] of type ['.$class.'] already exists', [
327 1
                'category' => get_class($this),
328 1
                'data' => $data,
329
            ]);
330
331 1
        return $result['_id'];
332
    }
333
334
    /**
335
     * Validate given job options.
336
     *
337
     * @param array $options
338
     *
339
     * @return Scheduler
340
     */
341 34
    protected function validateOptions(array $options): self
342
    {
343 34
        foreach ($options as $option => $value) {
344
            switch ($option) {
345 34
                case self::OPTION_AT:
346 34
                case self::OPTION_INTERVAL:
347 34
                case self::OPTION_RETRY:
348 33
                case self::OPTION_RETRY_INTERVAL:
349 34
                    if (!is_int($value)) {
350 1
                        throw new InvalidArgumentException('option '.$option.' must be an integer');
351
                    }
352
353 34
                break;
354 33
                case self::OPTION_IGNORE_MAX_CHILDREN:
355 33
                    if (!is_bool($value)) {
356
                        throw new InvalidArgumentException('option '.$option.' must be a boolean');
357
                    }
358
359 33
                break;
360
                default:
361 34
                    throw new InvalidArgumentException('invalid option '.$option.' given');
362
            }
363
        }
364
365 32
        return $this;
366
    }
367
368
    /**
369
     * Update job status.
370
     *
371
     * @param ObjectId $id
372
     * @param int      $status
373
     *
374
     * @return UpdateResult
375
     */
376 3
    protected function updateJob(ObjectId $id, int $status): UpdateResult
377
    {
378 3
        $result = $this->db->{$this->collection_name}->updateMany([
379 3
            '_id' => $id,
380
            '$isolated' => true,
381
        ], [
382
            '$set' => [
383 3
                'status' => $status,
384
            ],
385
        ]);
386
387 3
        return $result;
388
    }
389
}
390