Passed
Push — master ( da51a9...a72f89 )
by frey
04:15
created

CMQQueue::later()   A

Complexity

Conditions 4
Paths 6

Size

Total Lines 18
Code Lines 11

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 10
CRAP Score 4.074

Importance

Changes 0
Metric Value
cc 4
eloc 11
nc 6
nop 4
dl 0
loc 18
ccs 10
cts 12
cp 0.8333
crap 4.074
rs 9.9
c 0
b 0
f 0
1
<?php
2
3
namespace Freyo\LaravelQueueCMQ\Queue;
4
5
use Freyo\LaravelQueueCMQ\Queue\Driver\Account;
6
use Freyo\LaravelQueueCMQ\Queue\Driver\CMQServerException;
7
use Freyo\LaravelQueueCMQ\Queue\Driver\Message;
8
use Freyo\LaravelQueueCMQ\Queue\Driver\Topic;
9
use Freyo\LaravelQueueCMQ\Queue\Jobs\CMQJob;
10
use Illuminate\Contracts\Queue\Queue as QueueContract;
11
use Illuminate\Queue\Queue;
12
use Illuminate\Support\Arr;
13
14
class CMQQueue extends Queue implements QueueContract
15
{
16
    const CMQ_QUEUE_NO_MESSAGE_CODE = 7000;
17
18
    const CMQ_TOPIC_TAG_FILTER_NAME = 'msgtag';
19
    const CMQ_TOPIC_ROUTING_FILTER_NAME = 'routing';
20
21
    /**
22
     * @var array
23
     */
24
    protected $queueOptions;
25
    protected $topicOptions;
26
27
    /**
28
     * @var Account
29
     */
30
    private $queueAccount;
31
    private $topicAccount;
32
33
    /**
34
     * @var array
35
     */
36
    protected $plainOptions;
37
38
    public function __construct(Account $queueAccount, Account $topicAccount, array $config)
39
    {
40
        $this->queueAccount = $queueAccount;
41
        $this->topicAccount = $topicAccount;
42
43
        $this->queueOptions = $config['options']['queue'];
44
        $this->topicOptions = $config['options']['topic'];
45
46
        $this->plainOptions = Arr::get($config, 'plain', []);
47
    }
48
49
    /**
50
     * @return bool
51
     */
52 3
    public function isPlain()
53
    {
54 3
        return Arr::get($this->plainOptions, 'enable', false);
55
    }
56
57
    /**
58
     * @return string
59
     */
60 1
    public function getPlainJob()
61
    {
62 1
        return Arr::get($this->plainOptions, 'job');
63
    }
64
65
    /**
66
     * Get the size of the queue.
67
     *
68
     * @param string $queue
69
     *
70
     * @return int
71
     */
72 1
    public function size($queue = null)
73
    {
74 1
        $attributes = $this->getQueue($queue)->get_attributes();
75
76 1
        return (int)$attributes->activeMsgNum;
77
    }
78
79
    /**
80
     * Push a new job onto the queue.
81
     *
82
     * @param string|object $job
83
     * @param mixed $data
84
     * @param string $queue
85
     *
86
     * @return mixed
87
     * @throws \ReflectionException
88
     * @throws \Exception
89
     */
90 1
    public function push($job, $data = '', $queue = null)
91
    {
92 1
        if ($this->isPlain()) {
93
            return $this->pushRaw($job->getPayload(), $queue);
94
        }
95
96 1
        $reflection = new \ReflectionMethod($this, 'createPayload');
97 1
        if ($reflection->getNumberOfParameters() === 3) { // version >= 5.7
98 1
            $payload = $this->createPayload($job, $queue, $data);
99 1
        } else {
100
            $payload = $this->createPayload($job, $data);
101
        }
102
103 1
        return $this->pushRaw($payload, $queue);
104
    }
105
106
    /**
107
     * Push a raw payload onto the queue.
108
     *
109
     * @param string $payload
110
     * @param string $queue
111
     * @param array $options
112
     *
113
     * @return \Freyo\LaravelQueueCMQ\Queue\Driver\Message|array
114
     * @throws \Freyo\LaravelQueueCMQ\Queue\Driver\CMQServerNetworkException
115
     * @throws \Freyo\LaravelQueueCMQ\Queue\Driver\CMQServerException
116
     * @throws \Exception
117
     */
118 3
    public function pushRaw($payload, $queue = null, array $options = [])
119
    {
120 3
        $message = new Message($payload);
121
122 3
        $driver = $this->parseQueue($queue);
123
124 3
        if ($driver instanceof Topic) {
125
            switch ($this->topicOptions['filter']) {
126
                case self::CMQ_TOPIC_TAG_FILTER_NAME:
127
                    return retry(3, function () use ($driver, $message, $queue) {
128
                        return $driver->publish_message($message->msgBody, explode(',', $queue), null);
129
                    });
130
                case self::CMQ_TOPIC_ROUTING_FILTER_NAME:
131
                    return retry(3, function () use ($driver, $message, $queue) {
132
                        $driver->publish_message($message->msgBody, [], $queue);
133
                    });
134
                default:
135
                    throw new \InvalidArgumentException(
136
                        'Invalid CMQ topic filter: ' . $this->topicOptions['filter']
137
                    );
138
            }
139
        }
140
141 3
        return retry(3, function () use ($driver, $message, $options) {
142 3
            return $driver->send_message($message, Arr::get($options, 'delay', 0));
143 3
        });
144
    }
145
146
    /**
147
     * Push a new job onto the queue after a delay.
148
     *
149
     * @param \DateTimeInterface|\DateInterval|int $delay
150
     * @param string|object $job
151
     * @param mixed $data
152
     * @param string $queue
153
     *
154
     * @return mixed
155
     * @throws \ReflectionException
156
     * @throws \Exception
157
     */
158 1
    public function later($delay, $job, $data = '', $queue = null)
159
    {
160 1
        $delay = method_exists($this, 'getSeconds')
161 1
            ? $this->getSeconds($delay)
162 1
            : $this->secondsUntil($delay);
163
164 1
        if ($this->isPlain()) {
165
            return $this->pushRaw($job->getPayload(), $queue, ['delay' => $delay]);
166
        }
167
168 1
        $reflection = new \ReflectionMethod($this, 'createPayload');
169 1
        if ($reflection->getNumberOfParameters() === 3) { // version >= 5.7
170 1
            $payload = $this->createPayload($job, $queue, $data);
171 1
        } else {
172
            $payload = $this->createPayload($job, $data);
173
        }
174
175 1
        return $this->pushRaw($payload, $queue, ['delay' => $delay]);
176
    }
177
178
    /**
179
     * Pop the next job off of the queue.
180
     *
181
     * @param string $queue
182
     *
183
     * @return \Illuminate\Contracts\Queue\Job|null
184
     */
185 2
    public function pop($queue = null)
186
    {
187
        try {
188 2
            $queue = $this->getQueue($queue);
189 1
            $message = $queue->receive_message($this->queueOptions['polling_wait_seconds']);
190 1
        } catch (CMQServerException $e) {
191
            if (self::CMQ_QUEUE_NO_MESSAGE_CODE === (int)$e->getCode()) { //ignore no message
192
                return null;
193
            }
194
            throw $e;
195
        }
196
197 1
        return new CMQJob($this->container, $this, $message, $queue);
198
    }
199
200
    /**
201
     * Get the queue.
202
     *
203
     * @param string $queue
204
     *
205
     * @return Driver\Queue
206
     */
207 7
    public function getQueue($queue = null)
208
    {
209 7
        return $this->queueAccount->get_queue($queue ?: $this->queueOptions['name']);
210
    }
211
212
    /**
213
     * Get the topic.
214
     *
215
     * @param string $topic
216
     *
217
     * @return Driver\Topic
218
     */
219 1
    public function getTopic($topic = null)
220
    {
221 1
        return $this->topicAccount->get_topic($topic ?: $this->topicOptions['name']);
222
    }
223
224
    /**
225
     * Parse name to topic or queue.
226
     *
227
     * @param string $queue
228
     *
229
     * @return Driver\Queue|Driver\Topic
230
     */
231 4
    public function parseQueue($queue = null)
232
    {
233 4
        if ($this->topicOptions['enable']) {
234
            return $this->getTopic($this->topicOptions['name'] ?: $queue);
235
        }
236
237 4
        return $this->getQueue($queue ?: $this->queueOptions['name']);
238
    }
239
}
240