Completed
Push — master ( 71cc85...b00a6a )
by Alexey
37:06
created

MongoQueue   A

Complexity

Total Complexity 34

Size/Duplication

Total Lines 425
Duplicated Lines 0 %

Coupling/Cohesion

Components 1
Dependencies 4

Importance

Changes 0
Metric Value
wmc 34
lcom 1
cbo 4
dl 0
loc 425
rs 9.2
c 0
b 0
f 0

25 Methods

Rating   Name   Duplication   Size   Complexity  
A __construct() 0 12 1
A push() 0 4 1
A pop() 0 10 2
A exists() 0 7 1
A pushRaw() 0 4 1
A later() 0 4 1
A bulk() 0 12 1
A release() 0 4 1
A getJobById() 0 12 2
A deleteReserved() 0 9 1
A getExpire() 0 4 1
A setExpire() 0 4 1
A size() 0 8 2
A canRunJob() 0 11 4
A markJobAsReserved() 0 13 1
A pushToDatabase() 0 6 1
A getAvailableAt() 0 6 2
A getQueue() 0 4 2
A getNextAvailableJob() 0 15 2
A buildDatabaseRecord() 0 12 1
A isAvailable() 0 7 1
A isReservedButExpired() 0 6 1
A getCollection() 0 4 1
A putContainer() 0 4 1
A setContainer() 0 4 1
1
<?php
2
3
namespace SfCod\QueueBundle\Queue;
4
5
use Carbon\Carbon;
6
use DateTime;
7
use Exception;
8
use Illuminate\Container\Container;
9
use Illuminate\Queue\Queue;
10
use MongoDB\Collection;
11
use SfCod\QueueBundle\Base\Job;
12
use SfCod\QueueBundle\Job\MongoJob;
13
use SfCod\QueueBundle\Service\MongoDriverInterface;
14
use Symfony\Component\DependencyInjection\ContainerInterface;
15
16
/**
17
 * Class MongoQueue
18
 *
19
 * @author Alexey Orlov <[email protected]>
20
 *
21
 * @package yiiSfCod\jobqueue\queues
22
 */
