Queue   A
last analyzed

Complexity

Total Complexity 26

Size/Duplication

Total Lines 324
Duplicated Lines 0 %

Coupling/Cohesion

Components 1
Dependencies 5

Test Coverage

Coverage 100%

Importance

Changes 3
Bugs 1 Features 0
Metric Value
wmc 26
c 3
b 1
f 0
lcom 1
cbo 5
dl 0
loc 324
ccs 79
cts 79
cp 1
rs 10

18 Methods

Rating   Name   Duplication   Size   Complexity  
A declareQueue() 0 14 1
A bind() 0 10 2
A unbind() 0 10 2
A ack() 0 6 1
A nack() 0 6 1
A reject() 0 6 1
A purge() 0 6 1
A cancel() 0 6 1
A delete() 0 6 1
A get() 0 10 2
A getOptions() 0 4 1
A setOptions() 0 8 2
A getMessageMapper() 0 7 2
A setMessageMapper() 0 5 1
B consume() 0 28 4
A getChannel() 0 4 1
A setChannel() 0 5 1
A getConnection() 0 4 1
1
<?php
2
3
namespace AMQPAL\Adapter\PhpAmqpLib;
4
5
use PhpAmqpLib\Message\AMQPMessage;
6
use AMQPAL\Adapter\ChannelInterface;
7
use AMQPAL\Adapter\ConnectionInterface;
8
use AMQPAL\Adapter\ConsumerInterface;
9
use AMQPAL\Adapter\Exception;
10
use AMQPAL\Adapter\Message;
11
use AMQPAL\Adapter\QueueInterface;
12
use AMQPAL\Options;
13
use AMQPAL\Exception as BaseException;
14
15
/**
16
 * Class Queue
17
 *
18
 * @package AMQPAL\Adapter\PhpAmqpLib
19
 */
