QueueReader   A
last analyzed

Complexity

Total Complexity 19

Size/Duplication

Total Lines 213
Duplicated Lines 0 %

Coupling/Cohesion

Components 1
Dependencies 11

Importance

Changes 0
Metric Value
wmc 19
lcom 1
cbo 11
dl 0
loc 213
rs 10
c 0
b 0
f 0

9 Methods

Rating   Name   Duplication   Size   Complexity  
A __construct() 0 18 1
A read() 0 18 4
A purge() 0 5 1
A exchangeDeclare() 0 11 1
A queueDeclareAndBind() 0 17 2
A consume() 0 16 2
A initialize() 0 15 3
A stopConsuming() 0 11 3
A __destruct() 0 6 2
1
<?php
2
namespace Cmp\Queues\Infrastructure\AmqpLib\v26\RabbitMQ\Queue;
3
4
use Cmp\Queues\Domain\Queue\Exception\GracefulStopException;
5
use Cmp\Queues\Domain\Queue\Exception\ReaderException;
6
use Cmp\Queues\Domain\Queue\Exception\TimeoutReaderException;
7
use Cmp\Queues\Domain\Queue\QueueReader as DomainQueueReader;
8
use Cmp\Queues\Infrastructure\AmqpLib\v26\RabbitMQ\Queue\Config\BindConfig;
9
use Cmp\Queues\Infrastructure\AmqpLib\v26\RabbitMQ\Queue\Config\ConsumeConfig;
10
use Cmp\Queues\Infrastructure\AmqpLib\v26\RabbitMQ\Queue\Config\ExchangeConfig;
11
use Cmp\Queues\Infrastructure\AmqpLib\v26\RabbitMQ\Queue\Config\QueueConfig;
12
use PhpAmqpLib\Channel\AMQPChannel;
13
use PhpAmqpLib\Connection\AMQPLazyConnection;
14
use PhpAmqpLib\Exception\AMQPTimeoutException;
15
use Psr\Log\LoggerInterface;
16
17
class QueueReader implements DomainQueueReader
18
{
19
    /**
20
     * @var AMQPLazyConnection
21
     */
22
    protected $connection;
23
24
    /**
25
     * @var QueueConfig
26
     */
27
    protected $queueConfig;
28
29
    /**
30
     * @var ExchangeConfig
31
     */
32
    protected $exchangeConfig;
33
34
    /**
35
     * @var BindConfig
36
     */
37
    protected $bindConfig;
38
39
    /**
40
     * @var ConsumeConfig
41
     */
42
    protected $consumeConfig;
43
44
    /**
45
     * @var MessageHandler
46
     */
47
    protected $messageHandler;
48
49
    /**
50
     * @var LoggerInterface
51
     */
52
    protected $logger;
53
54
    /**
55
     * @var AMQPChannel
56
     */
57
    protected $channel;
58
59
    /**
60
     * @var string
61
     */
62
    protected $consumerTag = '';
63
64
    /**
65
     * QueueReader constructor.
66
     * @param AMQPLazyConnection $connection
67
     * @param QueueConfig $queueConfig
68
     * @param ExchangeConfig $exchangeConfig
69
     * @param BindConfig $bindConfig
70
     * @param ConsumeConfig $consumeConfig
71
     * @param MessageHandler $messageHandler
72
     * @param LoggerInterface $logger
73
     */
74
    public function __construct(
75
        AMQPLazyConnection $connection,
76
        QueueConfig $queueConfig,
77
        ExchangeConfig $exchangeConfig,
78
        BindConfig $bindConfig,
79
        ConsumeConfig $consumeConfig,
80
        MessageHandler $messageHandler,
81
        LoggerInterface $logger
82
    )
83
    {
84
        $this->connection = $connection;
85
        $this->queueConfig = $queueConfig;
86
        $this->exchangeConfig = $exchangeConfig;
87
        $this->bindConfig = $bindConfig;
88
        $this->consumeConfig = $consumeConfig;
89
        $this->logger = $logger;
90
        $this->messageHandler = $messageHandler;
91
    }
92
93
    /**
94
     * @param callable $callback
95
     * @param int $timeout
96
     * @throws ReaderException
97
     * @throws TimeoutReaderException
98
     * @throws GracefulStopException
99
     */
100
    public function read(callable $callback, $timeout=0)
101
    {
102
        $this->initialize();
103
        $this->messageHandler->setCallback($callback);
104
105
        try {
106
            $this->consume($timeout);
107
        } catch(GracefulStopException $e) {
108
            $this->stopConsuming();
109
            throw  new GracefulStopException("Graceful exception", 0, $e);
110
        } catch(AMQPTimeoutException $e) {
111
            $this->stopConsuming();
112
            throw new TimeoutReaderException("Timed out at $timeout seconds while reading.", 0, $e);
113
        } catch(\Exception $e) {
114
            $this->stopConsuming();
115
            throw new ReaderException("Error occurred while reading", 0, $e);
116
        }
117
    }
118
119
    /**
120
     * Deletes all messages from the queue
121
     * @return void
122
     */
123
    public function purge()
124
    {
125
        $this->initialize();
126
        $this->channel->queue_purge($this->queueConfig->getName());
127
    }
128
129
    /**
130
     * Creates the exchange
131
     */
132
    protected function exchangeDeclare()
133
    {
134
        $this->logger->debug('Declaring exchange');
135
        $this->channel->exchange_declare(
136
            $this->exchangeConfig->getName(),
137
            $this->exchangeConfig->getType(),
138
            $this->exchangeConfig->getPassive(),
139
            $this->exchangeConfig->getDurable(),
140
            $this->exchangeConfig->getAutoDelete()
141
        );
142
    }
143
144
    /**
145
     * Creates the queues and binds them to the exchanges and topics
146
     */
147
    protected function queueDeclareAndBind()
148
    {
149
        $this->logger->debug('Declaring queue');
150
        $this->channel->queue_declare(
151
            $this->queueConfig->getName(),
152
            $this->queueConfig->getPassive(),
153
            $this->queueConfig->getDurable(),
154
            $this->queueConfig->getExclusive(),
155
            $this->queueConfig->getAutoDelete()
156
        );
157
        $this->channel->queue_bind($this->queueConfig->getName(), $this->exchangeConfig->getName());
158
159
        foreach ($this->bindConfig->getTopics() as $bindTopic) {
160
            $this->logger->debug('Binding Topic:' . $bindTopic);
161
            $this->channel->queue_bind($this->queueConfig->getName(), $this->exchangeConfig->getName(), $bindTopic);
162
        }
163
    }
164
165
    /**
166
     * Starts consuming from the queue
167
     * @param int $timeout
168
     */
169
    protected function consume($timeout)
170
    {
171
        if ($this->consumerTag === '') {
172
            $this->logger->debug('Waiting for messages on queue:'.$this->queueConfig->getName());
173
            $this->consumerTag = $this->channel->basic_consume(
174
                $this->queueConfig->getName(),
175
                '',
176
                $this->consumeConfig->getNoLocal(),
177
                $this->consumeConfig->getNoAck(),
178
                $this->consumeConfig->getExclusive(),
179
                $this->consumeConfig->getNoWait(),
180
                array($this->messageHandler, 'handleMessage')
181
            );
182
        }
183
        $this->channel->wait(null, false, $timeout);
184
    }
185
186
    /**
187
     * @throws ReaderException
188
     */
189
    protected function initialize()
190
    {
191
        if ($this->channel) {
192
            return;
193
        }
194
        $this->logger->debug('Connecting to RabbitMQ');
195
        try {
196
            $this->channel = $this->connection->channel();
197
            $this->exchangeDeclare();
198
            $this->queueDeclareAndBind();
199
        } catch (\ErrorException $exception) {
200
            $this->logger->error('Error trying to connect to rabbitMQ:' . $exception->getMessage());
201
            throw new ReaderException("Error initializing queue reader", 0, $exception);
202
        }
203
    }
204
205
    /**
206
     * Stops the consuming of messages
207
     */
208
    private function stopConsuming()
209
    {
210
        if ($this->consumerTag) {
211
            try {
212
                $this->channel->basic_cancel($this->consumerTag);
213
            } catch(\Exception $e) {
0 ignored issues
show
Coding Style Comprehensibility introduced by
Consider adding a comment why this CATCH block is empty.
Loading history...
214
            }
215
216
            $this->consumerTag = '';
217
        }
218
    }
219
220
    /**
221
     * Destructor
222
     */
223
    public function __destruct()
224
    {
225
        if (isset($this->channel)) {
226
            $this->channel->close();
227
        }
228
    }
229
}