Completed
Push — master ( 9eae13...e56856 )
by Antonio
04:26
created

RedisQueueStoreAdapter::ack()   B

Complexity

Conditions 5
Paths 4

Size

Total Lines 15
Code Lines 8

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 11
CRAP Score 5

Importance

Changes 1
Bugs 0 Features 1
Metric Value
c 1
b 0
f 1
dl 0
loc 15
ccs 11
cts 11
cp 1
rs 8.8571
cc 5
eloc 8
nc 4
nop 1
crap 5
1
<?php
2
namespace Da\Mailer\Queue\Backend\Redis;
3
4
use Da\Mailer\Exception\InvalidCallException;
5
use Da\Mailer\Queue\Backend\MailJobInterface;
6
use Da\Mailer\Queue\Backend\QueueStoreAdapterInterface;
7
use phpseclib\Crypt\Random;
8
9
class RedisQueueStoreAdapter implements QueueStoreAdapterInterface
10
{
11
    /**
12
     * @var int
13
     */
14
    private $expireTime;
15
    /**
16
     * @var string
17
     */
18
    private $queueName;
19
    /**
20
     * @var RedisQueueStoreConnection
21
     */
22
    protected $connection;
23
24
    /**
25
     * RedisQueueStoreAdapter constructor.
26
     *
27
     * @param RedisQueueStoreConnection $connection
28
     * @param string $queueName
29
     * @param int $expireTime
30
     */
31 5
    public function __construct(RedisQueueStoreConnection $connection, $queueName = 'mail_queue', $expireTime = 60)
32
    {
33 5
        $this->expireTime = $expireTime;
34 5
        $this->connection = $connection;
35 5
        $this->queueName = $queueName;
36 5
        $this->init();
37 5
    }
38
39
    /**
40
     * @return RedisQueueStoreAdapter
41
     */
42 5
    public function init()
43
    {
44 5
        $this->getConnection()
45 5
            ->connect();
46
47 5
        return $this;
48
    }
49
50
    /**
51
     * @return RedisQueueStoreConnection
52
     */
53 5
    public function getConnection()
54
    {
55 5
        return $this->connection;
56
    }
57
58
    /**
59
     * @param RedisMailJob|MailJobInterface $mailJob
60
     *
61
     * @return int
62
     */
63 4
    public function enqueue(MailJobInterface $mailJob)
64
    {
65 4
        $timestamp = $mailJob->getTimeToSend();
0 ignored issues
show
Bug introduced by
It seems like you code against a concrete implementation and not the interface Da\Mailer\Queue\Backend\MailJobInterface as the method getTimeToSend() does only exist in the following implementations of said interface: Da\Mailer\Queue\Backend\...stalk\BeanstalkdMailJob, Da\Mailer\Queue\Backend\Pdo\PdoMailJob, Da\Mailer\Queue\Backend\Redis\RedisMailJob.

Let’s take a look at an example:

interface User
{
    /** @return string */
    public function getPassword();
}

class MyUser implements User
{
    public function getPassword()
    {
        // return something
    }

    public function getDisplayName()
    {
        // return some name.
    }
}

class AuthSystem
{
    public function authenticate(User $user)
    {
        $this->logger->info(sprintf('Authenticating %s.', $user->getDisplayName()));
        // do something.
    }
}

In the above example, the authenticate() method works fine as long as you just pass instances of MyUser. However, if you now also want to pass a different implementation of User which does not have a getDisplayName() method, the code will break.

Available Fixes

  1. Change the type-hint for the parameter:

    class AuthSystem
    {
        public function authenticate(MyUser $user) { /* ... */ }
    }
    
  2. Add an additional type-check:

    class AuthSystem
    {
        public function authenticate(User $user)
        {
            if ($user instanceof MyUser) {
                $this->logger->info(/** ... */);
            }
    
            // or alternatively
            if ( ! $user instanceof MyUser) {
                throw new \LogicException(
                    '$user must be an instance of MyUser, '
                   .'other instances are not supported.'
                );
            }
    
        }
    }
    
Note: PHP Analyzer uses reverse abstract interpretation to narrow down the types inside the if block in such a case.
  1. Add the method to the interface:

    interface User
    {
        /** @return string */
        public function getPassword();
    
        /** @return string */
        public function getDisplayName();
    }
    
Loading history...
66 4
        $payload = $this->createPayload($mailJob);
67
68 4
        return $timestamp !== null && $timestamp > time()
69 4
            ? $this->getConnection()->getInstance()->zadd($this->queueName . ':delayed', $timestamp, $payload)
70 4
            : $this->getConnection()->getInstance()->rpush($this->queueName, $payload);
71
    }
72
73
    /**
74
     * @return RedisMailJob|null
75
     */
76 4
    public function dequeue()
77
    {
78 4
        $this->migrateExpiredJobs();
79
80 4
        $job = $this->getConnection()->getInstance()->lpop($this->queueName);
81
82 4
        if ($job !== null) {
83 4
            $this->getConnection()
84 4
                ->getInstance()
85 4
                ->zadd($this->queueName . ':reserved', time() + $this->expireTime, $job);
86
87 4
            $data = json_decode($job, true);
88
89 4
            return new RedisMailJob(
90
                [
91 4
                    'id' => $data['id'],
92 4
                    'attempt' => $data['attempt'],
93 4
                    'message' => $data['message'],
94
                ]
95 4
            );
96
        }
97
98 1
        return null;
99
    }
100
101
    /**
102
     * @param RedisMailJob|MailJobInterface $mailJob
103
     */
104 5
    public function ack(MailJobInterface $mailJob)
105
    {
106 5
        if ($mailJob->isNewRecord()) {
107 1
            throw new InvalidCallException('RedisMailJob cannot be a new object to be acknowledged');
108
        }
109
110 4
        $this->removeReserved($mailJob);
111
112 4
        if (!$mailJob->isCompleted()) {
113 1
            if ($mailJob->getTimeToSend() === null || $mailJob->getTimeToSend() < time()) {
0 ignored issues
show
Bug introduced by
It seems like you code against a concrete implementation and not the interface Da\Mailer\Queue\Backend\MailJobInterface as the method getTimeToSend() does only exist in the following implementations of said interface: Da\Mailer\Queue\Backend\...stalk\BeanstalkdMailJob, Da\Mailer\Queue\Backend\Pdo\PdoMailJob, Da\Mailer\Queue\Backend\Redis\RedisMailJob.

Let’s take a look at an example:

interface User
{
    /** @return string */
    public function getPassword();
}

class MyUser implements User
{
    public function getPassword()
    {
        // return something
    }

    public function getDisplayName()
    {
        // return some name.
    }
}

class AuthSystem
{
    public function authenticate(User $user)
    {
        $this->logger->info(sprintf('Authenticating %s.', $user->getDisplayName()));
        // do something.
    }
}

In the above example, the authenticate() method works fine as long as you just pass instances of MyUser. However, if you now also want to pass a different implementation of User which does not have a getDisplayName() method, the code will break.

Available Fixes

  1. Change the type-hint for the parameter:

    class AuthSystem
    {
        public function authenticate(MyUser $user) { /* ... */ }
    }
    
  2. Add an additional type-check:

    class AuthSystem
    {
        public function authenticate(User $user)
        {
            if ($user instanceof MyUser) {
                $this->logger->info(/** ... */);
            }
    
            // or alternatively
            if ( ! $user instanceof MyUser) {
                throw new \LogicException(
                    '$user must be an instance of MyUser, '
                   .'other instances are not supported.'
                );
            }
    
        }
    }
    
Note: PHP Analyzer uses reverse abstract interpretation to narrow down the types inside the if block in such a case.
  1. Add the method to the interface:

    interface User
    {
        /** @return string */
        public function getPassword();
    
        /** @return string */
        public function getDisplayName();
    }
    
Loading history...
114 1
                $mailJob->setTimeToSend(time() + $this->expireTime);
0 ignored issues
show
Bug introduced by
It seems like you code against a concrete implementation and not the interface Da\Mailer\Queue\Backend\MailJobInterface as the method setTimeToSend() does only exist in the following implementations of said interface: Da\Mailer\Queue\Backend\...stalk\BeanstalkdMailJob, Da\Mailer\Queue\Backend\Pdo\PdoMailJob, Da\Mailer\Queue\Backend\Redis\RedisMailJob.

Let’s take a look at an example:

interface User
{
    /** @return string */
    public function getPassword();
}

class MyUser implements User
{
    public function getPassword()
    {
        // return something
    }

    public function getDisplayName()
    {
        // return some name.
    }
}

class AuthSystem
{
    public function authenticate(User $user)
    {
        $this->logger->info(sprintf('Authenticating %s.', $user->getDisplayName()));
        // do something.
    }
}

In the above example, the authenticate() method works fine as long as you just pass instances of MyUser. However, if you now also want to pass a different implementation of User which does not have a getDisplayName() method, the code will break.

Available Fixes

  1. Change the type-hint for the parameter:

    class AuthSystem
    {
        public function authenticate(MyUser $user) { /* ... */ }
    }
    
  2. Add an additional type-check:

    class AuthSystem
    {
        public function authenticate(User $user)
        {
            if ($user instanceof MyUser) {
                $this->logger->info(/** ... */);
            }
    
            // or alternatively
            if ( ! $user instanceof MyUser) {
                throw new \LogicException(
                    '$user must be an instance of MyUser, '
                   .'other instances are not supported.'
                );
            }
    
        }
    }
    
Note: PHP Analyzer uses reverse abstract interpretation to narrow down the types inside the if block in such a case.
  1. Add the method to the interface:

    interface User
    {
        /** @return string */
        public function getPassword();
    
        /** @return string */
        public function getDisplayName();
    }
    
Loading history...
115 1
            }
116 1
            $this->enqueue($mailJob);
117 1
        }
118 4
    }
119
120
    /**
121
     * @param MailJobInterface $mailJob
122
     */
123 4
    public function removeReserved(MailJobInterface $mailJob)
124
    {
125 4
        $payload = $this->createPayload($mailJob);
126 4
        $this->getConnection()->getInstance()->zrem($this->queueName . ':reserved', $payload);
127 4
    }
128
129
    /**
130
     * {@inheritdoc}
131
     */
132 4
    public function isEmpty()
133
    {
134 4
        return $this->getConnection()->getInstance()->llen($this->queueName) === 0;
135
    }
136
137
    /**
138
     * @param RedisMailJob|MailJobInterface $mailJob
139
     *
140
     * @return string
141
     */
142 4
    protected function createPayload(MailJobInterface $mailJob)
143
    {
144 4
        return json_encode(
145
            [
146 4
                'id' => $mailJob->isNewRecord() ? sha1(Random::string(32)) : $mailJob->getId(),
147 4
                'attempt' => $mailJob->getAttempt(),
148 4
                'message' => $mailJob->getMessage(),
149
            ]
150 4
        );
151
    }
152
153
    /**
154
     * Migrates all expired jobs from delayed and reserved queues to the main queue to be processed.
155
     */
156 4
    protected function migrateExpiredJobs()
157
    {
158 4
        $this->migrateJobs($this->queueName . ':delayed', $this->queueName);
159 4
        $this->migrateJobs($this->queueName . ':reserved', $this->queueName);
160 4
    }
161
162
    /**
163
     * Migrates expired jobs from one queue to another.
164
     *
165
     * @param string $from the name of the source queue
166
     * @param string $to the name of the target queue
167
     */
168 4
    protected function migrateJobs($from, $to)
169
    {
170 4
        $options = ['cas' => true, 'watch' => $from, 'retry' => 10];
171
172 4
        $this->getConnection()->getInstance()->transaction(
173 4
            $options,
174
            function ($transaction) use ($from, $to) {
175
                $time = time();
176
                // First we need to get all of jobs that have expired based on the current time
177
                // so that we can push them onto the main queue. After we get them we simply
178
                // remove them from this "delay" queues. All of this within a transaction.
179
                $jobs = $this->getExpiredJobs($transaction, $from, $time);
180
                // If we actually found any jobs, we will remove them from the old queue and we
181
                // will insert them onto the new (ready) "queue". This means they will stand
182
                // ready to be processed by the queue worker whenever their turn comes up.
183
                if (count($jobs) > 0) {
184
                    $this->removeExpiredJobs($transaction, $from, $time);
185
                    $this->pushExpiredJobsOntoNewQueue($transaction, $to, $jobs);
186
                }
187
            }
188 4
        );
189 4
    }
190
191
    /**
192
     * Get the expired jobs from a given queue.
193
     *
194
     * @param  \Predis\Transaction\MultiExec $transaction
195
     * @param  string $from
196
     * @param  int $time
197
     *
198
     * @return array
199
     */
200
    protected function getExpiredJobs($transaction, $from, $time)
201
    {
202
        return $transaction->zrangebyscore($from, '-inf', $time);
203
    }
204
205
    /**
206
     * Remove the expired jobs from a given queue.
207
     *
208
     * @param  \Predis\Transaction\MultiExec $transaction
209
     * @param  string $from
210
     * @param  int $time
211
     *
212
     */
213
    protected function removeExpiredJobs($transaction, $from, $time)
214
    {
215
        $transaction->multi();
216
        $transaction->zremrangebyscore($from, '-inf', $time);
217
    }
218
219
    /**
220
     * Push all of the given jobs onto another queue.
221
     *
222
     * @param  \Predis\Transaction\MultiExec $transaction
223
     * @param  string $to
224
     * @param  array $jobs
225
     *
226
     */
227
    protected function pushExpiredJobsOntoNewQueue($transaction, $to, $jobs)
228
    {
229
        call_user_func_array([$transaction, 'rpush'], array_merge([$to], $jobs));
230
    }
231
}
232