CMQQueue   A
last analyzed

Complexity

Total Complexity 27

Size/Duplication

Total Lines 249
Duplicated Lines 0 %

Test Coverage

Coverage 0%

Importance

Changes 17
Bugs 4 Features 2
Metric Value
eloc 67
c 17
b 4
f 2
dl 0
loc 249
ccs 0
cts 98
cp 0
rs 10
wmc 27

11 Methods

Rating   Name   Duplication   Size   Complexity  
A getTopic() 0 3 2
A isPlain() 0 3 1
A pop() 0 16 4
A push() 0 11 3
A size() 0 5 1
A __construct() 0 11 1
A later() 0 15 4
A getQueue() 0 3 2
A pushRaw() 0 27 4
A getPlainJob() 0 3 1
A parseQueue() 0 7 4
1
<?php
2
3
namespace Freyo\LaravelQueueCMQ\Queue;
4
5
use Freyo\LaravelQueueCMQ\Queue\Driver\Account;
6
use Freyo\LaravelQueueCMQ\Queue\Driver\CMQServerException;
7
use Freyo\LaravelQueueCMQ\Queue\Driver\Message;
8
use Freyo\LaravelQueueCMQ\Queue\Driver\Topic;
9
use Freyo\LaravelQueueCMQ\Queue\Jobs\CMQJob;
10
use Illuminate\Container\Container;
11
use Illuminate\Contracts\Queue\Queue as QueueContract;
12
use Illuminate\Queue\Queue;
13
use Illuminate\Support\Arr;
14
15
class CMQQueue extends Queue implements QueueContract
16
{
17
    const CMQ_QUEUE_NO_MESSAGE_CODE = 7000;
18
19
    const CMQ_TOPIC_TAG_FILTER_NAME = 'msgtag';
20
    const CMQ_TOPIC_ROUTING_FILTER_NAME = 'routing';
21
22
    /**
23
     * @var array
24
     */
25
    protected $queueOptions;
26
27
    /**
28
     * @var array
29
     */
30
    protected $topicOptions;
31
32
    /**
33
     * @var \Freyo\LaravelQueueCMQ\Queue\Driver\Account
34
     */
35
    private $queueAccount;
36
37
    /**
38
     * @var \Freyo\LaravelQueueCMQ\Queue\Driver\Account
39
     */
40
    private $topicAccount;
41
42
    /**
43
     * @var array
44
     */
45
    protected $plainOptions;
46
47
    /**
48
     * @var \ReflectionMethod
49
     */
50
    private $createPayload;
51
52
    /**
53
     * CMQQueue constructor.
54
     *
55
     * @param \Freyo\LaravelQueueCMQ\Queue\Driver\Account $queueAccount
56
     * @param \Freyo\LaravelQueueCMQ\Queue\Driver\Account $topicAccount
57
     * @param array                                       $config
58
     *
59
     * @throws \ReflectionException
60
     */
61
    public function __construct(Account $queueAccount, Account $topicAccount, array $config)
62
    {
63
        $this->queueAccount = $queueAccount;
64
        $this->topicAccount = $topicAccount;
65
66
        $this->queueOptions = $config['options']['queue'];
67
        $this->topicOptions = $config['options']['topic'];
68
69
        $this->plainOptions = Arr::get($config, 'plain', []);
70
71
        $this->createPayload = new \ReflectionMethod($this, 'createPayload');
72
    }
73
74
    /**
75
     * @return bool
76
     */
77
    public function isPlain()
78
    {
79
        return (bool) Arr::get($this->plainOptions, 'enable');
80
    }
81
82
    /**
83
     * @return string
84
     */
85
    public function getPlainJob()
86
    {
87
        return Arr::get($this->plainOptions, 'job');
88
    }
89
90
    /**
91
     * Get the size of the queue.
92
     *
93
     * @param string $queue
94
     *
95
     * @return int
96
     */
97
    public function size($queue = null)
98
    {
99
        $attributes = $this->getQueue($queue)->get_attributes();
100
101
        return (int) $attributes->activeMsgNum;
102
    }
103
104
    /**
105
     * Push a new job onto the queue.
106
     *
107
     * @param string|object $job
108
     * @param mixed         $data
109
     * @param string        $queue
110
     *
111
     * @throws \Exception
112
     *
113
     * @return mixed
114
     */
115
    public function push($job, $data = '', $queue = null)
116
    {
117
        if ($this->isPlain()) {
118
            return $this->pushRaw($job->getPayload(), $queue);
119
        }
120
121
        $payload = $this->createPayload->getNumberOfParameters() === 3
122
            ? $this->createPayload($job, $queue, $data) // version >= 5.7
123
            : $this->createPayload($job, $data);
124
125
        return $this->pushRaw($payload, $queue);
126
    }
127
128
    /**
129
     * Push a raw payload onto the queue.
130
     *
131
     * @param string $payload
132
     * @param string $queue
133
     * @param array  $options
134
     *
135
     * @throws \Freyo\LaravelQueueCMQ\Queue\Driver\CMQServerNetworkException
136
     * @throws \Freyo\LaravelQueueCMQ\Queue\Driver\CMQServerException
137
     * @throws \Exception
138
     *
139
     * @return \Freyo\LaravelQueueCMQ\Queue\Driver\Message|array
140
     */
141
    public function pushRaw($payload, $queue = null, array $options = [])
142
    {
143
        $message = new Message($payload);
144
145
        $driver = $this->parseQueue($queue);
146
147
        if ($driver instanceof Topic) {
148
            switch ($this->topicOptions['filter']) {
149
                case self::CMQ_TOPIC_TAG_FILTER_NAME:
150
                    return retry(Arr::get($this->topicOptions, 'retries', 3),
151
                        function () use ($driver, $message, $queue) {
152
                            return $driver->publish_message($message->msgBody, explode(',', $queue), null);
0 ignored issues
show
Bug introduced by
It seems like $queue can also be of type null; however, parameter $string of explode() does only seem to accept string, maybe add an additional type check? ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-type  annotation

152
                            return $driver->publish_message($message->msgBody, explode(',', /** @scrutinizer ignore-type */ $queue), null);
Loading history...
153
                        });
154
                case self::CMQ_TOPIC_ROUTING_FILTER_NAME:
155
                    return retry(Arr::get($this->topicOptions, 'retries', 3),
156
                        function () use ($driver, $message, $queue) {
157
                            $driver->publish_message($message->msgBody, [], $queue);
158
                        });
159
                default:
160
                    throw new \InvalidArgumentException(
161
                        'Invalid CMQ topic filter: '.$this->topicOptions['filter']
162
                    );
163
            }
164
        }
165
166
        return retry(Arr::get($this->queueOptions, 'retries', 3), function () use ($driver, $message, $options) {
167
            return $driver->send_message($message, Arr::get($options, 'delay', 0));
168
        });
169
    }
170
171
    /**
172
     * Push a new job onto the queue after a delay.
173
     *
174
     * @param \DateTimeInterface|\DateInterval|int $delay
175
     * @param string|object                        $job
176
     * @param mixed                                $data
177
     * @param string                               $queue
178
     *
179
     * @throws \Exception
180
     *
181
     * @return mixed
182
     */
183
    public function later($delay, $job, $data = '', $queue = null)
184
    {
185
        $delay = method_exists($this, 'getSeconds')
186
            ? $this->getSeconds($delay)
187
            : $this->secondsUntil($delay);
188
189
        if ($this->isPlain()) {
190
            return $this->pushRaw($job->getPayload(), $queue, ['delay' => $delay]);
191
        }
192
193
        $payload = $this->createPayload->getNumberOfParameters() === 3
194
            ? $this->createPayload($job, $queue, $data) // version >= 5.7
195
            : $this->createPayload($job, $data);
196
197
        return $this->pushRaw($payload, $queue, ['delay' => $delay]);
198
    }
199
200
    /**
201
     * Pop the next job off of the queue.
202
     *
203
     * @param string $queue
204
     *
205
     * @return \Illuminate\Contracts\Queue\Job|null
206
     */
207
    public function pop($queue = null)
208
    {
209
        try {
210
            $queue = $this->getQueue($queue);
211
            $message = $queue->receive_message($this->queueOptions['polling_wait_seconds']);
212
        } catch (CMQServerException $e) {
213
            if (self::CMQ_QUEUE_NO_MESSAGE_CODE === (int) $e->getCode()) { // ignore no message
214
                return;
215
            }
216
217
            throw $e;
218
        }
219
220
        return new CMQJob(
221
            $this->container ?: Container::getInstance(),
222
            $this, $message, $queue, $this->connectionName
223
        );
224
    }
225
226
    /**
227
     * Get the queue.
228
     *
229
     * @param string $queue
230
     *
231
     * @return Driver\Queue
232
     */
233
    public function getQueue($queue = null)
234
    {
235
        return $this->queueAccount->get_queue($queue ?: $this->queueOptions['name']);
236
    }
237
238
    /**
239
     * Get the topic.
240
     *
241
     * @param string $topic
242
     *
243
     * @return Driver\Topic
244
     */
245
    public function getTopic($topic = null)
246
    {
247
        return $this->topicAccount->get_topic($topic ?: $this->topicOptions['name']);
248
    }
249
250
    /**
251
     * Parse name to topic or queue.
252
     *
253
     * @param string $queue
254
     *
255
     * @return Driver\Queue|Driver\Topic
256
     */
257
    public function parseQueue($queue = null)
258
    {
259
        if ($this->topicOptions['enable']) {
260
            return $this->getTopic($this->topicOptions['name'] ?: $queue);
261
        }
262
263
        return $this->getQueue($queue ?: $this->queueOptions['name']);
264
    }
265
}
266