Completed
Push — master ( 959f68...c6517b )
by Alexey
29s queued 12s
created

RedisQueue::markJobAsReserved()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 9

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
dl 0
loc 9
rs 9.9666
c 0
b 0
f 0
cc 1
nc 1
nop 1
1
<?php
2
3
namespace SfCod\QueueBundle\Queue;
4
5
use DateInterval;
6
use DateTime;
7
use Predis\Client;
8
use Predis\Collection\Iterator\HashKey;
9
use SfCod\QueueBundle\Base\JobResolverInterface;
10
use SfCod\QueueBundle\Base\RandomizeTrait;
11
use SfCod\QueueBundle\Entity\Job;
12
use SfCod\QueueBundle\Job\JobContract;
13
use SfCod\QueueBundle\Job\JobContractInterface;
14
use SfCod\QueueBundle\Service\RedisDriver;
15
16
/**
17
 * Class RedisQueue
18
 *
19
 * @author Virchenko Maksim <[email protected]>
20
 *
21
 * @package SfCod\QueueBundle\Queue
22
 */
23
class RedisQueue extends Queue
24
{
25
    use RandomizeTrait;
26
27
    /**
28
     * Job resolver
29
     *
30
     * @var JobResolverInterface
31
     */
32
    private $resolver;
33
34
    /**
35
     * @var RedisDriver
36
     */
37
    private $redis;
38
39
    /**
40
     * The collection that holds the jobs.
41
     *
42
     * @var string
43
     */
44
    private $collection;
45
46
    /**
47
     * The name of the default queue.
48
     *
49
     * @var string
50
     */
51
    private $queue = 'default';
52
53
    /**
54
     * The expiration time of a job.
55
     *
56
     * @var int|null
57
     */
58
    private $expire = 60;
59
60
    /**
61
     * @var int
62
     */
63
    private $limit = 15;
64
65
    /**
66
     * Create a new redis queue instance.
67
     *
68
     * @param JobResolverInterface $resolver
69
     * @param RedisDriver $redis
70
     * @param string $collection
71
     * @param string $queue
72
     * @param int $expire
73
     * @param int $limit
74
     */
75
    public function __construct(
76
        JobResolverInterface $resolver,
77
        RedisDriver $redis,
78
        string $collection = 'queue_jobs',
79
        string $queue = 'default',
80
        int $expire = 60,
81
        int $limit = 15
82
    ) {
83
        $this->resolver = $resolver;
84
        $this->redis = $redis;
85
        $this->collection = $collection;
86
        $this->expire = $expire;
87
        $this->queue = $queue;
88
        $this->limit = $limit;
89
    }
90
91
    /**
92
     * Get the size of the queue.
93
     *
94
     * @param string|null $queue
95
     *
96
     * @return int
97
     */
98
    public function size(?string $queue = null): int
99
    {
100
        return (int)$this->getClient()->zcount($this->buildKey($queue), '-inf', '+inf');
101
    }
102
103
    /**
104
     * Push a new job onto the queue.
105
     *
106
     * @param string $job
107
     * @param mixed $data
108
     * @param string|null $queue
109
     *
110
     * @return mixed
111
     *
112
     * @throws \Exception
113
     */
114
    public function push(string $job, array $data = [], ?string $queue = null)
115
    {
116
        return $this->pushRaw($this->createPayload($job, $data), $queue);
117
    }
118
119
    /**
120
     * Push a raw payload onto the queue.
121
     *
122
     * @param string $payload
123
     * @param string|null $queue
124
     * @param array $options
125
     *
126
     * @return mixed
127
     *
128
     * @throws \Exception
129
     */
130
    public function pushRaw(string $payload, ?string $queue = null, array $options = [])
131
    {
132
        return $this->pushToDatabase(0, $queue, $payload);
133
    }
134
135
    /**
136
     * Push a new job onto the queue after a delay.
137
     *
138
     * @param DateInterval|int $delay
139
     * @param string $job
140
     * @param array $data
141
     * @param string|null $queue
142
     *
143
     * @return mixed
144
     *
145
     * @throws \Exception
146
     */
147
    public function later($delay, string $job, array $data = [], ?string $queue = null)
148
    {
149
        return $this->pushToDatabase($delay, $queue, $this->createPayload($job, $data));
150
    }
151
152
    /**
153
     * Pop the next job off of the queue.
154
     *
155
     * @param string|null $queue
156
     *
157
     * @return JobContractInterface|null
158
     */
159
    public function pop(?string $queue = null): ?JobContractInterface
160
    {
161
        $id = $this->getClient()->zrangebyscore($this->buildKey($queue), 0, $this->currentTime(), ['LIMIT' => [0, 1]]);
162
163
        if (empty($id)) {
164
            return null;
165
        }
166
167
        if (is_array($id)) {
168
            $id = array_shift($id);
169
        }
170
171
        $job = $this->getJobById($queue, $id);
172
173
        if ($job->reserved() && $job->reservedAt() > ($this->currentTime() - $this->expire)) {
174
            return null;
175
        }
176
177
        return $job;
178
    }
179
180
    /**
181
     * Check if job exists in the queue.
182
     *
183
     * @param string $job
184
     * @param array $data
185
     * @param string|null $queue
186
     *
187
     * @return bool
188
     */
189
    public function exists(string $job, array $data = [], ?string $queue = null): bool
190
    {
191
        $cursor = new HashKey($this->getClient(), $this->buildKey($queue, 'payload'));
192
        $payload = $this->createPayload($job, $data);
193
194
        foreach ($cursor as $key => $value) {
195
            if ($value === $payload) {
196
                return true;
197
            }
198
        }
199
200
        return false;
201
    }
202
203
    /**
204
     * Check if can run process depend on limits
205
     *
206
     * @param JobContractInterface $job
207
     *
208
     * @return bool
209
     */
210
    public function canRunJob(JobContractInterface $job): bool
211
    {
212
        return $this->getClient()->zcount(
213
                $this->buildKey($job->getQueue(), 'reserved'),
214
                '-inf',
215
                '+inf'
216
            ) < $this->limit || $job->reserved();
217
    }
218
219
    /**
220
     * Get job by its id
221
     *
222
     * @param string $queue
223
     * @param string $id
224
     *
225
     * @return JobContractInterface|null
226
     */
227
    public function getJobById(string $queue, string $id): ?JobContractInterface
228
    {
229
        $job = $this->getClient()->hget($this->buildKey($queue, 'payload'), $id);
230
231
        if (!$job) {
232
            return null;
233
        } else {
234
            $reservedAt = $this->getClient()->zscore($this->buildKey($queue, 'reserved'), $id);
235
            $attempts = $this->getClient()->zscore($this->buildKey($queue, 'attempted'), $id);
236
237
            return new JobContract(
238
                $this->resolver,
239
                $this,
240
                $this->buildJob($id, $queue, $attempts ?? 0, json_decode($job, true), $reservedAt)
241
            );
242
        }
243
    }
244
245
    /**
246
     * Mark the given job ID as reserved.
247
     *
248
     * @param JobContractInterface $job
249
     *
250
     * @throws \Exception
251
     */
252
    public function markJobAsReserved(JobContractInterface $job)
253
    {
254
        $this->getClient()->pipeline(['atomic' => true])
255
            ->zadd($this->buildKey($job->getQueue(), 'reserved'), [
256
                $job->getJobId() => $this->currentTime(),
257
            ])
258
            ->zincrby($this->buildKey($job->getQueue(), 'attempted'), 1, $job->getJobId())
259
            ->execute();
260
    }
261
262
    /**
263
     * Delete a reserved job from the queue.
264
     *
265
     * @param string $queue
266
     * @param string $id
267
     *
268
     * @return bool
269
     *
270
     * @throws \Exception
271
     */
272
    public function deleteReserved(string $queue, string $id): bool
273
    {
274
        $this->getClient()->pipeline(['atomic' => true])
275
            ->hdel($this->buildKey($queue, 'payload'), [$id])
276
            ->zrem($this->buildKey($queue, 'reserved'), $id)
277
            ->zrem($this->buildKey($queue, 'attempted'), $id)
278
            ->zrem($this->buildKey($queue), $id)
279
            ->execute();
280
281
        return true;
282
    }
283
284
    /**
285
     * Release a reserved job back onto the queue.
286
     *
287
     * @param JobContractInterface $job
288
     * @param DateInterval|int $delay
289
     *
290
     * @return mixed
291
     *
292
     * @throws \Exception
293
     */
294
    public function release(JobContractInterface $job, $delay)
295
    {
296
        return $this->pushToDatabase($delay, $job->getQueue(), $job->getRawBody(), $job->attempts());
297
    }
298
299
    /**
300
     * Build collection:queue:postfix key
301
     *
302
     * @param string|null $queue
303
     * @param string|null $postfix
304
     *
305
     * @return string
306
     */
307
    private function buildKey(?string $queue = 'default', ?string $postfix = null)
308
    {
309
        return "$this->collection:$queue" . ($postfix ? ":$postfix" : '');
310
    }
311
312
    /**
313
     * Get the "available at" UNIX timestamp.
314
     *
315
     * @param DateInterval|int $delay
316
     *
317
     * @return int
318
     */
319
    private function getAvailableAt($delay = 0)
320
    {
321
        return $delay instanceof DateInterval
322
            ? (new DateTime())->add($delay)->getTimestamp()
323
            : $this->currentTime() + $delay;
324
    }
325
326
    /**
327
     * Push job to database
328
     *
329
     * @param DateInterval|int $delay
330
     * @param string|null $queue
331
     * @param string $payload
332
     * @param int $attempts
333
     *
334
     * @throws \Exception
335
     */
336
    private function pushToDatabase($delay, ?string $queue, string $payload, int $attempts = 0)
337
    {
338
        $id = $this->getRandomId();
339
340
        $pipeline = $this->getClient()->pipeline(['atomic' => true])
341
            ->hset(
342
                $this->buildKey($queue, 'payload'),
343
                $id,
344
                $payload
345
            )
346
            ->zadd($this->buildKey($queue), [
347
                $id => $this->getAvailableAt($delay),
0 ignored issues
show
Bug introduced by
It seems like $delay defined by parameter $delay on line 336 can also be of type object<DateInterval>; however, SfCod\QueueBundle\Queue\...Queue::getAvailableAt() does only seem to accept integer, maybe add an additional type check?

This check looks at variables that have been passed in as parameters and are passed out again to other methods.

If the outgoing method call has stricter type requirements than the method itself, an issue is raised.

An additional type check may prevent trouble.

Loading history...
348
            ]);
349
350
        if ($attempts > 0) {
351
            $pipeline->zadd($this->buildKey($queue, 'attempted'), [
352
                $id => $attempts,
353
            ]);
354
        }
355
356
        $pipeline->execute();
357
    }
358
359
    /**
360
     * Build job from database record
361
     *
362
     * @param string $id
363
     * @param string $queue
364
     * @param int $attempts
365
     * @param array $payload
366
     * @param int|null $reservedAt
367
     *
368
     * @return Job
369
     */
370
    private function buildJob(string $id, string $queue, int $attempts, array $payload, ?int $reservedAt = null): Job
371
    {
372
        $job = new Job();
373
        $job->setId($id);
374
        $job->setAttempts($attempts);
375
        $job->setQueue($queue);
376
        $job->setReserved((bool)$reservedAt);
377
        $job->setReservedAt($reservedAt);
378
        $job->setPayload($payload);
379
380
        return $job;
381
    }
382
383
    /**
384
     * Get redis client
385
     *
386
     * @return Client
387
     */
388
    private function getClient(): Client
389
    {
390
        return $this->redis->getClient();
391
    }
392
}
393