Completed
Push — master ( 3a5f88...05006a )
by Thomas Mauro
10:18 queued 07:14
created

Queue   A

Complexity

Total Complexity 24

Size/Duplication

Total Lines 317
Duplicated Lines 0 %

Coupling/Cohesion

Components 1
Dependencies 5

Test Coverage

Coverage 93.33%

Importance

Changes 2
Bugs 0 Features 0
Metric Value
wmc 24
c 2
b 0
f 0
lcom 1
cbo 5
dl 0
loc 317
ccs 70
cts 75
cp 0.9333
rs 10

18 Methods

Rating   Name   Duplication   Size   Complexity  
A declareQueue() 0 14 1
A bind() 0 10 2
A unbind() 0 10 2
A ack() 0 6 1
A nack() 0 6 1
A reject() 0 6 1
A purge() 0 6 1
A cancel() 0 6 1
A delete() 0 6 1
A getOptions() 0 4 1
A setOptions() 0 5 1
A getMessageMapper() 0 7 2
A setMessageMapper() 0 5 1
B consume() 0 25 3
A setChannel() 0 5 1
A get() 0 10 2
A getChannel() 0 4 1
A getConnection() 0 4 1
1
<?php
2
3
namespace AMQPAL\Adapter\PhpAmqpLib;
4
5
use PhpAmqpLib\Message\AMQPMessage;
6
use AMQPAL\Adapter\ChannelInterface;
7
use AMQPAL\Adapter\ConnectionInterface;
8
use AMQPAL\Adapter\ConsumerInterface;
9
use AMQPAL\Adapter\Exception;
10
use AMQPAL\Adapter\Message;
11
use AMQPAL\Adapter\QueueInterface;
12
use AMQPAL\Options;
13
14
/**
15
 * Class Queue
16
 *
17
 * @package AMQPAL\Adapter\PhpAmqpLib
18
 */
