Completed
Push — master ( 3a5f88...05006a )
by Thomas Mauro
10:18 queued 07:14
created

Queue::getChannel()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 4
Code Lines 2

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 0
CRAP Score 2

Importance

Changes 1
Bugs 0 Features 0
Metric Value
c 1
b 0
f 0
dl 0
loc 4
ccs 0
cts 2
cp 0
rs 10
cc 1
eloc 2
nc 1
nop 0
crap 2
1
<?php
2
3
namespace AMQPAL\Adapter\AMQP;
4
5
use AMQPQueue;
6
use AMQPAL\Adapter\ConsumerInterface;
7
use AMQPAL\Adapter\Message;
8
use AMQPAL\Adapter\QueueInterface;
9
use AMQPAL\Options;
10
11
/**
12
 * Class Queue
13
 *
14
 * @package AMQPAL\Adapter\AMQP
15
 */
16
class Queue implements QueueInterface
17
{
18
    /**
19
     * @var Channel
20
     */
21
    protected $channel;
22
23
    /**
24
     * @var AMQPQueue
25
     */
26
    protected $resource;
27
28
    /**
29
     * @var Options\QueueOptions
30
     */
31
    protected $options;
32
33
    /**
34
     * @var MessageMapper
35
     */
36
    protected $messageMapper;
37
38
    /**
39
     * @return Options\QueueOptions
40
     */
41 30
    public function getOptions()
42
    {
43 30
        return $this->options;
44
    }
45
46
    /**
47
     * @param Options\QueueOptions $options
48
     * @return $this
49
     */
50 30
    public function setOptions(Options\QueueOptions $options)
51
    {
52 30
        $this->options = $options;
53 30
        $this->configureQueue();
54 30
        return $this;
55
    }
56
57
    /**
58
     * @return $this
59
     */
60 30
    protected function configureQueue()
61
    {
62 30
        $options = $this->getOptions();
63 30
        $queue = $this->getResource();
64
65 30
        $flags = AMQP_NOPARAM;
66 30
        if ($options->isDurable()) {
67 30
            $flags |= AMQP_DURABLE;
68
        }
69 30
        if ($options->isPassive()) {
70 30
            $flags |= AMQP_PASSIVE;
71
        }
72 30
        if ($options->isAutoDelete()) {
73 30
            $flags |= AMQP_AUTODELETE;
74
        }
75 30
        if ($options->isExclusive()) {
76 30
            $flags |= AMQP_EXCLUSIVE;
77
        }
78 30
        if ($options->isNoWait()) {
79 30
            $flags |= AMQP_NOWAIT;
80
        }
81
82 30
        $queue->setName($options->getName());
83 30
        $queue->setFlags($flags);
84 30
        $queue->setArguments($options->getArguments());
85
86 30
        return $this;
87
    }
88
89
    /**
90
     * @return AMQPQueue
91
     */
92 30
    public function getResource()
93
    {
94 30
        return $this->resource;
95
    }
96
97
    /**
98
     * @param AMQPQueue $resource
99
     * @return $this
100
     */
101 30
    public function setResource(AMQPQueue $resource)
102
    {
103 30
        $this->resource = $resource;
104 30
        return $this;
105
    }
106
107
    /**
108
     * Declare a new queue on the broker.
109
     *
110
     * @return $this
111
     * @throws \AMQPChannelException
112
     * @throws \AMQPConnectionException
113
     */
114 1
    public function declareQueue()
115
    {
116 1
        $this->getResource()->declareQueue();
117
118 1
        return $this;
119
    }
120
121
    /**
122
     * Bind the given queue to a routing key on an exchange.
123
     *
124
     * @param string $exchangeName Name of the exchange to bind to.
125
     * @param string $routingKey   Pattern or routing key to bind with.
126
     * @param bool   $noWait       No wait for a reply
127
     * @param array  $arguments    Additional binding arguments.
128
     *
129
     * @return $this
130
     * @throws \AMQPChannelException
131
     * @throws \AMQPConnectionException
132
     */
133 1
    public function bind($exchangeName, $routingKey = null, $noWait = false, array $arguments = [])
134
    {
135 1
        $this->getResource()->bind($exchangeName, $routingKey, $arguments);
136
137 1
        return $this;
138
    }
139
140
    /**
141
     * Remove a routing key binding on an exchange from the given queue.
142
     *
143
     * @param string $exchangeName  The name of the exchange on which the
144
     *                              queue is bound.
145
     * @param string $routingKey    The binding routing key used by the
146
     *                              queue.
147
     * @param array  $arguments     Additional binding arguments.
148
     *
149
     * @return $this
150
     * @throws \AMQPChannelException
151
     * @throws \AMQPConnectionException
152
     */
153 1
    public function unbind($exchangeName, $routingKey = null, array $arguments = [])
154
    {
155 1
        $this->getResource()->unbind($exchangeName, $routingKey, $arguments);
156
157 1
        return $this;
158
    }
159
160
    /**
161
     * Acknowledge the receipt of a message.
162
     *
163
     * @param string $deliveryTag   The message delivery tag of which to
164
     *                              acknowledge receipt.
165
     * @param bool   $multiple      Acknowledge all previous
166
     *                              unacked messages as well.
167
     *
168
     * @return $this
169
     * @throws \AMQPChannelException
170
     * @throws \AMQPConnectionException
171
     */
172 2
    public function ack($deliveryTag, $multiple = false)
173
    {
174 2
        $flags = AMQP_NOPARAM;
175 2
        if ($multiple) {
176 1
            $flags |= AMQP_MULTIPLE;
177
        }
178 2
        $this->getResource()->ack($deliveryTag, $flags);
179
180 2
        return $this;
181
    }
182
183
    /**
184
     * Mark a message as explicitly not acknowledged.
185
     *
186
     * Mark the message identified by delivery_tag as explicitly not
187
     * acknowledged. This method can only be called on messages that have not
188
     * yet been acknowledged. When called, the broker will immediately put the
189
     * message back onto the queue, instead of waiting until the connection is
190
     * closed. This method is only supported by the RabbitMQ broker. The
191
     * behavior of calling this method while connected to any other broker is
192
     * undefined.
193
     *
194
     * @param string $deliveryTag   Delivery tag of last message to reject.
195
     * @param bool   $requeue       Requeue the message(s).
196
     * @param bool   $multiple      Mark as not acknowledge all previous
197
     *                              unacked messages as well.
198
     *
199
     * @return $this
200
     * @throws \AMQPChannelException
201
     * @throws \AMQPConnectionException
202
     */
203 4
    public function nack($deliveryTag, $requeue = false, $multiple = false)
204
    {
205 4
        $flags = AMQP_NOPARAM;
206 4
        if ($requeue) {
207 2
            $flags |= AMQP_REQUEUE;
208
        }
209 4
        if ($multiple) {
210 2
            $flags |= AMQP_MULTIPLE;
211
        }
212 4
        $this->getResource()->nack($deliveryTag, $flags);
213
214 4
        return $this;
215
    }
216
217
    /**
218
     * Mark one message as explicitly not acknowledged.
219
     *
220
     * Mark the message identified by delivery_tag as explicitly not
221
     * acknowledged. This method can only be called on messages that have not
222
     * yet been acknowledged.
223
     *
224
     * @param string $deliveryTag Delivery tag of the message to reject.
225
     * @param bool   $requeue     Requeue the message(s).
226
     *
227
     * @return $this
228
     * @throws \AMQPChannelException
229
     * @throws \AMQPConnectionException
230
     */
231 2
    public function reject($deliveryTag, $requeue = false)
232
    {
233 2
        $flags = AMQP_NOPARAM;
234 2
        if ($requeue) {
235 1
            $flags |= AMQP_REQUEUE;
236
        }
237 2
        $this->getResource()->reject($deliveryTag, $flags);
238
239 2
        return $this;
240
    }
241
242
    /**
243
     * Purge the contents of a queue.
244
     *
245
     * @return $this
246
     * @throws \AMQPChannelException
247
     * @throws \AMQPConnectionException
248
     */
249 1
    public function purge()
250
    {
251 1
        $this->getResource()->purge();
252
253 1
        return $this;
254
    }
255
256
    /**
257
     * Cancel a queue that is already bound to an exchange and routing key.
258
     *
259
     * @param string $consumerTag  The queue name to cancel, if the queue
260
     *                             object is not already representative of
261
     *                             a queue.
262
     *
263
     * @return $this
264
     * @throws \AMQPChannelException
265
     * @throws \AMQPConnectionException
266
     */
267 1
    public function cancel($consumerTag = '')
268
    {
269 1
        if (null === $consumerTag) {
270 1
            $consumerTag = '';
271
        }
272 1
        $this->getResource()->cancel($consumerTag);
273
274 1
        return $this;
275
    }
276
277
    /**
278
     * Delete a queue from the broker.
279
     *
280
     * This includes its entire contents of unread or unacknowledged messages.
281
     *
282
     * @param bool $ifUnused        Optionally $ifUnused can be specified
283
     *                              to indicate the queue should not be
284
     *                              deleted until no clients are connected to
285
     *                              it.
286
     * @param bool $ifEmpty         Optionally $ifUnused can be specified
287
     *                              to indicate the queue should not be
288
     *                              deleted until it's empty
289
     * @param bool $noWait          No wait for a reply
290
     *
291
     * @return $this
292
     * @throws \AMQPChannelException
293
     * @throws \AMQPConnectionException
294
     */
295 8
    public function delete($ifUnused = false, $ifEmpty = false, $noWait = false)
296
    {
297 8
        $flags = AMQP_NOPARAM;
298 8
        if ($ifUnused) {
299 4
            $flags |= AMQP_IFUNUSED;
300
        }
301 8
        if ($ifEmpty) {
302 4
            $flags |= AMQP_IFEMPTY;
303
        }
304 8
        if ($noWait) {
305 4
            $flags |= AMQP_NOWAIT;
306
        }
307 8
        $this->getResource()->delete($flags);
308
309 8
        return $this;
310
    }
311
312
    /**
313
     * Retrieve the next message from the queue.
314
     *
315
     * @param bool $autoAck
316
     * @return null|Message
317
     * @throws \AMQPChannelException
318
     * @throws \AMQPConnectionException
319
     */
320 2
    public function get($autoAck = false)
321
    {
322 2
        $message = $this->getResource()->get($autoAck ? AMQP_AUTOACK : AMQP_NOPARAM);
323 2
        if (!$message) {
324 2
            return null;
325
        }
326
327
        return $this->getMessageMapper()->toMessage($message);
0 ignored issues
show
Bug introduced by
It seems like $message defined by $this->getResource()->ge...AUTOACK : AMQP_NOPARAM) on line 322 can also be of type boolean; however, AMQPAL\Adapter\AMQP\MessageMapper::toMessage() does only seem to accept object<AMQPEnvelope>, maybe add an additional type check?

If a method or function can return multiple different values and unless you are sure that you only can receive a single value in this context, we recommend to add an additional type check:

/**
 * @return array|string
 */
function returnsDifferentValues($x) {
    if ($x) {
        return 'foo';
    }

    return array();
}

$x = returnsDifferentValues($y);
if (is_array($x)) {
    // $x is an array.
}

If this a common case that PHP Analyzer should handle natively, please let us know by opening an issue.

Loading history...
328
    }
