Consumer   A
last analyzed

Complexity

Total Complexity 13

Size/Duplication

Total Lines 131
Duplicated Lines 0 %

Coupling/Cohesion

Components 1
Dependencies 7

Test Coverage

Coverage 100%

Importance

Changes 0
Metric Value
dl 0
loc 131
c 0
b 0
f 0
wmc 13
lcom 1
cbo 7
ccs 43
cts 43
cp 1
rs 10

9 Methods

Rating   Name   Duplication   Size   Complexity  
A __construct() 0 30 3
A getEventDispatcher() 0 4 1
A setNackRequeue() 0 4 1
A setCallback() 0 4 1
A get() 0 8 2
A ack() 0 4 1
A nack() 0 4 2
A consume() 0 4 1
A cancel() 0 4 1
1
<?php
2
3
namespace TreeHouse\Queue\Consumer;
4
5
use Symfony\Component\EventDispatcher\EventDispatcher;
6
use Symfony\Component\EventDispatcher\EventDispatcherInterface;
7
use TreeHouse\Queue\Amqp\EnvelopeInterface;
8
use TreeHouse\Queue\Amqp\QueueInterface;
9
use TreeHouse\Queue\Event\ConsumeEvent;
10
use TreeHouse\Queue\Event\ConsumeExceptionEvent;
11
use TreeHouse\Queue\Processor\ProcessorInterface;
12
use TreeHouse\Queue\QueueEvents;
13
14
class Consumer implements ConsumerInterface
15
{
16
    /**
17
     * @var QueueInterface
18
     */
19
    protected $queue;
20
21
    /**
22
     * @var ProcessorInterface
23
     */
24
    protected $processor;
25
26
    /**
27
     * @var EventDispatcherInterface
28
     */
29
    protected $dispatcher;
30
31
    /**
32
     * @var callable
33
     */
34
    protected $callback;
35
36
    /**
37
     * @var bool
38
     */
39
    protected $nackRequeue = false;
40
41
    /**
42
     * @param QueueInterface           $queue
43
     * @param ProcessorInterface       $processor
44
     * @param EventDispatcherInterface $dispatcher
45
     */
46 10
    public function __construct(QueueInterface $queue, ProcessorInterface $processor, EventDispatcherInterface $dispatcher = null)
47
    {
48 10
        $this->queue = $queue;
49 10
        $this->processor = $processor;
50 10
        $this->dispatcher = $dispatcher ?: new EventDispatcher();
51
52 10
        $this->setCallback(
53 10
            function (EnvelopeInterface $envelope) {
54
                try {
55 3
                    $event = new ConsumeEvent($envelope);
56 3
                    $this->dispatcher->dispatch(QueueEvents::CONSUME_MESSAGE, $event);
0 ignored issues
show
Documentation introduced by
$event is of type object<TreeHouse\Queue\Event\ConsumeEvent>, but the function expects a null|string.

It seems like the type of the argument is not accepted by the function/method which you are calling.

In some cases, in particular if PHP’s automatic type-juggling kicks in this might be fine. In other cases, however this might be a bug.

We suggest to add an explicit type cast like in the following example:

function acceptsInteger($int) { }

$x = '123'; // string "123"

// Instead of
acceptsInteger($x);

// we recommend to use
acceptsInteger((integer) $x);
Loading history...
57
58 3
                    $result = $this->processor->process($envelope);
59
60 1
                    $event->setResult($result);
61 1
                    $this->dispatcher->dispatch(QueueEvents::CONSUMED_MESSAGE, $event);
0 ignored issues
show
Documentation introduced by
$event is of type object<TreeHouse\Queue\Event\ConsumeEvent>, but the function expects a null|string.

It seems like the type of the argument is not accepted by the function/method which you are calling.

In some cases, in particular if PHP’s automatic type-juggling kicks in this might be fine. In other cases, however this might be a bug.

We suggest to add an explicit type cast like in the following example:

function acceptsInteger($int) { }

$x = '123'; // string "123"

// Instead of
acceptsInteger($x);

// we recommend to use
acceptsInteger((integer) $x);
Loading history...
62
63 1
                    $this->ack($envelope);
64
65 1
                    return $event->shouldContinueConsuming();
66 2
                } catch (\Exception $exception) {
67 2
                    $this->dispatcher->dispatch(QueueEvents::CONSUME_EXCEPTION, new ConsumeExceptionEvent($envelope, $exception));
0 ignored issues
show
Documentation introduced by
new \TreeHouse\Queue\Eve...($envelope, $exception) is of type object<TreeHouse\Queue\E...\ConsumeExceptionEvent>, but the function expects a null|string.

It seems like the type of the argument is not accepted by the function/method which you are calling.

In some cases, in particular if PHP’s automatic type-juggling kicks in this might be fine. In other cases, however this might be a bug.

We suggest to add an explicit type cast like in the following example:

function acceptsInteger($int) { }

$x = '123'; // string "123"

// Instead of
acceptsInteger($x);

// we recommend to use
acceptsInteger((integer) $x);
Loading history...
68
69 2
                    $this->nack($envelope, $this->nackRequeue);
70
71 2
                    throw $exception;
72
                }
73 10
            }
74
        );
75 10
    }
76
77
    /**
78
     * @inheritdoc
79
     */
80 1
    public function getEventDispatcher()
81
    {
82 1
        return $this->dispatcher;
83
    }
84
85
    /**
86
     * @inheritdoc
87
     */
88 1
    public function setNackRequeue($requeue)
89
    {
90 1
        $this->nackRequeue = $requeue;
91 1
    }
92
93
    /**
94
     * @param callable $callback
95
     */
96 10
    public function setCallback(callable $callback)
97
    {
98 10
        $this->callback = $callback;
99 10
    }
100
101
    /**
102
     * @inheritdoc
103
     */
104 1
    public function get()
105
    {
106 1
        if (false === $envelope = $this->queue->get()) {
107 1
            return null;
108
        }
109
110 1
        return $envelope;
111
    }
112
113
    /**
114
     * @inheritdoc
115
     */
116 2
    public function ack(EnvelopeInterface $envelope)
117
    {
118 2
        $this->queue->ack($envelope->getDeliveryTag());
119 2
    }
120
121
    /**
122
     * @inheritdoc
123
     */
124 4
    public function nack(EnvelopeInterface $envelope, $requeue = false)
125
    {
126 4
        $this->queue->nack($envelope->getDeliveryTag(), $requeue ? QueueInterface::REQUEUE : null);
127 4
    }
128
129
    /**
130
     * @inheritdoc
131
     */
132 3
    public function consume($consumerTag = null, $flags = QueueInterface::NOPARAM)
133
    {
134 3
        $this->queue->consume($this->callback, $flags, $consumerTag);
135 1
    }
136
137
    /**
138
     * @param string $consumerTag
139
     */
140 1
    public function cancel($consumerTag = '')
141
    {
142 1
        $this->queue->cancel($consumerTag);
143 1
    }
144
}
145