Completed
Pull Request — master (#3)
by Alexey
54:06 queued 15:35
created

MongoQueue   A

Complexity

Total Complexity 32

Size/Duplication

Total Lines 436
Duplicated Lines 0 %

Coupling/Cohesion

Components 1
Dependencies 7

Importance

Changes 0
Metric Value
wmc 32
lcom 1
cbo 7
dl 0
loc 436
rs 9.6
c 0
b 0
f 0

24 Methods

Rating   Name   Duplication   Size   Complexity  
A __construct() 0 15 1
A push() 0 4 1
A pop() 0 10 2
A exists() 0 7 1
A pushRaw() 0 4 1
A later() 0 4 1
A bulk() 0 12 1
A release() 0 4 1
A getJobById() 0 10 2
A deleteReserved() 0 15 2
A getExpire() 0 4 1
A setExpire() 0 4 1
A size() 0 8 2
A canRunJob() 0 7 2
A markJobAsReserved() 0 13 1
A pushToDatabase() 0 6 1
A getAvailableAt() 0 6 2
A getQueue() 0 4 2
A getNextAvailableJob() 0 15 2
A buildDatabaseRecord() 0 12 1
A isAvailable() 0 7 1
A isReservedButExpired() 0 6 1
A getCollection() 0 4 1
A buildJob() 0 12 1
1
<?php
2
3
namespace SfCod\QueueBundle\Queue;
4
5
use DateInterval;
6
use DateTime;
7
use MongoDB\Collection;
8
use SfCod\QueueBundle\Base\JobResolverInterface;
9
use SfCod\QueueBundle\Base\MongoDriverInterface;
10
use SfCod\QueueBundle\Entity\Job;
11
use SfCod\QueueBundle\Job\JobContract;
12
use SfCod\QueueBundle\Job\JobContractInterface;
13
14
/**
15
 * Class MongoQueue
16
 *
17
 * @author Alexey Orlov <[email protected]>
18
 *
19
 * @package yiiSfCod\jobqueue\queues
20
 */
21
class MongoQueue extends Queue
22
{
23
    /**
24
     * Job resolver
25
     *
26
     * @var JobResolverInterface
27
     */
28
    protected $resolver;
29
30
    /**
31
     * The mongo connection instance.
32
     *
33
     * @var MongoDriverInterface
34
     */
35
    protected $mongo;
36
37
    /**
38
     * The mongo collection that holds the jobs.
39
     *
40
     * @var string
41
     */
42
    protected $collection;
43
44
    /**
45
     * The name of the default queue.
46
     *
47
     * @var string
48
     */
49
    protected $queue = 'default';
50
51
    /**
52
     * The expiration time of a job.
53
     *
54
     * @var int|null
55
     */
56
    protected $expire = 60;
57
58
    /**
59
     * @var int
60
     */
61
    protected $limit = 15;
62
63
    /**
64
     * Create a new mongo queue instance.
65
     *
66
     * @param MongoDriverInterface $mongo
67
     * @param string $collection
68
     * @param string $queue
69
     * @param int $expire
70
     * @param int $limit
71
     */
72
    public function __construct(
73
        JobResolverInterface $resolver,
74
        MongoDriverInterface $mongo,
75
        string $collection,
76
        string $queue = 'default',
77
        int $expire = 60,
78
        int $limit = 15
79
    ) {
80
        $this->resolver = $resolver;
81
        $this->mongo = $mongo;
82
        $this->collection = $collection;
83
        $this->expire = $expire;
84
        $this->queue = $queue;
85
        $this->limit = $limit;
86
    }
87
88
    /**
89
     * Push a new job onto the queue.
90
     *
91
     * @param string $job
92
     * @param mixed $data
93
     * @param string $queue
94
     *
95
     * @return mixed
96
     */
97
    public function push(string $job, array $data = [], ?string $queue = null)
98
    {
99
        return $this->pushToDatabase(0, $queue, $this->createPayload($job, $data));
100
    }
101
102
    /**
103
     * Pop the next job off of the queue.
104
     *
105
     * @param string $queue
106
     *
107
     * @return null|JobContractInterface
108
     */
109
    public function pop(?string $queue = null): ?JobContractInterface
110
    {
111
        $queue = $this->getQueue($queue);
112
113
        if ($job = $this->getNextAvailableJob($queue)) {
114
            return $job;
115
        }
116
117
        return null;
118
    }
119
120
    /**
121
     * Check if job exists in the queue.
122
     *
123
     * @param string $job
124
     * @param array $data
125
     * @param string|null $queue
126
     *
127
     * @return bool
128
     */
129
    public function exists(string $job, array $data = [], ?string $queue = null): bool
130
    {
131
        return null !== $this->getCollection()->findOne([
132
                'queue' => $this->getQueue($queue),
133
                'payload' => $this->createPayload($job, $data),
134
            ]);
135
    }
136
137
    /**
138
     * Push a raw payload onto the queue.
139
     *
140
     * @param string $payload
141
     * @param string|null $queue
142
     * @param array $options
143
     *
144
     * @return mixed
145
     */
146
    public function pushRaw(string $payload, ?string $queue = null, array $options = [])
147
    {
148
        return $this->pushToDatabase(0, $queue, $payload);
149
    }
150
151
    /**
152
     * Push a new job onto the queue after a delay.
153
     *
154
     * @param DateInterval|int $delay
155
     * @param string $job
156
     * @param array $data
157
     * @param string $queue
158
     *
159
     * @return mixed
160
     */
161
    public function later($delay, string $job, array $data = [], ?string $queue = null)
162
    {
163
        return $this->pushToDatabase($delay, $queue, $this->createPayload($job, $data));
164
    }
165
166
    /**
167
     * Push an array of jobs onto the queue.
168
     *
169
     * @param array $jobs
170
     * @param mixed $data
171
     * @param string $queue
172
     *
173
     * @return mixed
174
     */
175
    public function bulk(array $jobs, array $data = [], ?string $queue = null)
176
    {
177
        $queue = $this->getQueue($queue);
178
179
        $availableAt = $this->getAvailableAt(0);
180
181
        $records = array_map(function ($job) use ($queue, $data, $availableAt) {
182
            return $this->buildDatabaseRecord($queue, $this->createPayload($job, $data), $availableAt);
183
        }, $jobs);
184
185
        return $this->getCollection()->insertMany($records);
186
    }
187
188
    /**
189
     * Release a reserved job back onto the queue.
190
     *
191
     * @param Job $job
192
     * @param DateInterval|int $delay
193
     *
194
     * @return mixed
195
     */
196
    public function release(Job $job, $delay)
197
    {
198
        return $this->pushToDatabase($delay, $job->getQueue(), json_encode($job->getPayload()), $job->getAttempts());
199
    }
200
201
    /**
202
     * Get the next available job for the queue.
203
     *
204
     * @param $id
205
     *
206
     * @return null|JobContractInterface
207
     */
208
    public function getJobById($id): ?JobContractInterface
209
    {
210
        $job = $this->getCollection()->findOne(['_id' => new \MongoDB\BSON\ObjectID($id)]);
211
212
        if (is_null($job)) {
213
            return null;
214
        } else {
215
            return new JobContract($this->resolver, $this, $this->buildJob($job));
216
        }
217
    }
218
219
    /**
220
     * Delete a reserved job from the queue.
221
     *
222
     * @param string $queue
223
     * @param string $id
224
     *
225
     * @return bool
226
     */
227
    public function deleteReserved(string $queue, $id): bool
228
    {
229
        $query = [
230
            '_id' => new \MongoDB\BSON\ObjectID($id),
231
            'queue' => $queue,
232
        ];
233
234
        $result = $this->getCollection()->deleteOne($query);
235
236
        if ($result instanceof DeleteResult) {
0 ignored issues
show
Bug introduced by
The class SfCod\QueueBundle\Queue\DeleteResult does not exist. Did you forget a USE statement, or did you not list all dependencies?

This error could be the result of:

1. Missing dependencies

PHP Analyzer uses your composer.json file (if available) to determine the dependencies of your project and to determine all the available classes and functions. It expects the composer.json to be in the root folder of your repository.

Are you sure this class is defined by one of your dependencies, or did you maybe not list a dependency in either the require or require-dev section?

2. Missing use statement

PHP does not complain about undefined classes in ìnstanceof checks. For example, the following PHP code will work perfectly fine:

if ($x instanceof DoesNotExist) {
    // Do something.
}

If you have not tested against this specific condition, such errors might go unnoticed.

Loading history...
237
            return (bool)$result->getDeletedCount();
238
        }
239
240
        return true;
241
    }
242
243
    /**
244
     * Get the expiration time in seconds.
245
     *
246
     * @return int|null
247
     */
248
    public function getExpire()
249
    {
250
        return $this->expire;
251
    }
252
253
    /**
254
     * Set the expiration time in seconds.
255
     *
256
     * @param int $seconds
257
     */
258
    public function setExpire(int $seconds)
259
    {
260
        $this->expire = $seconds;
261
    }
262
263
    /**
264
     * Get the size of the queue.
265
     *
266
     * @param string $queue
267
     *
268
     * @return int
269
     */
270
    public function size(?string $queue = null): int
271
    {
272
        if ($queue) {
0 ignored issues
show
Bug Best Practice introduced by
The expression $queue of type string|null is loosely compared to true; this is ambiguous if the string can be empty. You might want to explicitly use !== null instead.

In PHP, under loose comparison (like ==, or !=, or switch conditions), values of different types might be equal.

For string values, the empty string '' is a special case, in particular the following results might be unexpected:

''   == false // true
''   == null  // true
'ab' == false // false
'ab' == null  // false

// It is often better to use strict comparison
'' === false // false
'' === null  // false
Loading history...
273
            return $this->getCollection()->count(['queue' => $queue]);
274
        }
275
276
        return $this->getCollection()->count();
277
    }
278
279
    /**
280
     * Check if can run process depend on limits
281
     *
282
     * @param JobContractInterface $job
283
     *
284
     * @return bool
285
     */
286
    public function canRunJob(JobContractInterface $job): bool
287
    {
288
        return $this->getCollection()->count([
289
                'reserved' => 1,
290
                'queue' => $job->getQueue(),
291
            ]) < $this->limit || $job->reserved();
292
    }
293
294
    /**
295
     * Mark the given job ID as reserved.
296
     *
297
     * @param JobContractInterface $job
298
     */
299
    public function markJobAsReserved(JobContractInterface $job)
300
    {
301
        $attempts = $job->attempts() + 1;
302
        $reserved_at = $this->currentTime();
303
304
        $this->getCollection()->updateOne(['_id' => new \MongoDB\BSON\ObjectID($job->getJobId())], [
305
            '$set' => [
306
                'attempts' => $attempts,
307
                'reserved' => 1,
308
                'reserved_at' => $reserved_at,
309
            ],
310
        ]);
311
    }
312
313
    /**
314
     * Push a raw payload to the mongo with a given delay.
315
     *
316
     * @param DateInterval|int $delay
317
     * @param string|null $queue
318
     * @param string $payload
319
     * @param int $attempts
320
     *
321
     * @return mixed
322
     */
323
    protected function pushToDatabase($delay, $queue, $payload, $attempts = 0)
324
    {
325
        $attributes = $this->buildDatabaseRecord($this->getQueue($queue), $payload, $this->getAvailableAt($delay), $attempts);
0 ignored issues
show
Bug introduced by
It seems like $delay defined by parameter $delay on line 323 can also be of type object<DateInterval>; however, SfCod\QueueBundle\Queue\...Queue::getAvailableAt() does only seem to accept integer, maybe add an additional type check?

This check looks at variables that have been passed in as parameters and are passed out again to other methods.

If the outgoing method call has stricter type requirements than the method itself, an issue is raised.

An additional type check may prevent trouble.

Loading history...
326
327
        return $this->getCollection()->insertOne($attributes);
328
    }
329
330
    /**
331
     * Get the "available at" UNIX timestamp.
332
     *
333
     * @param DateInterval|int $delay
334
     *
335
     * @return int
336
     */
337
    protected function getAvailableAt($delay = 0)
338
    {
339
        return $delay instanceof DateInterval
340
            ? (new DateTime())->add($delay)->getTimestamp()
341
            : $this->currentTime() + $delay;
342
    }
343
344
    /**
345
     * Get the queue or return the default.
346
     *
347
     * @param string|null $queue
348
     *
349
     * @return string
350
     */
351
    protected function getQueue($queue)
352
    {
353
        return $queue ?: $this->queue;
354
    }
355
356
    /**
357
     * Get the next available job for the queue.
358
     *
359
     * @param string|null $queue
360
     *
361
     * @return null|JobContractInterface
362
     */
363
    protected function getNextAvailableJob($queue)
364
    {
365
        $job = $this->getCollection()
366
            ->findOne([
367
                'queue' => $this->getQueue($queue),
368
                '$or' => [
369
                    $this->isAvailable(),
370
                    $this->isReservedButExpired(),
371
                ],
372
            ], [
373
                'sort' => ['_id' => 1],
374
            ]);
375
376
        return $job ? new JobContract($this->resolver, $this, $this->buildJob($job)) : null;
377
    }
378
379
    /**
380
     * Create an array to insert for the given job.
381
     *
382
     * @param string|null $queue
383
     * @param string $payload
384
     * @param int $availableAt
385
     * @param int $attempts
386
     *
387
     * @return array
388
     */
389
    protected function buildDatabaseRecord($queue, $payload, $availableAt, $attempts = 0)
390
    {
391
        return [
392
            'queue' => $queue,
393
            'payload' => $payload,
394
            'attempts' => $attempts,
395
            'reserved' => 0,
396
            'reserved_at' => null,
397
            'available_at' => $availableAt,
398
            'created_at' => $this->currentTime(),
399
        ];
400
    }
401
402
    /**
403
     * Get available jobs
404
     *
405
     * @return array
406
     */
407
    protected function isAvailable()
408
    {
409
        return [
410
            'reserved_at' => null,
411
            'available_at' => ['$lte' => $this->currentTime()],
412
        ];
413
    }
414
415
    /**
416
     * Get reserved but expired by time jobs
417
     *
418
     * @return array
419
     */
420
    protected function isReservedButExpired()
421
    {
422
        return [
423
            'reserved_at' => ['$lte' => $this->currentTime() - $this->expire],
424
        ];
425
    }
426
427
    /**
428
     * Get queue collection
429
     *
430
     * @return Collection Mongo collection instance
431
     */
432
    protected function getCollection(): Collection
433
    {
434
        return $this->mongo->getDatabase()->selectCollection($this->collection);
435
    }
436
437
    /**
438
     * Build job from database record
439
     *
440
     * @param $data
441
     *
442
     * @return Job
443
     */
444
    protected function buildJob($data): Job
445
    {
446
        $job = new Job();
447
        $job->setId($data->_id);
448
        $job->setAttempts($data->attempts);
449
        $job->setQueue($data->queue);
450
        $job->setReserved($data->reserved);
451
        $job->setReservedAt($data->reserved_at);
452
        $job->setPayload(json_decode($data->payload, true));
453
454
        return $job;
455
    }
456
}
457