Completed
Push — master ( 03e2bb...7e3c9f )
by frey
02:57
created

CMQQueue   A

Complexity

Total Complexity 24

Size/Duplication

Total Lines 199
Duplicated Lines 0 %

Test Coverage

Coverage 61.82%

Importance

Changes 0
Metric Value
wmc 24
eloc 51
dl 0
loc 199
ccs 34
cts 55
cp 0.6182
rs 10
c 0
b 0
f 0

11 Methods

Rating   Name   Duplication   Size   Complexity  
A isPlain() 0 3 1
A push() 0 5 2
A size() 0 5 1
A __construct() 0 9 1
A pushRaw() 0 21 4
A getPlainJob() 0 3 1
A later() 0 9 3
A getTopic() 0 3 2
A pop() 0 14 3
A getQueue() 0 3 2
A parseQueue() 0 11 4
1
<?php
2
3
namespace Freyo\LaravelQueueCMQ\Queue;
4
5
use Freyo\LaravelQueueCMQ\Queue\Driver\Account;
6
use Freyo\LaravelQueueCMQ\Queue\Driver\CMQServerException;
7
use Freyo\LaravelQueueCMQ\Queue\Driver\Message;
8
use Freyo\LaravelQueueCMQ\Queue\Driver\Topic;
9
use Freyo\LaravelQueueCMQ\Queue\Jobs\CMQJob;
10
use Illuminate\Contracts\Queue\Queue as QueueContract;
11
use Illuminate\Queue\Queue;
12
use Illuminate\Support\Arr;
13
14
class CMQQueue extends Queue implements QueueContract
15
{
16
    const CMQ_QUEUE_NO_MESSAGE_CODE = 7000;
17
18
    const CMQ_TOPIC_TAG_FILTER_NAME = 'msgtag';
19
    const CMQ_TOPIC_ROUTING_FILTER_NAME = 'routing';
20
21
    /**
22
     * @var array
23
     */
24
    protected $queueOptions;
25
    protected $topicOptions;
26
27
    /**
28
     * @var Account
29
     */
30
    private $queueAccount;
31
    private $topicAccount;
32
33
    /**
34
     * @var array
35
     */
36
    protected $plainOptions;
37
38
    public function __construct(Account $queueAccount, Account $topicAccount, array $config)
39
    {
40
        $this->queueAccount = $queueAccount;
41
        $this->topicAccount = $topicAccount;
42
43
        $this->queueOptions = $config['options']['queue'];
44
        $this->topicOptions = $config['options']['topic'];
45
46
        $this->plainOptions = Arr::get($config, 'plain', []);
47
    }
48
49
    /**
50
     * @return bool
51
     */
52 3
    public function isPlain()
53
    {
54 3
        return Arr::get($this->plainOptions, 'enable', false);
55
    }
56
57
    /**
58
     * @return string
59
     */
60 1
    public function getPlainJob()
61
    {
62 1
        return Arr::get($this->plainOptions, 'job');
63
    }
64
65
    /**
66
     * Get the size of the queue.
67
     *
68
     * @param string $queue
69
     *
70
     * @return int
71
     */
72 1
    public function size($queue = null)
73
    {
74 1
        $attributes = $this->getQueue($queue)->get_attributes();
75
76 1
        return (int) $attributes->activeMsgNum;
77
    }
78
79
    /**
80
     * Push a new job onto the queue.
81
     *
82
     * @param string|object $job
83
     * @param mixed         $data
84
     * @param string        $queue
85
     *
86
     * @return mixed
87
     */
88 2
    public function push($job, $data = '', $queue = null)
89
    {
90 2
        $payload = $this->isPlain() ? $job->getPayload() : $this->createPayload($job, $data);
0 ignored issues
show
Bug introduced by
It seems like $job can also be of type object; however, parameter $job of Illuminate\Queue\Queue::createPayload() does only seem to accept string, maybe add an additional type check? ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-type  annotation

90
        $payload = $this->isPlain() ? $job->getPayload() : $this->createPayload(/** @scrutinizer ignore-type */ $job, $data);
Loading history...
91
92 1
        return $this->pushRaw($payload, $queue);
93
    }
94
95
    /**
96
     * Push a raw payload onto the queue.
97
     *
98
     * @param string $payload
99
     * @param string $queue
100
     * @param array  $options
101
     *
102
     * @return mixed
103
     */
104 3
    public function pushRaw($payload, $queue = null, array $options = [])
105
    {
106 3
        $message = new Message($payload);
107
108 3
        $driver = $this->parseQueue($queue);
109
110 3
        if ($driver instanceof Topic) {
111
            $vTagList = [];
112
            if ($this->topicOptions['filter'] === self::CMQ_TOPIC_TAG_FILTER_NAME) {
113
                $vTagList = explode(',', $queue);
114
            }
115
116
            $routingKey = null;
117
            if ($this->topicOptions['filter'] === self::CMQ_TOPIC_ROUTING_FILTER_NAME) {
118
                $routingKey = $queue;
119
            }
120
121
            return $driver->publish_message($message->msgBody, $vTagList, $routingKey);
122
        }
123
124 3
        return $driver->send_message($message, array_get($options, 'delay', 0));
125
    }
126
127
    /**
128
     * Push a new job onto the queue after a delay.
129
     *
130
     * @param \DateTimeInterface|\DateInterval|int $delay
131
     * @param string|object                        $job
132
     * @param mixed                                $data
133
     * @param string                               $queue
134
     *
135
     * @return mixed
136
     */
137 1
    public function later($delay, $job, $data = '', $queue = null)
138
    {
139 1
        $payload = $this->isPlain() ? $job->getPayload() : $this->createPayload($job, $data);
0 ignored issues
show
Bug introduced by
It seems like $job can also be of type object; however, parameter $job of Illuminate\Queue\Queue::createPayload() does only seem to accept string, maybe add an additional type check? ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-type  annotation

139
        $payload = $this->isPlain() ? $job->getPayload() : $this->createPayload(/** @scrutinizer ignore-type */ $job, $data);
Loading history...
140
141 1
        $delay = method_exists($this, 'getSeconds')
142 1
            ? $this->getSeconds($delay)
143 1
            : $this->secondsUntil($delay);
144
145 1
        return $this->pushRaw($payload, $queue, ['delay' => $delay]);
146
    }
147
148
    /**
149
     * Pop the next job off of the queue.
150
     *
151
     * @param string $queue
152
     *
153
     * @return \Illuminate\Contracts\Queue\Job|null
154
     */
155 1
    public function pop($queue = null)
156
    {
157
        try {
158 1
            $queue = $this->getQueue($queue);
159 1
            $message = $queue->receive_message($this->queueOptions['polling_wait_seconds']);
160 1
        } catch (CMQServerException $e) {
161
            if ($e->getCode() == self::CMQ_QUEUE_NO_MESSAGE_CODE) { //ignore no message
162
                return;
163
            }
164
165
            throw $e;
166
        }
167
168 1
        return new CMQJob($this->container, $this, $message, $queue);
169
    }
170
171
    /**
172
     * Get the queue.
173
     *
174
     * @param string $queue
175
     *
176
     * @return Driver\Queue
177
     */
178 7
    public function getQueue($queue = null)
179
    {
180 7
        return $this->queueAccount->get_queue($queue ?: $this->queueOptions['name']);
181
    }
182
183
    /**
184
     * Get the topic.
185
     *
186
     * @param string $topic
187
     *
188
     * @return Driver\Topic
189
     */
190 1
    public function getTopic($topic = null)
191
    {
192 1
        return $this->topicAccount->get_topic($topic ?: $this->topicOptions['name']);
193
    }
194
195
    /**
196
     * Parse name to topic or queue.
197
     *
198
     * @param string $queue
199
     *
200
     * @return Driver\Queue|Driver\Topic
201
     */
202 4
    public function parseQueue($queue = null)
203
    {
204 4
        if ($this->topicOptions['enable']) {
205
            $exchangeName = $this->topicOptions['name'] ?: $queue;
206
207
            return $this->getTopic($exchangeName);
208
        }
209
210 4
        $queueName = $queue ?: $this->queueOptions['name'];
211
212 4
        return $this->getQueue($queueName);
213
    }
214
}
215