Completed
Push — master ( f55883...4285c1 )
by Thomas Mauro
13:12 queued 10:06
created

Queue::get()   A

Complexity

Conditions 3
Paths 2

Size

Total Lines 9
Code Lines 5

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 5
CRAP Score 3

Importance

Changes 1
Bugs 0 Features 0
Metric Value
c 1
b 0
f 0
dl 0
loc 9
ccs 5
cts 5
cp 1
rs 9.6666
cc 3
eloc 5
nc 2
nop 1
crap 3
1
<?php
2
3
namespace AMQPAL\Adapter\AMQP;
4
5
use AMQPQueue;
6
use AMQPAL\Adapter\ConsumerInterface;
7
use AMQPAL\Adapter\Message;
8
use AMQPAL\Adapter\QueueInterface;
9
use AMQPAL\Options;
10
11
/**
12
 * Class Queue
13
 *
14
 * @package AMQPAL\Adapter\AMQP
15
 */
16
class Queue implements QueueInterface
17
{
18
    /**
19
     * @var Channel
20
     */
21
    protected $channel;
22
23
    /**
24
     * @var AMQPQueue
25
     */
26
    protected $resource;
27
28
    /**
29
     * @var Options\QueueOptions
30
     */
31
    protected $options;
32
33
    /**
34
     * @var MessageMapper
35
     */
36
    protected $messageMapper;
37
38
    /**
39
     * @return Options\QueueOptions
40
     */
41 32
    public function getOptions()
42
    {
43 32
        return $this->options;
44
    }
45
46
    /**
47
     * @param Options\QueueOptions $options
48
     * @return $this
49
     */
50 32
    public function setOptions(Options\QueueOptions $options)
51
    {
52 32
        $this->options = $options;
53 32
        $this->configureQueue();
54 32
        return $this;
55
    }
56
57
    /**
58
     * @return $this
59
     */
60 32
    protected function configureQueue()
61
    {
62 32
        $options = $this->getOptions();
63 32
        $queue = $this->getResource();
64
65 32
        $flags = AMQP_NOPARAM;
66 32
        if ($options->isDurable()) {
67 32
            $flags |= AMQP_DURABLE;
68
        }
69 32
        if ($options->isPassive()) {
70 32
            $flags |= AMQP_PASSIVE;
71
        }
72 32
        if ($options->isAutoDelete()) {
73 32
            $flags |= AMQP_AUTODELETE;
74
        }
75 32
        if ($options->isExclusive()) {
76 32
            $flags |= AMQP_EXCLUSIVE;
77
        }
78 32
        if ($options->isNoWait()) {
79 32
            $flags |= AMQP_NOWAIT;
80
        }
81
82 32
        $queue->setName($options->getName());
83 32
        $queue->setFlags($flags);
84 32
        $queue->setArguments($options->getArguments());
85
86 32
        return $this;
87
    }
88
89
    /**
90
     * @return AMQPQueue
91
     */
92 33
    public function getResource()
93
    {
94 33
        return $this->resource;
95
    }
96
97
    /**
98
     * @param AMQPQueue $resource
99
     * @return $this
100
     */
101 33
    public function setResource(AMQPQueue $resource)
102
    {
103 33
        $this->resource = $resource;
104 33
        return $this;
105
    }
106
107
    /**
108
     * Declare a new queue on the broker.
109
     *
110
     * @return $this
111
     * @throws \AMQPChannelException
112
     * @throws \AMQPConnectionException
113
     */
114 1
    public function declareQueue()
115
    {
116 1
        $this->getResource()->declareQueue();
117
118 1
        return $this;
119
    }
120
121
    /**
122
     * Bind the given queue to a routing key on an exchange.
123
     *
124
     * @param string $exchangeName Name of the exchange to bind to.
125
     * @param string $routingKey   Pattern or routing key to bind with.
126
     * @param bool   $noWait       No wait for a reply
127
     * @param array  $arguments    Additional binding arguments.
128
     *
129
     * @return $this
130
     * @throws \AMQPChannelException
131
     * @throws \AMQPConnectionException
132
     */
133 1
    public function bind($exchangeName, $routingKey = null, $noWait = false, array $arguments = [])
134
    {
135 1
        $this->getResource()->bind($exchangeName, $routingKey, $arguments);
136
137 1
        return $this;
138
    }
139
140
    /**
141
     * Remove a routing key binding on an exchange from the given queue.
142
     *
143
     * @param string $exchangeName  The name of the exchange on which the
144
     *                              queue is bound.
145
     * @param string $routingKey    The binding routing key used by the
146
     *                              queue.
147
     * @param array  $arguments     Additional binding arguments.
148
     *
149
     * @return $this
150
     * @throws \AMQPChannelException
151
     * @throws \AMQPConnectionException
152
     */
153 1
    public function unbind($exchangeName, $routingKey = null, array $arguments = [])
154
    {
155 1
        $this->getResource()->unbind($exchangeName, $routingKey, $arguments);
156
157 1
        return $this;
158
    }
159
160
    /**
161
     * Acknowledge the receipt of a message.
162
     *
163
     * @param string $deliveryTag   The message delivery tag of which to
164
     *                              acknowledge receipt.
165
     * @param bool   $multiple      Acknowledge all previous
166
     *                              unacked messages as well.
167
     *
168
     * @return $this
169
     * @throws \AMQPChannelException
170
     * @throws \AMQPConnectionException
171
     */
172 2
    public function ack($deliveryTag, $multiple = false)
173
    {
174 2
        $flags = AMQP_NOPARAM;
175 2
        if ($multiple) {
176 1
            $flags |= AMQP_MULTIPLE;
177
        }
178 2
        $this->getResource()->ack($deliveryTag, $flags);
179
180 2
        return $this;
181
    }
182
183
    /**
184
     * Mark a message as explicitly not acknowledged.
185
     *
186
     * Mark the message identified by delivery_tag as explicitly not
187
     * acknowledged. This method can only be called on messages that have not
188
     * yet been acknowledged. When called, the broker will immediately put the
189
     * message back onto the queue, instead of waiting until the connection is
190
     * closed. This method is only supported by the RabbitMQ broker. The
191
     * behavior of calling this method while connected to any other broker is
192
     * undefined.
193
     *
194
     * @param string $deliveryTag   Delivery tag of last message to reject.
195
     * @param bool   $requeue       Requeue the message(s).
196
     * @param bool   $multiple      Mark as not acknowledge all previous
197
     *                              unacked messages as well.
198
     *
199
     * @return $this
200
     * @throws \AMQPChannelException
201
     * @throws \AMQPConnectionException
202
     */
203 4
    public function nack($deliveryTag, $requeue = false, $multiple = false)
204
    {
205 4
        $flags = AMQP_NOPARAM;
206 4
        if ($requeue) {
207 2
            $flags |= AMQP_REQUEUE;
208
        }
209 4
        if ($multiple) {
210 2
            $flags |= AMQP_MULTIPLE;
211
        }
212 4
        $this->getResource()->nack($deliveryTag, $flags);
213
214 4
        return $this;
215
    }
216
217
    /**
218
     * Mark one message as explicitly not acknowledged.
219
     *
220
     * Mark the message identified by delivery_tag as explicitly not
221
     * acknowledged. This method can only be called on messages that have not
222
     * yet been acknowledged.
223
     *
224
     * @param string $deliveryTag Delivery tag of the message to reject.
225
     * @param bool   $requeue     Requeue the message(s).
226
     *
227
     * @return $this
228
     * @throws \AMQPChannelException
229
     * @throws \AMQPConnectionException
230
     */
231 2
    public function reject($deliveryTag, $requeue = false)
232
    {
233 2
        $flags = AMQP_NOPARAM;
234 2
        if ($requeue) {
235 1
            $flags |= AMQP_REQUEUE;
236
        }
237 2
        $this->getResource()->reject($deliveryTag, $flags);
238
239 2
        return $this;
240
    }
241
242
    /**
243
     * Purge the contents of a queue.
244
     *
245
     * @return $this
246
     * @throws \AMQPChannelException
247
     * @throws \AMQPConnectionException
248
     */
249 1
    public function purge()
250
    {
251 1
        $this->getResource()->purge();
252
253 1
        return $this;
254
    }
255
256
    /**
257
     * Cancel a queue that is already bound to an exchange and routing key.
258
     *
259
     * @param string $consumerTag  The queue name to cancel, if the queue
260
     *                             object is not already representative of
261
     *                             a queue.
262
     *
263
     * @return $this
264
     * @throws \AMQPChannelException
265
     * @throws \AMQPConnectionException
266
     */
267 1
    public function cancel($consumerTag = '')
268
    {
269 1
        if (null === $consumerTag) {
270 1
            $consumerTag = '';
271
        }
272 1
        $this->getResource()->cancel($consumerTag);
273
274 1
        return $this;
275
    }
276
277
    /**
278
     * Delete a queue from the broker.
279
     *
280
     * This includes its entire contents of unread or unacknowledged messages.
281
     *
282
     * @param bool $ifUnused        Optionally $ifUnused can be specified
283
     *                              to indicate the queue should not be
284
     *                              deleted until no clients are connected to
285
     *                              it.
286
     * @param bool $ifEmpty         Optionally $ifUnused can be specified
287
     *                              to indicate the queue should not be
288
     *                              deleted until it's empty
289
     * @param bool $noWait          No wait for a reply
290
     *
291
     * @return $this
292
     * @throws \AMQPChannelException
293
     * @throws \AMQPConnectionException
294
     */
295 8
    public function delete($ifUnused = false, $ifEmpty = false, $noWait = false)
296
    {
297 8
        $flags = AMQP_NOPARAM;
298 8
        if ($ifUnused) {
299 4
            $flags |= AMQP_IFUNUSED;
300
        }
301 8
        if ($ifEmpty) {
302 4
            $flags |= AMQP_IFEMPTY;
303
        }
304 8
        if ($noWait) {
305 4
            $flags |= AMQP_NOWAIT;
306
        }
307 8
        $this->getResource()->delete($flags);
308
309 8
        return $this;
310
    }
311
312
    /**
313
     * Retrieve the next message from the queue.
314
     *
315
     * @param bool $autoAck
316
     * @return null|Message
317
     * @throws \AMQPChannelException
318
     * @throws \AMQPConnectionException
319
     */
320 3
    public function get($autoAck = false)
321
    {
322 3
        $message = $this->getResource()->get($autoAck ? AMQP_AUTOACK : AMQP_NOPARAM);
323 3
        if (!$message) {
324 2
            return null;
325
        }
326
327 1
        return $this->getMessageMapper()->toMessage($message);
0 ignored issues
show
Bug introduced by
It seems like $message defined by $this->getResource()->ge...AUTOACK : AMQP_NOPARAM) on line 322 can also be of type boolean; however, AMQPAL\Adapter\AMQP\MessageMapper::toMessage() does only seem to accept object<AMQPEnvelope>, maybe add an additional type check?

If a method or function can return multiple different values and unless you are sure that you only can receive a single value in this context, we recommend to add an additional type check:

/**
 * @return array|string
 */
function returnsDifferentValues($x) {
    if ($x) {
        return 'foo';
    }

    return array();
}

$x = returnsDifferentValues($y);
if (is_array($x)) {
    // $x is an array.
}

If this a common case that PHP Analyzer should handle natively, please let us know by opening an issue.

Loading history...
328
    }
