MongoThreadQueue::isAvailable()   A
last analyzed

Complexity

Conditions 1
Paths 1

Size

Total Lines 7
Code Lines 4

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
dl 0
loc 7
rs 9.4285
c 0
b 0
f 0
cc 1
eloc 4
nc 1
nop 0
1
<?php
2
3
namespace yiicod\jobqueue\queues;
4
5
use Carbon\Carbon;
6
use DateTime;
7
use Illuminate\Contracts\Queue\Queue as QueueContract;
8
use Illuminate\Queue\Queue;
9
use yii\mongodb\Connection;
10
use yiicod\jobqueue\jobs\MongoJob;
11
12
/**
13
 * Class MongoThreadQueue
14
 *
15
 * @package yiicod\jobqueue\queues
16
 */
17
class MongoThreadQueue extends Queue implements QueueContract
18
{
19
    /**
20
     * @var int
21
     */
22
    protected $limit = 15;
23
24
    /**
25
     * Create a new database queue instance.
26
     *
27
     * @param Connection $database
28
     * @param string $table
29
     * @param string $default
30
     * @param int $expire
31
     * @param int $limit
32
     */
33
    public function __construct(Connection $database, $table, $default = 'default', $expire = 60, $limit = 15)
34
    {
35
        $this->table = $table;
0 ignored issues
show
Bug introduced by
The property table does not exist. Did you maybe forget to declare it?

In PHP it is possible to write to properties without declaring them. For example, the following is perfectly valid PHP code:

class MyClass { }

$x = new MyClass();
$x->foo = true;

Generally, it is a good practice to explictly declare properties to avoid accidental typos and provide IDE auto-completion:

class MyClass {
    public $foo;
}

$x = new MyClass();
$x->foo = true;
Loading history...
36
        $this->expire = $expire;
0 ignored issues
show
Bug introduced by
The property expire does not exist. Did you maybe forget to declare it?

In PHP it is possible to write to properties without declaring them. For example, the following is perfectly valid PHP code:

class MyClass { }

$x = new MyClass();
$x->foo = true;

Generally, it is a good practice to explictly declare properties to avoid accidental typos and provide IDE auto-completion:

class MyClass {
    public $foo;
}

$x = new MyClass();
$x->foo = true;
Loading history...
37
        $this->default = $default;
0 ignored issues
show
Bug introduced by
The property default does not exist. Did you maybe forget to declare it?

In PHP it is possible to write to properties without declaring them. For example, the following is perfectly valid PHP code:

class MyClass { }

$x = new MyClass();
$x->foo = true;

Generally, it is a good practice to explictly declare properties to avoid accidental typos and provide IDE auto-completion:

class MyClass {
    public $foo;
}

$x = new MyClass();
$x->foo = true;
Loading history...
38
        $this->database = $database;
0 ignored issues
show
Bug introduced by
The property database does not exist. Did you maybe forget to declare it?

In PHP it is possible to write to properties without declaring them. For example, the following is perfectly valid PHP code:

class MyClass { }

$x = new MyClass();
$x->foo = true;

Generally, it is a good practice to explictly declare properties to avoid accidental typos and provide IDE auto-completion:

class MyClass {
    public $foo;
}

$x = new MyClass();
$x->foo = true;
Loading history...
39
        $this->limit = $limit;
40
    }
41
42
    /**
43
     * Check if can run process depend on limits
44
     *
45
     * @param MongoJob $job
46
     *
47
     * @return bool
48
     */
49
    public function canRunJob(MongoJob $job)
50
    {
51
        if ($job->getQueue()) {
52
            return $this->getCollection()->count([
53
                    'reserved' => 1,
54
                    'queue' => $job->getQueue(),
55
                ]) < $this->limit || $job->reserved();
56
        }
57
58
        return $this->getCollection()->count(['reserved' => 1]) < $this->limit || $job->reserved();
59
    }
60
61
    /**
62
     * Get the next available job for the queue.
63
     *
64
     * @param $id
65
     *
66
     * @return null|MongoJob
67
     */
68
    public function getJobById($id)
69
    {
70
        $job = $this->getCollection()->findOne(['_id' => new \MongoDB\BSON\ObjectID($id)]);
71
72
        if (is_null($job)) {
73
            return $job;
74
        } else {
75
            $job = (object)$job;
76
77
            return new MongoJob($this->container, $this, $job, $job->queue);
78
        }
79
    }
80
81
    /**
82
     * Push a new job onto the queue.
83
     *
84
     * @param  string $job
85
     * @param  mixed $data
86
     * @param  string $queue
87
     *
88
     * @return mixed
89
     */
90
    public function push($job, $data = '', $queue = null)
91
    {
92
        return $this->pushToDatabase(0, $queue, $this->createPayload($job, $data));
93
    }
94
95
    /**
96
     * Push a new job onto the queue.
97
     *
98
     * @param  string $job
99
     * @param  mixed $data
100
     * @param  string $queue
101
     *
102
     * @return mixed
103
     */
104
    public function exists($job, $data = '', $queue = null)
105
    {
106
        return null !== $this->getCollection()->findOne([
107
                'queue' => $queue,
108
                'payload' => $this->createPayload($job, $data),
109
            ]);
110
    }
111
112
    /**
113
     * Push a raw payload onto the queue.
114
     *
115
     * @param  string $payload
116
     * @param  string $queue
117
     * @param  array $options
118
     *
119
     * @return mixed
120
     */
121
    public function pushRaw($payload, $queue = null, array $options = [])
122
    {
123
        return $this->pushToDatabase(0, $queue, $payload);
124
    }
125
126
    /**
127
     * Push a new job onto the queue after a delay.
128
     *
129
     * @param  DateTime|int $delay
130
     * @param  string $job
131
     * @param  mixed $data
132
     * @param  string $queue
133
     *
134
     * @return mixed
135
     */
136
    public function later($delay, $job, $data = '', $queue = null)
137
    {
138
        return $this->pushToDatabase($delay, $queue, $this->createPayload($job, $data));
139
    }
140
141
    /**
142
     * Push an array of jobs onto the queue.
143
     *
144
     * @param  array $jobs
145
     * @param  mixed $data
146
     * @param  string $queue
147
     *
148
     * @return mixed
149
     */
150
    public function bulk($jobs, $data = '', $queue = null)
151
    {
152
        $queue = $this->getQueue($queue);
153
154
        $availableAt = $this->getAvailableAt(0);
155
156
        $records = array_map(function ($job) use ($queue, $data, $availableAt) {
157
            return $this->buildDatabaseRecord($queue, $this->createPayload($job, $data), $availableAt);
158
        }, (array)$jobs);
159
160
        return $this->getCollection()->insert($records);
161
    }
162
163
    /**
164
     * Release a reserved job back onto the queue.
165
     *
166
     * @param  string $queue
167
     * @param  \StdClass $job
168
     * @param  int $delay
169
     *
170
     * @return mixed
171
     */
172
    public function release($queue, $job, $delay)
173
    {
174
        return $this->pushToDatabase($delay, $queue, $job->payload, $job->attempts);
175
    }
176
177
    /**
178
     * Push a raw payload to the database with a given delay.
179
     *
180
     * @param  DateTime|int $delay
181
     * @param  string|null $queue
182
     * @param  string $payload
183
     * @param  int $attempts
184
     *
185
     * @return mixed
186
     */
187
    protected function pushToDatabase($delay, $queue, $payload, $attempts = 0)
188
    {
189
        $attributes = $this->buildDatabaseRecord($this->getQueue($queue), $payload, $this->getAvailableAt($delay), $attempts);
190
191
        return $this->getCollection()->insert($attributes);
192
    }
193
194
    /**
195
     * Pop the next job off of the queue.
196
     *
197
     * @param  string $queue
198
     *
199
     * @return Job|null
200
     */
201
    public function pop($queue = null)
202
    {
203
        $queue = $this->getQueue($queue);
204
205
        if ($job = $this->getNextAvailableJob($queue)) {
206
            // Worker does it
207
            //$this->markJobAsReserved($job);
208
209
            return $job; //new MongoJob($this->container, $this, $job->getJob(), $queue);
210
        }
211
    }
212
213
    /**
214
     * Get the next available job for the queue.
215
     *
216
     * @param  string|null $queue
217
     *
218
     * @return null|MongoJob
219
     */
220
    protected function getNextAvailableJob($queue)
221
    {
222
        $job = $this->getCollection()
223
            ->findOne([
224
                'queue' => $this->getQueue($queue),
225
                '$or' => [
226
                    $this->isAvailable(),
227
                    $this->isReservedButExpired(),
228
                ],
229
            ], [], [
230
                'sort' => ['_id' => 1],
231
            ]);
232
233
        return $job ? new MongoJob($this->container, $this, (object)$job, ((object)$job)->queue) : null;
234
    }
235
236
    /**
237
     * Get available jobs
238
     *
239
     * @return array
240
     */
241
    protected function isAvailable()
242
    {
243
        return [
244
            'reserved_at' => null,
245
            'available_at' => ['$lte' => $this->currentTime()],
246
        ];
247
    }
248
249
    /**
250
     * Get reserved but expired by time jobs
251
     *
252
     * @return array
253
     */
254
    protected function isReservedButExpired()
255
    {
256
        return [
257
            'reserved_at' => ['$lte' => Carbon::now()->subSeconds($this->expire)->getTimestamp()],
258
        ];
259
    }
260
261
    /**
262
     * Mark the given job ID as reserved.
263
     *
264
     * @param MongoJob $job
265
     */
266
    public function markJobAsReserved($job)
267
    {
268
        $attempts = $job->attempts() + 1;
269
        $reserved_at = $this->currentTime();
270
271
        $this->getCollection()->update(['_id' => new \MongoDB\BSON\ObjectID($job->getJobId())], [
272
            '$set' => [
273
                'attempts' => $attempts,
274
                'reserved' => 1,
275
                'reserved_at' => $reserved_at,
276
            ],
277
        ]);
278
    }
279
280
    /**
281
     * Delete a reserved job from the queue.
282
     *
283
     * @param  string $queue
284
     * @param  string $id
285
     */
286
    public function deleteReserved($queue, $id)
0 ignored issues
show
Unused Code introduced by
The parameter $queue is not used and could be removed.

This check looks from parameters that have been defined for a function or method, but which are not used in the method body.

Loading history...
287
    {
288
        return $this->getCollection()->remove(['_id' => new \MongoDB\BSON\ObjectID($id)]);
289
    }
290
291
    /**
292
     * Get the "available at" UNIX timestamp.
293
     *
294
     * @param  DateTime|int $delay
295
     *
296
     * @return int
297
     */
298
    protected function getAvailableAt($delay)
299
    {
300
        $availableAt = $delay instanceof DateTime ? $delay : Carbon::now()->addSeconds($delay);
301
302
        return $availableAt->getTimestamp();
303
    }
304
305
    /**
306
     * Create an array to insert for the given job.
307
     *
308
     * @param  string|null $queue
309
     * @param  string $payload
310
     * @param  int $availableAt
311
     * @param  int $attempts
312
     *
313
     * @return array
314
     */
315
    protected function buildDatabaseRecord($queue, $payload, $availableAt, $attempts = 0)
316
    {
317
        return [
318
            'queue' => $queue,
319
            'payload' => $payload,
320
            'attempts' => $attempts,
321
            'reserved' => 0,
322
            'reserved_at' => null,
323
            'available_at' => $availableAt,
324
            'created_at' => $this->currentTime(),
325
        ];
326
    }
327
328
    /**
329
     * Get the queue or return the default.
330
     *
331
     * @param  string|null $queue
332
     *
333
     * @return string
334
     */
335
    protected function getQueue($queue)
336
    {
337
        return $queue ?: $this->default;
338
    }
339
340
    /**
341
     * Get the expiration time in seconds.
342
     *
343
     * @return int|null
344
     */
345
    public function getExpire()
346
    {
347
        return $this->expire;
348
    }
349
350
    /**
351
     * Set the expiration time in seconds.
352
     *
353
     * @param  int|null $seconds
354
     */
355
    public function setExpire($seconds)
356
    {
357
        $this->expire = $seconds;
358
    }
359
360
    /**
361
     * Get the size of the queue.
362
     *
363
     * @param  string $queue
364
     *
365
     * @return int
366
     */
367
    public function size($queue = null)
368
    {
369
        $this->getCollection()->count();
370
    }
371
372
    /**
373
     * Get queue table
374
     *
375
     * @return Collection Mongo collection instance
376
     */
377
    protected function getCollection()
378
    {
379
        return $this->database->getDatabase()->getCollection($this->table);
380
    }
381
}
382