19
class Queue implements QueueInterface
20
{
21
    /**
22
     * @var Channel
23
     */
24
    protected $channel;
25
    /**
26
     * @var Options\QueueOptions
27
     */
28
    protected $options;
29
    /**
30
     * @var MessageMapper
31
     */
32
    protected $messageMapper;
33
34
    /**
35
     * Declare a new queue on the broker.
36
     *
37
     * @return integer the message count.
38
     */
39 1
    public function declareQueue()
40
    {
41 1
        $this->channel->getResource()->queue_declare(
42 1
            $this->options->getName(),
43 1
            $this->options->isPassive(),
44 1
            $this->options->isDurable(),
45 1
            $this->options->isExclusive(),
46 1
            $this->options->isAutoDelete(),
47 1
            $this->options->isNoWait(),
48 1
            $this->options->getArguments()
49
        );
50
51 1
        return $this;
52
    }
53
54
    /**
55
     * Bind the given queue to a routing key on an exchange.
56
     *
57
     * @param string $exchangeName Name of the exchange to bind to.
58
     * @param string $routingKey   Pattern or routing key to bind with.
59
     * @param bool   $noWait       No wait for a reply
60
     * @param array  $arguments    Additional binding arguments.
61
     *
62
     * @return boolean
63
     */
64 2
    public function bind($exchangeName, $routingKey = null, $noWait = false, array $arguments = [])
65
    {
66 2
        if (null === $routingKey) {
67 1
            $routingKey = '';
68
        }
69 2
        $queueName = $this->options->getName();
70 2
        $this->channel->getResource()->queue_bind($queueName, $exchangeName, $routingKey, $noWait, $arguments);
71
72 2
        return $this;
73
    }
74
75
    /**
76
     * Remove a routing key binding on an exchange from the given queue.
77
     *
78
     * @param string $exchangeName  The name of the exchange on which the
79
     *                              queue is bound.
80
     * @param string $routingKey    The binding routing key used by the
81
     *                              queue.
82
     * @param array  $arguments     Additional binding arguments.
83
     *
84
     * @return $this
85
     */
86 2
    public function unbind($exchangeName, $routingKey = null, array $arguments = [])
87
    {
88 2
        if (null === $routingKey) {
89 1
            $routingKey = '';
90
        }
91 2
        $queueName = $this->options->getName();
92 2
        $this->channel->getResource()->queue_unbind($queueName, $exchangeName, $routingKey, $arguments);
93
94 2
        return $this;
95
    }
96
97
    /**
98
     * Acknowledge the receipt of a message.
99
     *
100
     * @param string $deliveryTag   The message delivery tag of which to
101
     *                              acknowledge receipt.
102
     * @param bool   $multiple      Acknowledge all previous
103
     *                              unacked messages as well.
104
     *
105
     * @return $this
106
     */
107 2
    public function ack($deliveryTag, $multiple = false)
108
    {
109 2
        $this->channel->getResource()->basic_ack($deliveryTag, $multiple);
110
111 2
        return $this;
112
    }
113
114
    /**
115
     * Mark a message as explicitly not acknowledged.
116
     *
117
     * Mark the message identified by delivery_tag as explicitly not
118
     * acknowledged. This method can only be called on messages that have not
119
     * yet been acknowledged. When called, the broker will immediately put the
120
     * message back onto the queue, instead of waiting until the connection is
121
     * closed. This method is only supported by the RabbitMQ broker. The
122
     * behavior of calling this method while connected to any other broker is
123
     * undefined.
124
     *
125
     * @param string $deliveryTag   Delivery tag of last message to reject.
126
     * @param bool   $requeue       Requeue the message(s).
127
     * @param bool   $multiple      Mark as not acknowledge all previous
128
     *                              unacked messages as well.
129
     *
130
     * @return $this
131
     */
132 4
    public function nack($deliveryTag, $requeue = false, $multiple = false)
133
    {
134 4
        $this->channel->getResource()->basic_nack($deliveryTag, $multiple, $requeue);
135
136 4
        return $this;
137
    }
138
139
    /**
140
     * Mark one message as explicitly not acknowledged.
141
     *
142
     * Mark the message identified by delivery_tag as explicitly not
143
     * acknowledged. This method can only be called on messages that have not
144
     * yet been acknowledged.
145
     *
146
     * @param string $deliveryTag Delivery tag of the message to reject.
147
     * @param bool   $requeue     Requeue the message(s).
148
     *
149
     * @return $this
150
     */
151 2
    public function reject($deliveryTag, $requeue = false)
152
    {
153 2
        $this->channel->getResource()->basic_reject($deliveryTag, $requeue);
154
155 2
        return $this;
156
    }
157
158
    /**
159
     * Purge the contents of a queue.
160
     *
161
     * @return $this
162
     */
163 1
    public function purge()
164
    {
165 1
        $this->channel->getResource()->queue_purge($this->options->getName());
166
167 1
        return $this;
168
    }
169
170
    /**
171
     * Cancel a queue that is already bound to an exchange and routing key.
172
     *
173
     * @param string $consumerTag  The queue name to cancel, if the queue
174
     *                             object is not already representative of
175
     *                             a queue.
176
     *
177
     * @return $this
178
     */
179 1
    public function cancel($consumerTag = '')
180
    {
181 1
        $this->channel->getResource()->basic_cancel($consumerTag);
182
183 1
        return $this;
184
    }
185
186
    /**
187
     * Delete a queue from the broker.
188
     *
189
     * This includes its entire contents of unread or unacknowledged messages.
190
     *
191
     * @param bool $ifUnused        Optionally $ifUnused can be specified
192
     *                              to indicate the queue should not be
193
     *                              deleted until no clients are connected to
194
     *                              it.
195
     * @param bool $ifEmpty         Optionally $ifUnused can be specified
196
     *                              to indicate the queue should not be
197
     *                              deleted until it's empty
198
     * @param bool $noWait          No wait for a reply
199
     *
200
     * @return $this
201
     * @throws Exception\InvalidArgumentException
202
     */
203 8
    public function delete($ifUnused = false, $ifEmpty = false, $noWait = false)
204
    {
205 8
        $this->channel->getResource()->queue_delete($this->options->getName(), $ifUnused, $ifEmpty, $noWait);
206
207 8
        return $this;
208
    }
209
210
    /**
211
     * Retrieve the next message from the queue.
212
     *
213
     * @param bool $autoAck
214
     * @return null|Message
215
     * @throws \OutOfBoundsException
216
     */
217 1
    public function get($autoAck = false)
218
    {
219
        /** @var AMQPMessage $message */
220 1
        $message = $this->channel->getResource()->basic_get($this->getOptions()->getName(), !$autoAck);
221 1
        if (!$message) {
222 1
            return null;
223
        }
224
225
        return $this->getMessageMapper()->toMessage($message);
226
    }
227
228
    /**
229
     * @return Options\QueueOptions
230
     */
231 4
    public function getOptions()
232
    {
233 4
        return $this->options;
234
    }
235
236
    /**
237
     * @param Options\QueueOptions $options
238
     * @return $this
239
     */
240 27
    public function setOptions(Options\QueueOptions $options)
241
    {
242 27
        $this->options = $options;
243 27
        return $this;
244
    }
245
246
    /**
247
     * @return MessageMapper
248
     */
249 4
    public function getMessageMapper()
250
    {
251 4
        if (!$this->messageMapper) {
252 1
            $this->messageMapper = new MessageMapper();
253
        }
254 4
        return $this->messageMapper;
255
    }
256
257
    /**
258
     * @param MessageMapper $messageMapper
259
     * @return $this
260
     */
261 4
    public function setMessageMapper(MessageMapper $messageMapper)
262
    {
263 4
        $this->messageMapper = $messageMapper;
264 4
        return $this;
265
    }
266
267
    /**
268
     * Consume messages from a queue.
269
     *
270
     * @param string                          $consumerTag  A string describing this consumer. Used
271
     *                                                      for canceling subscriptions with cancel().
272
     * @param bool                            $noLocal
273
     * @param bool                            $autoAck
274
     * @param bool                            $exclusive
275
     * @param bool                            $nowait       No wait for a reply.
276
     * @param callback|ConsumerInterface|null $callback     A callback function to which the
277
     *                                                      consumed message will be passed.
278
     * @return $this
279
     */
280 3
    public function consume(
281
        $consumerTag = null,
282
        $noLocal = false,
283
        $autoAck = false,
284
        $exclusive = false,
285
        $nowait = false,
286
        callable $callback = null
287
    ) {
288 3
        if (null === $consumerTag) {
289 1
            $consumerTag = '';
290
        }
291
292 3
        $queue = $this->getOptions()->getName();
293
294 3
        $consumerCallback = null;
295 3
        if ($callback) {
296 3
            $consumerCallback = new ConsumerCallback($callback, $this);
297 3
            $consumerCallback->setMessageMapper($this->getMessageMapper());
298
        }
299
300 3
        $this->channel->getResource()
301 3
            ->basic_consume($queue, $consumerTag, $noLocal, !$autoAck, $exclusive, $nowait, $consumerCallback);
302
303 3
        return $this;
304
    }
305
306
    /**
307
     * Get the Channel object in use
308
     *
309
     * @return ChannelInterface
310
     */
311
    public function getChannel()
312
    {
313
        return $this->channel;
314
    }
315
316
    /**
317
     * @param Channel $channel
318
     * @return $this
319
     */
320 27
    public function setChannel(Channel $channel)
321
    {
322 27
        $this->channel = $channel;
323 27
        return $this;
324
    }
325
326
    /**
327
     * Get the Connection object in use
328
     *
329
     * @return ConnectionInterface
330
     */
331
    public function getConnection()
332
    {
333
        return $this->channel->getConnection();
334
    }
335
}
336