Queue   B
last analyzed

Complexity

Total Complexity 41

Size/Duplication

Total Lines 410
Duplicated Lines 0 %

Coupling/Cohesion

Components 2
Dependencies 4

Test Coverage

Coverage 100%

Importance

Changes 3
Bugs 1 Features 0
Metric Value
wmc 41
c 3
b 1
f 0
lcom 2
cbo 4
dl 0
loc 410
ccs 109
cts 109
cp 1
rs 8.2769

21 Methods

Rating   Name   Duplication   Size   Complexity  
A getOptions() 0 4 1
A setOptions() 0 9 2
B configureQueue() 0 25 5
A getResource() 0 4 1
A setResource() 0 5 1
A declareQueue() 0 6 1
A bind() 0 6 1
A unbind() 0 6 1
A ack() 0 10 2
A nack() 0 13 3
A reject() 0 10 2
A purge() 0 6 1
A cancel() 0 9 2
A delete() 0 16 4
A get() 0 9 3
A getMessageMapper() 0 7 2
A setMessageMapper() 0 5 1
B consume() 0 28 5
A getChannel() 0 4 1
A setChannel() 0 5 1
A getConnection() 0 4 1

How to fix   Complexity   

Complex Class

Complex classes like Queue often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes. You can also have a look at the cohesion graph to spot any un-connected, or weakly-connected components.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

While breaking up the class, it is a good idea to analyze how other classes use Queue, and based on these observations, apply Extract Interface, too.

1
<?php
2
3
namespace AMQPAL\Adapter\AMQP;
4
5
use AMQPQueue;
6
use AMQPAL\Adapter\ConsumerInterface;
7
use AMQPAL\Adapter\Message;
8
use AMQPAL\Adapter\QueueInterface;
9
use AMQPAL\Options;
10
use AMQPAL\Exception as BaseException;
11
12
/**
13
 * Class Queue
14
 *
15
 * @package AMQPAL\Adapter\AMQP
16
 */
