BunnyManager::declare()   A
last analyzed

Complexity

Conditions 3
Paths 3

Size

Total Lines 25
Code Lines 14

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 0
CRAP Score 12

Importance

Changes 1
Bugs 0 Features 1
Metric Value
eloc 14
c 1
b 0
f 1
dl 0
loc 25
ccs 0
cts 11
cp 0
rs 9.7998
cc 3
nc 3
nop 0
crap 12
1
<?php declare(strict_types = 1);
2
3
namespace Portiny\RabbitMQ;
4
5
use Bunny\Async\Client as AsyncClient;
6
use Bunny\Channel;
7
use Bunny\Client;
8
use Portiny\RabbitMQ\Consumer\AbstractConsumer;
9
use Portiny\RabbitMQ\Exchange\AbstractExchange;
10
use Portiny\RabbitMQ\Queue\AbstractQueue;
11
use React\EventLoop\LoopInterface;
12
use React\Promise\PromiseInterface;
13
14
final class BunnyManager
15
{
16
	/**
17
	 * @var bool
18
	 */
19
	private $isDeclared = false;
20
21
	/**
22
	 * @var array
23
	 */
24
	private $connection = [];
25
26
	/**
27
	 * @var array
28
	 */
29
	private $aliases = [];
30
31
	/**
32
	 * @var iterable
33
	 */
34
	private $consumers = [];
35
36
	/**
37
	 * @var iterable
38
	 */
39
	private $exchanges = [];
40
41
	/**
42
	 * @var iterable
43
	 */
44
	private $queues = [];
45
46
	/**
47
	 * @var LoopInterface
48
	 */
49
	private $loop;
50
51
	/**
52
	 * @var Client|AsyncClient|null
53
	 */
54
	private $client;
55
56
	/**
57
	 * @var Channel|PromiseInterface|null
58
	 */
59
	private $channel;
60
61
62 3
	public function __construct(
63
		array $connection,
64
		array $aliases,
65
		iterable $consumers,
66
		iterable $exchanges,
67
		iterable $queues
68
	) {
69 3
		$this->connection = $connection;
70 3
		$this->aliases = $aliases;
71 3
		$this->consumers = $consumers;
72 3
		$this->exchanges = $exchanges;
73 3
		$this->queues = $queues;
74 3
	}
75
76
77 1
	public function setLoop(LoopInterface $loop): void
78
	{
79 1
		$this->loop = $loop;
80 1
	}
81
82
83
	/**
84
	 * @return Client|AsyncClient
85
	 */
86 2
	public function getClient()
87
	{
88 2
		if ($this->client === null) {
89 2
			if ($this->loop === null) {
90 1
				$this->client = new Client($this->connection);
91
			} else {
92 1
				$this->client = new AsyncClient($this->loop, $this->connection);
93
			}
94
		}
95
96 2
		return $this->client;
97
	}
98
99
100
    /**
101
     * @return array<string, mixed>
102
     */
103
    public function getConnection(): array
104
    {
105
        return $this->connection;
106
    }
107
108
109
	/**
110
	 * @return Channel|PromiseInterface
111
	 */
112
	public function getChannel()
113 1
	{
114
		if ($this->channel === null) {
115 1
			$this->channel = $this->createChannel();
116 1
		}
117
118 1
		return $this->channel;
119 1
	}
120 1
121
122
	public function getConsumerByAlias(string $alias): ?AbstractConsumer
123
	{
124
		$consumerClassName = $this->aliases[$alias] ?? null;
125 1
		if ($consumerClassName !== null) {
126
			/** @var AbstractConsumer $consumer */
127
			foreach ($this->consumers as $consumer) {
128
				if ($consumer instanceof $consumerClassName) {
129
					return $consumer;
130
				}
131
			}
132
		}
133
134
		return null;
135
	}
136
137
138
	/**
139
	 * @return bool|PromiseInterface
140
	 */
141
	public function declare()
142
	{
143
		if ($this->isDeclared) {
144
			return false;
145
		}
146
147
		$channel = $this->getChannel();
148
149
		if ($channel instanceof PromiseInterface) {
150
			$this->isDeclared = true;
151
152
			return $channel->then(function (Channel $channel) {
153
				$this->declareExchanges($channel);
154
				$this->declareQueues($channel);
155
156
				return true;
157
			});
158
		}
159
160
		$this->declareExchanges($channel);
161
		$this->declareQueues($channel);
162
163
		$this->isDeclared = true;
164
165
		return true;
166
	}
167
168
169
	/**
170
	 * @return Channel|PromiseInterface
171
	 */
172
	private function createChannel()
173
	{
174
		$client = $this->getClient();
175
176
		if (! $client->isConnected()) {
177
			if ($client instanceof AsyncClient) {
178
				return $client->connect()->then(function (AsyncClient $client) {
179
					return $client->channel();
180
				});
181
			}
182
183
			$client->connect();
184
		}
185
186
		return $client->channel();
187
	}
188
189
190
	private function declareExchanges(Channel $channel): void
191
	{
192
		/** @var AbstractExchange $exchange */
193
		foreach ($this->exchanges as $exchange) {
194
			$exchange->declare($channel);
195
		}
196
197
		/** @var AbstractExchange $exchange */
198
		foreach ($this->exchanges as $exchange) {
199
			$exchange->declareBindings($channel);
200
		}
201
	}
202
203
204
	private function declareQueues(Channel $channel): void
205
	{
206
		/** @var AbstractQueue $queue */
207
		foreach ($this->queues as $queue) {
208
			$queue->declare($channel);
209
		}
210
	}
211
212
}
213