Completed
Pull Request — master (#9)
by Marc
06:42
created

QueueReader::queueDeclareAndBind()   A

Complexity

Conditions 2
Paths 2

Size

Total Lines 17
Code Lines 12

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
dl 0
loc 17
c 0
b 0
f 0
rs 9.4285
cc 2
eloc 12
nc 2
nop 0
1
<?php
2
namespace Cmp\Queues\Infrastructure\AmqpLib\v26\RabbitMQ\Queue;
3
4
use Cmp\Queues\Domain\Queue\Exception\ReaderException;
5
use Cmp\Queues\Domain\Queue\Exception\TimeoutReaderException;
6
use Cmp\Queues\Domain\Queue\QueueReader as DomainQueueReader;
7
use Cmp\Queues\Infrastructure\AmqpLib\v26\RabbitMQ\Queue\Config\BindConfig;
8
use Cmp\Queues\Infrastructure\AmqpLib\v26\RabbitMQ\Queue\Config\ConsumeConfig;
9
use Cmp\Queues\Infrastructure\AmqpLib\v26\RabbitMQ\Queue\Config\ExchangeConfig;
10
use Cmp\Queues\Infrastructure\AmqpLib\v26\RabbitMQ\Queue\Config\QueueConfig;
11
use PhpAmqpLib\Channel\AMQPChannel;
12
use PhpAmqpLib\Connection\AMQPLazyConnection;
13
use PhpAmqpLib\Exception\AMQPTimeoutException;
14
use Psr\Log\LoggerInterface;
15
16
class QueueReader implements DomainQueueReader
17
{
18
    /**
19
     * @var AMQPLazyConnection
20
     */
21
    protected $connection;
22
23
    /**
24
     * @var QueueConfig
25
     */
26
    protected $queueConfig;
27
28
    /**
29
     * @var ExchangeConfig
30
     */
31
    protected $exchangeConfig;
32
33
    /**
34
     * @var BindConfig
35
     */
36
    protected $bindConfig;
37
38
    /**
39
     * @var ConsumeConfig
40
     */
41
    protected $consumeConfig;
42
43
    /**
44
     * @var MessageHandler
45
     */
46
    protected $messageHandler;
47
48
    /**
49
     * @var LoggerInterface
50
     */
51
    protected $logger;
52
53
    /**
54
     * @var AMQPChannel
55
     */
56
    protected $channel;
57
58
    /**
59
     * QueueReader constructor.
60
     * @param AMQPLazyConnection $connection
61
     * @param QueueConfig $queueConfig
62
     * @param ExchangeConfig $exchangeConfig
63
     * @param BindConfig $bindConfig
64
     * @param ConsumeConfig $consumeConfig
65
     * @param MessageHandler $messageHandler
66
     * @param LoggerInterface $logger
67
     */
68
    public function __construct(
69
        AMQPLazyConnection $connection,
70
        QueueConfig $queueConfig,
71
        ExchangeConfig $exchangeConfig,
72
        BindConfig $bindConfig,
73
        ConsumeConfig $consumeConfig,
74
        MessageHandler $messageHandler,
75
        LoggerInterface $logger
76
    )
77
    {
78
        $this->connection = $connection;
79
        $this->queueConfig = $queueConfig;
80
        $this->exchangeConfig = $exchangeConfig;
81
        $this->bindConfig = $bindConfig;
82
        $this->consumeConfig = $consumeConfig;
83
        $this->logger = $logger;
84
        $this->messageHandler = $messageHandler;
85
    }
86
87
    /**
88
     * @param callable $callback
89
     * @param int $timeout
90
     * @throws ReaderException
91
     * @throws TimeoutReaderException
92
     */
93
    public function read(callable $callback, $timeout=0)
94
    {
95
        $this->initialize();
96
        $this->messageHandler->setCallback($callback);
97
98
        try {
99
            $this->consume($timeout);
100
        } catch(AMQPTimeoutException $e) {
101
            throw new TimeoutReaderException("Timed out at $timeout seconds while reading.", 0, $e);
102
        } catch(\Exception $e) {
103
            throw new ReaderException("Error occurred while reading", 0, $e);
104
        } finally {
105
            $this->channel->basic_cancel('');
106
        }
107
    }
108
109
    /**
110
     * Deletes all messages from the queue
111
     * @return void
112
     */
113
    public function purge()
114
    {
115
        $this->initialize();
116
        $this->channel->queue_purge($this->queueConfig->getName());
117
    }
118
119
    /**
120
     * Creates the exchange
121
     */
122
    protected function exchangeDeclare()
123
    {
124
        $this->logger->debug('Declaring exchange');
125
        $this->channel->exchange_declare(
126
            $this->exchangeConfig->getName(),
127
            $this->exchangeConfig->getType(),
128
            $this->exchangeConfig->getPassive(),
129
            $this->exchangeConfig->getDurable(),
130
            $this->exchangeConfig->getAutoDelete()
131
        );
132
    }
133
134
    /**
135
     * Creates the queues and binds them to the exchanges and topics
136
     */
137
    protected function queueDeclareAndBind()
138
    {
139
        $this->logger->debug('Declaring queue');
140
        $this->channel->queue_declare(
141
            $this->queueConfig->getName(),
142
            $this->queueConfig->getPassive(),
143
            $this->queueConfig->getDurable(),
144
            $this->queueConfig->getExclusive(),
145
            $this->queueConfig->getAutoDelete()
146
        );
147
        $this->channel->queue_bind($this->queueConfig->getName(), $this->exchangeConfig->getName());
148
149
        foreach ($this->bindConfig->getTopics() as $bindTopic) {
150
            $this->logger->info('Binding Topic:' . $bindTopic);
151
            $this->channel->queue_bind($this->queueConfig->getName(), $this->exchangeConfig->getName(), $bindTopic);
152
        }
153
    }
154
155
    /**
156
     * Starts consuming from the queue
157
     * @param int $timeout
158
     */
159
    protected function consume($timeout)
160
    {
161
        $this->logger->debug('Waiting for messages on queue:' . $this->queueConfig->getName());
162
        $this->channel->basic_consume(
163
            $this->queueConfig->getName(),
164
            '',
165
            $this->consumeConfig->getNoLocal(),
166
            $this->consumeConfig->getNoAck(),
167
            $this->consumeConfig->getExclusive(),
168
            $this->consumeConfig->getNoWait(),
169
            array($this->messageHandler, 'handleMessage')
170
        );
171
        $this->channel->wait(null, false, $timeout);
172
    }
173
174
    /**
175
     * @throws ReaderException
176
     */
177
    protected function initialize()
178
    {
179
        if ($this->channel) {
180
            return;
181
        }
182
        $this->logger->debug('Connecting to RabbitMQ');
183
        try {
184
            $this->channel = $this->connection->channel();
185
            $this->exchangeDeclare();
186
            $this->queueDeclareAndBind();
187
        } catch (\ErrorException $exception) {
188
            $this->logger->error('Error trying to connect to rabbitMQ:' . $exception->getMessage());
189
            throw new ReaderException("Error initializing queue reader", 0, $exception);
190
        }
191
    }
192
193
    /**
194
     * Destructor
195
     */
196
    public function __destruct()
197
    {
198
        if (isset($this->channel)) {
199
            $this->channel->close();
200
        }
201
    }
202
}