17
class Queue implements QueueInterface
18
{
19
    /**
20
     * @var Channel
21
     */
22
    protected $channel;
23
24
    /**
25
     * @var AMQPQueue
26
     */
27
    protected $resource;
28
29
    /**
30
     * @var Options\QueueOptions
31
     */
32
    protected $options;
33
34
    /**
35
     * @var MessageMapper
36
     */
37
    protected $messageMapper;
38
39
    /**
40
     * @return Options\QueueOptions
41
     */
42 38
    public function getOptions()
43
    {
44 38
        return $this->options;
45
    }
46
47
    /**
48
     * @param Options\QueueOptions|\Traversable|array $options
49
     * @return $this
50
     * @throws BaseException\BadMethodCallException
51
     * @throws BaseException\InvalidArgumentException
52
     */
53 38
    public function setOptions($options)
54
    {
55 38
        if (!$options instanceof Options\QueueOptions) {
56 1
            $options = new Options\QueueOptions($options);
57
        }
58 38
        $this->options = $options;
59 38
        $this->configureQueue();
60 38
        return $this;
61
    }
62
63
    /**
64
     * @return $this
65
     */
66 38
    protected function configureQueue()
67
    {
68 38
        $options = $this->getOptions();
69 38
        $queue = $this->getResource();
70
71 38
        $flags = AMQP_NOPARAM;
72 38
        if ($options->isDurable()) {
73 37
            $flags |= AMQP_DURABLE;
74
        }
75 38
        if ($options->isPassive()) {
76 31
            $flags |= AMQP_PASSIVE;
77
        }
78 38
        if ($options->isAutoDelete()) {
79 32
            $flags |= AMQP_AUTODELETE;
80
        }
81 38
        if ($options->isExclusive()) {
82 31
            $flags |= AMQP_EXCLUSIVE;
83
        }
84
85 38
        $queue->setName($options->getName());
86 38
        $queue->setFlags($flags);
87 38
        $queue->setArguments($options->getArguments());
88
89 38
        return $this;
90
    }
91
92
    /**
93
     * @return AMQPQueue
94
     */
95 39
    public function getResource()
96
    {
97 39
        return $this->resource;
98
    }
99
100
    /**
101
     * @param AMQPQueue $resource
102
     * @return $this
103
     */
104 39
    public function setResource(AMQPQueue $resource)
105
    {
106 39
        $this->resource = $resource;
107 39
        return $this;
108
    }
109
110
    /**
111
     * Declare a new queue on the broker.
112
     *
113
     * @return $this
114
     * @throws \AMQPChannelException
115
     * @throws \AMQPConnectionException
116
     */
117 7
    public function declareQueue()
118
    {
119 7
        $this->getResource()->declareQueue();
120
121 7
        return $this;
122
    }
123
124
    /**
125
     * Bind the given queue to a routing key on an exchange.
126
     *
127
     * @param string $exchangeName Name of the exchange to bind to.
128
     * @param string $routingKey   Pattern or routing key to bind with.
129
     * @param bool   $noWait       No wait for a reply
130
     * @param array  $arguments    Additional binding arguments.
131
     *
132
     * @return $this
133
     * @throws \AMQPChannelException
134
     * @throws \AMQPConnectionException
135
     */
136 6
    public function bind($exchangeName, $routingKey = null, $noWait = false, array $arguments = [])
137
    {
138 6
        $this->getResource()->bind($exchangeName, $routingKey, $arguments);
139
140 6
        return $this;
141
    }
142
143
    /**
144
     * Remove a routing key binding on an exchange from the given queue.
145
     *
146
     * @param string $exchangeName  The name of the exchange on which the
147
     *                              queue is bound.
148
     * @param string $routingKey    The binding routing key used by the
149
     *                              queue.
150
     * @param array  $arguments     Additional binding arguments.
151
     *
152
     * @return $this
153
     * @throws \AMQPChannelException
154
     * @throws \AMQPConnectionException
155
     */
156 1
    public function unbind($exchangeName, $routingKey = null, array $arguments = [])
157
    {
158 1
        $this->getResource()->unbind($exchangeName, $routingKey, $arguments);
159
160 1
        return $this;
161
    }
162
163
    /**
164
     * Acknowledge the receipt of a message.
165
     *
166
     * @param string $deliveryTag   The message delivery tag of which to
167
     *                              acknowledge receipt.
168
     * @param bool   $multiple      Acknowledge all previous
169
     *                              unacked messages as well.
170
     *
171
     * @return $this
172
     * @throws \AMQPChannelException
173
     * @throws \AMQPConnectionException
174
     */
175 3
    public function ack($deliveryTag, $multiple = false)
176
    {
177 3
        $flags = AMQP_NOPARAM;
178 3
        if ($multiple) {
179 1
            $flags |= AMQP_MULTIPLE;
180
        }
181 3
        $this->getResource()->ack($deliveryTag, $flags);
182
183 3
        return $this;
184
    }
185
186
    /**
187
     * Mark a message as explicitly not acknowledged.
188
     *
189
     * Mark the message identified by delivery_tag as explicitly not
190
     * acknowledged. This method can only be called on messages that have not
191
     * yet been acknowledged. When called, the broker will immediately put the
192
     * message back onto the queue, instead of waiting until the connection is
193
     * closed. This method is only supported by the RabbitMQ broker. The
194
     * behavior of calling this method while connected to any other broker is
195
     * undefined.
196
     *
197
     * @param string $deliveryTag   Delivery tag of last message to reject.
198
     * @param bool   $requeue       Requeue the message(s).
199
     * @param bool   $multiple      Mark as not acknowledge all previous
200
     *                              unacked messages as well.
201
     *
202
     * @return $this
203
     * @throws \AMQPChannelException
204
     * @throws \AMQPConnectionException
205
     */
206 4
    public function nack($deliveryTag, $requeue = false, $multiple = false)
207
    {
208 4
        $flags = AMQP_NOPARAM;
209 4
        if ($requeue) {
210 2
            $flags |= AMQP_REQUEUE;
211
        }
212 4
        if ($multiple) {
213 2
            $flags |= AMQP_MULTIPLE;
214
        }
215 4
        $this->getResource()->nack($deliveryTag, $flags);
216
217 4
        return $this;
218
    }
219
220
    /**
221
     * Mark one message as explicitly not acknowledged.
222
     *
223
     * Mark the message identified by delivery_tag as explicitly not
224
     * acknowledged. This method can only be called on messages that have not
225
     * yet been acknowledged.
226
     *
227
     * @param string $deliveryTag Delivery tag of the message to reject.
228
     * @param bool   $requeue     Requeue the message(s).
229
     *
230
     * @return $this
231
     * @throws \AMQPChannelException
232
     * @throws \AMQPConnectionException
233
     */
234 3
    public function reject($deliveryTag, $requeue = false)
235
    {
236 3
        $flags = AMQP_NOPARAM;
237 3
        if ($requeue) {
238 2
            $flags |= AMQP_REQUEUE;
239
        }
240 3
        $this->getResource()->reject($deliveryTag, $flags);
241
242 3
        return $this;
243
    }
244
245
    /**
246
     * Purge the contents of a queue.
247
     *
248
     * @return $this
249
     * @throws \AMQPChannelException
250
     * @throws \AMQPConnectionException
251
     */
252 1
    public function purge()
253
    {
254 1
        $this->getResource()->purge();
255
256 1
        return $this;
257
    }
258
259
    /**
260
     * Cancel a queue that is already bound to an exchange and routing key.
261
     *
262
     * @param string $consumerTag  The queue name to cancel, if the queue
263
     *                             object is not already representative of
264
     *                             a queue.
265
     *
266
     * @return $this
267
     * @throws \AMQPChannelException
268
     * @throws \AMQPConnectionException
269
     */
270 1
    public function cancel($consumerTag = '')
271
    {
272 1
        if (null === $consumerTag) {
273 1
            $consumerTag = '';
274
        }
275 1
        $this->getResource()->cancel($consumerTag);
276
277 1
        return $this;
278
    }
279
280
    /**
281
     * Delete a queue from the broker.
282
     *
283
     * This includes its entire contents of unread or unacknowledged messages.
284
     *
285
     * @param bool $ifUnused        Optionally $ifUnused can be specified
286
     *                              to indicate the queue should not be
287
     *                              deleted until no clients are connected to
288
     *                              it.
289
     * @param bool $ifEmpty         Optionally $ifUnused can be specified
290
     *                              to indicate the queue should not be
291
     *                              deleted until it's empty
292
     * @param bool $noWait          No wait for a reply
293
     *
294
     * @return $this
295
     * @throws \AMQPChannelException
296
     * @throws \AMQPConnectionException
297
     */
298 8
    public function delete($ifUnused = false, $ifEmpty = false, $noWait = false)
299
    {
300 8
        $flags = AMQP_NOPARAM;
301 8
        if ($ifUnused) {
302 4
            $flags |= AMQP_IFUNUSED;
303
        }
304 8
        if ($ifEmpty) {
305 4
            $flags |= AMQP_IFEMPTY;
306
        }
307 8
        if ($noWait) {
308 4
            $flags |= AMQP_NOWAIT;
309
        }
310 8
        $this->getResource()->delete($flags);
311
312 8
        return $this;
313
    }
314
315
    /**
316
     * Retrieve the next message from the queue.
317
     *
318
     * @param bool $autoAck
319
     * @return null|Message
320
     * @throws \AMQPChannelException
321
     * @throws \AMQPConnectionException
322
     */
323 5
    public function get($autoAck = false)
324
    {
325 5
        $message = $this->getResource()->get($autoAck ? AMQP_AUTOACK : AMQP_NOPARAM);
326 5
        if (!$message) {
327 2
            return null;
328
        }
329
330 3
        return $this->getMessageMapper()->toMessage($message);
0 ignored issues
show
Bug introduced by
It seems like $message defined by $this->getResource()->ge...AUTOACK : AMQP_NOPARAM) on line 325 can also be of type boolean; however, AMQPAL\Adapter\AMQP\MessageMapper::toMessage() does only seem to accept object<AMQPEnvelope>, maybe add an additional type check?

If a method or function can return multiple different values and unless you are sure that you only can receive a single value in this context, we recommend to add an additional type check:

/**
 * @return array|string
 */
function returnsDifferentValues($x) {
    if ($x) {
        return 'foo';
    }

    return array();
}

$x = returnsDifferentValues($y);
if (is_array($x)) {
    // $x is an array.
}

If this a common case that PHP Analyzer should handle natively, please let us know by opening an issue.

Loading history...
331
    }
