Completed
Pull Request — master (#3)
by
unknown
39:33
created

MongoQueue::bulk()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 12
Code Lines 7

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
dl 0
loc 12
rs 9.4285
c 0
b 0
f 0
cc 1
eloc 7
nc 1
nop 3
1
<?php
2
3
namespace SfCod\QueueBundle\Queue;
4
5
use DateInterval;
6
use DateTime;
7
use MongoDB\Collection;
8
use SfCod\QueueBundle\Base\JobResolverInterface;
9
use SfCod\QueueBundle\Base\MongoDriverInterface;
10
use SfCod\QueueBundle\Entity\Job;
11
use SfCod\QueueBundle\Job\JobContract;
12
use SfCod\QueueBundle\Job\JobContractInterface;
13
14
/**
15
 * Class MongoQueue
16
 *
17
 * @author Alexey Orlov <[email protected]>
18
 *
19
 * @package yiiSfCod\jobqueue\queues
20
 */
21
class MongoQueue extends Queue
22
{
23
    /**
24
     * Job resolver
25
     *
26
     * @var JobResolverInterface
27
     */
28
    protected $resolver;
29
30
    /**
31
     * The mongo connection instance.
32
     *
33
     * @var MongoDriverInterface
34
     */
35
    protected $mongo;
36
37
    /**
38
     * The mongo collection that holds the jobs.
39
     *
40
     * @var string
41
     */
42
    protected $collection;
43
44
    /**
45
     * The name of the default queue.
46
     *
47
     * @var string
48
     */
49
    protected $queue = 'default';
50
51
    /**
52
     * The expiration time of a job.
53
     *
54
     * @var int|null
55
     */
56
    protected $expire = 60;
57
58
    /**
59
     * @var int
60
     */
61
    protected $limit = 15;
62
63
    /**
64
     * Create a new mongo queue instance.
65
     *
66
     * @param MongoDriverInterface $mongo
67
     * @param string $collection
68
     * @param string $queue
69
     * @param int $expire
70
     * @param int $limit
71
     */
72
    public function __construct(
73
        JobResolverInterface $resolver,
74
        MongoDriverInterface $mongo,
75
        string $collection,
76
        string $queue = 'default',
77
        int $expire = 60,
78
        int $limit = 15
79
    ) {
80
        $this->resolver = $resolver;
81
        $this->mongo = $mongo;
82
        $this->collection = $collection;
83
        $this->expire = $expire;
84
        $this->queue = $queue;
85
        $this->limit = $limit;
86
    }
87
88
    /**
89
     * Push a new job onto the queue.
90
     *
91
     * @param string $job
92
     * @param mixed $data
93
     * @param string $queue
94
     *
95
     * @return mixed
96
     */
97
    public function push(string $job, array $data = [], ?string $queue = null)
98
    {
99
        return $this->pushToDatabase(0, $queue, $this->createPayload($job, $data));
100
    }
101
102
    /**
103
     * Pop the next job off of the queue.
104
     *
105
     * @param string $queue
106
     *
107
     * @return null|JobContractInterface
108
     */
109
    public function pop(?string $queue = null): ?JobContractInterface
110
    {
111
        $queue = $this->getQueue($queue);
112
113
        if ($job = $this->getNextAvailableJob($queue)) {
114
            return $job;
115
        }
116
117
        return null;
118
    }
119
120
    /**
121
     * Push a new job onto the queue.
122
     *
123
     * @param string $job
124
     * @param array $data
125
     * @param string|null $queue
126
     *
127
     * @return bool
128
     */
129
    public function exists(string $job, array $data = [], ?string $queue = null): bool
130
    {
131
        return null !== $this->getCollection()->findOne([
132
                'queue' => $queue,
133
                'payload' => $this->createPayload($job, $data),
134
            ]);
135
    }
136
137
    /**
138
     * Push a raw payload onto the queue.
139
     *
140
     * @param string $payload
141
     * @param string|null $queue
142
     * @param array $options
143
     *
144
     * @return mixed
145
     */
146
    public function pushRaw(string $payload, ?string $queue = null, array $options = [])
147
    {
148
        return $this->pushToDatabase(0, $queue, $payload);
149
    }
150
151
    /**
152
     * Push a new job onto the queue after a delay.
153
     *
154
     * @param DateInterval|int $delay
155
     * @param string $job
156
     * @param array $data
157
     * @param string $queue
158
     *
159
     * @return mixed
160
     */
161
    public function later($delay, string $job, array $data = [], ?string $queue = null)
162
    {
163
        return $this->pushToDatabase($delay, $queue, $this->createPayload($job, $data));
164
    }
165
166
    /**
167
     * Push an array of jobs onto the queue.
168
     *
169
     * @param array $jobs
170
     * @param mixed $data
171
     * @param string $queue
172
     *
173
     * @return mixed
174
     */
175
    public function bulk(array $jobs, array $data = [], ?string $queue = null)
176
    {
177
        $queue = $this->getQueue($queue);
178
179
        $availableAt = $this->getAvailableAt(0);
180
181
        $records = array_map(function ($job) use ($queue, $data, $availableAt) {
182
            return $this->buildDatabaseRecord($queue, $this->createPayload($job, $data), $availableAt);
183
        }, (array)$jobs);
184
185
        return $this->getCollection()->insertOne($records);
186
    }
187
188
    /**
189
     * Release a reserved job back onto the queue.
190
     *
191
     * @param Job $job
192
     * @param DateInterval|int $delay
193
     *
194
     * @return mixed
195
     */
196
    public function release(Job $job, $delay)
197
    {
198
        return $this->pushToDatabase($delay, $job->getQueue(), json_encode($job->getPayload()), $job->getAttempts());
199
    }
200
201
    /**
202
     * Get the next available job for the queue.
203
     *
204
     * @param $id
205
     *
206
     * @return null|JobContractInterface
207
     */
208
    public function getJobById($id): ?JobContractInterface
209
    {
210
        $job = $this->getCollection()->findOne(['_id' => new \MongoDB\BSON\ObjectID($id)]);
211
212
        if (is_null($job)) {
213
            return null;
214
        } else {
215
            return new JobContract($this->resolver, $this, $this->buildJob($job));
216
        }
217
    }
218
219
    /**
220
     * Delete a reserved job from the queue.
221
     *
222
     * @param string $queue
223
     * @param string $id
224
     *
225
     * @return int
226
     */
227
    public function deleteReserved(string $queue, $id): int
228
    {
229
        $query = [
230
            '_id' => new \MongoDB\BSON\ObjectID($id),
231
            'queue' => $queue,
232
        ];
233
234
        return $this->getCollection()->deleteOne($query)->getDeletedCount();
235
    }
236
237
    /**
238
     * Get the expiration time in seconds.
239
     *
240
     * @return int|null
241
     */
242
    public function getExpire()
243
    {
244
        return $this->expire;
245
    }
246
247
    /**
248
     * Set the expiration time in seconds.
249
     *
250
     * @param int $seconds
251
     */
252
    public function setExpire(int $seconds)
253
    {
254
        $this->expire = $seconds;
255
    }
256
257
    /**
258
     * Get the size of the queue.
259
     *
260
     * @param string $queue
261
     *
262
     * @return int
263
     */
264
    public function size(?string $queue = null): int
265
    {
266
        if ($queue) {
0 ignored issues
show
Bug Best Practice introduced by
The expression $queue of type string|null is loosely compared to true; this is ambiguous if the string can be empty. You might want to explicitly use !== null instead.

In PHP, under loose comparison (like ==, or !=, or switch conditions), values of different types might be equal.

For string values, the empty string '' is a special case, in particular the following results might be unexpected:

''   == false // true
''   == null  // true
'ab' == false // false
'ab' == null  // false

// It is often better to use strict comparison
'' === false // false
'' === null  // false
Loading history...
267
            return $this->getCollection()->count(['queue' => $queue]);
268
        }
269
270
        return $this->getCollection()->count();
271
    }
272
273
    /**
274
     * Check if can run process depend on limits
275
     *
276
     * @param JobContractInterface $job
277
     *
278
     * @return bool
279
     */
280
    public function canRunJob(JobContractInterface $job): bool
281
    {
282
        return $this->getCollection()->count([
283
                'reserved' => 1,
284
                'queue' => $job->getQueue(),
285
            ]) < $this->limit || $job->reserved();
286
    }
287
288
    /**
289
     * Mark the given job ID as reserved.
290
     *
291
     * @param JobContractInterface $job
292
     */
293
    public function markJobAsReserved(JobContractInterface $job)
294
    {
295
        $attempts = $job->attempts() + 1;
296
        $reserved_at = $this->currentTime();
297
298
        $this->getCollection()->updateOne(['_id' => new \MongoDB\BSON\ObjectID($job->getJobId())], [
299
            '$set' => [
300
                'attempts' => $attempts,
301
                'reserved' => 1,
302
                'reserved_at' => $reserved_at,
303
            ],
304
        ]);
305
    }
306
307
    /**
308
     * Push a raw payload to the mongo with a given delay.
309
     *
310
     * @param DateInterval|int $delay
311
     * @param string|null $queue
312
     * @param string $payload
313
     * @param int $attempts
314
     *
315
     * @return mixed
316
     */
317
    protected function pushToDatabase($delay, $queue, $payload, $attempts = 0)
318
    {
319
        $attributes = $this->buildDatabaseRecord($this->getQueue($queue), $payload, $this->getAvailableAt($delay), $attempts);
0 ignored issues
show
Bug introduced by
It seems like $delay defined by parameter $delay on line 317 can also be of type object<DateInterval>; however, SfCod\QueueBundle\Queue\...Queue::getAvailableAt() does only seem to accept integer, maybe add an additional type check?

This check looks at variables that have been passed in as parameters and are passed out again to other methods.

If the outgoing method call has stricter type requirements than the method itself, an issue is raised.

An additional type check may prevent trouble.

Loading history...
320
321
        return $this->getCollection()->insertOne($attributes);
322
    }
323
324
    /**
325
     * Get the "available at" UNIX timestamp.
326
     *
327
     * @param DateInterval|int $delay
328
     *
329
     * @return int
330
     */
331
    protected function getAvailableAt($delay = 0)
332
    {
333
        return $delay instanceof DateInterval
334
            ? (new DateTime())->add($delay)->getTimestamp()
335
            : $this->currentTime() + $delay;
336
    }
337
338
    /**
339
     * Get the queue or return the default.
340
     *
341
     * @param string|null $queue
342
     *
343
     * @return string
344
     */
345
    protected function getQueue($queue)
346
    {
347
        return $queue ?: $this->queue;
348
    }
349
350
    /**
351
     * Get the next available job for the queue.
352
     *
353
     * @param string|null $queue
354
     *
355
     * @return null|JobContractInterface
356
     */
357
    protected function getNextAvailableJob($queue)
358
    {
359
        $job = $this->getCollection()
360
            ->findOne([
361
                'queue' => $this->getQueue($queue),
362
                '$or' => [
363
                    $this->isAvailable(),
364
                    $this->isReservedButExpired(),
365
                ],
366
            ], [
367
                'sort' => ['_id' => 1],
368
            ]);
369
370
        return $job ? new JobContract($this->resolver, $this, $this->buildJob($job)) : null;
371
    }
372
373
    /**
374
     * Create an array to insert for the given job.
375
     *
376
     * @param string|null $queue
377
     * @param string $payload
378
     * @param int $availableAt
379
     * @param int $attempts
380
     *
381
     * @return array
382
     */
383
    protected function buildDatabaseRecord($queue, $payload, $availableAt, $attempts = 0)
384
    {
385
        return [
386
            'queue' => $queue,
387
            'payload' => $payload,
388
            'attempts' => $attempts,
389
            'reserved' => 0,
390
            'reserved_at' => null,
391
            'available_at' => $availableAt,
392
            'created_at' => $this->currentTime(),
393
        ];
394
    }
395
396
    /**
397
     * Get available jobs
398
     *
399
     * @return array
400
     */
401
    protected function isAvailable()
402
    {
403
        return [
404
            'reserved_at' => null,
405
            'available_at' => ['$lte' => $this->currentTime()],
406
        ];
407
    }
408
409
    /**
410
     * Get reserved but expired by time jobs
411
     *
412
     * @return array
413
     */
414
    protected function isReservedButExpired()
415
    {
416
        return [
417
            'reserved_at' => ['$lte' => $this->currentTime() - $this->expire],
418
        ];
419
    }
420
421
    /**
422
     * Get queue collection
423
     *
424
     * @return Collection Mongo collection instance
425
     */
426
    protected function getCollection(): Collection
427
    {
428
        return $this->mongo->getDatabase()->selectCollection($this->collection);
429
    }
430
431
    /**
432
     * Build job from database record
433
     *
434
     * @param $data
435
     *
436
     * @return Job
437
     */
438
    protected function buildJob($data): Job
439
    {
440
        $job = new Job();
441
        $job->setId($data->_id);
442
        $job->setAttempts($data->attempts);
443
        $job->setQueue($data->queue);
444
        $job->setReserved($data->reserved);
445
        $job->setReservedAt($data->reserved_at);
446
        $job->setPayload(json_decode($data->payload, true));
447
448
        return $job;
449
    }
450
}
451