RedisQueueStoreAdapter::removeExpiredJobs()   A
last analyzed

Complexity

Conditions 1
Paths 1

Size

Total Lines 4
Code Lines 2

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 0
CRAP Score 2

Importance

Changes 1
Bugs 0 Features 1
Metric Value
cc 1
eloc 2
c 1
b 0
f 1
nc 1
nop 3
dl 0
loc 4
ccs 0
cts 2
cp 0
crap 2
rs 10
1
<?php
2
3
namespace Da\Mailer\Queue\Backend\Redis;
4
5
use Da\Mailer\Exception\InvalidCallException;
6
use Da\Mailer\Queue\Backend\MailJobInterface;
7
use Da\Mailer\Queue\Backend\QueueStoreAdapterInterface;
8
use phpseclib3\Crypt\Random;
9
10
class RedisQueueStoreAdapter implements QueueStoreAdapterInterface
11
{
12
    /**
13
     * @var int
14
     */
15
    private $expireTime;
16
/**
17
     * @var string
18
     */
19
    private $queueName;
20
/**
21
     * @var RedisQueueStoreConnection
22
     */
23
    protected $connection;
24
/**
25
     * RedisQueueStoreAdapter constructor.
26
     *
27
     * @param RedisQueueStoreConnection $connection
28
     * @param string $queueName
29
     * @param int $expireTime
30
     */
31 5
    public function __construct(RedisQueueStoreConnection $connection, $queueName = 'mail_queue', $expireTime = 60)
32
    {
33 5
        $this->expireTime = $expireTime;
34 5
        $this->connection = $connection;
35 5
        $this->queueName = $queueName;
36 5
        $this->init();
37 5
    }
38
39
    /**
40
     * @return RedisQueueStoreAdapter
41
     */
42 5
    public function init()
43
    {
44 5
        $this->getConnection()
45 5
            ->connect();
46
        return $this;
47 5
    }
48
49
    /**
50
     * @return RedisQueueStoreConnection
51
     */
52
    public function getConnection()
53 5
    {
54
        return $this->connection;
55 5
    }
56
57
    /**
58
     * @param RedisMailJob|MailJobInterface $mailJob
59
     *
60
     * @return int
61
     */
62
    public function enqueue(MailJobInterface $mailJob)
63 4
    {
64
        $timestamp = $mailJob->getTimeToSend();
0 ignored issues
show
Bug introduced by
The method getTimeToSend() does not exist on Da\Mailer\Queue\Backend\MailJobInterface. It seems like you code against a sub-type of Da\Mailer\Queue\Backend\MailJobInterface such as Da\Mailer\Queue\Backend\Pdo\PdoMailJob or Da\Mailer\Queue\Backend\...talkd\BeanstalkdMailJob or Da\Mailer\Queue\Backend\Redis\RedisMailJob. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-call  annotation

64
        /** @scrutinizer ignore-call */ 
65
        $timestamp = $mailJob->getTimeToSend();
Loading history...
65 4
        $payload = $this->createPayload($mailJob);
66 4
        return $timestamp !== null && $timestamp > time()
67
            ? $this->getConnection()->getInstance()->zadd($this->queueName . ':delayed', $timestamp, $payload)
68 4
            : $this->getConnection()->getInstance()->rpush($this->queueName, $payload);
69 4
    }
70 4
71
    /**
72
     * @return RedisMailJob|null
73
     */
74
    public function dequeue()
75
    {
76 4
        $this->migrateExpiredJobs();
77
        $job = $this->getConnection()->getInstance()->lpop($this->queueName);
78 4
        if ($job !== null) {
79
            $this->getConnection()
80 4
                ->getInstance()
81
                ->zadd($this->queueName . ':reserved', time() + $this->expireTime, $job);
82 4
            $data = json_decode($job, true);
83 4
            return new RedisMailJob([
84 4
                    'id' => $data['id'],
85 4
                    'attempt' => $data['attempt'],
86
                    'message' => $data['message'],
87 4
                ]);
88
        }
89 4
90
        return null;
91 4
    }
92 4
93 4
    /**
94
     * @param RedisMailJob|MailJobInterface $mailJob
95 4
     */
96
    public function ack(MailJobInterface $mailJob)
97
    {
98 1
        if ($mailJob->isNewRecord()) {
99
            throw new InvalidCallException('RedisMailJob cannot be a new object to be acknowledged');
100
        }
101
102
        $this->removeReserved($mailJob);
103
        if (!$mailJob->isCompleted()) {
104 5
            if ($mailJob->getTimeToSend() === null || $mailJob->getTimeToSend() < time()) {
105
                $mailJob->setTimeToSend(time() + $this->expireTime);
0 ignored issues
show
Bug introduced by
The method setTimeToSend() does not exist on Da\Mailer\Queue\Backend\MailJobInterface. It seems like you code against a sub-type of Da\Mailer\Queue\Backend\MailJobInterface such as Da\Mailer\Queue\Backend\Pdo\PdoMailJob or Da\Mailer\Queue\Backend\...talkd\BeanstalkdMailJob or Da\Mailer\Queue\Backend\Redis\RedisMailJob. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-call  annotation

105
                $mailJob->/** @scrutinizer ignore-call */ 
106
                          setTimeToSend(time() + $this->expireTime);
Loading history...
106 5
            }
107 1
            $this->enqueue($mailJob);
108
        }
109
    }
110 4
111
    /**
112 4
     * @param MailJobInterface $mailJob
113 1
     */
114 1
    public function removeReserved(MailJobInterface $mailJob)
115 1
    {
116 1
        $payload = $this->createPayload($mailJob);
117 1
        $this->getConnection()->getInstance()->zrem($this->queueName . ':reserved', $payload);
118 4
    }
119
120
    /**
121
     * {@inheritdoc}
122
     */
123 4
    public function isEmpty()
124
    {
125 4
        return $this->getConnection()->getInstance()->llen($this->queueName) === 0;
126 4
    }
127 4
128
    /**
129
     * @param RedisMailJob|MailJobInterface $mailJob
130
     *
131
     * @return string
132 4
     */
133
    protected function createPayload(MailJobInterface $mailJob)
134 4
    {
135
        return json_encode([
136
                'id' => $mailJob->isNewRecord() ? sha1(Random::string(32)) : $mailJob->getId(),
0 ignored issues
show
Bug introduced by
The method getId() does not exist on Da\Mailer\Queue\Backend\MailJobInterface. Since it exists in all sub-types, consider adding an abstract or default implementation to Da\Mailer\Queue\Backend\MailJobInterface. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-call  annotation

136
                'id' => $mailJob->isNewRecord() ? sha1(Random::string(32)) : $mailJob->/** @scrutinizer ignore-call */ getId(),
Loading history...
137
                'attempt' => $mailJob->getAttempt(),
0 ignored issues
show
Bug introduced by
The method getAttempt() does not exist on Da\Mailer\Queue\Backend\MailJobInterface. Since it exists in all sub-types, consider adding an abstract or default implementation to Da\Mailer\Queue\Backend\MailJobInterface. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-call  annotation

137
                'attempt' => $mailJob->/** @scrutinizer ignore-call */ getAttempt(),
Loading history...
138
                'message' => $mailJob->getMessage(),
139
            ]);
140
    }
141
142 4
    /**
143
     * Migrates all expired jobs from delayed and reserved queues to the main queue to be processed.
144 4
     */
145
    protected function migrateExpiredJobs()
146 4
    {
147 4
        $this->migrateJobs($this->queueName . ':delayed', $this->queueName);
148 4
        $this->migrateJobs($this->queueName . ':reserved', $this->queueName);
149
    }
150 4
151
    /**
152
     * Migrates expired jobs from one queue to another.
153
     *
154
     * @param string $from the name of the source queue
155
     * @param string $to the name of the target queue
156 4
     */
157
    protected function migrateJobs($from, $to)
158 4
    {
159 4
        $options = ['cas' => true, 'watch' => $from, 'retry' => 10];
160 4
        $this->getConnection()->getInstance()->transaction($options, function ($transaction) use ($from, $to) {
161
162
                $time = time();
163
        // First we need to get all of jobs that have expired based on the current time
164
                // so that we can push them onto the main queue. After we get them we simply
165
                // remove them from this "delay" queues. All of this within a transaction.
166
                $jobs = $this->getExpiredJobs($transaction, $from, $time);
167
        // If we actually found any jobs, we will remove them from the old queue and we
168 4
                // will insert them onto the new (ready) "queue". This means they will stand
169
                // ready to be processed by the queue worker whenever their turn comes up.
170 4
            if (count($jobs) > 0) {
171
                $this->removeExpiredJobs($transaction, $from, $time);
172 4
                $this->pushExpiredJobsOntoNewQueue($transaction, $to, $jobs);
173 4
            }
174
        });
175
    }
176
177
    /**
178
     * Get the expired jobs from a given queue.
179
     *
180
     * @param  \Predis\Transaction\MultiExec $transaction
181
     * @param  string $from
182
     * @param  int $time
183
     *
184
     * @return array
185
     */
186
    protected function getExpiredJobs($transaction, $from, $time)
187
    {
188 4
        return $transaction->zrangebyscore($from, '-inf', $time);
0 ignored issues
show
Bug Best Practice introduced by
The expression return $transaction->zra...e($from, '-inf', $time) also could return the type Predis\Transaction\MultiExec which is incompatible with the documented return type array.
Loading history...
189 4
    }
190
191
    /**
192
     * Remove the expired jobs from a given queue.
193
     *
194
     * @param  \Predis\Transaction\MultiExec $transaction
195
     * @param  string $from
196
     * @param  int $time
197
     *
198
     */
199
    protected function removeExpiredJobs($transaction, $from, $time)
200
    {
201
        $transaction->multi();
202
        $transaction->zremrangebyscore($from, '-inf', $time);
203
    }
204
205
    /**
206
     * Push all of the given jobs onto another queue.
207
     *
208
     * @param  \Predis\Transaction\MultiExec $transaction
209
     * @param  string $to
210
     * @param  array $jobs
211
     *
212
     */
213
    protected function pushExpiredJobsOntoNewQueue($transaction, $to, $jobs)
214
    {
215
        call_user_func_array([$transaction, 'rpush'], array_merge([$to], $jobs));
216
    }
217
}
218