332
333
    /**
334
     * @return MessageMapper
335
     */
336 8
    public function getMessageMapper()
337
    {
338 8
        if (!$this->messageMapper) {
339 6
            $this->messageMapper = new MessageMapper();
340
        }
341 8
        return $this->messageMapper;
342
    }
343
344
    /**
345
     * @param MessageMapper $messageMapper
346
     * @return $this
347
     */
348 9
    public function setMessageMapper(MessageMapper $messageMapper)
349
    {
350 9
        $this->messageMapper = $messageMapper;
351 9
        return $this;
352
    }
353
354
    /**
355
     * Consume messages from a queue (blocking function).
356
     *
357
     * @param callback|ConsumerInterface|null $callback     A callback function to which the
358
     *                                                      consumed message will be passed.
359
     * @param bool                            $noLocal
360
     * @param bool                            $autoAck
361
     * @param bool                            $exclusive
362
     * @param string                          $consumerTag  A string describing this consumer. Used
363
     *                                                      for canceling subscriptions with cancel().
364
     * @return $this
365
     * @throws \AMQPChannelException
366
     * @throws \AMQPConnectionException
367
     */
368 9
    public function consume(
369
        callable $callback = null,
370
        $noLocal = false,
371
        $autoAck = false,
372
        $exclusive = false,
373
        $consumerTag = null
374
    ) {
375 9
        $consumerCallback = null;
376 9
        if ($callback) {
377 4
            $consumerCallback = new ConsumerCallback($callback, $this);
378 4
            $consumerCallback->setMessageMapper($this->getMessageMapper());
379
        }
380
381 9
        $flags = AMQP_NOPARAM;
382 9
        if ($noLocal) {
383 2
            $flags |= AMQP_NOLOCAL;
384
        }
385 9
        if ($autoAck) {
386 3
            $flags |= AMQP_AUTOACK;
387
        }
388 9
        if ($exclusive) {
389 2
            $flags |= AMQP_EXCLUSIVE;
390
        }
391
392 9
        $this->getResource()->consume($consumerCallback, $flags, $consumerTag);
393
394 9
        return $this;
395
    }
396
397
    /**
398
     * Get the Channel object in use
399
     *
400
     * @return Channel
401
     */
402 1
    public function getChannel()
403
    {
404 1
        return $this->channel;
405
    }
406
407
    /**
408
     * @param Channel $channel
409
     * @return $this
410
     */
411 8
    public function setChannel(Channel $channel)
412
    {
413 8
        $this->channel = $channel;
414 8
        return $this;
415
    }
416
417
    /**
418
     * Get the Connection object in use
419
     *
420
     * @return Connection
421
     */
422 1
    public function getConnection()
423
    {
424 1
        return $this->channel->getConnection();
425
    }
426
}
427