Passed
Push — master ( b0131a...01bb58 )
by Raffael
03:28
created

Scheduler::addJob()   B

Complexity

Conditions 2
Paths 2

Size

Total Lines 35
Code Lines 25

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 24
CRAP Score 2

Importance

Changes 0
Metric Value
dl 0
loc 35
c 0
b 0
f 0
ccs 24
cts 24
cp 1
rs 8.8571
cc 2
eloc 25
nc 2
nop 3
crap 2
1
<?php
2
3
declare(strict_types=1);
4
5
/**
6
 * TaskScheduler
7
 *
8
 * @author      Raffael Sahli <[email protected]>
9
 * @copyright   Copryright (c) 2017-2018 gyselroth GmbH (https://gyselroth.com)
10
 * @license     MIT https://opensource.org/licenses/MIT
11
 */
12
13
namespace TaskScheduler;
14
15
use InvalidArgumentException;
16
use MongoDB\BSON\ObjectId;
17
use MongoDB\BSON\UTCDateTime;
18
use MongoDB\Database;
19
use Psr\Log\LoggerInterface;
20
use Traversable;
21
22
class Scheduler
23
{
24
    /**
25
     * Job options.
26
     */
27
    const OPTION_AT = 'at';
28
    const OPTION_INTERVAL = 'interval';
29
    const OPTION_RETRY = 'retry';
30
    const OPTION_RETRY_INTERVAL = 'retry_interval';
31
32
    /**
33
     * Database.
34
     *
35
     * @var Database
36
     */
37
    protected $db;
38
39
    /**
40
     * LoggerInterface.
41
     *
42
     * @var LoggerInterface
43
     */
44
    protected $logger;
45
46
    /**
47
     * Collection name.
48
     *
49
     * @var string
50
     */
51
    protected $collection_name = 'queue';
52
53
    /**
54
     * Default at (Secconds from now).
55
     *
56
     * @var int
57
     */
58
    protected $default_at = 0;
59
60
    /**
61
     * Default interval (secconds).
62
     *
63
     * @var int
64
     */
65
    protected $default_interval = -1;
66
67
    /**
68
     * Default retry.
69
     *
70
     * @var int
71
     */
72
    protected $default_retry = 0;
73
74
    /**
75
     * Default retry interval (secconds).
76
     *
77
     * @var int
78
     */
79
    protected $default_retry_interval = 300;
80
81
    /**
82
     * Queue size.
83
     *
84
     * @var int
85
     */
86
    protected $queue_size = 100000;
87
88
    /**
89
     * Init queue.
90
     *
91
     * @param Database        $db
92
     * @param LoggerInterface $logger
93
     * @param iterable        $config
94
     */
95 32
    public function __construct(Database $db, LoggerInterface $logger, ?Iterable $config = null)
96
    {
97 32
        $this->db = $db;
98 32
        $this->logger = $logger;
99
100 32
        if (null !== $config) {
101 1
            $this->setOptions($config);
102
        }
103 32
    }
104
105
    /**
106
     * Set options.
107
     *
108
     * @param iterable $config
109
     *
110
     * @return Async
0 ignored issues
show
Bug introduced by
The type TaskScheduler\Async was not found. Maybe you did not declare it correctly or list all dependencies?

The issue could also be caused by a filter entry in the build configuration. If the path has been excluded in your configuration, e.g. excluded_paths: ["lib/*"], you can move it to the dependency path list as follows:

filter:
    dependency_paths: ["lib/*"]

For further information see https://scrutinizer-ci.com/docs/tools/php/php-scrutinizer/#list-dependency-paths

Loading history...
111
     */
112 1
    public function setOptions(Iterable $config = []): self
113
    {
114 1
        foreach ($config as $option => $value) {
115
            switch ($option) {
116 1
                case 'collection_name':
117 1
                    $this->{$option} = (string) $value;
118
119 1
                break;
120 1
                case 'default_retry':
121 1
                case 'default_at':
122 1
                case 'default_retry_interval':
123 1
                case 'default_interval':
124 1
                case 'queue_size':
125 1
                    $this->{$option} = (int) $value;
126
127 1
                break;
128
                default:
129 1
                    throw new InvalidArgumentException('invalid option '.$option.' given');
130
            }
131
        }
132
133 1
        return $this;
0 ignored issues
show
Bug Best Practice introduced by
The expression return $this returns the type TaskScheduler\Scheduler which is incompatible with the documented return type TaskScheduler\Async.
Loading history...
134
    }
135
136
    /**
137
     * Get collection name.
138
     *
139
     * @return string
140
     */
141 19
    public function getCollection(): string
142
    {
143 19
        return $this->collection_name;
144
    }
145
146
    /**
147
     * Create queue collection.
148
     *
149
     * @return Async
150
     */
151 1
    public function createQueue(): self
152
    {
153 1
        $this->db->createCollection(
154 1
            $this->collection_name,
155
            [
156 1
                'capped' => true,
157 1
                'size' => $this->queue_size,
158
            ]
159
        );
160
161 1
        return $this;
0 ignored issues
show
Bug Best Practice introduced by
The expression return $this returns the type TaskScheduler\Scheduler which is incompatible with the documented return type TaskScheduler\Async.
Loading history...
162
    }
163
164
    /**
165
     * Get job by ID.
166
     *
167
     * @param ObjectId
168
     *
169
     * @return array
170
     */
171 23
    public function getJob(ObjectId $id): array
172
    {
173 23
        $result = $this->db->{$this->collection_name}->findOne([
174 23
            '_id' => $id,
175
        ], [
176 23
            'typeMap' => [
177
                'document' => 'array',
178
                'root' => 'array',
179
                'array' => 'array',
180
            ],
181
        ]);
182
183 23
        if (null === $result) {
184 1
            throw new Exception\JobNotFound('job '.$id.' was not found');
185
        }
186
187 22
        return $result;
188
    }
189
190
    /**
191
     * Cancel job.
192
     *
193
     * @param ObjectId $id
194
     *
195
     * @return bool
196
     */
197 1
    public function cancelJob(ObjectId $id): bool
198
    {
199 1
        return $this->updateJob($id, Queue::STATUS_CANCELED);
200
    }
201
202
    /**
203
     * Get jobs (Pass a filter which contains job status, by default all active jobs get returned).
204
     *
205
     * @param array $filter
206
     *
207
     * @return Traversable
208
     */
209 3
    public function getJobs(array $filter = []): Traversable
210
    {
211 3
        if (0 === count($filter)) {
212
            $filter = [
213 2
                Queue::STATUS_WAITING,
214
                Queue::STATUS_PROCESSING,
215
                Queue::STATUS_POSTPONED,
216
            ];
217
        }
218
219 3
        $result = $this->db->{$this->collection_name}->find([
220
            'status' => [
221 3
                '$in' => $filter,
222
            ],
223
        ], [
224 3
            'typeMap' => [
225
                'document' => 'array',
226
                'root' => 'array',
227
                'array' => 'array',
228
            ],
229
        ]);
230
231 3
        return $result;
232
    }
233
234
    /**
235
     * Add job to queue.
236
     *
237
     * @param string $class
238
     * @param mixed  $data
239
     * @param array  $options
240
     *
241
     * @return ObjectId
242
     */
243 30
    public function addJob(string $class, $data, array $options = []): ObjectId
244
    {
245
        $defaults = [
246 30
            self::OPTION_AT => $this->default_at,
247 30
            self::OPTION_INTERVAL => $this->default_interval,
248 30
            self::OPTION_RETRY => $this->default_retry,
249 30
            self::OPTION_RETRY_INTERVAL => $this->default_retry_interval,
250
        ];
251
252 30
        $options = array_merge($defaults, $options);
253 30
        $this->validateOptions($options);
254 28
        $at = null;
255
256 28
        if ($options[self::OPTION_AT] > 0) {
257 7
            $at = new UTCDateTime($options[self::OPTION_AT] * 1000);
258
        }
259
260 28
        $result = $this->db->{$this->collection_name}->insertOne([
261 28
            'class' => $class,
262
            'status' => Queue::STATUS_WAITING,
263 28
            'timestamp' => new UTCDateTime(),
264 28
            'at' => $at,
265 28
            'retry' => $options[self::OPTION_RETRY],
266 28
            'retry_interval' => $options[self::OPTION_RETRY_INTERVAL],
267 28
            'interval' => $options[self::OPTION_INTERVAL],
268 28
            'data' => $data,
269 28
        ], ['$isolated' => true]);
270
271 28
        $this->logger->debug('queue job ['.$result->getInsertedId().'] added to ['.$class.']', [
272 28
            'category' => get_class($this),
273 28
            'params' => $options,
274 28
            'data' => $data,
275
        ]);
276
277 28
        return $result->getInsertedId();
278
    }
279
280
    /**
281
     * Only add job if not in queue yet.
282
     *
283
     * @param string $class
284
     * @param mixed  $data
285
     * @param array  $options
286
     *
287
     * @return ObjectId
288
     */
289 1
    public function addJobOnce(string $class, $data, array $options = []): ObjectId
290
    {
291
        $filter = [
292 1
            'class' => $class,
293 1
            'data' => $data,
294
            '$or' => [
295
                ['status' => Queue::STATUS_WAITING],
296
                ['status' => Queue::STATUS_POSTPONED],
297
            ],
298
        ];
299
300 1
        $result = $this->db->queue->findOne($filter);
301
302 1
        if (null === $result) {
303 1
            return $this->addJob($class, $data, $options);
304
        }
305 1
        $this->logger->debug('queue job ['.$result['_id'].'] of type ['.$class.'] already exists', [
306 1
                'category' => get_class($this),
307 1
                'data' => $data,
308
            ]);
309
310 1
        return $result['_id'];
311
    }
312
313
    /**
314
     * Validate given job options.
315
     *
316
     * @param array $options
317
     *
318
     * @return Async
319
     */
320 30
    protected function validateOptions(array $options): self
321
    {
322 30
        if (count($options) > 4) {
323 1
            throw new InvalidArgumentException('invalid option given');
324
        }
325
326 29
        if (4 !== count(array_filter($options, 'is_int'))) {
327 1
            throw new InvalidArgumentException('Only integers are allowed to passed');
328
        }
329
330 28
        return $this;
0 ignored issues
show
Bug Best Practice introduced by
The expression return $this returns the type TaskScheduler\Scheduler which is incompatible with the documented return type TaskScheduler\Async.
Loading history...
331
    }
332
333
    /**
334
     * Update job status.
335
     *
336
     * @param ObjectId $id
337
     * @param int      $status
338
     *
339
     * @return bool
340
     */
341 1
    protected function updateJob(ObjectId $id, int $status): bool
342
    {
343 1
        $result = $this->db->{$this->collection_name}->updateMany([
344 1
            '_id' => $id,
345
            '$isolated' => true,
346
        ], [
347
            '$set' => [
348 1
                'status' => $status,
349 1
                'timestamp' => new UTCDateTime(),
350
            ],
351
        ]);
352
353 1
        return $result->isAcknowledged();
354
    }
355
}
356