329
330
    /**
331
     * @return MessageMapper
332
     */
333 2
    public function getMessageMapper()
334
    {
335 2
        if (!$this->messageMapper) {
336 1
            $this->messageMapper = new MessageMapper();
337
        }
338 2
        return $this->messageMapper;
339
    }
340
341
    /**
342
     * @param MessageMapper $messageMapper
343
     * @return $this
344
     */
345 9
    public function setMessageMapper(MessageMapper $messageMapper)
346
    {
347 9
        $this->messageMapper = $messageMapper;
348 9
        return $this;
349
    }
350
351
    /**
352
     * Consume messages from a queue.
353
     *
354
     * @param string                          $consumerTag  A string describing this consumer. Used
355
     *                                                      for canceling subscriptions with cancel().
356
     * @param bool                            $noLocal
357
     * @param bool                            $autoAck
358
     * @param bool                            $exclusive
359
     * @param bool                            $nowait       No wait for a reply.
360
     * @param callback|ConsumerInterface|null $callback     A callback function to which the
361
     *                                                      consumed message will be passed.
362
     * @return $this
363
     * @throws \AMQPChannelException
364
     * @throws \AMQPConnectionException
365
     */
366 7
    public function consume(
367
        $consumerTag = null,
368
        $noLocal = false,
369
        $autoAck = false,
370
        $exclusive = false,
371
        $nowait = false,
372
        callable $callback = null
373
    ) {
374 7
        $consumerCallback = null;
375 7
        if ($callback) {
376 1
            $consumerCallback = new ConsumerCallback($callback, $this);
377 1
            $consumerCallback->setMessageMapper($this->getMessageMapper());
378
        }
379
380 7
        $flags = AMQP_NOPARAM;
381 7
        if ($noLocal) {
382 2
            $flags |= AMQP_NOLOCAL;
383
        }
384 7
        if ($autoAck) {
385 2
            $flags |= AMQP_AUTOACK;
386
        }
387 7
        if ($exclusive) {
388 2
            $flags |= AMQP_EXCLUSIVE;
389
        }
390 7
        if ($nowait) {
391 2
            $flags |= AMQP_NOWAIT;
392
        }
393
394 7
        $this->getResource()->consume($consumerCallback, $flags, $consumerTag);
395
396 7
        return $this;
397
    }
398
399
    /**
400
     * Get the Channel object in use
401
     *
402
     * @return Channel
403
     */
404
    public function getChannel()
405
    {
406
        return $this->channel;
407
    }
408
409
    /**
410
     * @param Channel $channel
411
     * @return $this
412
     */
413
    public function setChannel(Channel $channel)
414
    {
415
        $this->channel = $channel;
416
        return $this;
417
    }
418
419
    /**
420
     * Get the Connection object in use
421
     *
422
     * @return Connection
423
     */
424
    public function getConnection()
425
    {
426
        return $this->channel->getConnection();
427
    }
428
}
429