329
330
    /**
331
     * @return MessageMapper
332
     */
333 3
    public function getMessageMapper()
334
    {
335 3
        if (!$this->messageMapper) {
336 1
            $this->messageMapper = new MessageMapper();
337
        }
338 3
        return $this->messageMapper;
339
    }
340
341
    /**
342
     * @param MessageMapper $messageMapper
343
     * @return $this
344
     */
345 10
    public function setMessageMapper(MessageMapper $messageMapper)
346
    {
347 10
        $this->messageMapper = $messageMapper;
348 10
        return $this;
349
    }
350
351
    /**
352
     * Consume messages from a queue.
353
     *
354
     * @param string                          $consumerTag  A string describing this consumer. Used
355
     *                                                      for canceling subscriptions with cancel().
356
     * @param bool                            $noLocal
357
     * @param bool                            $autoAck
358
     * @param bool                            $exclusive
359
     * @param bool                            $nowait       No wait for a reply.
360
     * @param callback|ConsumerInterface|null $callback     A callback function to which the
361
     *                                                      consumed message will be passed.
362
     * @return $this
363
     * @throws \AMQPChannelException
364
     * @throws \AMQPConnectionException
365
     */
366 7
    public function consume(
367
        $consumerTag = null,
368
        $noLocal = false,
369
        $autoAck = false,
370
        $exclusive = false,
371
        $nowait = false,
372
        callable $callback = null
373
    ) {
374 7
        $consumerCallback = null;
375 7
        if ($callback) {
376 1
            $consumerCallback = new ConsumerCallback($callback, $this);
377 1
            $consumerCallback->setMessageMapper($this->getMessageMapper());
378
        }
379
380 7
        $flags = AMQP_NOPARAM;
381 7
        if ($noLocal) {
382 2
            $flags |= AMQP_NOLOCAL;
383
        }
384 7
        if ($autoAck) {
385 2
            $flags |= AMQP_AUTOACK;
386
        }
387 7
        if ($exclusive) {
388 2
            $flags |= AMQP_EXCLUSIVE;
389
        }
390 7
        if ($nowait) {
391 2
            $flags |= AMQP_NOWAIT;
392
        }
393
394 7
        $this->getResource()->consume($consumerCallback, $flags, $consumerTag);
395
396 7
        return $this;
397
    }
398
399
    /**
400
     * Get the Channel object in use
401
     *
402
     * @return Channel
403
     */
404 1
    public function getChannel()
405
    {
406 1
        return $this->channel;
407
    }
408
409
    /**
410
     * @param Channel $channel
411
     * @return $this
412
     */
413 2
    public function setChannel(Channel $channel)
414
    {
415 2
        $this->channel = $channel;
416 2
        return $this;
417
    }
418
419
    /**
420
     * Get the Connection object in use
421
     *
422
     * @return Connection
423
     */
424 1
    public function getConnection()
425
    {
426 1
        return $this->channel->getConnection();
427
    }
428
}
429