BaseAmqp::setChannel()   A
last analyzed

Complexity

Conditions 1
Paths 1

Size

Total Lines 6
Code Lines 3

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 3
CRAP Score 1

Importance

Changes 0
Metric Value
dl 0
loc 6
ccs 3
cts 3
cp 1
rs 9.4285
c 0
b 0
f 0
cc 1
eloc 3
nc 1
nop 1
crap 1
1
<?php
2
3
namespace RabbitMqModule;
4
5
use PhpAmqpLib\Channel\AMQPChannel;
6
use PhpAmqpLib\Connection\AbstractConnection;
7
use RabbitMqModule\Options\Exchange as ExchangeOptions;
8
use RabbitMqModule\Options\Queue as QueueOptions;
9
use RabbitMqModule\Service\SetupFabricAwareInterface;
10
11
/**
12
 * Class BaseAmqp
13
 * @package RabbitMqModule
14
 */
15
abstract class BaseAmqp implements SetupFabricAwareInterface
16
{
17
    /**
18
     * @var AbstractConnection
19
     */
20
    protected $connection;
21
    /**
22
     * @var AMQPChannel
23
     */
24
    protected $channel;
25
    /**
26
     * @var QueueOptions
27
     */
28
    protected $queueOptions;
29
    /**
30
     * @var ExchangeOptions
31
     */
32
    protected $exchangeOptions;
33
    /**
34
     * @var bool
35
     */
36
    protected $autoSetupFabricEnabled = true;
37
    /**
38
     * @var bool
39
     */
40
    protected $exchangeDeclared = false;
41
    /**
42
     * @var bool
43
     */
44
    protected $queueDeclared = false;
45
46
    /**
47
     * @param AbstractConnection $connection
48
     * @param AMQPChannel        $channel
49
     */
50 29
    public function __construct(AbstractConnection $connection, AMQPChannel $channel = null)
51
    {
52 29
        $this->connection = $connection;
53 29
        $this->channel = $channel;
54 29
    }
55
56
    /**
57
     * @return AbstractConnection
58
     */
59 28
    public function getConnection()
60
    {
61 28
        return $this->connection;
62
    }
63
64
    /**
65
     * @return AMQPChannel
66
     */
67 14
    public function getChannel()
68
    {
69 14
        if (!$this->channel) {
70 3
            $this->channel = $this->getConnection()->channel();
71 3
        }
72
73 14
        return $this->channel;
74
    }
75
76
    /**
77
     * @param AMQPChannel $channel
78
     *
79
     * @return $this
80
     */
81 2
    public function setChannel(AMQPChannel $channel)
82
    {
83 2
        $this->channel = $channel;
84
85 2
        return $this;
86
    }
87
88
    /**
89
     * @return QueueOptions
90
     */
91 15
    public function getQueueOptions()
92
    {
93 15
        return $this->queueOptions;
94
    }
95
96
    /**
97
     * @param QueueOptions $queueOptions
98
     *
99
     * @return $this
100
     */
101 13
    public function setQueueOptions(QueueOptions $queueOptions)
102
    {
103 13
        $this->queueOptions = $queueOptions;
104
105 13
        return $this;
106
    }
107
108
    /**
109
     * @return ExchangeOptions
110
     */
111 14
    public function getExchangeOptions()
112
    {
113 14
        return $this->exchangeOptions;
114
    }
115
116
    /**
117
     * @param ExchangeOptions $exchangeOptions
118
     *
119
     * @return $this
120
     */
121 14
    public function setExchangeOptions(ExchangeOptions $exchangeOptions)
122
    {
123 14
        $this->exchangeOptions = $exchangeOptions;
124
125 14
        return $this;
126
    }
127
128
    /**
129
     * @return bool
130
     */
131 7
    public function isAutoSetupFabricEnabled()
132
    {
133 7
        return $this->autoSetupFabricEnabled;
134
    }
135
136
    /**
137
     * @param bool $autoSetupFabricEnabled
138
     *
139
     * @return $this
140
     */
141 5
    public function setAutoSetupFabricEnabled($autoSetupFabricEnabled)
142
    {
143 5
        $this->autoSetupFabricEnabled = $autoSetupFabricEnabled;
144
145 5
        return $this;
146
    }
147
148
    /**
149
     * @param ExchangeOptions $options
150
     *
151
     * @return $this
152
     */
153 9
    protected function declareExchange(ExchangeOptions $options = null)
154
    {
155 9
        if (!$options) {
156 9
            $options = $this->getExchangeOptions();
157 9
        }
158
159 9
        if (!$options->isDeclare()) {
160 3
            return $this;
161
        }
162
163 6
        $this->getChannel()->exchange_declare(
164 6
            $options->getName(),
165 6
            $options->getType(),
166 6
            $options->isPassive(),
167 6
            $options->isDurable(),
168 6
            $options->isAutoDelete(),
169 6
            $options->isInternal(),
170 6
            $options->isNoWait(),
171 6
            $options->getArguments(),
172 6
            $options->getTicket()
173 6
        );
174
175 6
        $binds = $options->getExchangeBinds();
176 6
        foreach ($binds as $bind) {
177 1
            $this->declareExchange($bind->getExchange());
178 1
            $routingKeys = $bind->getRoutingKeys();
179 1
            if (!count($routingKeys)) {
180 1
                $routingKeys = [''];
181 1
            }
182 1
            foreach ($routingKeys as $routingKey) {
183 1
                $this->getChannel()->exchange_bind(
184 1
                    $options->getName(),
185 1
                    $bind->getExchange()->getName(),
186
                    $routingKey
187 1
                );
188 1
            }
189 6
        }
190
191 6
        $this->exchangeDeclared = true;
192
193 6
        return $this;
194
    }
195
196
    /**
197
     * @return $this
198
     */
199 7
    protected function declareQueue()
200
    {
201 7
        $queueOptions = $this->getQueueOptions();
202
203 7
        if (!$queueOptions || null === $queueOptions->getName()) {
204 1
            return $this;
205
        }
206
207 6
        $exchangeOptions = $this->getExchangeOptions();
208
209 6
        list($queueName) = $this->getChannel()->queue_declare(
210 6
            $queueOptions->getName(),
211 6
            $queueOptions->isPassive(),
212 6
            $queueOptions->isDurable(),
213 6
            $queueOptions->isExclusive(),
214 6
            $queueOptions->isAutoDelete(),
215 6
            $queueOptions->isNoWait(),
216 6
            $queueOptions->getArguments(),
217 6
            $queueOptions->getTicket()
218 6
        );
219
220 6
        $routingKeys = $queueOptions->getRoutingKeys();
221 6
        if (!count($routingKeys)) {
222 6
            $routingKeys = [''];
223 6
        }
224 6
        foreach ($routingKeys as $routingKey) {
225 6
            $this->getChannel()->queue_bind(
226 6
                $queueName,
227 6
                $exchangeOptions->getName(),
228
                $routingKey
229 6
            );
230 6
        }
231
232 6
        $this->queueDeclared = true;
233
234 6
        return $this;
235
    }
236
237
    /**
238
     * @return $this
239
     */
240 9
    public function setupFabric()
241
    {
242 9
        if (!$this->exchangeDeclared) {
243 9
            $this->declareExchange();
244 9
        }
245
246 9
        $queueOptions = $this->getQueueOptions();
247
248 9
        if (!$this->queueDeclared && $queueOptions) {
249 7
            $this->declareQueue();
250 7
        }
251
252 9
        return $this;
253
    }
254
255
    /**
256
     * @return $this
257
     */
258 2
    public function reconnect()
259
    {
260 2
        if (!$this->getConnection()->isConnected()) {
261 1
            return $this;
262
        }
263
264 1
        $this->getConnection()->reconnect();
265
266 1
        return $this;
267
    }
268
269 26
    public function __destruct()
270
    {
271 26
        if ($this->channel) {
272 21
            $this->channel->close();
273 21
        }
274
275 26
        if ($this->connection && $this->getConnection()->isConnected()) {
276 1
            $this->getConnection()->close();
277 1
        }
278 26
    }
279
}
280