Passed
Push — master ( ecf44b...de2af5 )
by Raffael
02:46
created

Scheduler   A

Complexity

Total Complexity 25

Size/Duplication

Total Lines 325
Duplicated Lines 0 %

Test Coverage

Coverage 100%

Importance

Changes 0
Metric Value
wmc 25
dl 0
loc 325
ccs 87
cts 87
cp 1
rs 10
c 0
b 0
f 0

11 Methods

Rating   Name   Duplication   Size   Complexity  
C setOptions() 0 22 8
A __construct() 0 7 2
A getJob() 0 17 2
A getCollection() 0 3 1
B addJob() 0 37 2
A addJobOnce() 0 22 2
A cancelJob() 0 3 1
A updateJob() 0 12 1
A validateOptions() 0 11 3
A getJobs() 0 23 2
A getQueueSize() 0 3 1
1
<?php
2
3
declare(strict_types=1);
4
5
/**
6
 * TaskScheduler
7
 *
8
 * @author      Raffael Sahli <[email protected]>
9
 * @copyright   Copryright (c) 2017-2018 gyselroth GmbH (https://gyselroth.com)
10
 * @license     MIT https://opensource.org/licenses/MIT
11
 */
12
13
namespace TaskScheduler;
14
15
use InvalidArgumentException;
16
use MongoDB\BSON\ObjectId;
17
use MongoDB\BSON\UTCDateTime;
18
use MongoDB\Database;
19
use Psr\Log\LoggerInterface;
20
use Traversable;
21
22
class Scheduler
23
{
24
    /**
25
     * Job options.
26
     */
27
    const OPTION_AT = 'at';
28
    const OPTION_INTERVAL = 'interval';
29
    const OPTION_RETRY = 'retry';
30
    const OPTION_RETRY_INTERVAL = 'retry_interval';
31
32
    /**
33
     * Database.
34
     *
35
     * @var Database
36
     */
37
    protected $db;
38
39
    /**
40
     * LoggerInterface.
41
     *
42
     * @var LoggerInterface
43
     */
44
    protected $logger;
45
46
    /**
47
     * Collection name.
48
     *
49
     * @var string
50
     */
51
    protected $collection_name = 'queue';
52
53
    /**
54
     * Default at (Secconds from now).
55
     *
56
     * @var int
57
     */
58
    protected $default_at = 0;
59
60
    /**
61
     * Default interval (secconds).
62
     *
63
     * @var int
64
     */
65
    protected $default_interval = -1;
66
67
    /**
68
     * Default retry.
69
     *
70
     * @var int
71
     */
72
    protected $default_retry = -1;
73
74
    /**
75
     * Default retry interval (secconds).
76
     *
77
     * @var int
78
     */
79
    protected $default_retry_interval = 300;
80
81
    /**
82
     * Queue size.
83
     *
84
     * @var int
85
     */
86
    protected $queue_size = 100000;
87
88
    /**
89
     * Init queue.
90
     *
91
     * @param Database        $db
92
     * @param LoggerInterface $logger
93
     * @param iterable        $config
94
     */
95 37
    public function __construct(Database $db, LoggerInterface $logger, ?Iterable $config = null)
96
    {
97 37
        $this->db = $db;
98 37
        $this->logger = $logger;
99
100 37
        if (null !== $config) {
101 1
            $this->setOptions($config);
102
        }
103 37
    }
104
105
    /**
106
     * Set options.
107
     *
108
     * @param iterable $config
109
     *
110
     * @return Async
0 ignored issues
show
Bug introduced by
The type TaskScheduler\Async was not found. Maybe you did not declare it correctly or list all dependencies?

The issue could also be caused by a filter entry in the build configuration. If the path has been excluded in your configuration, e.g. excluded_paths: ["lib/*"], you can move it to the dependency path list as follows:

filter:
    dependency_paths: ["lib/*"]

For further information see https://scrutinizer-ci.com/docs/tools/php/php-scrutinizer/#list-dependency-paths

Loading history...
111
     */
112 1
    public function setOptions(Iterable $config = []): self
113
    {
114 1
        foreach ($config as $option => $value) {
115
            switch ($option) {
116 1
                case 'collection_name':
117 1
                    $this->{$option} = (string) $value;
118
119 1
                break;
120 1
                case 'default_retry':
121 1
                case 'default_at':
122 1
                case 'default_retry_interval':
123 1
                case 'default_interval':
124 1
                case 'queue_size':
125 1
                    $this->{$option} = (int) $value;
126
127 1
                break;
128
                default:
129 1
                    throw new InvalidArgumentException('invalid option '.$option.' given');
130
            }
131
        }
132
133 1
        return $this;
0 ignored issues
show
Bug Best Practice introduced by
The expression return $this returns the type TaskScheduler\Scheduler which is incompatible with the documented return type TaskScheduler\Async.
Loading history...
134
    }
135
136
    /**
137
     * Get Queue size.
138
     *
139
     * @return int
140
     */
141 2
    public function getQueueSize(): int
142
    {
143 2
        return $this->queue_size;
144
    }
145
146
    /**
147
     * Get collection name.
148
     *
149
     * @return string
150
     */
151 24
    public function getCollection(): string
152
    {
153 24
        return $this->collection_name;
154
    }
155
156
    /**
157
     * Get job by ID.
158
     *
159
     * @param ObjectId
160
     *
161
     * @return array
162
     */
163 25
    public function getJob(ObjectId $id): array
164
    {
165 25
        $result = $this->db->{$this->collection_name}->findOne([
166 25
            '_id' => $id,
167
        ], [
168 25
            'typeMap' => [
169
                'document' => 'array',
170
                'root' => 'array',
171
                'array' => 'array',
172
            ],
173
        ]);
174
175 25
        if (null === $result) {
176 1
            throw new Exception\JobNotFound('job '.$id.' was not found');
177
        }
178
179 24
        return $result;
180
    }
181
182
    /**
183
     * Cancel job.
184
     *
185
     * @param ObjectId $id
186
     *
187
     * @return bool
188
     */
189 1
    public function cancelJob(ObjectId $id): bool
190
    {
191 1
        return $this->updateJob($id, Queue::STATUS_CANCELED);
192
    }
193
194
    /**
195
     * Get jobs (Pass a filter which contains job status, by default all active jobs get returned).
196
     *
197
     * @param array $filter
198
     *
199
     * @return Traversable
200
     */
201 3
    public function getJobs(array $filter = []): Traversable
202
    {
203 3
        if (0 === count($filter)) {
204
            $filter = [
205 2
                Queue::STATUS_WAITING,
206
                Queue::STATUS_PROCESSING,
207
                Queue::STATUS_POSTPONED,
208
            ];
209
        }
210
211 3
        $result = $this->db->{$this->collection_name}->find([
212
            'status' => [
213 3
                '$in' => $filter,
214
            ],
215
        ], [
216 3
            'typeMap' => [
217
                'document' => 'array',
218
                'root' => 'array',
219
                'array' => 'array',
220
            ],
221
        ]);
222
223 3
        return $result;
224
    }
225
226
    /**
227
     * Add job to queue.
228
     *
229
     * @param string $class
230
     * @param mixed  $data
231
     * @param array  $options
232
     *
233
     * @return ObjectId
234
     */
235 32
    public function addJob(string $class, $data, array $options = []): ObjectId
236
    {
237
        $defaults = [
238 32
            self::OPTION_AT => $this->default_at,
239 32
            self::OPTION_INTERVAL => $this->default_interval,
240 32
            self::OPTION_RETRY => $this->default_retry,
241 32
            self::OPTION_RETRY_INTERVAL => $this->default_retry_interval,
242
        ];
243
244 32
        $options = array_merge($defaults, $options);
245 32
        $this->validateOptions($options);
246 30
        $at = null;
247
248 30
        if ($options[self::OPTION_AT] > 0) {
249 7
            $at = new UTCDateTime($options[self::OPTION_AT] * 1000);
250
        }
251
252 30
        $result = $this->db->{$this->collection_name}->insertOne([
253 30
            'class' => $class,
254
            'status' => Queue::STATUS_WAITING,
255 30
            'created' => new UTCDateTime(),
256 30
            'started' => new UTCDateTime(0),
257 30
            'ended' => new UTCDateTime(0),
258 30
            'at' => $at,
259 30
            'retry' => $options[self::OPTION_RETRY],
260 30
            'retry_interval' => $options[self::OPTION_RETRY_INTERVAL],
261 30
            'interval' => $options[self::OPTION_INTERVAL],
262 30
            'data' => $data,
263 30
        ], ['$isolated' => true]);
264
265 30
        $this->logger->debug('queue job ['.$result->getInsertedId().'] added to ['.$class.']', [
266 30
            'category' => get_class($this),
267 30
            'params' => $options,
268 30
            'data' => $data,
269
        ]);
270
271 30
        return $result->getInsertedId();
272
    }
273
274
    /**
275
     * Only add job if not in queue yet.
276
     *
277
     * @param string $class
278
     * @param mixed  $data
279
     * @param array  $options
280
     *
281
     * @return ObjectId
282
     */
283 1
    public function addJobOnce(string $class, $data, array $options = []): ObjectId
284
    {
285
        $filter = [
286 1
            'class' => $class,
287 1
            'data' => $data,
288
            '$or' => [
289
                ['status' => Queue::STATUS_WAITING],
290
                ['status' => Queue::STATUS_POSTPONED],
291
            ],
292
        ];
293
294 1
        $result = $this->db->queue->findOne($filter);
295
296 1
        if (null === $result) {
297 1
            return $this->addJob($class, $data, $options);
298
        }
299 1
        $this->logger->debug('queue job ['.$result['_id'].'] of type ['.$class.'] already exists', [
300 1
                'category' => get_class($this),
301 1
                'data' => $data,
302
            ]);
303
304 1
        return $result['_id'];
305
    }
306
307
    /**
308
     * Validate given job options.
309
     *
310
     * @param array $options
311
     *
312
     * @return Async
313
     */
314 32
    protected function validateOptions(array $options): self
315
    {
316 32
        if (count($options) > 4) {
317 1
            throw new InvalidArgumentException('invalid option given');
318
        }
319
320 31
        if (4 !== count(array_filter($options, 'is_int'))) {
321 1
            throw new InvalidArgumentException('Only integers are allowed to passed');
322
        }
323
324 30
        return $this;
0 ignored issues
show
Bug Best Practice introduced by
The expression return $this returns the type TaskScheduler\Scheduler which is incompatible with the documented return type TaskScheduler\Async.
Loading history...
325
    }
326
327
    /**
328
     * Update job status.
329
     *
330
     * @param ObjectId $id
331
     * @param int      $status
332
     *
333
     * @return bool
334
     */
335 1
    protected function updateJob(ObjectId $id, int $status): bool
336
    {
337 1
        $result = $this->db->{$this->collection_name}->updateMany([
338 1
            '_id' => $id,
339
            '$isolated' => true,
340
        ], [
341
            '$set' => [
342 1
                'status' => $status,
343
            ],
344
        ]);
345
346 1
        return $result->isAcknowledged();
347
    }
348
}
349