Completed
Pull Request — master (#19)
by Peter
07:59
created

Consumer::__construct()   B

Complexity

Conditions 3
Paths 2

Size

Total Lines 30
Code Lines 18

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 19
CRAP Score 3

Importance

Changes 1
Bugs 0 Features 1
Metric Value
c 1
b 0
f 1
dl 0
loc 30
ccs 19
cts 19
cp 1
rs 8.8571
cc 3
eloc 18
nc 2
nop 3
crap 3
1
<?php
2
3
namespace TreeHouse\Queue\Consumer;
4
5
use Symfony\Component\EventDispatcher\EventDispatcher;
6
use Symfony\Component\EventDispatcher\EventDispatcherInterface;
7
use TreeHouse\Queue\Amqp\EnvelopeInterface;
8
use TreeHouse\Queue\Amqp\QueueInterface;
9
use TreeHouse\Queue\Event\ConsumeEvent;
10
use TreeHouse\Queue\Event\ConsumeExceptionEvent;
11
use TreeHouse\Queue\Processor\ProcessorInterface;
12
use TreeHouse\Queue\QueueEvents;
13
14
class Consumer implements ConsumerInterface
15
{
16
    /**
17
     * @var QueueInterface
18
     */
19
    protected $queue;
20
21
    /**
22
     * @var ProcessorInterface
23
     */
24
    protected $processor;
25
26
    /**
27
     * @var EventDispatcherInterface
28
     */
29
    protected $dispatcher;
30
31
    /**
32
     * @var callable
33
     */
34
    protected $callback;
35
36
    /**
37
     * @var bool
38
     */
39
    protected $nackRequeue = false;
40
41
    /**
42
     * @param QueueInterface           $queue
43
     * @param ProcessorInterface       $processor
44
     * @param EventDispatcherInterface $dispatcher
45
     */
46 9
    public function __construct(QueueInterface $queue, ProcessorInterface $processor, EventDispatcherInterface $dispatcher = null)
47
    {
48 9
        $this->queue = $queue;
49 9
        $this->processor = $processor;
50 9
        $this->dispatcher = $dispatcher ?: new EventDispatcher();
51
52 9
        $this->setCallback(
53 9
            function (EnvelopeInterface $envelope) {
54
                try {
55 3
                    $event = new ConsumeEvent($envelope);
56 3
                    $this->dispatcher->dispatch(QueueEvents::CONSUME_MESSAGE, $event);
57
58 3
                    $result = $this->processor->process($envelope);
59
60 1
                    $event->setResult($result);
61 1
                    $this->dispatcher->dispatch(QueueEvents::CONSUMED_MESSAGE, $event);
62
63 1
                    $this->ack($envelope);
64
65 1
                    return $result;
66 2
                } catch (\Exception $exception) {
67 2
                    $this->dispatcher->dispatch(QueueEvents::CONSUME_EXCEPTION, new ConsumeExceptionEvent($envelope, $exception));
68
69 2
                    $this->nack($envelope, $this->nackRequeue);
70
71 2
                    throw $exception;
72
                }
73 9
            }
74
        );
75 9
    }
76
77
    /**
78
     * @inheritdoc
79
     */
80 1
    public function getEventDispatcher()
81
    {
82 1
        return $this->dispatcher;
83
    }
84
85
    /**
86
     * @inheritdoc
87
     */
88 1
    public function setNackRequeue($requeue)
89
    {
90 1
        $this->nackRequeue = $requeue;
91 1
    }
92
93
    /**
94
     * @param callable $callback
95
     */
96 9
    public function setCallback(callable $callback)
97
    {
98 9
        $this->callback = $callback;
99 9
    }
100
101
    /**
102
     * @inheritdoc
103
     */
104 1
    public function get()
105
    {
106 1
        if (false === $envelope = $this->queue->get()) {
107 1
            return null;
108
        }
109
110 1
        return $envelope;
111
    }
112
113
    /**
114
     * @inheritdoc
115
     */
116 2
    public function ack(EnvelopeInterface $envelope)
117
    {
118 2
        $this->queue->ack($envelope->getDeliveryTag());
119 2
    }
120
121
    /**
122
     * @inheritdoc
123
     */
124 4
    public function nack(EnvelopeInterface $envelope, $requeue = false)
125
    {
126 4
        $this->queue->nack($envelope->getDeliveryTag(), $requeue ? QueueInterface::REQUEUE : null);
127 4
    }
128
129
    /**
130
     * @inheritdoc
131
     */
132 3
    public function consume($flags = QueueInterface::NOPARAM)
133
    {
134 3
        $this->queue->consume($this->callback, $flags);
135 1
    }
136
}
137