Completed
Push — master ( 5fca0d...2a987b )
by Quim
02:16
created

QueueReader   A

Complexity

Total Complexity 8

Size/Duplication

Total Lines 140
Duplicated Lines 0 %

Coupling/Cohesion

Components 1
Dependencies 10

Importance

Changes 0
Metric Value
wmc 8
lcom 1
cbo 10
dl 0
loc 140
rs 10
c 0
b 0
f 0

4 Methods

Rating   Name   Duplication   Size   Complexity  
A __construct() 0 18 1
A read() 0 10 2
B initialize() 0 43 4
A __destruct() 0 4 1
1
<?php
2
/**
3
 * Created by PhpStorm.
4
 * User: quimmanrique
5
 * Date: 13/02/17
6
 * Time: 17:33
7
 */
8
9
namespace Cmp\Queues\Infrastructure\AmqpLib\v26\RabbitMQ\Queue;
10
11
use Cmp\Queues\Domain\Queue\Exception\ReaderException;
12
use Cmp\Queues\Domain\Queue\Exception\TimeoutReaderException;
13
use Cmp\Queues\Domain\Queue\QueueReader as DomainQueueReader;
14
use Cmp\Queues\Infrastructure\AmqpLib\v26\RabbitMQ\Queue\Config\BindConfig;
15
use Cmp\Queues\Infrastructure\AmqpLib\v26\RabbitMQ\Queue\Config\ConsumeConfig;
16
use Cmp\Queues\Infrastructure\AmqpLib\v26\RabbitMQ\Queue\Config\ExchangeConfig;
17
use Cmp\Queues\Infrastructure\AmqpLib\v26\RabbitMQ\Queue\Config\QueueConfig;
18
use PhpAmqpLib\Channel\AMQPChannel;
19
use PhpAmqpLib\Connection\AMQPLazyConnection;
20
use Psr\Log\LoggerInterface;
21
22
class QueueReader implements DomainQueueReader
23
{
24
    /**
25
     * @var AMQPLazyConnection
26
     */
27
    protected $connection;
28
29
    /**
30
     * @var QueueConfig
31
     */
32
    protected $queueConfig;
33
34
    /**
35
     * @var ExchangeConfig
36
     */
37
    protected $exchangeConfig;
38
39
    /**
40
     * @var BindConfig
41
     */
42
    protected $bindConfig;
43
44
    /**
45
     * @var ConsumeConfig
46
     */
47
    protected $consumeConfig;
48
49
    /**
50
     * @var MessageHandler
51
     */
52
    protected $messageHandler;
53
54
    /**
55
     * @var LoggerInterface
56
     */
57
    protected $logger;
58
59
    /**
60
     * @var AMQPChannel
61
     */
62
    protected $channel;
63
64
    /**
65
     * QueueReader constructor.
66
     * @param AMQPLazyConnection $connection
67
     * @param QueueConfig $queueConfig
68
     * @param ExchangeConfig $exchangeConfig
69
     * @param BindConfig $bindConfig
70
     * @param ConsumeConfig $consumeConfig
71
     * @param MessageHandler $messageHandler
72
     * @param LoggerInterface $logger
73
     */
74
    public function __construct(
75
        AMQPLazyConnection $connection,
76
        QueueConfig $queueConfig,
77
        ExchangeConfig $exchangeConfig,
78
        BindConfig $bindConfig,
79
        ConsumeConfig $consumeConfig,
80
        MessageHandler $messageHandler,
81
        LoggerInterface $logger
82
    )
83
    {
84
        $this->connection = $connection;
85
        $this->queueConfig = $queueConfig;
86
        $this->exchangeConfig = $exchangeConfig;
87
        $this->bindConfig = $bindConfig;
88
        $this->consumeConfig = $consumeConfig;
89
        $this->logger = $logger;
90
        $this->messageHandler = $messageHandler;
91
    }
92
93
    /**
94
     * @param callable $callback
95
     * @param int $timeout
96
     * @throws ReaderException
97
     * @throws TimeoutReaderException
98
     */
99
    public function read(callable $callback, $timeout=0)
100
    {
101
        $this->initialize();
102
        $this->messageHandler->setCallback($callback);
103
        try {
104
            $this->channel->wait(null, false, $timeout);
105
        } catch(\Exception $e) {
106
            throw new TimeoutReaderException();
107
        }
108
    }
109
110
    /**
111
     * @throws ReaderException
112
     */
113
    protected function initialize()
114
    {
115
        if($this->channel) {
116
            return;
117
        }
118
        $this->logger->info('Connecting to RabbitMQ');
119
        try {
120
            $this->channel = $this->connection->channel();
121
            $this->channel->exchange_declare(
122
                $this->exchangeConfig->getName(),
123
                $this->exchangeConfig->getType(),
124
                $this->exchangeConfig->getPassive(),
125
                $this->exchangeConfig->getDurable(),
126
                $this->exchangeConfig->getAutoDelete()
127
            );
128
            $this->logger->info('Declaring queue');
129
            $this->channel->queue_declare(
130
                $this->queueConfig->getName(),
131
                $this->queueConfig->getPassive(),
132
                $this->queueConfig->getDurable(),
133
                $this->queueConfig->getExclusive(),
134
                $this->queueConfig->getAutoDelete()
135
            );
136
            $this->channel->queue_bind($this->queueConfig->getName(), $this->exchangeConfig->getName());
137
            foreach ($this->bindConfig->getTopics() as $bindTopic) {
138
                $this->logger->info('Binding Topic:' . $bindTopic);
139
                $this->channel->queue_bind($this->queueConfig->getName(), $this->exchangeConfig->getName(), $bindTopic);
140
            }
141
            $this->logger->info('Waiting for messages on queue:' . $this->queueConfig->getName());
142
            $this->channel->basic_consume(
143
                $this->queueConfig->getName(),
144
                '',
145
                $this->consumeConfig->getNoLocal(),
146
                $this->consumeConfig->getNoAck(),
147
                $this->consumeConfig->getExclusive(),
148
                $this->consumeConfig->getNoWait(),
149
                array($this->messageHandler, 'handleMessage')
150
            );
151
        } catch (\ErrorException $exception) {
152
            $this->logger->error('Error trying to connect to rabbitMQ:' . $exception->getMessage());
153
            throw new ReaderException($exception->getMessage(), $exception->getCode());
154
        }
155
    }
156
157
    public function __destruct()
158
    {
159
        $this->channel->close();
160
    }
161
}