Passed
Push — master ( 58a5a8...186224 )
by Tomáš
05:54
created

BunnyManager::declareExchanges()   A

Complexity

Conditions 3
Paths 4

Size

Total Lines 10
Code Lines 4

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 0
CRAP Score 12

Importance

Changes 1
Bugs 0 Features 1
Metric Value
eloc 4
c 1
b 0
f 1
dl 0
loc 10
ccs 0
cts 5
cp 0
rs 10
cc 3
nc 4
nop 1
crap 12
1
<?php declare(strict_types=1);
2
3
namespace Portiny\RabbitMQ;
4
5
use Bunny\Async\Client as AsyncClient;
6
use Bunny\Channel;
7
use Bunny\Client;
8
use Portiny\RabbitMQ\Consumer\AbstractConsumer;
9
use Portiny\RabbitMQ\Exchange\AbstractExchange;
10
use Portiny\RabbitMQ\Queue\AbstractQueue;
11
use React\EventLoop\LoopInterface;
12
use React\Promise\PromiseInterface;
13
14
final class BunnyManager
15
{
16
	/**
17
	 * @var bool
18
	 */
19
	private $isDeclared = false;
20
21
	/**
22
	 * @var array
23
	 */
24
	private $connection = [];
25
26
	/**
27
	 * @var array
28
	 */
29
	private $aliases = [];
30
31
	/**
32
	 * @var iterable
33
	 */
34
	private $consumers = [];
35
36
	/**
37
	 * @var iterable
38
	 */
39
	private $exchanges = [];
40
41
	/**
42
	 * @var iterable
43
	 */
44
	private $queues = [];
45
46
	/**
47
	 * @var LoopInterface
48
	 */
49
	private $loop;
50
51
	/**
52
	 * @var Client|AsyncClient|null
53
	 */
54
	private $client;
55
56
	/**
57
	 * @var Channel|PromiseInterface|null
58
	 */
59
	private $channel;
60
61 3
	public function __construct(
62
		array $connection,
63
		array $aliases,
64
		iterable $consumers,
65
		iterable $exchanges,
66
		iterable $queues
67
	) {
68 3
		$this->connection = $connection;
69 3
		$this->aliases = $aliases;
70 3
		$this->consumers = $consumers;
71 3
		$this->exchanges = $exchanges;
72 3
		$this->queues = $queues;
73 3
	}
74
75 1
	public function setLoop(LoopInterface $loop): void
76
	{
77 1
		$this->loop = $loop;
78 1
	}
79
80
	/**
81
	 * @return Client|AsyncClient
82
	 */
83 2
	public function getClient()
84
	{
85 2
		if ($this->client === null) {
86 2
			if ($this->loop === null) {
87 1
				$this->client = new Client($this->connection);
88
			} else {
89 1
				$this->client = new AsyncClient($this->loop, $this->connection);
90
			}
91
		}
92
93 2
		return $this->client;
94
	}
95
96
	/**
97
	 * @return Channel|PromiseInterface
98
	 */
99
	public function getChannel()
100
	{
101
		if ($this->channel === null) {
102
			$this->channel = $this->createChannel();
103
		}
104
105
		return $this->channel;
106
	}
107
108 1
	public function getConsumerByAlias(string $alias): ?AbstractConsumer
109
	{
110 1
		$consumerClassName = $this->aliases[$alias] ?? null;
111 1
		if ($consumerClassName !== null) {
112
			/** @var AbstractConsumer $consumer */
113 1
			foreach ($this->consumers as $consumer) {
114 1
				if ($consumer instanceof $consumerClassName) {
115 1
					return $consumer;
116
				}
117
			}
118
		}
119
120 1
		return null;
121
	}
122
123
	/**
124
	 * @return bool|PromiseInterface
125
	 */
126
	public function declare()
127
	{
128
		if ($this->isDeclared) {
129
			return false;
130
		}
131
132
		$channel = $this->getChannel();
133
134
		if ($channel instanceof PromiseInterface) {
135
			$this->isDeclared = true;
136
137
			return $channel->then(function (Channel $channel) {
138
				$this->declareExchanges($channel);
139
				$this->declareQueues($channel);
140
141
				return true;
142
			});
143
		}
144
145
		$this->declareExchanges($channel);
146
		$this->declareQueues($channel);
147
148
		$this->isDeclared = true;
149
150
		return true;
151
	}
152
153
	/**
154
	 * @return Channel|PromiseInterface
155
	 */
156
	private function createChannel()
157
	{
158
		$client = $this->getClient();
159
160
		if (! $client->isConnected()) {
161
			if ($client instanceof AsyncClient) {
162
				return $client->connect()->then(function (AsyncClient $client) {
163
					return $client->channel();
164
				});
165
			}
166
167
			$client->connect();
168
		}
169
170
		return $client->channel();
171
	}
172
173
	private function declareExchanges(Channel $channel): void
174
	{
175
		/** @var AbstractExchange $exchange */
176
		foreach ($this->exchanges as $exchange) {
177
			$exchange->declare($channel);
178
		}
179
180
		/** @var AbstractExchange $exchange */
181
		foreach ($this->exchanges as $exchange) {
182
			$exchange->declareBindings($channel);
183
		}
184
	}
185
186
	private function declareQueues(Channel $channel): void
187
	{
188
		/** @var AbstractQueue $queue */
189
		foreach ($this->queues as $queue) {
190
			$queue->declare($channel);
191
		}
192
	}
193
}
194