1 | <?php |
||||
2 | |||||
3 | namespace CmdrSharp\AmqpRouteMessenger; |
||||
4 | |||||
5 | use PhpAmqpLib\Connection\AMQPStreamConnection; |
||||
6 | use PhpAmqpLib\Connection\AMQPSSLConnection; |
||||
7 | use PhpAmqpLib\Channel\AMQPChannel; |
||||
8 | use PhpAmqpLib\Message\AMQPMessage; |
||||
9 | use Log; |
||||
10 | |||||
11 | class Client implements ClientInterface |
||||
12 | { |
||||
13 | /** @var AMQPStreamConnection */ |
||||
14 | protected $connection; |
||||
15 | |||||
16 | /** @var AMQPChannel */ |
||||
17 | protected $channel; |
||||
18 | |||||
19 | /** @var string */ |
||||
20 | protected $exchange_name; |
||||
21 | |||||
22 | /** @var string */ |
||||
23 | protected $queue_name; |
||||
24 | |||||
25 | /** @var mixed */ |
||||
26 | private $response; |
||||
27 | |||||
28 | /** |
||||
29 | * Factory constructor. |
||||
30 | */ |
||||
31 | public function __construct() |
||||
32 | { |
||||
33 | $this->connection = new AMQPSSLConnection( |
||||
34 | config('amqproutemessenger.host', 'localhost'), |
||||
35 | config('amqproutemessenger.port', '5672'), |
||||
36 | config('amqproutemessenger.login', 'guest'), |
||||
37 | config('amqproutemessenger.password', 'guest'), |
||||
38 | config('amqproutemessenger.vhost', '/'), |
||||
39 | $this->createSslContext(), |
||||
40 | [ |
||||
41 | 'insist' => config('amqproutemessenger.insist', false), |
||||
42 | 'login_method' => config('amqproutemessenger.login_method', 'AMQPLAIN'), |
||||
43 | 'login_response' => null, |
||||
44 | 'locale' => config('amqproutemessenger.locale', 'en_US'), |
||||
45 | 'connection_timeout' => config('amqproutemessenger.connection_timeout', 3.0), |
||||
46 | 'read_write_timeout' => config('amqproutemessenger.read_write_timeout', 3.0), |
||||
47 | 'keepalive' => config('amqproutemessenger.keepalive', false), |
||||
48 | 'heartbeat' => config('amqproutemessenger.heartbeat', 0) |
||||
49 | ] |
||||
50 | ); |
||||
51 | } |
||||
52 | |||||
53 | /** |
||||
54 | * Factory destructor. |
||||
55 | */ |
||||
56 | public function __destruct() |
||||
57 | { |
||||
58 | if ($this->channel) { |
||||
59 | $this->channel->close(); |
||||
60 | } |
||||
61 | |||||
62 | if ($this->connection && $this->connection->isConnected()) { |
||||
63 | $this->connection->close(); |
||||
64 | } |
||||
65 | } |
||||
66 | |||||
67 | /** |
||||
68 | * Generates an array of SSL Options if specified. |
||||
69 | * |
||||
70 | * @return array |
||||
71 | */ |
||||
72 | private function createSslContext(): array |
||||
73 | { |
||||
74 | $ca_file = config('amqproutemessenger.ca_file', null); |
||||
75 | $verify_peer = config('amqproutemessenger.verify_peer', false); |
||||
76 | |||||
77 | if (null !== $ca_file) { |
||||
78 | return [ |
||||
79 | 'cafile' => $ca_file, |
||||
80 | 'verify_peer' => $verify_peer |
||||
81 | ]; |
||||
82 | } |
||||
83 | |||||
84 | return []; |
||||
85 | } |
||||
86 | |||||
87 | /** |
||||
88 | * Declares an Exchange. |
||||
89 | * |
||||
90 | * @param string $exchange_name |
||||
91 | * @param bool $passive |
||||
92 | * @return Client |
||||
93 | */ |
||||
94 | public function declareExchange(string $exchange_name, bool $passive = false): ClientInterface |
||||
95 | { |
||||
96 | if (!$this->channel) { |
||||
97 | $this->channel = $this->connection->channel(); |
||||
98 | } |
||||
99 | |||||
100 | $this->exchange_name = $exchange_name; |
||||
101 | |||||
102 | $this->channel->exchange_declare( |
||||
103 | $this->exchange_name, |
||||
104 | 'direct', |
||||
105 | $passive, |
||||
106 | false, |
||||
107 | false |
||||
108 | ); |
||||
109 | |||||
110 | return $this; |
||||
111 | } |
||||
112 | |||||
113 | /** |
||||
114 | * Declares a randomly generated queue. |
||||
115 | * |
||||
116 | * @param bool $passive |
||||
117 | * @return Client |
||||
118 | */ |
||||
119 | public function declareQueue(bool $passive = false): ClientInterface |
||||
120 | { |
||||
121 | if (!$this->channel) { |
||||
122 | $this->channel = $this->connection->channel(); |
||||
123 | } |
||||
124 | |||||
125 | list($this->queue_name, , ) = $this->channel->queue_declare( |
||||
126 | "", |
||||
127 | $passive, |
||||
128 | false, |
||||
129 | true, |
||||
130 | false |
||||
131 | ); |
||||
132 | |||||
133 | return $this; |
||||
134 | } |
||||
135 | |||||
136 | /** |
||||
137 | * Binds a queue. |
||||
138 | * |
||||
139 | * @param string $correlation_id |
||||
140 | * @return Client |
||||
141 | * @throws \ErrorException |
||||
142 | */ |
||||
143 | public function bindQueue(string $correlation_id): ClientInterface |
||||
144 | { |
||||
145 | if (!$this->channel || !$this->queue_name || !$this->exchange_name) { |
||||
146 | throw new \ErrorException('An exchange has not been defined.'); |
||||
147 | } |
||||
148 | |||||
149 | $this->channel->queue_bind($this->queue_name, $this->exchange_name, $correlation_id); |
||||
150 | |||||
151 | return $this; |
||||
152 | } |
||||
153 | |||||
154 | /** |
||||
155 | * Publishes a routed message. |
||||
156 | * |
||||
157 | * @param string $correlation_id |
||||
158 | * @param string $message |
||||
159 | * @return bool |
||||
160 | * @throws \ErrorException |
||||
161 | */ |
||||
162 | public function publish(string $correlation_id, string $message): bool |
||||
163 | { |
||||
164 | if (!$this->channel || !$this->exchange_name) { |
||||
165 | throw new \ErrorException('An exchange has not been defined.'); |
||||
166 | } |
||||
167 | |||||
168 | $this->channel->set_ack_handler(function (AMQPMessage $message) { |
||||
0 ignored issues
–
show
|
|||||
169 | Log::info('AMQP Message ACK'); |
||||
170 | }); |
||||
171 | |||||
172 | $this->channel->set_nack_handler(function (AMQPMessage $message) { |
||||
0 ignored issues
–
show
The parameter
$message is not used and could be removed.
(
Ignorable by Annotation
)
If this is a false-positive, you can also ignore this issue in your code via the
This check looks for parameters that have been defined for a function or method, but which are not used in the method body. ![]() |
|||||
173 | Log::error('AMQP Message NACK'); |
||||
174 | }); |
||||
175 | |||||
176 | $this->channel->confirm_select(); |
||||
177 | |||||
178 | $this->channel->basic_publish(new AMQPMessage($message), $this->exchange_name, $correlation_id); |
||||
179 | $this->channel->wait_for_pending_acks(); |
||||
180 | |||||
181 | return true; |
||||
182 | } |
||||
183 | |||||
184 | /** |
||||
185 | * Reads from a routed exchange and returns the response. |
||||
186 | * |
||||
187 | * @param int $timeout |
||||
188 | * @return mixed |
||||
189 | * @throws \ErrorException |
||||
190 | */ |
||||
191 | public function read(int $timeout = 0): string |
||||
192 | { |
||||
193 | if (!$this->queue_name || !$this->channel) { |
||||
194 | throw new \ErrorException('A queue, or exchange, have not been defined.'); |
||||
195 | } |
||||
196 | |||||
197 | $callback = function ($msg) { |
||||
198 | return $this->response = $msg->body; |
||||
199 | }; |
||||
200 | |||||
201 | $this->channel->basic_consume( |
||||
202 | $this->queue_name, |
||||
203 | '', |
||||
204 | false, |
||||
205 | true, |
||||
206 | false, |
||||
207 | false, |
||||
208 | $callback |
||||
209 | ); |
||||
210 | |||||
211 | while ($this->response === null && count($this->channel->callbacks)) { |
||||
212 | $this->channel->wait(null, false, $timeout); |
||||
213 | } |
||||
214 | |||||
215 | return $this->response; |
||||
216 | } |
||||
217 | |||||
218 | /** |
||||
219 | * Closes a connection. |
||||
220 | * |
||||
221 | * @return $this |
||||
222 | */ |
||||
223 | public function closeConnection(): ClientInterface |
||||
224 | { |
||||
225 | if ($this->connection->isConnected()) { |
||||
226 | $this->connection->close(); |
||||
227 | } |
||||
228 | |||||
229 | return $this; |
||||
230 | } |
||||
231 | |||||
232 | /** |
||||
233 | * Closes a channel. |
||||
234 | * |
||||
235 | * @return $this |
||||
236 | */ |
||||
237 | public function closeChannel(): ClientInterface |
||||
238 | { |
||||
239 | if ($this->channel) { |
||||
240 | $this->channel->close(); |
||||
241 | } |
||||
242 | |||||
243 | return $this; |
||||
244 | } |
||||
245 | |||||
246 | /** |
||||
247 | * Returns the active connection. |
||||
248 | * |
||||
249 | * @return AMQPStreamConnection |
||||
250 | */ |
||||
251 | public function getConnection(): AMQPStreamConnection |
||||
252 | { |
||||
253 | return $this->connection; |
||||
254 | } |
||||
255 | |||||
256 | /** |
||||
257 | * Returns the channel. |
||||
258 | * |
||||
259 | * @return mixed |
||||
260 | */ |
||||
261 | public function getChannel(): AMQPChannel |
||||
262 | { |
||||
263 | return $this->channel ?: false; |
||||
0 ignored issues
–
show
|
|||||
264 | } |
||||
265 | |||||
266 | /** |
||||
267 | * Returns the name of the queue declared. |
||||
268 | * |
||||
269 | * @return mixed |
||||
270 | */ |
||||
271 | public function getQueue(): string |
||||
272 | { |
||||
273 | return $this->queue_name ?: false; |
||||
0 ignored issues
–
show
|
|||||
274 | } |
||||
275 | } |
||||
276 |
This check looks for parameters that have been defined for a function or method, but which are not used in the method body.