Completed
Push — master ( 9dd986...0b1f80 )
by frey
03:47
created

CMQQueue::later()   A

Complexity

Conditions 4
Paths 6

Size

Total Lines 15
Code Lines 9

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 9
CRAP Score 4.016

Importance

Changes 0
Metric Value
cc 4
eloc 9
nc 6
nop 4
dl 0
loc 15
ccs 9
cts 10
cp 0.9
crap 4.016
rs 9.9666
c 0
b 0
f 0
1
<?php
2
3
namespace Freyo\LaravelQueueCMQ\Queue;
4
5
use Freyo\LaravelQueueCMQ\Queue\Driver\Account;
6
use Freyo\LaravelQueueCMQ\Queue\Driver\CMQServerException;
7
use Freyo\LaravelQueueCMQ\Queue\Driver\Message;
8
use Freyo\LaravelQueueCMQ\Queue\Driver\Topic;
9
use Freyo\LaravelQueueCMQ\Queue\Jobs\CMQJob;
10
use Illuminate\Contracts\Queue\Queue as QueueContract;
11
use Illuminate\Queue\Queue;
12
use Illuminate\Support\Arr;
13
14
class CMQQueue extends Queue implements QueueContract
15
{
16
    const CMQ_QUEUE_NO_MESSAGE_CODE = 7000;
17
18
    const CMQ_TOPIC_TAG_FILTER_NAME = 'msgtag';
19
    const CMQ_TOPIC_ROUTING_FILTER_NAME = 'routing';
20
21
    /**
22
     * @var array
23
     */
24
    protected $queueOptions;
25
26
    /**
27
     * @var array
28
     */
29
    protected $topicOptions;
30
31
    /**
32
     * @var Account
33
     */
34
    private $queueAccount;
35
36
    /**
37
     * @var Account
38
     */
39
    private $topicAccount;
40
41
    /**
42
     * @var array
43
     */
44
    protected $plainOptions;
45
46
    /**
47
     * @var \ReflectionMethod
48
     */
49
    private static $createPayload;
50
51
    /**
52
     * CMQQueue constructor.
53
     *
54
     * @param Account $queueAccount
55
     * @param Account $topicAccount
56
     * @param array $config
57
     *
58
     * @throws \ReflectionException
59
     */
60 1
    public function __construct(Account $queueAccount, Account $topicAccount, array $config)
61
    {
62
        $this->queueAccount = $queueAccount;
63
        $this->topicAccount = $topicAccount;
64
65
        $this->queueOptions = $config['options']['queue'];
66
        $this->topicOptions = $config['options']['topic'];
67
68 1
        $this->plainOptions = Arr::get($config, 'plain', []);
69
70
        self::$createPayload = new \ReflectionMethod($this, 'createPayload');
71
    }
72
73
    /**
74
     * @return bool
75
     */
76 3
    public function isPlain()
77
    {
78 3
        return (bool)Arr::get($this->plainOptions, 'enable');
79
    }
80
81
    /**
82
     * @return string
83
     */
84 1
    public function getPlainJob()
85
    {
86 1
        return Arr::get($this->plainOptions, 'job');
87
    }
88
89
    /**
90
     * Get the size of the queue.
91
     *
92
     * @param string $queue
93
     *
94
     * @return int
95
     */
96 1
    public function size($queue = null)
97
    {
98 1
        $attributes = $this->getQueue($queue)->get_attributes();
99
100 1
        return (int)$attributes->activeMsgNum;
101
    }
102
103
    /**
104
     * Push a new job onto the queue.
105
     *
106
     * @param string|object $job
107
     * @param mixed $data
108
     * @param string $queue
109
     *
110
     * @return mixed
111
     * @throws \Exception
112
     */
113 1
    public function push($job, $data = '', $queue = null)
114
    {
115 1
        if ($this->isPlain()) {
116
            return $this->pushRaw($job->getPayload(), $queue);
117
        }
118
119 1
        $payload = self::$createPayload->getNumberOfParameters() === 3
120 1
            ? $this->createPayload($job, $queue, $data) // version >= 5.7
121 1
            : $this->createPayload($job, $data);
122
123 1
        return $this->pushRaw($payload, $queue);
124
    }
125
126
    /**
127
     * Push a raw payload onto the queue.
128
     *
129
     * @param string $payload
130
     * @param string $queue
131
     * @param array $options
132
     *
133
     * @return \Freyo\LaravelQueueCMQ\Queue\Driver\Message|array
134
     * @throws \Freyo\LaravelQueueCMQ\Queue\Driver\CMQServerNetworkException
135
     * @throws \Freyo\LaravelQueueCMQ\Queue\Driver\CMQServerException
136
     * @throws \Exception
137
     */
138 3
    public function pushRaw($payload, $queue = null, array $options = [])
139
    {
140 3
        $message = new Message($payload);
141
142 3
        $driver = $this->parseQueue($queue);
143
144 3
        if ($driver instanceof Topic) {
145
            switch ($this->topicOptions['filter']) {
146
                case self::CMQ_TOPIC_TAG_FILTER_NAME:
147
                    return retry(Arr::get($this->topicOptions, 'retries', 3),
148
                        function () use ($driver, $message, $queue) {
149
                            return $driver->publish_message($message->msgBody, explode(',', $queue), null);
150
                        });
151
                case self::CMQ_TOPIC_ROUTING_FILTER_NAME:
152
                    return retry(Arr::get($this->topicOptions, 'retries', 3),
153
                        function () use ($driver, $message, $queue) {
154
                            $driver->publish_message($message->msgBody, [], $queue);
155
                        });
156
                default:
157
                    throw new \InvalidArgumentException(
158
                        'Invalid CMQ topic filter: ' . $this->topicOptions['filter']
159
                    );
160
            }
161
        }
162
163 3
        return retry(Arr::get($this->queueOptions, 'retries', 3), function () use ($driver, $message, $options) {
164 3
            return $driver->send_message($message, Arr::get($options, 'delay', 0));
165 3
        });
166
    }
167
168
    /**
169
     * Push a new job onto the queue after a delay.
170
     *
171
     * @param \DateTimeInterface|\DateInterval|int $delay
172
     * @param string|object $job
173
     * @param mixed $data
174
     * @param string $queue
175
     *
176
     * @return mixed
177
     * @throws \Exception
178
     */
179 1
    public function later($delay, $job, $data = '', $queue = null)
180
    {
181 1
        $delay = method_exists($this, 'getSeconds')
182 1
            ? $this->getSeconds($delay)
183 1
            : $this->secondsUntil($delay);
184
185 1
        if ($this->isPlain()) {
186
            return $this->pushRaw($job->getPayload(), $queue, ['delay' => $delay]);
187
        }
188
189 1
        $payload = self::$createPayload->getNumberOfParameters() === 3
190 1
            ? $this->createPayload($job, $queue, $data) // version >= 5.7
191 1
            : $this->createPayload($job, $data);
192
193 1
        return $this->pushRaw($payload, $queue, ['delay' => $delay]);
194
    }
195
196
    /**
197
     * Pop the next job off of the queue.
198
     *
199
     * @param string $queue
200
     *
201
     * @return \Illuminate\Contracts\Queue\Job|null
202
     */
203 1
    public function pop($queue = null)
204
    {
205
        try {
206 1
            $queue = $this->getQueue($queue);
207 1
            $message = $queue->receive_message($this->queueOptions['polling_wait_seconds']);
208 1
        } catch (CMQServerException $e) {
209
            if (self::CMQ_QUEUE_NO_MESSAGE_CODE === (int)$e->getCode()) { // ignore no message
210
                return null;
211
            }
212
            throw $e;
213
        }
214
215 1
        return new CMQJob($this->container, $this, $message, $queue);
216
    }
217
218
    /**
219
     * Get the queue.
220
     *
221
     * @param string $queue
222
     *
223
     * @return Driver\Queue
224
     */
225 7
    public function getQueue($queue = null)
226
    {
227 7
        return $this->queueAccount->get_queue($queue ?: $this->queueOptions['name']);
228
    }
229
230
    /**
231
     * Get the topic.
232
     *
233
     * @param string $topic
234
     *
235
     * @return Driver\Topic
236
     */
237 1
    public function getTopic($topic = null)
238
    {
239 1
        return $this->topicAccount->get_topic($topic ?: $this->topicOptions['name']);
240
    }
241
242
    /**
243
     * Parse name to topic or queue.
244
     *
245
     * @param string $queue
246
     *
247
     * @return Driver\Queue|Driver\Topic
248
     */
249 4
    public function parseQueue($queue = null)
250
    {
251 4
        if ($this->topicOptions['enable']) {
252
            return $this->getTopic($this->topicOptions['name'] ?: $queue);
253
        }
254
255 4
        return $this->getQueue($queue ?: $this->queueOptions['name']);
256
    }
257
}
258