23
class MongoQueue extends Queue
24
{
25
    /**
26
     * The mongo connection instance.
27
     *
28
     * @var \Illuminate\Database\Connection
29
     */
30
    protected $mongo;
31
32
    /**
33
     * The mongo collection that holds the jobs.
34
     *
35
     * @var string
36
     */
37
    protected $collection;
38
39
    /**
40
     * The name of the default queue.
41
     *
42
     * @var string
43
     */
44
    protected $queue;
45
46
    /**
47
     * The expiration time of a job.
48
     *
49
     * @var int|null
50
     */
51
    protected $expire = 60;
52
53
    /**
54
     * @var int
55
     */
56
    protected $limit = 15;
57
58
    /**
59
     * Create a new mongo queue instance.
60
     *
61
     * @param MongoDriverInterface $mongo
62
     * @param string $collection
63
     * @param string $queue
64
     * @param int $expire
65
     * @param int $limit
66
     */
67
    public function __construct(MongoDriverInterface $mongo,
68
                                string $collection,
69
                                string $queue = 'default',
70
                                int $expire = 60,
71
                                int $limit = 15
72
    ) {
73
        $this->collection = $collection;
74
        $this->expire = $expire;
75
        $this->queue = $queue;
76
        $this->mongo = $mongo;
0 ignored issues
show
Documentation Bug introduced by
It seems like $mongo of type object<SfCod\QueueBundle...e\MongoDriverInterface> is incompatible with the declared type object<Illuminate\Database\Connection> of property $mongo.

Our type inference engine has found an assignment to a property that is incompatible with the declared type of that property.

Either this assignment is in error or the assigned type should be added to the documentation/type hint for that property..

Loading history...
77
        $this->limit = $limit;
78
    }
79
80
    /**
81
     * Push a new job onto the queue.
82
     *
83
     * @param string $job
84
     * @param mixed $data
85
     * @param string $queue
86
     *
87
     * @return mixed
88
     */
89
    public function push($job, $data = '', $queue = null)
90
    {
91
        return $this->pushToDatabase(0, $queue, $this->createPayload($job, $data));
92
    }
93
94
    /**
95
     * Pop the next job off of the queue.
96
     *
97
     * @param string $queue
98
     *
99
     * @return null|Job
100
     */
101
    public function pop($queue = null)
102
    {
103
        $queue = $this->getQueue($queue);
104
105
        if ($job = $this->getNextAvailableJob($queue)) {
106
            return $job;
107
        }
108
109
        return null;
110
    }
111
112
    /**
113
     * Push a new job onto the queue.
114
     *
115
     * @param string $job
116
     * @param mixed $data
117
     * @param string $queue
118
     *
119
     * @return mixed
120
     */
121
    public function exists($job, $data = '', $queue = null)
122
    {
123
        return null !== $this->getCollection()->findOne([
124
                'queue' => $queue,
125
                'payload' => $this->createPayload($job, $data),
126
            ]);
127
    }
128
129
    /**
130
     * Push a raw payload onto the queue.
131
     *
132
     * @param string $payload
133
     * @param string $queue
134
     * @param array $options
135
     *
136
     * @return mixed
137
     */
138
    public function pushRaw($payload, $queue = null, array $options = [])
0 ignored issues
show
Unused Code introduced by
The parameter $options is not used and could be removed.

This check looks from parameters that have been defined for a function or method, but which are not used in the method body.

Loading history...
139
    {
140
        return $this->pushToDatabase(0, $queue, $payload);
141
    }
142
143
    /**
144
     * Push a new job onto the queue after a delay.
145
     *
146
     * @param DateTime|int $delay
147
     * @param string $job
148
     * @param mixed $data
149
     * @param string $queue
150
     *
151
     * @return mixed
152
     */
153
    public function later($delay, $job, $data = '', $queue = null)
154
    {
155
        return $this->pushToDatabase($delay, $queue, $this->createPayload($job, $data));
156
    }
157
158
    /**
159
     * Push an array of jobs onto the queue.
160
     *
161
     * @param array $jobs
162
     * @param mixed $data
163
     * @param string $queue
164
     *
165
     * @return mixed
166
     */
167
    public function bulk($jobs, $data = '', $queue = null)
168
    {
169
        $queue = $this->getQueue($queue);
170
171
        $availableAt = $this->getAvailableAt(0);
172
173
        $records = array_map(function ($job) use ($queue, $data, $availableAt) {
174
            return $this->buildDatabaseRecord($queue, $this->createPayload($job, $data), $availableAt);
175
        }, (array)$jobs);
176
177
        return $this->getCollection()->insertOne($records);
178
    }
179
180
    /**
181
     * Release a reserved job back onto the queue.
182
     *
183
     * @param string $queue
184
     * @param \StdClass $job
185
     * @param int $delay
186
     *
187
     * @return mixed
188
     */
189
    public function release($queue, $job, $delay)
190
    {
191
        return $this->pushToDatabase($delay, $queue, $job->payload, $job->attempts);
192
    }
193
194
    /**
195
     * Get the next available job for the queue.
196
     *
197
     * @param $id
198
     *
199
     * @return null|Job
200
     */
201
    public function getJobById($id)
202
    {
203
        $job = $this->getCollection()->findOne(['_id' => new \MongoDB\BSON\ObjectID($id)]);
204
205
        if (is_null($job)) {
206
            return null;
207
        } else {
208
            $job = (object)$job;
209
210
            return new MongoJob($this->container, $this, $job, $job->queue);
0 ignored issues
show
Documentation introduced by
$this->container is of type object<Illuminate\Container\Container>, but the function expects a object<Symfony\Component...ion\ContainerInterface>.

It seems like the type of the argument is not accepted by the function/method which you are calling.

In some cases, in particular if PHP’s automatic type-juggling kicks in this might be fine. In other cases, however this might be a bug.

We suggest to add an explicit type cast like in the following example:

function acceptsInteger($int) { }

$x = '123'; // string "123"

// Instead of
acceptsInteger($x);

// we recommend to use
acceptsInteger((integer) $x);
Loading history...
211
        }
212
    }
213
214
    /**
215
     * Delete a reserved job from the queue.
216
     *
217
     * @param string $queue
218
     * @param string $id
219
     *
220
     * @return int
221
     */
222
    public function deleteReserved($queue, $id): int
223
    {
224
        $query = [
225
            '_id' => new \MongoDB\BSON\ObjectID($id),
226
            'queue' => $queue,
227
        ];
228
229
        return $this->getCollection()->deleteOne($query)->getDeletedCount();
230
    }
231
232
    /**
233
     * Get the expiration time in seconds.
234
     *
235
     * @return int|null
236
     */
237
    public function getExpire()
238
    {
239
        return $this->expire;
240
    }
241
242
    /**
243
     * Set the expiration time in seconds.
244
     *
245
     * @param int|null $seconds
246
     */
247
    public function setExpire($seconds)
248
    {
249
        $this->expire = $seconds;
250
    }
251
252
    /**
253
     * Get the size of the queue.
254
     *
255
     * @param string $queue
256
     *
257
     * @return int
258
     */
259
    public function size($queue = null)
260
    {
261
        if ($queue) {
0 ignored issues
show
Bug Best Practice introduced by
The expression $queue of type string|null is loosely compared to true; this is ambiguous if the string can be empty. You might want to explicitly use !== null instead.

In PHP, under loose comparison (like ==, or !=, or switch conditions), values of different types might be equal.

For string values, the empty string '' is a special case, in particular the following results might be unexpected:

''   == false // true
''   == null  // true
'ab' == false // false
'ab' == null  // false

// It is often better to use strict comparison
'' === false // false
'' === null  // false
Loading history...
262
            return $this->getCollection()->count(['queue' => $queue]);
263
        }
264
265
        return $this->getCollection()->count();
266
    }
267
268
    /**
269
     * Check if can run process depend on limits
270
     *
271
     * @param Job $job
272
     *
273
     * @return bool
274
     */
275
    public function canRunJob(Job $job)
276
    {
277
        if ($job->getQueue()) {
278
            return $this->getCollection()->count([
279
                    'reserved' => 1,
280
                    'queue' => $job->getQueue(),
281
                ]) < $this->limit || $job->reserved();
282
        }
283
284
        return $this->getCollection()->count(['reserved' => 1]) < $this->limit || $job->reserved();
285
    }
286
287
    /**
288
     * Mark the given job ID as reserved.
289
     *
290
     * @param Job $job
291
     */
292
    public function markJobAsReserved($job)
293
    {
294
        $attempts = $job->attempts() + 1;
295
        $reserved_at = $this->currentTime();
296
297
        $this->getCollection()->updateOne(['_id' => new \MongoDB\BSON\ObjectID($job->getJobId())], [
298
            '$set' => [
299
                'attempts' => $attempts,
300
                'reserved' => 1,
301
                'reserved_at' => $reserved_at,
302
            ],
303
        ]);
304
    }
305
306
    /**
307
     * Push a raw payload to the mongo with a given delay.
308
     *
309
     * @param DateTime|int $delay
310
     * @param string|null $queue
311
     * @param string $payload
312
     * @param int $attempts
313
     *
314
     * @return mixed
315
     */
316
    protected function pushToDatabase($delay, $queue, $payload, $attempts = 0)
317
    {
318
        $attributes = $this->buildDatabaseRecord($this->getQueue($queue), $payload, $this->getAvailableAt($delay), $attempts);
319
320
        return $this->getCollection()->insertOne($attributes);
321
    }
322
323
    /**
324
     * Get the "available at" UNIX timestamp.
325
     *
326
     * @param DateTime|int $delay
327
     *
328
     * @return int
329
     */
330
    protected function getAvailableAt($delay)
331
    {
332
        $availableAt = $delay instanceof DateTime ? $delay : Carbon::now()->addSeconds($delay);
333
334
        return $availableAt->getTimestamp();
335
    }
336
337
    /**
338
     * Get the queue or return the default.
339
     *
340
     * @param string|null $queue
341
     *
342
     * @return string
343
     */
344
    protected function getQueue($queue)
345
    {
346
        return $queue ?: $this->queue;
347
    }
348
349
    /**
350
     * Get the next available job for the queue.
351
     *
352
     * @param string|null $queue
353
     *
354
     * @return null|Job
355
     */
356
    protected function getNextAvailableJob($queue)
357
    {
358
        $job = $this->getCollection()
359
            ->findOne([
360
                'queue' => $this->getQueue($queue),
361
                '$or' => [
362
                    $this->isAvailable(),
363
                    $this->isReservedButExpired(),
364
                ],
365
            ], [
366
                'sort' => ['_id' => 1],
367
            ]);
368
369
        return $job ? new MongoJob($this->container, $this, (object)$job, ((object)$job)->queue) : null;
0 ignored issues
show
Documentation introduced by
$this->container is of type object<Illuminate\Container\Container>, but the function expects a object<Symfony\Component...ion\ContainerInterface>.

It seems like the type of the argument is not accepted by the function/method which you are calling.

In some cases, in particular if PHP’s automatic type-juggling kicks in this might be fine. In other cases, however this might be a bug.

We suggest to add an explicit type cast like in the following example:

function acceptsInteger($int) { }

$x = '123'; // string "123"

// Instead of
acceptsInteger($x);

// we recommend to use
acceptsInteger((integer) $x);
Loading history...
370
    }
371
372
    /**
373
     * Create an array to insert for the given job.
374
     *
375
     * @param string|null $queue
376
     * @param string $payload
377
     * @param int $availableAt
378
     * @param int $attempts
379
     *
380
     * @return array
381
     */
382
    protected function buildDatabaseRecord($queue, $payload, $availableAt, $attempts = 0)
383
    {
384
        return [
385
            'queue' => $queue,
386
            'payload' => $payload,
387
            'attempts' => $attempts,
388
            'reserved' => 0,
389
            'reserved_at' => null,
390
            'available_at' => $availableAt,
391
            'created_at' => $this->currentTime(),
392
        ];
393
    }
394
395
    /**
396
     * Get available jobs
397
     *
398
     * @return array
399
     */
400
    protected function isAvailable()
401
    {
402
        return [
403
            'reserved_at' => null,
404
            'available_at' => ['$lte' => $this->currentTime()],
405
        ];
406
    }
407
408
    /**
409
     * Get reserved but expired by time jobs
410
     *
411
     * @return array
412
     */
413
    protected function isReservedButExpired()
414
    {
415
        return [
416
            'reserved_at' => ['$lte' => Carbon::now()->subSeconds($this->expire)->getTimestamp()],
417
        ];
418
    }
419
420
    /**
421
     * Get queue collection
422
     *
423
     * @return Collection Mongo collection instance
424
     */
425
    protected function getCollection(): Collection
426
    {
427
        return $this->mongo->getDatabase()->selectCollection($this->collection);
0 ignored issues
show
Bug introduced by
The method getDatabase() does not exist on Illuminate\Database\Connection. Did you maybe mean getDatabaseName()?

This check marks calls to methods that do not seem to exist on an object.

This is most likely the result of a method being renamed without all references to it being renamed likewise.

Loading history...
428
    }
429
430
    /**
431
     * @param ContainerInterface $container
432
     */
433
    public function putContainer(ContainerInterface $container)
434
    {
435
        $this->container = $container;
0 ignored issues
show
Documentation Bug introduced by
It seems like $container of type object<Symfony\Component...ion\ContainerInterface> is incompatible with the declared type object<Illuminate\Container\Container> of property $container.

Our type inference engine has found an assignment to a property that is incompatible with the declared type of that property.

Either this assignment is in error or the assigned type should be added to the documentation/type hint for that property..

Loading history...
436
    }
437
438
    /**
439
     * @param Container $container
440
     *
441
     * @throws Exception
442
     */
443
    public function setContainer(Container $container)
444
    {
445
        // Nothing
446
    }
447
}
448