Completed
Push — master ( eadff5...a4f238 )
by frey
06:31
created

CMQQueue::push()   A

Complexity

Conditions 3
Paths 3

Size

Total Lines 14
Code Lines 8

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 7
CRAP Score 3.0987

Importance

Changes 0
Metric Value
cc 3
eloc 8
nc 3
nop 3
dl 0
loc 14
ccs 7
cts 9
cp 0.7778
crap 3.0987
rs 10
c 0
b 0
f 0
1
<?php
2
3
namespace Freyo\LaravelQueueCMQ\Queue;
4
5
use Freyo\LaravelQueueCMQ\Queue\Driver\Account;
6
use Freyo\LaravelQueueCMQ\Queue\Driver\CMQServerException;
7
use Freyo\LaravelQueueCMQ\Queue\Driver\Message;
8
use Freyo\LaravelQueueCMQ\Queue\Driver\Topic;
9
use Freyo\LaravelQueueCMQ\Queue\Jobs\CMQJob;
10
use Illuminate\Contracts\Queue\Queue as QueueContract;
11
use Illuminate\Queue\Queue;
12
use Illuminate\Support\Arr;
13
14
class CMQQueue extends Queue implements QueueContract
15
{
16
    const CMQ_QUEUE_NO_MESSAGE_CODE = 7000;
17
18
    const CMQ_TOPIC_TAG_FILTER_NAME = 'msgtag';
19
    const CMQ_TOPIC_ROUTING_FILTER_NAME = 'routing';
20
21
    /**
22
     * @var array
23
     */
24
    protected $queueOptions;
25
    protected $topicOptions;
26
27
    /**
28
     * @var Account
29
     */
30
    private $queueAccount;
31
    private $topicAccount;
32
33
    /**
34
     * @var array
35
     */
36
    protected $plainOptions;
37
38
    public function __construct(Account $queueAccount, Account $topicAccount, array $config)
39
    {
40
        $this->queueAccount = $queueAccount;
41
        $this->topicAccount = $topicAccount;
42
43
        $this->queueOptions = $config['options']['queue'];
44
        $this->topicOptions = $config['options']['topic'];
45
46
        $this->plainOptions = Arr::get($config, 'plain', []);
47
    }
48
49
    /**
50
     * @return bool
51
     */
52 3
    public function isPlain()
53
    {
54 3
        return Arr::get($this->plainOptions, 'enable', false);
55
    }
56
57
    /**
58
     * @return string
59
     */
60 1
    public function getPlainJob()
61
    {
62 1
        return Arr::get($this->plainOptions, 'job');
63
    }
64
65
    /**
66
     * Get the size of the queue.
67
     *
68
     * @param string $queue
69
     *
70
     * @return int
71
     */
72 1
    public function size($queue = null)
73
    {
74 1
        $attributes = $this->getQueue($queue)->get_attributes();
75
76 1
        return (int) $attributes->activeMsgNum;
77
    }
78
79
    /**
80
     * Push a new job onto the queue.
81
     *
82
     * @param string|object $job
83
     * @param mixed $data
84
     * @param string $queue
85
     *
86
     * @return mixed
87
     */
88 2
    public function push($job, $data = '', $queue = null)
89
    {
90 2
        if ($this->isPlain()) {
91
            return $this->pushRaw($job->getPayload(), $queue);
92
        }
93
94 1
        $reflection = new \ReflectionMethod($this, 'createPayload');
95 1
        if ($reflection->getNumberOfParameters() === 3) { // version >= 5.7
96 1
            $payload = $this->createPayload($job, $queue, $data);
97 1
        } else {
98
            $payload = $this->createPayload($job, $data);
99
        }
100
101 1
        return $this->pushRaw($payload, $queue);
102
    }
103
104
    /**
105
     * Push a raw payload onto the queue.
106
     *
107
     * @param string $payload
108
     * @param string $queue
109
     * @param array  $options
110
     *
111
     * @return mixed
112
     */
113 3
    public function pushRaw($payload, $queue = null, array $options = [])
114
    {
115 3
        $message = new Message($payload);
116
117 3
        $driver = $this->parseQueue($queue);
118
119 3
        if ($driver instanceof Topic) {
120
            $vTagList = [];
121
            if ($this->topicOptions['filter'] === self::CMQ_TOPIC_TAG_FILTER_NAME) {
122
                $vTagList = explode(',', $queue);
123
            }
124
125
            $routingKey = null;
126
            if ($this->topicOptions['filter'] === self::CMQ_TOPIC_ROUTING_FILTER_NAME) {
127
                $routingKey = $queue;
128
            }
129
130
            return $driver->publish_message($message->msgBody, $vTagList, $routingKey);
131
        }
132
133 3
        return $driver->send_message($message, array_get($options, 'delay', 0));
0 ignored issues
show
Deprecated Code introduced by
The function array_get() has been deprecated: Arr::get() should be used directly instead. Will be removed in Laravel 5.9. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-deprecated  annotation

133
        return $driver->send_message($message, /** @scrutinizer ignore-deprecated */ array_get($options, 'delay', 0));

This function has been deprecated. The supplier of the function has supplied an explanatory message.

The explanatory message should give you some clue as to whether and when the function will be removed and what other function to use instead.

Loading history...
134
    }
135
136
    /**
137
     * Push a new job onto the queue after a delay.
138
     *
139
     * @param \DateTimeInterface|\DateInterval|int $delay
140
     * @param string|object                        $job
141
     * @param mixed                                $data
142
     * @param string                               $queue
143
     *
144
     * @return mixed
145
     */
146 1
    public function later($delay, $job, $data = '', $queue = null)
147
    {
148 1
        $payload = $this->isPlain() ? $job->getPayload() : $this->createPayload($job, $data);
149
150 1
        $delay = method_exists($this, 'getSeconds')
151 1
            ? $this->getSeconds($delay)
152 1
            : $this->secondsUntil($delay);
153
154 1
        return $this->pushRaw($payload, $queue, ['delay' => $delay]);
155
    }
156
157
    /**
158
     * Pop the next job off of the queue.
159
     *
160
     * @param string $queue
161
     *
162
     * @return \Illuminate\Contracts\Queue\Job|null
163
     */
164 1
    public function pop($queue = null)
165
    {
166
        try {
167 1
            $queue = $this->getQueue($queue);
168 1
            $message = $queue->receive_message($this->queueOptions['polling_wait_seconds']);
169 1
        } catch (CMQServerException $e) {
170
            if ($e->getCode() == self::CMQ_QUEUE_NO_MESSAGE_CODE) { //ignore no message
171
                return;
172
            }
173
174
            throw $e;
175
        }
176
177 1
        return new CMQJob($this->container, $this, $message, $queue);
178
    }
179
180
    /**
181
     * Get the queue.
182
     *
183
     * @param string $queue
184
     *
185
     * @return Driver\Queue
186
     */
187 7
    public function getQueue($queue = null)
188 1
    {
189 7
        return $this->queueAccount->get_queue($queue ?: $this->queueOptions['name']);
190
    }
191
192
    /**
193
     * Get the topic.
194
     *
195
     * @param string $topic
196
     *
197
     * @return Driver\Topic
198
     */
199 1
    public function getTopic($topic = null)
200
    {
201 1
        return $this->topicAccount->get_topic($topic ?: $this->topicOptions['name']);
202
    }
203
204
    /**
205
     * Parse name to topic or queue.
206
     *
207
     * @param string $queue
208
     *
209
     * @return Driver\Queue|Driver\Topic
210
     */
211 4
    public function parseQueue($queue = null)
212
    {
213 4
        if ($this->topicOptions['enable']) {
214
            $exchangeName = $this->topicOptions['name'] ?: $queue;
215
216
            return $this->getTopic($exchangeName);
217
        }
218
219 4
        $queueName = $queue ?: $this->queueOptions['name'];
220
221 4
        return $this->getQueue($queueName);
222
    }
223
}
224