Completed
Push — master ( f62712...6c6c04 )
by frey
04:17
created

CMQQueue   A

Complexity

Total Complexity 26

Size/Duplication

Total Lines 229
Duplicated Lines 0 %

Test Coverage

Coverage 58.33%

Importance

Changes 0
Metric Value
eloc 67
dl 0
loc 229
ccs 42
cts 72
cp 0.5833
rs 10
c 0
b 0
f 0
wmc 26

11 Methods

Rating   Name   Duplication   Size   Complexity  
A getTopic() 0 3 2
A isPlain() 0 3 1
A pop() 0 13 3
A push() 0 13 3
A size() 0 5 1
A __construct() 0 11 1
A later() 0 17 4
A getQueue() 0 3 2
A pushRaw() 0 27 4
A getPlainJob() 0 3 1
A parseQueue() 0 7 4
1
<?php
2
3
namespace Freyo\LaravelQueueCMQ\Queue;
4
5
use Freyo\LaravelQueueCMQ\Queue\Driver\Account;
6
use Freyo\LaravelQueueCMQ\Queue\Driver\CMQServerException;
7
use Freyo\LaravelQueueCMQ\Queue\Driver\Message;
8
use Freyo\LaravelQueueCMQ\Queue\Driver\Topic;
9
use Freyo\LaravelQueueCMQ\Queue\Jobs\CMQJob;
10
use Illuminate\Contracts\Queue\Queue as QueueContract;
11
use Illuminate\Queue\Queue;
12
use Illuminate\Support\Arr;
13
14
class CMQQueue extends Queue implements QueueContract
15
{
16
    const CMQ_QUEUE_NO_MESSAGE_CODE = 7000;
17
18
    const CMQ_TOPIC_TAG_FILTER_NAME = 'msgtag';
19
    const CMQ_TOPIC_ROUTING_FILTER_NAME = 'routing';
20
21
    /**
22
     * @var array
23
     */
24
    protected $queueOptions;
25
    protected $topicOptions;
26
27
    /**
28
     * @var Account
29
     */
30
    private $queueAccount;
31
    private $topicAccount;
32
33
    /**
34
     * @var array
35
     */
36
    protected $plainOptions;
37
38
    /**
39
     * @var \ReflectionMethod
40
     */
41
    private static $createPayload;
42
43
    public function __construct(Account $queueAccount, Account $topicAccount, array $config)
44
    {
45
        $this->queueAccount = $queueAccount;
46
        $this->topicAccount = $topicAccount;
47
48
        $this->queueOptions = $config['options']['queue'];
49
        $this->topicOptions = $config['options']['topic'];
50
51
        $this->plainOptions = Arr::get($config, 'plain', []);
52
53
        self::$createPayload = new \ReflectionMethod($this, 'createPayload');
54
    }
55
56
    /**
57
     * @return bool
58
     */
59 3
    public function isPlain()
60
    {
61 3
        return Arr::get($this->plainOptions, 'enable', false);
62
    }
63
64
    /**
65
     * @return string
66
     */
67 1
    public function getPlainJob()
68 1
    {
69 1
        return Arr::get($this->plainOptions, 'job');
70
    }
71
72
    /**
73
     * Get the size of the queue.
74
     *
75
     * @param string $queue
76
     *
77
     * @return int
78
     */
79 1
    public function size($queue = null)
80
    {
81 1
        $attributes = $this->getQueue($queue)->get_attributes();
82
83 1
        return (int)$attributes->activeMsgNum;
84
    }
85
86
    /**
87
     * Push a new job onto the queue.
88
     *
89
     * @param string|object $job
90
     * @param mixed $data
91
     * @param string $queue
92
     *
93
     * @return mixed
94
     * @throws \Exception
95
     */
96 1
    public function push($job, $data = '', $queue = null)
97
    {
98 1
        if ($this->isPlain()) {
99
            return $this->pushRaw($job->getPayload(), $queue);
100
        }
101
102 1
        if (self::$createPayload->getNumberOfParameters() === 3) { // version >= 5.7
103 1
            $payload = $this->createPayload($job, $queue, $data);
104 1
        } else {
105
            $payload = $this->createPayload($job, $data);
106
        }
107
108 1
        return $this->pushRaw($payload, $queue);
109
    }
110
111
    /**
112
     * Push a raw payload onto the queue.
113
     *
114
     * @param string $payload
115
     * @param string $queue
116
     * @param array $options
117
     *
118
     * @return \Freyo\LaravelQueueCMQ\Queue\Driver\Message|array
119
     * @throws \Freyo\LaravelQueueCMQ\Queue\Driver\CMQServerNetworkException
120
     * @throws \Freyo\LaravelQueueCMQ\Queue\Driver\CMQServerException
121
     * @throws \Exception
122
     */
123 3
    public function pushRaw($payload, $queue = null, array $options = [])
124
    {
125 3
        $message = new Message($payload);
126
127 3
        $driver = $this->parseQueue($queue);
128
129 3
        if ($driver instanceof Topic) {
130
            switch ($this->topicOptions['filter']) {
131
                case self::CMQ_TOPIC_TAG_FILTER_NAME:
132
                    return retry(Arr::get($this->topicOptions, 'retries', 1),
133
                        function () use ($driver, $message, $queue) {
134
                            return $driver->publish_message($message->msgBody, explode(',', $queue), null);
135
                        });
136
                case self::CMQ_TOPIC_ROUTING_FILTER_NAME:
137
                    return retry(Arr::get($this->topicOptions, 'retries', 1),
138
                        function () use ($driver, $message, $queue) {
139
                            $driver->publish_message($message->msgBody, [], $queue);
140
                        });
141
                default:
142
                    throw new \InvalidArgumentException(
143
                        'Invalid CMQ topic filter: ' . $this->topicOptions['filter']
144
                    );
145
            }
146
        }
147
148 3
        return retry(Arr::get($this->queueOptions, 'retries', 1), function () use ($driver, $message, $options) {
149 3
            return $driver->send_message($message, Arr::get($options, 'delay', 0));
150 3
        });
151
    }
152
153
    /**
154
     * Push a new job onto the queue after a delay.
155
     *
156
     * @param \DateTimeInterface|\DateInterval|int $delay
157
     * @param string|object $job
158
     * @param mixed $data
159
     * @param string $queue
160
     *
161
     * @return mixed
162
     * @throws \Exception
163
     */
164 1
    public function later($delay, $job, $data = '', $queue = null)
165
    {
166 1
        $delay = method_exists($this, 'getSeconds')
167 1
            ? $this->getSeconds($delay)
168 1
            : $this->secondsUntil($delay);
169
170 1
        if ($this->isPlain()) {
171
            return $this->pushRaw($job->getPayload(), $queue, ['delay' => $delay]);
172
        }
173
174 1
        if (self::$createPayload->getNumberOfParameters() === 3) { // version >= 5.7
175 1
            $payload = $this->createPayload($job, $queue, $data);
176 1
        } else {
177
            $payload = $this->createPayload($job, $data);
178
        }
179
180 1
        return $this->pushRaw($payload, $queue, ['delay' => $delay]);
181
    }
182
183
    /**
184
     * Pop the next job off of the queue.
185
     *
186
     * @param string $queue
187
     *
188
     * @return \Illuminate\Contracts\Queue\Job|null
189
     */
190 1
    public function pop($queue = null)
191
    {
192
        try {
193 1
            $queue = $this->getQueue($queue);
194 1
            $message = $queue->receive_message($this->queueOptions['polling_wait_seconds']);
195 1
        } catch (CMQServerException $e) {
196
            if (self::CMQ_QUEUE_NO_MESSAGE_CODE === (int)$e->getCode()) { //ignore no message
197
                return null;
198
            }
199
            throw $e;
200
        }
201
202 1
        return new CMQJob($this->container, $this, $message, $queue);
203
    }
204
205
    /**
206
     * Get the queue.
207
     *
208
     * @param string $queue
209
     *
210
     * @return Driver\Queue
211
     */
212 7
    public function getQueue($queue = null)
213
    {
214 7
        return $this->queueAccount->get_queue($queue ?: $this->queueOptions['name']);
215
    }
216
217
    /**
218
     * Get the topic.
219
     *
220
     * @param string $topic
221
     *
222
     * @return Driver\Topic
223
     */
224 1
    public function getTopic($topic = null)
225
    {
226 1
        return $this->topicAccount->get_topic($topic ?: $this->topicOptions['name']);
227
    }
228
229
    /**
230
     * Parse name to topic or queue.
231
     *
232
     * @param string $queue
233
     *
234
     * @return Driver\Queue|Driver\Topic
235
     */
236 4
    public function parseQueue($queue = null)
237
    {
238 4
        if ($this->topicOptions['enable']) {
239
            return $this->getTopic($this->topicOptions['name'] ?: $queue);
240
        }
241
242 4
        return $this->getQueue($queue ?: $this->queueOptions['name']);
243
    }
244
}
245