Completed
Push — master ( 71cc85...b00a6a )
by Alexey
37:06
created

Queue/MongoQueue.php (1 issue)

Upgrade to new PHP Analysis Engine

These results are based on our legacy PHP analysis, consider migrating to our new PHP analysis engine instead. Learn more

1
<?php
2
3
namespace SfCod\QueueBundle\Queue;
4
5
use Carbon\Carbon;
6
use DateTime;
7
use Exception;
8
use Illuminate\Container\Container;
9
use Illuminate\Queue\Queue;
10
use MongoDB\Collection;
11
use SfCod\QueueBundle\Base\Job;
12
use SfCod\QueueBundle\Job\MongoJob;
13
use SfCod\QueueBundle\Service\MongoDriverInterface;
14
use Symfony\Component\DependencyInjection\ContainerInterface;
15
16
/**
17
 * Class MongoQueue
18
 *
19
 * @author Alexey Orlov <[email protected]>
20
 *
21
 * @package yiiSfCod\jobqueue\queues
22
 */
23
class MongoQueue extends Queue
24
{
25
    /**
26
     * The mongo connection instance.
27
     *
28
     * @var \Illuminate\Database\Connection
29
     */
30
    protected $mongo;
31
32
    /**
33
     * The mongo collection that holds the jobs.
34
     *
35
     * @var string
36
     */
37
    protected $collection;
38
39
    /**
40
     * The name of the default queue.
41
     *
42
     * @var string
43
     */
44
    protected $queue;
45
46
    /**
47
     * The expiration time of a job.
48
     *
49
     * @var int|null
50
     */
51
    protected $expire = 60;
52
53
    /**
54
     * @var int
55
     */
56
    protected $limit = 15;
57
58
    /**
59
     * Create a new mongo queue instance.
60
     *
61
     * @param MongoDriverInterface $mongo
62
     * @param string $collection
63
     * @param string $queue
64
     * @param int $expire
65
     * @param int $limit
66
     */
67
    public function __construct(MongoDriverInterface $mongo,
68
                                string $collection,
69
                                string $queue = 'default',
70
                                int $expire = 60,
71
                                int $limit = 15
72
    ) {
73
        $this->collection = $collection;
74
        $this->expire = $expire;
75
        $this->queue = $queue;
76
        $this->mongo = $mongo;
0 ignored issues
show
Documentation Bug introduced by
It seems like $mongo of type object<SfCod\QueueBundle...e\MongoDriverInterface> is incompatible with the declared type object<Illuminate\Database\Connection> of property $mongo.

Our type inference engine has found an assignment to a property that is incompatible with the declared type of that property.

Either this assignment is in error or the assigned type should be added to the documentation/type hint for that property..

Loading history...
77
        $this->limit = $limit;
78
    }
79
80
    /**
81
     * Push a new job onto the queue.
82
     *
83
     * @param string $job
84
     * @param mixed $data
85
     * @param string $queue
86
     *
87
     * @return mixed
88
     */
89
    public function push($job, $data = '', $queue = null)
90
    {
91
        return $this->pushToDatabase(0, $queue, $this->createPayload($job, $data));
92
    }
93
94
    /**
95
     * Pop the next job off of the queue.
96
     *
97
     * @param string $queue
98
     *
99
     * @return null|Job
100
     */
101
    public function pop($queue = null)
102
    {
103
        $queue = $this->getQueue($queue);
104
105
        if ($job = $this->getNextAvailableJob($queue)) {
106
            return $job;
107
        }
108
109
        return null;
110
    }
111
112
    /**
113
     * Push a new job onto the queue.
114
     *
115
     * @param string $job
116
     * @param mixed $data
117
     * @param string $queue
118
     *
119
     * @return mixed
120
     */
121
    public function exists($job, $data = '', $queue = null)
122
    {
123
        return null !== $this->getCollection()->findOne([
124
                'queue' => $queue,
125
                'payload' => $this->createPayload($job, $data),
126
            ]);
127
    }
128
129
    /**
130
     * Push a raw payload onto the queue.
131
     *
132
     * @param string $payload
133
     * @param string $queue
134
     * @param array $options
135
     *
136
     * @return mixed
137
     */
138
    public function pushRaw($payload, $queue = null, array $options = [])
139
    {
140
        return $this->pushToDatabase(0, $queue, $payload);
141
    }
142
143
    /**
144
     * Push a new job onto the queue after a delay.
145
     *
146
     * @param DateTime|int $delay
147
     * @param string $job
148
     * @param mixed $data
149
     * @param string $queue
150
     *
151
     * @return mixed
152
     */
153
    public function later($delay, $job, $data = '', $queue = null)
154
    {
155
        return $this->pushToDatabase($delay, $queue, $this->createPayload($job, $data));
156
    }
157
158
    /**
159
     * Push an array of jobs onto the queue.
160
     *
161
     * @param array $jobs
162
     * @param mixed $data
163
     * @param string $queue
164
     *
165
     * @return mixed
166
     */
167
    public function bulk($jobs, $data = '', $queue = null)
168
    {
169
        $queue = $this->getQueue($queue);
170
171
        $availableAt = $this->getAvailableAt(0);
172
173
        $records = array_map(function ($job) use ($queue, $data, $availableAt) {
174
            return $this->buildDatabaseRecord($queue, $this->createPayload($job, $data), $availableAt);
175
        }, (array)$jobs);
176
177
        return $this->getCollection()->insertOne($records);
178
    }
179
180
    /**
181
     * Release a reserved job back onto the queue.
182
     *
183
     * @param string $queue
184
     * @param \StdClass $job
185
     * @param int $delay
186
     *
187
     * @return mixed
188
     */
189
    public function release($queue, $job, $delay)
190
    {
191
        return $this->pushToDatabase($delay, $queue, $job->payload, $job->attempts);
192
    }
193
194
    /**
195
     * Get the next available job for the queue.
196
     *
197
     * @param $id
198
     *
199
     * @return null|Job
200
     */
201
    public function getJobById($id)
202
    {
203
        $job = $this->getCollection()->findOne(['_id' => new \MongoDB\BSON\ObjectID($id)]);
204
205
        if (is_null($job)) {
206
            return null;
207
        } else {
208
            $job = (object)$job;
209
210
            return new MongoJob($this->container, $this, $job, $job->queue);
211
        }
212
    }
213
214
    /**
215
     * Delete a reserved job from the queue.
216
     *
217
     * @param string $queue
218
     * @param string $id
219
     *
220
     * @return int
221
     */
222
    public function deleteReserved($queue, $id): int
223
    {
224
        $query = [
225
            '_id' => new \MongoDB\BSON\ObjectID($id),
226
            'queue' => $queue,
227
        ];
228
229
        return $this->getCollection()->deleteOne($query)->getDeletedCount();
230
    }
231
232
    /**
233
     * Get the expiration time in seconds.
234
     *
235
     * @return int|null
236
     */
237
    public function getExpire()
238
    {
239
        return $this->expire;
240
    }
241
242
    /**
243
     * Set the expiration time in seconds.
244
     *
245
     * @param int|null $seconds
246
     */
247
    public function setExpire($seconds)
248
    {
249
        $this->expire = $seconds;
250
    }
251
252
    /**
253
     * Get the size of the queue.
254
     *
255
     * @param string $queue
256
     *
257
     * @return int
258
     */
259
    public function size($queue = null)
260
    {
261
        if ($queue) {
262
            return $this->getCollection()->count(['queue' => $queue]);
263
        }
264
265
        return $this->getCollection()->count();
266
    }
267
268
    /**
269
     * Check if can run process depend on limits
270
     *
271
     * @param Job $job
272
     *
273
     * @return bool
274
     */
275
    public function canRunJob(Job $job)
276
    {
277
        if ($job->getQueue()) {
278
            return $this->getCollection()->count([
279
                    'reserved' => 1,
280
                    'queue' => $job->getQueue(),
281
                ]) < $this->limit || $job->reserved();
282
        }
283
284
        return $this->getCollection()->count(['reserved' => 1]) < $this->limit || $job->reserved();
285
    }
286
287
    /**
288
     * Mark the given job ID as reserved.
289
     *
290
     * @param Job $job
291
     */
292
    public function markJobAsReserved($job)
293
    {
294
        $attempts = $job->attempts() + 1;
295
        $reserved_at = $this->currentTime();
296
297
        $this->getCollection()->updateOne(['_id' => new \MongoDB\BSON\ObjectID($job->getJobId())], [
298
            '$set' => [
299
                'attempts' => $attempts,
300
                'reserved' => 1,
301
                'reserved_at' => $reserved_at,
302
            ],
303
        ]);
304
    }
305
306
    /**
307
     * Push a raw payload to the mongo with a given delay.
308
     *
309
     * @param DateTime|int $delay
310
     * @param string|null $queue
311
     * @param string $payload
312
     * @param int $attempts
313
     *
314
     * @return mixed
315
     */
316
    protected function pushToDatabase($delay, $queue, $payload, $attempts = 0)
317
    {
318
        $attributes = $this->buildDatabaseRecord($this->getQueue($queue), $payload, $this->getAvailableAt($delay), $attempts);
319
320
        return $this->getCollection()->insertOne($attributes);
321
    }
322
323
    /**
324
     * Get the "available at" UNIX timestamp.
325
     *
326
     * @param DateTime|int $delay
327
     *
328
     * @return int
329
     */
330
    protected function getAvailableAt($delay)
331
    {
332
        $availableAt = $delay instanceof DateTime ? $delay : Carbon::now()->addSeconds($delay);
333
334
        return $availableAt->getTimestamp();
335
    }
336
337
    /**
338
     * Get the queue or return the default.
339
     *
340
     * @param string|null $queue
341
     *
342
     * @return string
343
     */
344
    protected function getQueue($queue)
345
    {
346
        return $queue ?: $this->queue;
347
    }
348
349
    /**
350
     * Get the next available job for the queue.
351
     *
352
     * @param string|null $queue
353
     *
354
     * @return null|Job
355
     */
356
    protected function getNextAvailableJob($queue)
357
    {
358
        $job = $this->getCollection()
359
            ->findOne([
360
                'queue' => $this->getQueue($queue),
361
                '$or' => [
362
                    $this->isAvailable(),
363
                    $this->isReservedButExpired(),
364
                ],
365
            ], [
366
                'sort' => ['_id' => 1],
367
            ]);
368
369
        return $job ? new MongoJob($this->container, $this, (object)$job, ((object)$job)->queue) : null;
370
    }
371
372
    /**
373
     * Create an array to insert for the given job.
374
     *
375
     * @param string|null $queue
376
     * @param string $payload
377
     * @param int $availableAt
378
     * @param int $attempts
379
     *
380
     * @return array
381
     */
382
    protected function buildDatabaseRecord($queue, $payload, $availableAt, $attempts = 0)
383
    {
384
        return [
385
            'queue' => $queue,
386
            'payload' => $payload,
387
            'attempts' => $attempts,
388
            'reserved' => 0,
389
            'reserved_at' => null,
390
            'available_at' => $availableAt,
391
            'created_at' => $this->currentTime(),
392
        ];
393
    }
394
395
    /**
396
     * Get available jobs
397
     *
398
     * @return array
399
     */
400
    protected function isAvailable()
401
    {
402
        return [
403
            'reserved_at' => null,
404
            'available_at' => ['$lte' => $this->currentTime()],
405
        ];
406
    }
407
408
    /**
409
     * Get reserved but expired by time jobs
410
     *
411
     * @return array
412
     */
413
    protected function isReservedButExpired()
414
    {
415
        return [
416
            'reserved_at' => ['$lte' => Carbon::now()->subSeconds($this->expire)->getTimestamp()],
417
        ];
418
    }
419
420
    /**
421
     * Get queue collection
422
     *
423
     * @return Collection Mongo collection instance
424
     */
425
    protected function getCollection(): Collection
426
    {
427
        return $this->mongo->getDatabase()->selectCollection($this->collection);
428
    }
429
430
    /**
431
     * @param ContainerInterface $container
432
     */
433
    public function putContainer(ContainerInterface $container)
434
    {
435
        $this->container = $container;
436
    }
437
438
    /**
439
     * @param Container $container
440
     *
441
     * @throws Exception
442
     */
443
    public function setContainer(Container $container)
444
    {
445
        // Nothing
446
    }
447
}
448