MongoQueue::pushToDatabase()   A
last analyzed

Complexity

Conditions 1
Paths 1

Size

Total Lines 6

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
dl 0
loc 6
rs 10
c 0
b 0
f 0
cc 1
nc 1
nop 4
1
<?php
2
3
namespace SfCod\QueueBundle\Queue;
4
5
use DateInterval;
6
use DateTime;
7
use MongoDB\Collection;
8
use MongoDB\DeleteResult;
9
use SfCod\QueueBundle\Base\JobResolverInterface;
10
use SfCod\QueueBundle\Entity\Job;
11
use SfCod\QueueBundle\Job\JobContract;
12
use SfCod\QueueBundle\Job\JobContractInterface;
13
use SfCod\QueueBundle\Service\MongoDriver;
14
15
/**
16
 * Class MongoQueue
17
 *
18
 * @author Alexey Orlov <[email protected]>
19
 *
20
 * @package yiiSfCod\jobqueue\queues
21
 */
22
class MongoQueue extends Queue
23
{
24
    /**
25
     * Job resolver
26
     *
27
     * @var JobResolverInterface
28
     */
29
    protected $resolver;
30
31
    /**
32
     * The mongo connection instance.
33
     *
34
     * @var MongoDriver
35
     */
36
    protected $mongo;
37
38
    /**
39
     * The mongo collection that holds the jobs.
40
     *
41
     * @var string
42
     */
43
    protected $collection;
44
45
    /**
46
     * The name of the default queue.
47
     *
48
     * @var string
49
     */
50
    protected $queue = 'default';
51
52
    /**
53
     * The expiration time of a job.
54
     *
55
     * @var int|null
56
     */
57
    protected $expire = 60;
58
59
    /**
60
     * @var int
61
     */
62
    protected $limit = 15;
63
64
    /**
65
     * Create a new mongo queue instance.
66
     *
67
     * @param JobResolverInterface $resolver
68
     * @param MongoDriver $mongo
69
     * @param string $collection
70
     * @param string $queue
71
     * @param int $expire
72
     * @param int $limit
73
     */
74
    public function __construct(
75
        JobResolverInterface $resolver,
76
        MongoDriver $mongo,
77
        string $collection,
78
        string $queue = 'default',
79
        int $expire = 60,
80
        int $limit = 15
81
    ) {
82
        $this->resolver = $resolver;
83
        $this->mongo = $mongo;
84
        $this->collection = $collection;
85
        $this->expire = $expire;
86
        $this->queue = $queue;
87
        $this->limit = $limit;
88
    }
89
90
    /**
91
     * Push a new job onto the queue.
92
     *
93
     * @param string $job
94
     * @param mixed $data
95
     * @param string|null $queue
96
     *
97
     * @return mixed
98
     */
99
    public function push(string $job, array $data = [], ?string $queue = null)
100
    {
101
        return $this->pushToDatabase(0, $queue, $this->createPayload($job, $data));
102
    }
103
104
    /**
105
     * Pop the next job off of the queue.
106
     *
107
     * @param string|null $queue
108
     *
109
     * @return JobContractInterface|null
110
     */
111
    public function pop(?string $queue = null): ?JobContractInterface
112
    {
113
        $queue = $this->getQueue($queue);
114
115
        if ($job = $this->getNextAvailableJob($queue)) {
116
            return $job;
117
        }
118
119
        return null;
120
    }
121
122
    /**
123
     * Check if job exists in the queue.
124
     *
125
     * @param string $job
126
     * @param array $data
127
     * @param string|null $queue
128
     *
129
     * @return bool
130
     */
131
    public function exists(string $job, array $data = [], ?string $queue = null): bool
132
    {
133
        return null !== $this->getCollection()->findOne([
134
                'queue' => $this->getQueue($queue),
135
                'payload' => $this->createPayload($job, $data),
136
            ]);
137
    }
138
139
    /**
140
     * Push a raw payload onto the queue.
141
     *
142
     * @param string $payload
143
     * @param string|null $queue
144
     * @param array $options
145
     *
146
     * @return mixed
147
     */
148
    public function pushRaw(string $payload, ?string $queue = null, array $options = [])
149
    {
150
        return $this->pushToDatabase(0, $queue, $payload);
151
    }
152
153
    /**
154
     * Push a new job onto the queue after a delay.
155
     *
156
     * @param DateInterval|int $delay
157
     * @param string $job
158
     * @param array $data
159
     * @param string|null $queue
160
     *
161
     * @return mixed
162
     */
163
    public function later($delay, string $job, array $data = [], ?string $queue = null)
164
    {
165
        return $this->pushToDatabase($delay, $queue, $this->createPayload($job, $data));
166
    }
167
168
    /**
169
     * Push an array of jobs onto the queue.
170
     *
171
     * @param array $jobs
172
     * @param mixed $data
173
     * @param string|null $queue
174
     *
175
     * @return mixed
176
     */
177
    public function bulk(array $jobs, array $data = [], ?string $queue = null)
178
    {
179
        $queue = $this->getQueue($queue);
180
181
        $availableAt = $this->getAvailableAt(0);
182
183
        $records = array_map(function ($job) use ($queue, $data, $availableAt) {
184
            return $this->buildDatabaseRecord($queue, $this->createPayload($job, $data), $availableAt);
185
        }, $jobs);
186
187
        return $this->getCollection()->insertMany($records);
188
    }
189
190
    /**
191
     * Release a reserved job back onto the queue.
192
     *
193
     * @param JobContractInterface $job
194
     * @param DateInterval|int $delay
195
     *
196
     * @return mixed
197
     */
198
    public function release(JobContractInterface $job, $delay)
199
    {
200
        return $this->pushToDatabase($delay, $job->getQueue(), json_encode($job->payload()), $job->attempts());
201
    }
202
203
    /**
204
     * Get the next available job for the queue.
205
     *
206
     * @param string $queue
207
     * @param string $id
208
     *
209
     * @return JobContractInterface|null
210
     */
211
    public function getJobById(string $queue, string $id): ?JobContractInterface
212
    {
213
        $job = $this->getCollection()->findOne(['_id' => new \MongoDB\BSON\ObjectID($id)]);
214
215
        if (is_null($job)) {
216
            return null;
217
        } else {
218
            return new JobContract($this->resolver, $this, $this->buildJob($job));
219
        }
220
    }
221
222
    /**
223
     * Delete a reserved job from the queue.
224
     *
225
     * @param string $queue
226
     * @param string $id
227
     *
228
     * @return bool
229
     */
230
    public function deleteReserved(string $queue, string $id): bool
231
    {
232
        $query = [
233
            '_id' => new \MongoDB\BSON\ObjectID($id),
234
            'queue' => $queue,
235
        ];
236
237
        $result = $this->getCollection()->deleteOne($query);
238
239
        if ($result instanceof DeleteResult) {
240
            return (bool)$result->getDeletedCount();
241
        }
242
243
        return true;
244
    }
245
246
    /**
247
     * Get the expiration time in seconds.
248
     *
249
     * @return int|null
250
     */
251
    public function getExpire()
252
    {
253
        return $this->expire;
254
    }
255
256
    /**
257
     * Set the expiration time in seconds.
258
     *
259
     * @param int $seconds
260
     */
261
    public function setExpire(int $seconds)
262
    {
263
        $this->expire = $seconds;
264
    }
265
266
    /**
267
     * Get the size of the queue.
268
     *
269
     * @param string|null $queue
270
     *
271
     * @return int
272
     */
273
    public function size(?string $queue = null): int
274
    {
275
        if ($queue) {
0 ignored issues
show
Bug Best Practice introduced by
The expression $queue of type string|null is loosely compared to true; this is ambiguous if the string can be empty. You might want to explicitly use !== null instead.

In PHP, under loose comparison (like ==, or !=, or switch conditions), values of different types might be equal.

For string values, the empty string '' is a special case, in particular the following results might be unexpected:

''   == false // true
''   == null  // true
'ab' == false // false
'ab' == null  // false

// It is often better to use strict comparison
'' === false // false
'' === null  // false
Loading history...
276
            return $this->getCollection()->count(['queue' => $queue]);
0 ignored issues
show
Deprecated Code introduced by
The method MongoDB\Collection::count() has been deprecated with message: 1.4

This method has been deprecated. The supplier of the class has supplied an explanatory message.

The explanatory message should give you some clue as to whether and when the method will be removed from the class and what other method or class to use instead.

Loading history...
277
        }
278
279
        return $this->getCollection()->count();
0 ignored issues
show
Deprecated Code introduced by
The method MongoDB\Collection::count() has been deprecated with message: 1.4

This method has been deprecated. The supplier of the class has supplied an explanatory message.

The explanatory message should give you some clue as to whether and when the method will be removed from the class and what other method or class to use instead.

Loading history...
280
    }
281
282
    /**
283
     * Check if can run process depend on limits
284
     *
285
     * @param JobContractInterface $job
286
     *
287
     * @return bool
288
     */
289
    public function canRunJob(JobContractInterface $job): bool
290
    {
291
        return $this->getCollection()->count([
0 ignored issues
show
Deprecated Code introduced by
The method MongoDB\Collection::count() has been deprecated with message: 1.4

This method has been deprecated. The supplier of the class has supplied an explanatory message.

The explanatory message should give you some clue as to whether and when the method will be removed from the class and what other method or class to use instead.

Loading history...
292
                'reserved' => 1,
293
                'queue' => $job->getQueue(),
294
            ]) < $this->limit || !$job->reserved();
295
    }
296
297
    /**
298
     * Mark the given job ID as reserved.
299
     *
300
     * @param JobContractInterface $job
301
     */
302
    public function markJobAsReserved(JobContractInterface $job)
303
    {
304
        $attempts = $job->attempts() + 1;
305
        $reserved_at = $this->currentTime();
306
307
        $this->getCollection()->updateOne(['_id' => new \MongoDB\BSON\ObjectID($job->getJobId())], [
308
            '$set' => [
309
                'attempts' => $attempts,
310
                'reserved' => 1,
311
                'reserved_at' => $reserved_at,
312
            ],
313
        ]);
314
    }
315
316
    /**
317
     * Push a raw payload to the mongo with a given delay.
318
     *
319
     * @param DateInterval|int $delay
320
     * @param string|null $queue
321
     * @param string $payload
322
     * @param int $attempts
323
     *
324
     * @return mixed
325
     */
326
    protected function pushToDatabase($delay, $queue, $payload, $attempts = 0)
327
    {
328
        $attributes = $this->buildDatabaseRecord($this->getQueue($queue), $payload, $this->getAvailableAt($delay), $attempts);
0 ignored issues
show
Bug introduced by
It seems like $delay defined by parameter $delay on line 326 can also be of type object<DateInterval>; however, SfCod\QueueBundle\Queue\...Queue::getAvailableAt() does only seem to accept integer, maybe add an additional type check?

This check looks at variables that have been passed in as parameters and are passed out again to other methods.

If the outgoing method call has stricter type requirements than the method itself, an issue is raised.

An additional type check may prevent trouble.

Loading history...
329
330
        return $this->getCollection()->insertOne($attributes);
331
    }
332
333
    /**
334
     * Get the "available at" UNIX timestamp.
335
     *
336
     * @param DateInterval|int $delay
337
     *
338
     * @return int
339
     */
340
    protected function getAvailableAt($delay = 0)
341
    {
342
        return $delay instanceof DateInterval
343
            ? (new DateTime())->add($delay)->getTimestamp()
344
            : $this->currentTime() + $delay;
345
    }
346
347
    /**
348
     * Get the queue or return the default.
349
     *
350
     * @param string|null $queue
351
     *
352
     * @return string
353
     */
354
    protected function getQueue($queue)
355
    {
356
        return $queue ?: $this->queue;
357
    }
358
359
    /**
360
     * Get the next available job for the queue.
361
     *
362
     * @param string|null $queue
363
     *
364
     * @return JobContractInterface|null
365
     */
366
    protected function getNextAvailableJob($queue)
367
    {
368
        $job = $this->getCollection()
369
            ->findOne([
370
                'queue' => $this->getQueue($queue),
371
                '$or' => [
372
                    $this->isAvailable(),
373
                    $this->isReservedButExpired(),
374
                ],
375
            ], [
376
                'sort' => ['_id' => 1],
377
            ]);
378
379
        return $job ? new JobContract($this->resolver, $this, $this->buildJob($job)) : null;
380
    }
381
382
    /**
383
     * Create an array to insert for the given job.
384
     *
385
     * @param string|null $queue
386
     * @param string $payload
387
     * @param int $availableAt
388
     * @param int $attempts
389
     *
390
     * @return array
391
     */
392
    protected function buildDatabaseRecord($queue, $payload, $availableAt, $attempts = 0)
393
    {
394
        return [
395
            'queue' => $queue,
396
            'payload' => $payload,
397
            'attempts' => $attempts,
398
            'reserved' => 0,
399
            'reserved_at' => null,
400
            'available_at' => $availableAt,
401
            'created_at' => $this->currentTime(),
402
        ];
403
    }
404
405
    /**
406
     * Get available jobs
407
     *
408
     * @return array
409
     */
410
    protected function isAvailable()
411
    {
412
        return [
413
            'reserved_at' => null,
414
            'available_at' => ['$lte' => $this->currentTime()],
415
        ];
416
    }
417
418
    /**
419
     * Get reserved but expired by time jobs
420
     *
421
     * @return array
422
     */
423
    protected function isReservedButExpired()
424
    {
425
        return [
426
            'reserved_at' => ['$lte' => $this->currentTime() - $this->expire],
427
        ];
428
    }
429
430
    /**
431
     * Get queue collection
432
     *
433
     * @return Collection Mongo collection instance
434
     */
435
    protected function getCollection(): Collection
436
    {
437
        return $this->mongo->getDatabase()->selectCollection($this->collection);
438
    }
439
440
    /**
441
     * Build job from database record
442
     *
443
     * @param $data
444
     *
445
     * @return Job
446
     */
447
    protected function buildJob($data): Job
448
    {
449
        $job = new Job();
450
        $job->setId($data->_id);
451
        $job->setAttempts($data->attempts);
452
        $job->setQueue($data->queue);
453
        $job->setReserved($data->reserved);
454
        $job->setReservedAt($data->reserved_at);
455
        $job->setPayload(json_decode($data->payload, true));
456
457
        return $job;
458
    }
459
}
460