Completed
Pull Request — master (#10)
by Marc
03:01
created

QueueReader   A

Complexity

Total Complexity 18

Size/Duplication

Total Lines 209
Duplicated Lines 0 %

Coupling/Cohesion

Components 1
Dependencies 10

Importance

Changes 0
Metric Value
wmc 18
lcom 1
cbo 10
dl 0
loc 209
rs 10
c 0
b 0
f 0

9 Methods

Rating   Name   Duplication   Size   Complexity  
A __construct() 0 18 1
A read() 0 15 3
A purge() 0 5 1
A exchangeDeclare() 0 11 1
A queueDeclareAndBind() 0 17 2
A consume() 0 16 2
A initialize() 0 15 3
A stopConsuming() 0 11 3
A __destruct() 0 6 2
1
<?php
2
namespace Cmp\Queues\Infrastructure\AmqpLib\v26\RabbitMQ\Queue;
3
4
use Cmp\Queues\Domain\Queue\Exception\ReaderException;
5
use Cmp\Queues\Domain\Queue\Exception\TimeoutReaderException;
6
use Cmp\Queues\Domain\Queue\QueueReader as DomainQueueReader;
7
use Cmp\Queues\Infrastructure\AmqpLib\v26\RabbitMQ\Queue\Config\BindConfig;
8
use Cmp\Queues\Infrastructure\AmqpLib\v26\RabbitMQ\Queue\Config\ConsumeConfig;
9
use Cmp\Queues\Infrastructure\AmqpLib\v26\RabbitMQ\Queue\Config\ExchangeConfig;
10
use Cmp\Queues\Infrastructure\AmqpLib\v26\RabbitMQ\Queue\Config\QueueConfig;
11
use PhpAmqpLib\Channel\AMQPChannel;
12
use PhpAmqpLib\Connection\AMQPLazyConnection;
13
use PhpAmqpLib\Exception\AMQPRuntimeException;
14
use PhpAmqpLib\Exception\AMQPTimeoutException;
15
use Psr\Log\LoggerInterface;
16
17
class QueueReader implements DomainQueueReader
18
{
19
    /**
20
     * @var AMQPLazyConnection
21
     */
22
    protected $connection;
23
24
    /**
25
     * @var QueueConfig
26
     */
27
    protected $queueConfig;
28
29
    /**
30
     * @var ExchangeConfig
31
     */
32
    protected $exchangeConfig;
33
34
    /**
35
     * @var BindConfig
36
     */
37
    protected $bindConfig;
38
39
    /**
40
     * @var ConsumeConfig
41
     */
42
    protected $consumeConfig;
43
44
    /**
45
     * @var MessageHandler
46
     */
47
    protected $messageHandler;
48
49
    /**
50
     * @var LoggerInterface
51
     */
52
    protected $logger;
53
54
    /**
55
     * @var AMQPChannel
56
     */
57
    protected $channel;
58
59
    /**
60
     * @var string
61
     */
62
    protected $consumerTag = '';
63
64
    /**
65
     * QueueReader constructor.
66
     * @param AMQPLazyConnection $connection
67
     * @param QueueConfig $queueConfig
68
     * @param ExchangeConfig $exchangeConfig
69
     * @param BindConfig $bindConfig
70
     * @param ConsumeConfig $consumeConfig
71
     * @param MessageHandler $messageHandler
72
     * @param LoggerInterface $logger
73
     */
74
    public function __construct(
75
        AMQPLazyConnection $connection,
76
        QueueConfig $queueConfig,
77
        ExchangeConfig $exchangeConfig,
78
        BindConfig $bindConfig,
79
        ConsumeConfig $consumeConfig,
80
        MessageHandler $messageHandler,
81
        LoggerInterface $logger
82
    )
83
    {
84
        $this->connection = $connection;
85
        $this->queueConfig = $queueConfig;
86
        $this->exchangeConfig = $exchangeConfig;
87
        $this->bindConfig = $bindConfig;
88
        $this->consumeConfig = $consumeConfig;
89
        $this->logger = $logger;
90
        $this->messageHandler = $messageHandler;
91
    }
92
93
    /**
94
     * @param callable $callback
95
     * @param int $timeout
96
     * @throws ReaderException
97
     * @throws TimeoutReaderException
98
     */
99
    public function read(callable $callback, $timeout=0)
100
    {
101
        $this->initialize();
102
        $this->messageHandler->setCallback($callback);
103
104
        try {
105
            $this->consume($timeout);
106
        } catch(AMQPTimeoutException $e) {
107
            $this->stopConsuming();
108
            throw new TimeoutReaderException("Timed out at $timeout seconds while reading.", 0, $e);
109
        } catch(\Exception $e) {
110
            $this->stopConsuming();
111
            throw new ReaderException("Error occurred while reading", 0, $e);
112
        }
113
    }
114
115
    /**
116
     * Deletes all messages from the queue
117
     * @return void
118
     */
119
    public function purge()
120
    {
121
        $this->initialize();
122
        $this->channel->queue_purge($this->queueConfig->getName());
123
    }
124
125
    /**
126
     * Creates the exchange
127
     */
128
    protected function exchangeDeclare()
129
    {
130
        $this->logger->debug('Declaring exchange');
131
        $this->channel->exchange_declare(
132
            $this->exchangeConfig->getName(),
133
            $this->exchangeConfig->getType(),
134
            $this->exchangeConfig->getPassive(),
135
            $this->exchangeConfig->getDurable(),
136
            $this->exchangeConfig->getAutoDelete()
137
        );
138
    }
139
140
    /**
141
     * Creates the queues and binds them to the exchanges and topics
142
     */
143
    protected function queueDeclareAndBind()
144
    {
145
        $this->logger->debug('Declaring queue');
146
        $this->channel->queue_declare(
147
            $this->queueConfig->getName(),
148
            $this->queueConfig->getPassive(),
149
            $this->queueConfig->getDurable(),
150
            $this->queueConfig->getExclusive(),
151
            $this->queueConfig->getAutoDelete()
152
        );
153
        $this->channel->queue_bind($this->queueConfig->getName(), $this->exchangeConfig->getName());
154
155
        foreach ($this->bindConfig->getTopics() as $bindTopic) {
156
            $this->logger->info('Binding Topic:' . $bindTopic);
157
            $this->channel->queue_bind($this->queueConfig->getName(), $this->exchangeConfig->getName(), $bindTopic);
158
        }
159
    }
160
161
    /**
162
     * Starts consuming from the queue
163
     * @param int $timeout
164
     */
165
    protected function consume($timeout)
166
    {
167
        if ($this->consumerTag === '') {
168
            $this->logger->debug('Waiting for messages on queue:'.$this->queueConfig->getName());
169
            $this->consumerTag = $this->channel->basic_consume(
170
                $this->queueConfig->getName(),
171
                '',
172
                $this->consumeConfig->getNoLocal(),
173
                $this->consumeConfig->getNoAck(),
174
                $this->consumeConfig->getExclusive(),
175
                $this->consumeConfig->getNoWait(),
176
                array($this->messageHandler, 'handleMessage')
177
            );
178
        }
179
        $this->channel->wait(null, false, $timeout);
180
    }
181
182
    /**
183
     * @throws ReaderException
184
     */
185
    protected function initialize()
186
    {
187
        if ($this->channel) {
188
            return;
189
        }
190
        $this->logger->debug('Connecting to RabbitMQ');
191
        try {
192
            $this->channel = $this->connection->channel();
193
            $this->exchangeDeclare();
194
            $this->queueDeclareAndBind();
195
        } catch (\ErrorException $exception) {
196
            $this->logger->error('Error trying to connect to rabbitMQ:' . $exception->getMessage());
197
            throw new ReaderException("Error initializing queue reader", 0, $exception);
198
        }
199
    }
200
201
    /**
202
     * Stops the consuming of messages
203
     */
204
    private function stopConsuming()
205
    {
206
        if ($this->consumerTag) {
207
            try {
208
                $this->channel->basic_cancel($this->consumerTag);
209
            } catch(\Exception $e) {
0 ignored issues
show
Coding Style Comprehensibility introduced by
Consider adding a comment why this CATCH block is empty.
Loading history...
210
            }
211
212
            $this->consumerTag = '';
213
        }
214
    }
215
216
    /**
217
     * Destructor
218
     */
219
    public function __destruct()
220
    {
221
        if (isset($this->channel)) {
222
            $this->channel->close();
223
        }
224
    }
225
}