20
class Queue implements QueueInterface
21
{
22
    /**
23
     * @var Channel
24
     */
25
    protected $channel;
26
    /**
27
     * @var Options\QueueOptions
28
     */
29
    protected $options;
30
    /**
31
     * @var MessageMapper
32
     */
33
    protected $messageMapper;
34
35
    /**
36
     * Declare a new queue on the broker.
37
     *
38
     * @return integer the message count.
39
     */
40 7
    public function declareQueue()
41
    {
42 7
        $this->channel->getResource()->queue_declare(
43 7
            $this->options->getName(),
44 7
            $this->options->isPassive(),
45 7
            $this->options->isDurable(),
46 7
            $this->options->isExclusive(),
47 7
            $this->options->isAutoDelete(),
48 7
            false,
49 7
            $this->options->getArguments()
50
        );
51
52 7
        return $this;
53
    }
54
55
    /**
56
     * Bind the given queue to a routing key on an exchange.
57
     *
58
     * @param string $exchangeName Name of the exchange to bind to.
59
     * @param string $routingKey   Pattern or routing key to bind with.
60
     * @param bool   $noWait       No wait for a reply
61
     * @param array  $arguments    Additional binding arguments.
62
     *
63
     * @return boolean
64
     */
65 7
    public function bind($exchangeName, $routingKey = null, $noWait = false, array $arguments = [])
66
    {
67 7
        if (null === $routingKey) {
68 6
            $routingKey = '';
69
        }
70 7
        $queueName = $this->options->getName();
71 7
        $this->channel->getResource()->queue_bind($queueName, $exchangeName, $routingKey, $noWait, $arguments);
72
73 7
        return $this;
74
    }
75
76
    /**
77
     * Remove a routing key binding on an exchange from the given queue.
78
     *
79
     * @param string $exchangeName  The name of the exchange on which the
80
     *                              queue is bound.
81
     * @param string $routingKey    The binding routing key used by the
82
     *                              queue.
83
     * @param array  $arguments     Additional binding arguments.
84
     *
85
     * @return $this
86
     */
87 2
    public function unbind($exchangeName, $routingKey = null, array $arguments = [])
88
    {
89 2
        if (null === $routingKey) {
90 1
            $routingKey = '';
91
        }
92 2
        $queueName = $this->options->getName();
93 2
        $this->channel->getResource()->queue_unbind($queueName, $exchangeName, $routingKey, $arguments);
94
95 2
        return $this;
96
    }
97
98
    /**
99
     * Acknowledge the receipt of a message.
100
     *
101
     * @param string $deliveryTag   The message delivery tag of which to
102
     *                              acknowledge receipt.
103
     * @param bool   $multiple      Acknowledge all previous
104
     *                              unacked messages as well.
105
     *
106
     * @return $this
107
     */
108 3
    public function ack($deliveryTag, $multiple = false)
109
    {
110 3
        $this->channel->getResource()->basic_ack($deliveryTag, $multiple);
111
112 3
        return $this;
113
    }
114
115
    /**
116
     * Mark a message as explicitly not acknowledged.
117
     *
118
     * Mark the message identified by delivery_tag as explicitly not
119
     * acknowledged. This method can only be called on messages that have not
120
     * yet been acknowledged. When called, the broker will immediately put the
121
     * message back onto the queue, instead of waiting until the connection is
122
     * closed. This method is only supported by the RabbitMQ broker. The
123
     * behavior of calling this method while connected to any other broker is
124
     * undefined.
125
     *
126
     * @param string $deliveryTag   Delivery tag of last message to reject.
127
     * @param bool   $requeue       Requeue the message(s).
128
     * @param bool   $multiple      Mark as not acknowledge all previous
129
     *                              unacked messages as well.
130
     *
131
     * @return $this
132
     */
133 4
    public function nack($deliveryTag, $requeue = false, $multiple = false)
134
    {
135 4
        $this->channel->getResource()->basic_nack($deliveryTag, $multiple, $requeue);
136
137 4
        return $this;
138
    }
139
140
    /**
141
     * Mark one message as explicitly not acknowledged.
142
     *
143
     * Mark the message identified by delivery_tag as explicitly not
144
     * acknowledged. This method can only be called on messages that have not
145
     * yet been acknowledged.
146
     *
147
     * @param string $deliveryTag Delivery tag of the message to reject.
148
     * @param bool   $requeue     Requeue the message(s).
149
     *
150
     * @return $this
151
     */
152 3
    public function reject($deliveryTag, $requeue = false)
153
    {
154 3
        $this->channel->getResource()->basic_reject($deliveryTag, $requeue);
155
156 3
        return $this;
157
    }
158
159
    /**
160
     * Purge the contents of a queue.
161
     *
162
     * @return $this
163
     */
164 1
    public function purge()
165
    {
166 1
        $this->channel->getResource()->queue_purge($this->options->getName());
167
168 1
        return $this;
169
    }
170
171
    /**
172
     * Cancel a queue that is already bound to an exchange and routing key.
173
     *
174
     * @param string $consumerTag  The queue name to cancel, if the queue
175
     *                             object is not already representative of
176
     *                             a queue.
177
     *
178
     * @return $this
179
     */
180 4
    public function cancel($consumerTag = '')
181
    {
182 4
        $this->channel->getResource()->basic_cancel($consumerTag);
183
184 4
        return $this;
185
    }
186
187
    /**
188
     * Delete a queue from the broker.
189
     *
190
     * This includes its entire contents of unread or unacknowledged messages.
191
     *
192
     * @param bool $ifUnused        Optionally $ifUnused can be specified
193
     *                              to indicate the queue should not be
194
     *                              deleted until no clients are connected to
195
     *                              it.
196
     * @param bool $ifEmpty         Optionally $ifUnused can be specified
197
     *                              to indicate the queue should not be
198
     *                              deleted until it's empty
199
     * @param bool $noWait          No wait for a reply
200
     *
201
     * @return $this
202
     * @throws Exception\InvalidArgumentException
203
     */
204 8
    public function delete($ifUnused = false, $ifEmpty = false, $noWait = false)
205
    {
206 8
        $this->channel->getResource()->queue_delete($this->options->getName(), $ifUnused, $ifEmpty, $noWait);
207
208 8
        return $this;
209
    }
210
211
    /**
212
     * Retrieve the next message from the queue.
213
     *
214
     * @param bool $autoAck
215
     * @return null|Message
216
     * @throws \OutOfBoundsException
217
     */
218 4
    public function get($autoAck = false)
219
    {
220
        /** @var AMQPMessage $message */
221 4
        $message = $this->channel->getResource()->basic_get($this->getOptions()->getName(), $autoAck);
222 4
        if (!$message) {
223 1
            return null;
224
        }
225
226 3
        return $this->getMessageMapper()->toMessage($message);
227
    }
228
229
    /**
230
     * @return Options\QueueOptions
231
     */
232 12
    public function getOptions()
233
    {
234 12
        return $this->options;
235
    }
236
237
    /**
238
     * @param Options\QueueOptions|\Traversable|array $options
239
     * @return $this
240
     * @throws BaseException\BadMethodCallException
241
     * @throws BaseException\InvalidArgumentException
242
     */
243 36
    public function setOptions($options)
244
    {
245 36
        if (!$options instanceof Options\QueueOptions) {
246 1
            $options = new Options\QueueOptions($options);
247
        }
248 36
        $this->options = $options;
249 36
        return $this;
250
    }
251
252
    /**
253
     * @return MessageMapper
254
     */
255 10
    public function getMessageMapper()
256
    {
257 10
        if (!$this->messageMapper) {
258 6
            $this->messageMapper = new MessageMapper();
259
        }
260 10
        return $this->messageMapper;
261
    }
262
263
    /**
264
     * @param MessageMapper $messageMapper
265
     * @return $this
266
     */
267 5
    public function setMessageMapper(MessageMapper $messageMapper)
268
    {
269 5
        $this->messageMapper = $messageMapper;
270 5
        return $this;
271
    }
272
273
    /**
274
     * Consume messages from a queue (blocking function).
275
     *
276
     * @param callback|ConsumerInterface|null $callback     A callback function to which the
277
     *                                                      consumed message will be passed.
278
     * @param bool                            $noLocal
279
     * @param bool                            $autoAck
280
     * @param bool                            $exclusive
281
     * @param string                          $consumerTag  A string describing this consumer. Used
282
     *                                                      for canceling subscriptions with cancel().
283
     * @return $this
284
     */
285 6
    public function consume(
286
        callable $callback = null,
287
        $noLocal = false,
288
        $autoAck = false,
289
        $exclusive = false,
290
        $consumerTag = null
291
    ) {
292 6
        if (null === $consumerTag) {
293 1
            $consumerTag = '';
294
        }
295
296 6
        $queue = $this->getOptions()->getName();
297
298 6
        $consumerCallback = null;
299 6
        if ($callback) {
300 6
            $consumerCallback = new ConsumerCallback($callback, $this);
301 6
            $consumerCallback->setMessageMapper($this->getMessageMapper());
302
        }
303
304 6
        $this->channel->getResource()
305 6
            ->basic_consume($queue, $consumerTag, $noLocal, $autoAck, $exclusive, false, $consumerCallback);
306
307 6
        while (count($this->channel->getResource()->callbacks)) {
308 6
            $this->channel->getResource()->wait();
309
        }
310
311 6
        return $this;
312
    }
313
314
    /**
315
     * Get the Channel object in use
316
     *
317
     * @return ChannelInterface
318
     */
319 1
    public function getChannel()
320
    {
321 1
        return $this->channel;
322
    }
323
324
    /**
325
     * @param Channel $channel
326
     * @return $this
327
     */
328 36
    public function setChannel(Channel $channel)
329
    {
330 36
        $this->channel = $channel;
331 36
        return $this;
332
    }
333
334
    /**
335
     * Get the Connection object in use
336
     *
337
     * @return ConnectionInterface
338
     */
339 1
    public function getConnection()
340
    {
341 1
        return $this->channel->getConnection();
342
    }
343
}
344