This project does not seem to handle request data directly as such no vulnerable execution paths were found.
include
, or for example
via PHP's auto-loading mechanism.
These results are based on our legacy PHP analysis, consider migrating to our new PHP analysis engine instead. Learn more
1 | <?php |
||
2 | |||
3 | namespace PHPDaemon\Clients\AMQP; |
||
4 | |||
5 | use PHPDaemon\Clients\AMQP\Driver\Exception\AMQPChannelException; |
||
6 | use PHPDaemon\Clients\AMQP\Driver\Protocol\v091\Protocol\Basic; |
||
7 | use PHPDaemon\Clients\AMQP\Driver\Protocol\v091\Protocol\BodyFrame; |
||
8 | use PHPDaemon\Clients\AMQP\Driver\Protocol\v091\Protocol\Channel as ProtocolChannel; |
||
9 | use PHPDaemon\Clients\AMQP\Driver\Protocol\v091\Protocol\Exchange; |
||
10 | use PHPDaemon\Clients\AMQP\Driver\Protocol\v091\Protocol\IncomingFrame; |
||
11 | use PHPDaemon\Clients\AMQP\Driver\Protocol\v091\Protocol\Queue; |
||
12 | use PHPDaemon\Core\Daemon; |
||
13 | use PHPDaemon\Traits\EventHandlers; |
||
14 | |||
15 | /** |
||
16 | * Class Channel |
||
17 | * @author Aleksey I. Kuleshov YOU GLOBAL LIMITED |
||
18 | * @package PHPDaemon\Clients\AMQP |
||
19 | */ |
||
20 | class Channel |
||
21 | { |
||
22 | use EventHandlers; |
||
23 | |||
24 | /** |
||
25 | * The QOS prefetch count value |
||
26 | */ |
||
27 | const QOS_PREFETCH_COUNT = 3; |
||
28 | |||
29 | /** |
||
30 | * The QOS prefetch size value |
||
31 | */ |
||
32 | const QOS_PREFECTH_SIZE = 0; |
||
33 | |||
34 | /** |
||
35 | * This event raised on channel open |
||
36 | */ |
||
37 | const EVENT_ON_CHANNEL_OPEN_CALLBACK = 'event.on.channel.open.callback'; |
||
38 | |||
39 | /** |
||
40 | * This event raised on channel close |
||
41 | */ |
||
42 | const EVENT_ON_CHANNEL_CLOSE_CALLBACK = 'event.on.channel.close.callback'; |
||
43 | |||
44 | /** |
||
45 | * This event raised on channel consume ok frame |
||
46 | */ |
||
47 | const EVENT_ON_CHANNEL_CONSUMEOK_CALLBACK = 'event.on.channel.consumeOk.callback'; |
||
48 | |||
49 | /** |
||
50 | * This event raised on queue declare confirmation |
||
51 | */ |
||
52 | const EVENT_ON_CHANNEL_DECLARE_QUEUE_CALLBACK = 'event.channel.dispatch.declareQueue.callback'; |
||
53 | |||
54 | /** |
||
55 | * This event raised on queue delete confirmation |
||
56 | */ |
||
57 | const EVENT_ON_CHANNEL_DELETE_QUEUE_CALLBACK = 'event.on.channel.deleteQueue.callback'; |
||
58 | |||
59 | /** |
||
60 | * This event raised on queue purge confirmation |
||
61 | */ |
||
62 | const EVENT_ON_CHANNEL_PURGE_QUEUE_CALLBACK = 'event.on.channel.purgeQueue.callback'; |
||
63 | |||
64 | /** |
||
65 | * This event raised on queue bind confirmation |
||
66 | */ |
||
67 | const EVENT_ON_CHANNEL_BIND_QUEUE_CALLBACK = 'event.on.channel.bindQueue.callback'; |
||
68 | |||
69 | /** |
||
70 | * This event raised on queue unbind confirmation |
||
71 | */ |
||
72 | const EVENT_ON_CHANNEL_UNBIND_QUEUE_CALLBACK = 'event.on.channel.unbindQueue.callback'; |
||
73 | |||
74 | /** |
||
75 | * This event raised on BasicGet message income |
||
76 | */ |
||
77 | const EVENT_DISPATCH_MESSAGE = 'event.channel.dispatch.message'; |
||
78 | |||
79 | /** |
||
80 | * This event raised on BasicConsume message income |
||
81 | */ |
||
82 | const EVENT_DISPATCH_CONSUMER_MESSAGE = 'event.channel.dispatch.consumer.message'; |
||
83 | |||
84 | /** |
||
85 | * This event raised on exchange declare confirmation |
||
86 | */ |
||
87 | const EVENT_ON_CHANNEL_DECLARE_EXCHANGE_CALLBACK = 'event.channel.dispatch.declareExchange.callback'; |
||
88 | |||
89 | /** |
||
90 | * This event raised on exchange delete confirmation |
||
91 | */ |
||
92 | const EVENT_ON_CHANNEL_DELETE_EXCHANGE_CALLBACK = 'event.channel.dispatch.deleteExchange.callback'; |
||
93 | |||
94 | /** |
||
95 | * This event raised on exchange bind confirmation |
||
96 | */ |
||
97 | const EVENT_ON_CHANNEL_BIND_EXCHANGE_CALLBACK = 'event.channel.dispatch.bindExchange.callback'; |
||
98 | |||
99 | /** |
||
100 | * This event raised on exchange unbind confirmation |
||
101 | */ |
||
102 | const EVENT_ON_CHANNEL_UNBIND_EXCHANGE_CALLBACK = 'event.channel.dispatch.unbindExchange.callback'; |
||
103 | |||
104 | /** |
||
105 | * @var Connection |
||
106 | */ |
||
107 | protected $connection; |
||
108 | |||
109 | /** |
||
110 | * @var int |
||
111 | */ |
||
112 | protected $id; |
||
113 | |||
114 | /** |
||
115 | * @var array |
||
116 | */ |
||
117 | protected $consumers = []; |
||
118 | |||
119 | /** |
||
120 | * @var |
||
121 | */ |
||
122 | private $stack; |
||
123 | |||
124 | /** |
||
125 | * @var bool |
||
126 | */ |
||
127 | private $isConnected; |
||
128 | |||
129 | /** |
||
130 | * AMQPChannel constructor. |
||
131 | * @param Connection $connection |
||
132 | * @param callable|null $callback |
||
133 | * @throws \InvalidArgumentException |
||
134 | * @throws \PHPDaemon\Clients\AMQP\Driver\Protocol\Exception\AMQPProtocolException |
||
135 | * @throws \PHPDaemon\Clients\AMQP\Driver\Exception\AMQPConnectionException |
||
136 | */ |
||
137 | public function __construct(Connection $connection, callable $callback = null) |
||
138 | { |
||
139 | $this->connection = $connection; |
||
140 | $this->addThisToEvents = false; |
||
141 | |||
142 | $outputFrame = new ProtocolChannel\ChannelOpenFrame(); |
||
143 | $outputFrame->frameChannelId = $this->connection->findChannelId(); |
||
144 | $this->connection->addChannel($outputFrame->frameChannelId, $this); |
||
145 | |||
146 | $this->connection->command($outputFrame); |
||
147 | |||
148 | $this->on(ProtocolChannel\ChannelOpenOkFrame::class, [$this, 'dispatch']); |
||
149 | $this->on(ProtocolChannel\ChannelCloseFrame::class, [$this, 'dispatch']); |
||
150 | $this->on(ProtocolChannel\ChannelCloseOkFrame::class, [$this, 'dispatch']); |
||
151 | |||
152 | $this->on(Basic\BasicQosOkFrame::class, [$this, 'dispatch']); |
||
153 | $this->on(Basic\BasicCancelOkFrame::class, [$this, 'dispatch']); |
||
154 | $this->on(Basic\BasicDeliverFrame::class, [$this, 'dispatch']); |
||
155 | $this->on(Basic\BasicGetOkFrame::class, [$this, 'dispatch']); |
||
156 | $this->on(Basic\BasicHeaderFrame::class, [$this, 'dispatch']); |
||
157 | $this->on(Basic\BasicGetEmptyFrame::class, [$this, 'dispatch']); |
||
158 | $this->on(Basic\BasicConsumeOkFrame::class, [$this, 'dispatch']); |
||
159 | |||
160 | $this->on(BodyFrame::class, [$this, 'dispatch']); |
||
161 | |||
162 | $this->on(Queue\QueueDeclareOkFrame::class, [$this, 'dispatch']); |
||
163 | $this->on(Queue\QueueDeleteOkFrame::class, [$this, 'dispatch']); |
||
164 | $this->on(Queue\QueuePurgeOkFrame::class, [$this, 'dispatch']); |
||
165 | $this->on(Queue\QueueBindOkFrame::class, [$this, 'dispatch']); |
||
166 | $this->on(Queue\QueueUnbindOkFrame::class, [$this, 'dispatch']); |
||
167 | |||
168 | $this->on(Exchange\ExchangeDeclareOkFrame::class, [$this, 'dispatch']); |
||
169 | $this->on(Exchange\ExchangeDeleteOkFrame::class, [$this, 'dispatch']); |
||
170 | $this->on(Exchange\ExchangeBindOkFrame::class, [$this, 'dispatch']); |
||
171 | $this->on(Exchange\ExchangeUnbindOkFrame::class, [$this, 'dispatch']); |
||
172 | |||
173 | $this->on(self::EVENT_DISPATCH_CONSUMER_MESSAGE, function ($consumerTag, $message) { |
||
174 | if (array_key_exists($consumerTag, $this->consumers)) { |
||
175 | $consumer = $this->consumers[$consumerTag]; |
||
176 | $consumer($message); |
||
177 | return; |
||
178 | } |
||
179 | }); |
||
180 | |||
181 | $this->on(self::EVENT_ON_CHANNEL_OPEN_CALLBACK, function ($channel) use ($callback) { |
||
182 | if (is_callable($callback)) { |
||
183 | $callback($channel); |
||
184 | } |
||
185 | }); |
||
186 | |||
187 | $this->stack = new \SplObjectStorage(); |
||
188 | } |
||
189 | |||
190 | /** |
||
191 | * Dispatch incoming frame |
||
192 | * @param IncomingFrame $incomingFrame |
||
193 | * @throws \InvalidArgumentException |
||
194 | * @throws \PHPDaemon\Clients\AMQP\Driver\Protocol\Exception\AMQPProtocolException |
||
195 | */ |
||
196 | public function dispatch(IncomingFrame $incomingFrame) |
||
197 | { |
||
198 | switch (true) { |
||
199 | case $incomingFrame instanceof Basic\BasicDeliverFrame: |
||
200 | case $incomingFrame instanceof Basic\BasicGetOkFrame: |
||
201 | $message = new Message(); |
||
202 | $message->setRoutingKey($incomingFrame->routingKey) |
||
203 | ->setExchange($incomingFrame->exchange) |
||
204 | ->setTag($incomingFrame->deliveryTag) |
||
205 | ->setChannel($this); |
||
206 | $object = new \stdClass(); |
||
207 | if ($incomingFrame instanceof Basic\BasicDeliverFrame) { |
||
208 | $object->consumerTag = $incomingFrame->consumerTag; |
||
209 | } |
||
210 | $object->message = $message; |
||
211 | $this->stack->attach($object); |
||
212 | $this->stack->rewind(); |
||
213 | break; |
||
214 | case $incomingFrame instanceof Basic\BasicHeaderFrame: |
||
215 | $object = $this->stack->current(); |
||
216 | /** @var Message $message */ |
||
217 | $message = $object->message; |
||
218 | $message->setContentLength($incomingFrame->contentLength) |
||
219 | ->setContentType($incomingFrame->contentType) |
||
220 | ->setContentEncoding($incomingFrame->contentEncoding) |
||
221 | ->setHeaders($incomingFrame->headers) |
||
222 | ->setMessageId($incomingFrame->messageId) |
||
223 | ->setDeliveryMode($incomingFrame->deliveryMode) |
||
224 | ->setCorrelationId($incomingFrame->correlationId) |
||
225 | ->setReplyTo($incomingFrame->replyTo) |
||
226 | ->setExpiration($incomingFrame->expiration) |
||
227 | ->setTimestamp($incomingFrame->timestamp) |
||
228 | ->setType($incomingFrame->type) |
||
229 | ->setUserId($incomingFrame->userId) |
||
230 | ->setAppId($incomingFrame->appId) |
||
231 | ->setClusterId($incomingFrame->clusterId); |
||
232 | $object->totalPayloadSize = $incomingFrame->contentLength; |
||
233 | break; |
||
234 | case $incomingFrame instanceof BodyFrame: |
||
235 | $object = $this->stack->current(); |
||
236 | /** |
||
237 | * Use only php strlen because wee need string length in bytes |
||
238 | */ |
||
239 | $currentContentLength = strlen($incomingFrame->content); |
||
240 | |||
241 | /** @var Message $message */ |
||
242 | $message = $object->message; |
||
243 | $message->setContent($message->getContent() . $incomingFrame->content); |
||
244 | |||
245 | $object->totalPayloadSize -= $currentContentLength; |
||
246 | |||
247 | if ($object->totalPayloadSize === 0) { |
||
248 | if (isset($object->consumerTag)) { |
||
249 | $this->trigger(self::EVENT_DISPATCH_CONSUMER_MESSAGE, $object->consumerTag, $message); |
||
250 | } else { |
||
251 | $this->triggerOneAndUnbind(self::EVENT_DISPATCH_MESSAGE, $message); |
||
252 | } |
||
253 | $this->stack->detach($object); |
||
254 | } |
||
255 | break; |
||
256 | case $incomingFrame instanceof Basic\BasicGetEmptyFrame: |
||
257 | $this->triggerOneAndUnbind(self::EVENT_DISPATCH_MESSAGE, false); |
||
258 | break; |
||
259 | case $incomingFrame instanceof Basic\BasicCancelOkFrame: |
||
260 | unset($this->consumers[$incomingFrame->consumerTag]); |
||
261 | break; |
||
262 | |||
263 | case $incomingFrame instanceof ProtocolChannel\ChannelOpenOkFrame: |
||
264 | $this->isConnected = true; |
||
265 | $this->id = $incomingFrame->frameChannelId; |
||
266 | //write QoS |
||
267 | $outputFrame = Basic\BasicQosFrame::create( |
||
268 | self::QOS_PREFECTH_SIZE, |
||
269 | self::QOS_PREFETCH_COUNT |
||
270 | ); |
||
271 | $outputFrame->frameChannelId = $incomingFrame->frameChannelId; |
||
272 | $this->connection->command($outputFrame); |
||
273 | break; |
||
274 | case $incomingFrame instanceof ProtocolChannel\ChannelCloseFrame: |
||
275 | $this->trigger(self::EVENT_ON_CHANNEL_CLOSE_CALLBACK); |
||
276 | Daemon::log(sprintf('[AMQP] Channel closed by broker. Reason: %s[%d]', $incomingFrame->replyText, $incomingFrame->replyCode)); |
||
277 | $this->isConnected = false; |
||
278 | break; |
||
279 | case $incomingFrame instanceof Basic\BasicQosOkFrame: |
||
280 | $this->trigger(self::EVENT_ON_CHANNEL_OPEN_CALLBACK, $this); |
||
281 | break; |
||
282 | case $incomingFrame instanceof Basic\BasicConsumeOkFrame: |
||
283 | $this->triggerOneAndUnbind(self::EVENT_ON_CHANNEL_CONSUMEOK_CALLBACK, $incomingFrame); |
||
284 | break; |
||
285 | case $incomingFrame instanceof ProtocolChannel\ChannelCloseOkFrame: |
||
286 | $this->isConnected = false; |
||
287 | break; |
||
288 | case $incomingFrame instanceof Queue\QueueDeclareOkFrame: |
||
289 | $this->triggerOneAndUnbind(self::EVENT_ON_CHANNEL_DECLARE_QUEUE_CALLBACK); |
||
290 | break; |
||
291 | case $incomingFrame instanceof Queue\QueueDeleteOkFrame: |
||
292 | $this->triggerOneAndUnbind(self::EVENT_ON_CHANNEL_DELETE_QUEUE_CALLBACK); |
||
293 | break; |
||
294 | case $incomingFrame instanceof Queue\QueuePurgeOkFrame: |
||
295 | $this->triggerOneAndUnbind(self::EVENT_ON_CHANNEL_PURGE_QUEUE_CALLBACK); |
||
296 | break; |
||
297 | case $incomingFrame instanceof Queue\QueueBindOkFrame: |
||
298 | $this->triggerOneAndUnbind(self::EVENT_ON_CHANNEL_BIND_QUEUE_CALLBACK); |
||
299 | break; |
||
300 | case $incomingFrame instanceof Queue\QueueUnbindOkFrame: |
||
301 | $this->triggerOneAndUnbind(self::EVENT_ON_CHANNEL_UNBIND_QUEUE_CALLBACK); |
||
302 | break; |
||
303 | case $incomingFrame instanceof Exchange\ExchangeDeclareOkFrame: |
||
304 | $this->triggerOneAndUnbind(self::EVENT_ON_CHANNEL_DECLARE_EXCHANGE_CALLBACK); |
||
305 | break; |
||
306 | case $incomingFrame instanceof Exchange\ExchangeDeleteOkFrame: |
||
307 | $this->triggerOneAndUnbind(self::EVENT_ON_CHANNEL_DELETE_EXCHANGE_CALLBACK); |
||
308 | break; |
||
309 | case $incomingFrame instanceof Exchange\ExchangeBindOkFrame: |
||
310 | $this->triggerOneAndUnbind(self::EVENT_ON_CHANNEL_BIND_EXCHANGE_CALLBACK); |
||
311 | break; |
||
312 | case $incomingFrame instanceof Exchange\ExchangeUnbindOkFrame: |
||
313 | $this->triggerOneAndUnbind(self::EVENT_ON_CHANNEL_UNBIND_EXCHANGE_CALLBACK); |
||
314 | break; |
||
315 | } |
||
316 | } |
||
317 | |||
318 | /** |
||
319 | * Close the channel. |
||
320 | * |
||
321 | * @throws \InvalidArgumentException |
||
322 | * @throws \PHPDaemon\Clients\AMQP\Driver\Protocol\Exception\AMQPProtocolException |
||
323 | */ |
||
324 | public function close() |
||
325 | { |
||
326 | $outputFrame = ProtocolChannel\ChannelCloseFrame::create( |
||
327 | 0,//@todo replyCode |
||
328 | 'Channel closed by client' |
||
329 | ); |
||
330 | $outputFrame->frameChannelId = $this->id; |
||
331 | $this->connection->command($outputFrame); |
||
332 | } |
||
333 | |||
334 | /** |
||
335 | * Check queue |
||
336 | * |
||
337 | * @param $name |
||
338 | * @param array $options |
||
339 | * @param callable|null $callback |
||
340 | */ |
||
341 | public function checkQueue($name, array $options = [], callable $callback = null) |
||
0 ignored issues
–
show
|
|||
342 | { |
||
343 | //@todo implement this |
||
344 | } |
||
345 | |||
346 | /** |
||
347 | * DeclareQueue |
||
348 | * |
||
349 | * @param string $name a quque name |
||
350 | * @param array $options a queue options |
||
351 | * @param callable|null $callback |
||
352 | * @throws \InvalidArgumentException |
||
353 | * @throws \PHPDaemon\Clients\AMQP\Driver\Protocol\Exception\AMQPProtocolException |
||
354 | */ |
||
355 | public function declareQueue($name, array $options = [], callable $callback = null) |
||
356 | { |
||
357 | $passive = array_key_exists('passive', $options) ? (bool)$options['passive'] : null; |
||
358 | $durable = array_key_exists('durable', $options) ? (bool)$options['durable'] : null; |
||
359 | $exclusive = array_key_exists('exclusive', $options) ? (bool)$options['exclusive'] : null; |
||
360 | $autoDelete = array_key_exists('autoDelete', $options) ? (bool)$options['autoDelete'] : null; |
||
361 | $noWait = array_key_exists('noWait', $options) ? (bool)$options['noWait'] : null; |
||
362 | $arguments = array_key_exists('arguments', $options) ? $options['arguments'] : null; |
||
363 | |||
364 | $outputFrame = Queue\QueueDeclareFrame::create( |
||
365 | $name, |
||
366 | $passive, |
||
367 | $durable, |
||
368 | $exclusive, |
||
369 | $autoDelete, |
||
370 | $noWait, |
||
371 | $arguments |
||
372 | ); |
||
373 | $outputFrame->frameChannelId = $this->id; |
||
374 | $this->connection->command($outputFrame); |
||
375 | |||
376 | if (is_callable($callback)) { |
||
377 | $this->on(self::EVENT_ON_CHANNEL_DECLARE_QUEUE_CALLBACK, $callback); |
||
378 | } |
||
379 | } |
||
380 | |||
381 | /** |
||
382 | * Delete Queue |
||
383 | * |
||
384 | * @param string $name a queue name |
||
385 | * @param array $options a options array |
||
386 | * @param callable|null $callback |
||
387 | * @throws \InvalidArgumentException |
||
388 | * @throws \PHPDaemon\Clients\AMQP\Driver\Protocol\Exception\AMQPProtocolException |
||
389 | */ |
||
390 | public function deleteQueue($name, array $options = [], callable $callback = null) |
||
391 | { |
||
392 | $ifUnused = array_key_exists('ifUnused', $options) ? (bool)$options['ifUnused'] : null; |
||
393 | $ifEmpty = array_key_exists('ifEmpty', $options) ? (bool)$options['ifEmpty'] : null; |
||
394 | $noWait = array_key_exists('noWait', $options) ? (bool)$options['noWait'] : null; |
||
395 | |||
396 | $outputFrame = Queue\QueueDeleteFrame::create($name, $ifUnused, $ifEmpty, $noWait); |
||
397 | $outputFrame->frameChannelId = $this->id; |
||
398 | $this->connection->command($outputFrame); |
||
399 | |||
400 | if (is_callable($callback)) { |
||
401 | $this->on(self::EVENT_ON_CHANNEL_DELETE_QUEUE_CALLBACK, $callback); |
||
402 | } |
||
403 | } |
||
404 | |||
405 | /** |
||
406 | * Purge queue messages |
||
407 | * |
||
408 | * @param string $name a queue name |
||
409 | * @param array $options a options array |
||
410 | * @param callable|null $callback |
||
411 | * @throws \InvalidArgumentException |
||
412 | * @throws \PHPDaemon\Clients\AMQP\Driver\Protocol\Exception\AMQPProtocolException |
||
413 | */ |
||
414 | View Code Duplication | public function purgeQueue($name, array $options = [], callable $callback = null) |
|
0 ignored issues
–
show
This method seems to be duplicated in your project.
Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation. You can also find more detailed suggestions in the “Code” section of your repository. ![]() |
|||
415 | { |
||
416 | $noWait = array_key_exists('noWait', $options) ? (bool)$options['noWait'] : null; |
||
417 | |||
418 | $outputFrame = Queue\QueuePurgeFrame::create($name, $noWait); |
||
419 | $outputFrame->frameChannelId = $this->id; |
||
420 | $this->connection->command($outputFrame); |
||
421 | |||
422 | if (is_callable($callback)) { |
||
423 | $this->on(self::EVENT_ON_CHANNEL_PURGE_QUEUE_CALLBACK, $callback); |
||
424 | } |
||
425 | } |
||
426 | |||
427 | /** |
||
428 | * Bind queue to exchange |
||
429 | * |
||
430 | * @param string $name a queue name |
||
431 | * @param string $exchangeName a exchange name |
||
432 | * @param string $routingKey a routing key |
||
433 | * @param array $options additional options |
||
434 | * @param callable|null $callback |
||
435 | * @throws \InvalidArgumentException |
||
436 | * @throws \PHPDaemon\Clients\AMQP\Driver\Protocol\Exception\AMQPProtocolException |
||
437 | */ |
||
438 | View Code Duplication | public function bindQueue($name, $exchangeName, $routingKey, array $options = [], callable $callback = null) |
|
0 ignored issues
–
show
This method seems to be duplicated in your project.
Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation. You can also find more detailed suggestions in the “Code” section of your repository. ![]() |
|||
439 | { |
||
440 | $noWait = array_key_exists('noWait', $options) ? (bool)$options['noWait'] : null; |
||
441 | $arguments = array_key_exists('arguments', $options) ? $options['arguments'] : null; |
||
442 | |||
443 | $outputFrame = Queue\QueueBindFrame::create( |
||
444 | $name, |
||
445 | $exchangeName, |
||
446 | $routingKey, |
||
447 | $noWait, |
||
448 | $arguments |
||
449 | ); |
||
450 | $outputFrame->frameChannelId = $this->id; |
||
451 | $this->connection->command($outputFrame); |
||
452 | |||
453 | if (is_callable($callback)) { |
||
454 | $this->on(self::EVENT_ON_CHANNEL_BIND_QUEUE_CALLBACK, $callback); |
||
455 | } |
||
456 | } |
||
457 | |||
458 | /** |
||
459 | * Unbind queue from exchange |
||
460 | * |
||
461 | * @param string $name a queue name |
||
462 | * @param string $exchangeName a exchange name |
||
463 | * @param string $routingKey a routing key |
||
464 | * @param callable|null $callback |
||
465 | * @throws \InvalidArgumentException |
||
466 | * @throws \PHPDaemon\Clients\AMQP\Driver\Protocol\Exception\AMQPProtocolException |
||
467 | */ |
||
468 | public function unbindQueue($name, $exchangeName, $routingKey, callable $callback = null) |
||
469 | { |
||
470 | $outputFrame = Queue\QueueUnbindFrame::create( |
||
471 | $name, |
||
472 | $exchangeName, |
||
473 | $routingKey |
||
474 | ); |
||
475 | $outputFrame->frameChannelId = $this->id; |
||
476 | $this->connection->command($outputFrame); |
||
477 | |||
478 | if (is_callable($callback)) { |
||
479 | $this->on(self::EVENT_ON_CHANNEL_UNBIND_QUEUE_CALLBACK, $callback); |
||
480 | } |
||
481 | } |
||
482 | |||
483 | /** |
||
484 | * Declare exchange |
||
485 | * |
||
486 | * @param string $name a name of exchange |
||
487 | * @param array $options exchange options |
||
488 | * @param callable|null $callback |
||
489 | * @throws \InvalidArgumentException |
||
490 | * @throws \PHPDaemon\Clients\AMQP\Driver\Protocol\Exception\AMQPProtocolException |
||
491 | */ |
||
492 | public function declareExchange($name, array $options = [], callable $callback = null) |
||
493 | { |
||
494 | $type = array_key_exists('type', $options) ? $options['type'] : null; |
||
495 | $passive = array_key_exists('passive', $options) ? (bool)$options['passive'] : null; |
||
496 | $durable = array_key_exists('durable', $options) ? (bool)$options['durable'] : null; |
||
497 | $internal = array_key_exists('internal', $options) ? (bool)$options['internal'] : null; |
||
498 | $autoDelete = array_key_exists('autoDelete', $options) ? (bool)$options['autoDelete'] : null; |
||
499 | $noWait = array_key_exists('noWait', $options) ? (bool)$options['noWait'] : null; |
||
500 | $arguments = array_key_exists('arguments', $options) ? $options['arguments'] : null; |
||
501 | |||
502 | $outputFrame = Exchange\ExchangeDeclareFrame::create( |
||
503 | $name, |
||
504 | $type, |
||
505 | $passive, |
||
506 | $durable, |
||
507 | $autoDelete, |
||
508 | $internal, |
||
509 | $noWait, |
||
510 | $arguments |
||
511 | ); |
||
512 | $outputFrame->frameChannelId = $this->id; |
||
513 | $this->connection->command($outputFrame); |
||
514 | |||
515 | if (is_callable($callback)) { |
||
516 | $this->on(self::EVENT_ON_CHANNEL_DECLARE_EXCHANGE_CALLBACK, $callback); |
||
517 | } |
||
518 | } |
||
519 | |||
520 | /** |
||
521 | * Delete Exchange |
||
522 | * |
||
523 | * @param string $name a exchange name |
||
524 | * @param array $options a exchange options |
||
525 | * @param callable|null $callback |
||
526 | * @throws \InvalidArgumentException |
||
527 | * @throws \PHPDaemon\Clients\AMQP\Driver\Protocol\Exception\AMQPProtocolException |
||
528 | */ |
||
529 | View Code Duplication | public function deleteExchange($name, array $options = [], callable $callback = null) |
|
0 ignored issues
–
show
This method seems to be duplicated in your project.
Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation. You can also find more detailed suggestions in the “Code” section of your repository. ![]() |
|||
530 | { |
||
531 | $ifUnused = array_key_exists('ifUnused', $options) ? (bool)$options['ifUnused'] : null; |
||
532 | $noWait = array_key_exists('noWait', $options) ? (bool)$options['noWait'] : null; |
||
533 | |||
534 | $outputFrame = Exchange\ExchangeDeleteFrame::create($name, $ifUnused, $noWait); |
||
535 | $outputFrame->frameChannelId = $this->id; |
||
536 | $this->connection->command($outputFrame); |
||
537 | |||
538 | if (is_callable($callback)) { |
||
539 | $this->on(self::EVENT_ON_CHANNEL_DELETE_EXCHANGE_CALLBACK, $callback); |
||
540 | } |
||
541 | } |
||
542 | |||
543 | /** |
||
544 | * Bind exchange |
||
545 | * |
||
546 | * @param string $name a source exchange name |
||
547 | * @param string $exchangeName a destination exchange name |
||
548 | * @param string $routingKey a routing key |
||
549 | * @param array $options |
||
550 | * @param callable|null $callback |
||
551 | * @throws \InvalidArgumentException |
||
552 | * @throws \PHPDaemon\Clients\AMQP\Driver\Protocol\Exception\AMQPProtocolException |
||
553 | * @throws \PHPDaemon\Clients\AMQP\Driver\Exception\AMQPChannelException |
||
554 | */ |
||
555 | View Code Duplication | public function bindExchange($name, $exchangeName, $routingKey, array $options = [], callable $callback = null) |
|
0 ignored issues
–
show
This method seems to be duplicated in your project.
Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation. You can also find more detailed suggestions in the “Code” section of your repository. ![]() |
|||
556 | { |
||
557 | if (!$this->connection->getFeatures()->exchangeToExchangeBindings) { |
||
558 | throw new AMQPChannelException('Broker does not support exchange to exchange bindings'); |
||
559 | } |
||
560 | |||
561 | if ($exchangeName === $name) { |
||
562 | throw new AMQPChannelException('Exchange cannot bind to itself'); |
||
563 | } |
||
564 | |||
565 | $noWait = array_key_exists('noWait', $options) ? $options['noWait'] : false; |
||
566 | |||
567 | $outputFrame = Exchange\ExchangeBindFrame::create( |
||
568 | $name, |
||
569 | $exchangeName, |
||
570 | $routingKey, |
||
571 | $noWait, |
||
572 | $options |
||
573 | ); |
||
574 | $outputFrame->frameChannelId = $this->id; |
||
575 | $this->connection->command($outputFrame); |
||
576 | |||
577 | if (is_callable($callback)) { |
||
578 | $this->on(self::EVENT_ON_CHANNEL_BIND_EXCHANGE_CALLBACK, $callback); |
||
579 | } |
||
580 | } |
||
581 | |||
582 | /** |
||
583 | * Unbind exchange |
||
584 | * |
||
585 | * @param string $name a source exchange name |
||
586 | * @param string $exchangeName a destination exchange name |
||
587 | * @param string $routingKey a routing key |
||
588 | * @param array $options |
||
589 | * @param callable|null $callback |
||
590 | * @throws \InvalidArgumentException |
||
591 | * @throws \PHPDaemon\Clients\AMQP\Driver\Exception\AMQPChannelException |
||
592 | * @throws \PHPDaemon\Clients\AMQP\Driver\Protocol\Exception\AMQPProtocolException |
||
593 | */ |
||
594 | View Code Duplication | public function unbindExchange($name, $exchangeName, $routingKey, array $options = [], callable $callback = null) |
|
0 ignored issues
–
show
This method seems to be duplicated in your project.
Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation. You can also find more detailed suggestions in the “Code” section of your repository. ![]() |
|||
595 | { |
||
596 | if (!$this->connection->getFeatures()->exchangeToExchangeBindings) { |
||
597 | throw new AMQPChannelException('Broker does not support exchange to exchange bindings'); |
||
598 | } |
||
599 | |||
600 | if ($exchangeName === $name) { |
||
601 | throw new AMQPChannelException('Exchange cannot unbind itself'); |
||
602 | } |
||
603 | |||
604 | $noWait = array_key_exists('noWait', $options) ? $options['noWait'] : false; |
||
605 | |||
606 | $outputFrame = Exchange\ExchangeUnbindFrame::create( |
||
607 | $name, |
||
608 | $exchangeName, |
||
609 | $routingKey, |
||
610 | $noWait, |
||
611 | $options |
||
612 | ); |
||
613 | $outputFrame->frameChannelId = $this->id; |
||
614 | $this->connection->command($outputFrame); |
||
615 | |||
616 | if (is_callable($callback)) { |
||
617 | $this->on(self::EVENT_ON_CHANNEL_UNBIND_EXCHANGE_CALLBACK, $callback); |
||
618 | } |
||
619 | } |
||
620 | |||
621 | /** |
||
622 | * Publish message to exchange |
||
623 | * |
||
624 | * @param string $content The message content |
||
625 | * @param string $exchangeName exchange name |
||
626 | * @param string $routingKey routing key |
||
627 | * @param array $options |
||
628 | * @throws \InvalidArgumentException |
||
629 | * @throws \PHPDaemon\Clients\AMQP\Driver\Protocol\Exception\AMQPProtocolException |
||
630 | */ |
||
631 | public function publish($content, $exchangeName, $routingKey, array $options = []) |
||
632 | { |
||
633 | /** |
||
634 | * Нам нужно собрать докучи три фрейма |
||
635 | * 1. BasicPublishFrame сообщает брокеру , что будет чтото передавать. |
||
636 | * 2. BasicHeaderFrame сообщает брокеру заголовки отправляемого сообщения |
||
637 | * 3. BodyFrame содержит контент сообщения . Отправляется этот фрейм пачками по $this->channel->getConnection()->getMaximumFrameSize() |
||
638 | */ |
||
639 | $outputBasicPublishFrame = Basic\BasicPublishFrame::create( |
||
640 | $exchangeName, $routingKey |
||
641 | ); |
||
642 | $outputBasicPublishFrame->frameChannelId = $this->id; |
||
643 | $this->connection->command($outputBasicPublishFrame); |
||
644 | |||
645 | $outputBasicHeaderFrame = new Basic\BasicHeaderFrame(); |
||
646 | $outputBasicHeaderFrame->frameChannelId = $this->id; |
||
647 | $outputBasicHeaderFrame->contentLength = array_key_exists('contentLength', $options) ? $options['contentLength'] : null; |
||
648 | $outputBasicHeaderFrame->contentType = array_key_exists('contentType', $options) ? $options['contentType'] : null; |
||
649 | $outputBasicHeaderFrame->contentEncoding = array_key_exists('contentEncoding', $options) ? $options['contentEncoding'] : null; |
||
650 | $outputBasicHeaderFrame->headers = array_key_exists('headers', $options) ? $options['headers'] : null; |
||
651 | $outputBasicHeaderFrame->messageId = array_key_exists('messageId', $options) ? $options['messageId'] : null; |
||
652 | $outputBasicHeaderFrame->deliveryMode = array_key_exists('deliveryMode', $options) ? $options['deliveryMode'] : null; |
||
653 | $outputBasicHeaderFrame->correlationId = array_key_exists('correlationId', $options) ? $options['correlationId'] : null; |
||
654 | $outputBasicHeaderFrame->replyTo = array_key_exists('replyTo', $options) ? $options['replyTo'] : null; |
||
655 | $outputBasicHeaderFrame->expiration = array_key_exists('expiration', $options) ? $options['expiration'] : null; |
||
656 | $outputBasicHeaderFrame->timestamp = array_key_exists('timestamp', $options) ? $options['timestamp'] : null; |
||
657 | $outputBasicHeaderFrame->type = array_key_exists('type', $options) ? $options['type'] : null; |
||
658 | $outputBasicHeaderFrame->userId = array_key_exists('userId', $options) ? $options['userId'] : null; |
||
659 | $outputBasicHeaderFrame->appId = array_key_exists('appId', $options) ? $options['appId'] : null; |
||
660 | $outputBasicHeaderFrame->clusterId = array_key_exists('clusterId', $options) ? $options['clusterId'] : null; |
||
661 | |||
662 | $fInfo = new \finfo(); |
||
663 | if (null === $outputBasicHeaderFrame->contentType) { |
||
664 | $outputBasicHeaderFrame->contentType = $fInfo->buffer($content, FILEINFO_MIME_TYPE); |
||
665 | } |
||
666 | if (null === $outputBasicHeaderFrame->contentEncoding) { |
||
667 | $outputBasicHeaderFrame->contentEncoding = $fInfo->buffer($content, FILEINFO_MIME_ENCODING); |
||
668 | } |
||
669 | unset($fInfo); |
||
670 | |||
671 | if (null === $outputBasicHeaderFrame->contentLength) { |
||
672 | $outputBasicHeaderFrame->contentLength = strlen($content); |
||
673 | } |
||
674 | $this->connection->command($outputBasicHeaderFrame); |
||
675 | |||
676 | $maxFrameSize = $this->connection->getMaximumFrameSize(); |
||
677 | $length = $outputBasicHeaderFrame->contentLength; |
||
678 | |||
679 | $contentBuffer = $content; |
||
680 | while ($length) { |
||
681 | $outputBodyFrame = new BodyFrame(); |
||
682 | $outputBodyFrame->frameChannelId = $this->id; |
||
683 | |||
684 | if ($length <= $maxFrameSize) { |
||
685 | $outputBodyFrame->content = $contentBuffer; |
||
686 | $contentBuffer = ''; |
||
687 | $length = 0; |
||
688 | } else { |
||
689 | $outputBodyFrame->content = substr($contentBuffer, 0, $maxFrameSize); |
||
690 | $contentBuffer = substr($contentBuffer, $maxFrameSize); |
||
691 | $length -= $maxFrameSize; |
||
692 | } |
||
693 | $this->connection->command($outputBodyFrame); |
||
694 | } |
||
695 | } |
||
696 | |||
697 | /** |
||
698 | * Send message directly to queue |
||
699 | * |
||
700 | * @param string $content a message content |
||
701 | * @param string $name a queue name |
||
702 | * @param array $options |
||
703 | * @throws \InvalidArgumentException |
||
704 | * @throws \PHPDaemon\Clients\AMQP\Driver\Protocol\Exception\AMQPProtocolException |
||
705 | */ |
||
706 | public function sendToQueue($content, $name, array $options = []) |
||
707 | { |
||
708 | $this->publish($content, '', $name, $options); |
||
709 | } |
||
710 | |||
711 | /** |
||
712 | * Bind a consumer to consume on message receive |
||
713 | * |
||
714 | * @param string $queueName a queue name |
||
715 | * @param array $options |
||
716 | * @param callable $callback |
||
717 | * @throws \InvalidArgumentException |
||
718 | * @throws \PHPDaemon\Clients\AMQP\Driver\Protocol\Exception\AMQPProtocolException |
||
719 | */ |
||
720 | public function consume($queueName, array $options = [], callable $callback) |
||
721 | { |
||
722 | $consumerTag = array_key_exists('consumerTag', $options) ? $options['consumerTag'] : null; |
||
723 | $noLocal = array_key_exists('noLocal', $options) ? (bool)$options['noLocal'] : null; |
||
724 | $noAck = array_key_exists('noAck', $options) ? (bool)$options['noAck'] : null; |
||
725 | $exclusive = array_key_exists('exclusive', $options) ? (bool)$options['exclusive'] : null; |
||
726 | $noWait = array_key_exists('noWait', $options) ? (bool)$options['noWait'] : null; |
||
727 | $arguments = array_key_exists('arguments', $options) ? $options['arguments'] : null; |
||
728 | |||
729 | $outputFrame = Basic\BasicConsumeFrame::create( |
||
730 | $queueName, |
||
731 | $consumerTag, |
||
732 | $noLocal, |
||
733 | $noAck, |
||
734 | $exclusive, |
||
735 | $noWait, |
||
736 | $arguments |
||
737 | ); |
||
738 | $outputFrame->frameChannelId = $this->id; |
||
739 | $this->connection->command($outputFrame); |
||
740 | |||
741 | if (is_callable($callback)) { |
||
742 | $this->on(self::EVENT_ON_CHANNEL_CONSUMEOK_CALLBACK, function (Basic\BasicConsumeOkFrame $incomingFrame) use ($callback) { |
||
743 | $this->consumers[$incomingFrame->consumerTag] = $callback; |
||
744 | }); |
||
745 | } |
||
746 | } |
||
747 | |||
748 | /** |
||
749 | * Unbind consumer |
||
750 | * |
||
751 | * @param string $consumerTag |
||
752 | * @param array $options |
||
753 | * @throws \InvalidArgumentException |
||
754 | * @throws \PHPDaemon\Clients\AMQP\Driver\Protocol\Exception\AMQPProtocolException |
||
755 | */ |
||
756 | View Code Duplication | public function cancel($consumerTag, array $options = []) |
|
0 ignored issues
–
show
This method seems to be duplicated in your project.
Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation. You can also find more detailed suggestions in the “Code” section of your repository. ![]() |
|||
757 | { |
||
758 | $noWait = array_key_exists('noWait', $options) ? $options['noWait'] : null; |
||
759 | |||
760 | $outputFrame = Basic\BasicCancelFrame::create($consumerTag, $noWait); |
||
761 | $outputFrame->frameChannelId = $this->id; |
||
762 | $this->connection->command($outputFrame); |
||
763 | } |
||
764 | |||
765 | /** |
||
766 | * get message from queue |
||
767 | * |
||
768 | * @param string $queueName a queue name |
||
769 | * @param array $options |
||
770 | * @param callable|null $callback |
||
771 | * @throws \InvalidArgumentException |
||
772 | * @throws \PHPDaemon\Clients\AMQP\Driver\Protocol\Exception\AMQPProtocolException |
||
773 | */ |
||
774 | View Code Duplication | public function get($queueName, array $options = [], callable $callback = null) |
|
0 ignored issues
–
show
This method seems to be duplicated in your project.
Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation. You can also find more detailed suggestions in the “Code” section of your repository. ![]() |
|||
775 | { |
||
776 | $noAck = array_key_exists('noAck', $options) ? $options['noAck'] : null; |
||
777 | |||
778 | $outputFrame = Basic\BasicGetFrame::create( |
||
779 | $queueName, |
||
780 | $noAck |
||
781 | ); |
||
782 | $outputFrame->frameChannelId = $this->id; |
||
783 | $this->connection->command($outputFrame); |
||
784 | |||
785 | if (is_callable($callback)) { |
||
786 | $this->on(self::EVENT_DISPATCH_MESSAGE, $callback); |
||
787 | } |
||
788 | } |
||
789 | |||
790 | /** |
||
791 | * Ack message by delivery tag |
||
792 | * |
||
793 | * @param int $deliveryTag |
||
794 | * @param array $options |
||
795 | * @throws \InvalidArgumentException |
||
796 | * @throws \PHPDaemon\Clients\AMQP\Driver\Protocol\Exception\AMQPProtocolException |
||
797 | */ |
||
798 | View Code Duplication | public function ack($deliveryTag, array $options = []) |
|
0 ignored issues
–
show
This method seems to be duplicated in your project.
Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation. You can also find more detailed suggestions in the “Code” section of your repository. ![]() |
|||
799 | { |
||
800 | $multiple = array_key_exists('multiple', $options) ? (int)$options['multiple'] : null; |
||
801 | |||
802 | $outputFrame = Basic\BasicAckFrame::create($deliveryTag, $multiple); |
||
803 | $outputFrame->frameChannelId = $this->id; |
||
804 | $this->connection->command($outputFrame); |
||
805 | } |
||
806 | |||
807 | /** |
||
808 | * Nack message |
||
809 | * |
||
810 | * @param $deliveryTag |
||
811 | * @param array $options |
||
812 | * @throws \InvalidArgumentException |
||
813 | * @throws \PHPDaemon\Clients\AMQP\Driver\Protocol\Exception\AMQPProtocolException |
||
814 | */ |
||
815 | public function nack($deliveryTag, array $options = []) |
||
816 | { |
||
817 | $multiple = array_key_exists('multiple', $options) ? (int)$options['multiple'] : null; |
||
818 | $requeue = array_key_exists('requeue', $options) ? (bool)$options['requeue'] : null; |
||
819 | |||
820 | $outputFrame = Basic\BasicNackFrame::create($deliveryTag, $multiple, $requeue); |
||
821 | $outputFrame->frameChannelId = $this->id; |
||
822 | $this->connection->command($outputFrame); |
||
823 | } |
||
824 | |||
825 | /** |
||
826 | * Reject a message |
||
827 | * |
||
828 | * @param $deliveryTag |
||
829 | * @param array $options |
||
830 | * @throws \InvalidArgumentException |
||
831 | * @throws \PHPDaemon\Clients\AMQP\Driver\Protocol\Exception\AMQPProtocolException |
||
832 | */ |
||
833 | View Code Duplication | public function reject($deliveryTag, array $options = []) |
|
0 ignored issues
–
show
This method seems to be duplicated in your project.
Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation. You can also find more detailed suggestions in the “Code” section of your repository. ![]() |
|||
834 | { |
||
835 | $requeue = array_key_exists('requeue', $options) ? $options['requeue'] : null; |
||
836 | |||
837 | $outputFrame = Basic\BasicRejectFrame::create($deliveryTag, $requeue); |
||
838 | $outputFrame->frameChannelId = $this->id; |
||
839 | $this->connection->command($outputFrame); |
||
840 | } |
||
841 | |||
842 | /** |
||
843 | * Redeliver unacknowledged messages. |
||
844 | * |
||
845 | * @param bool $requeue |
||
846 | * @throws \InvalidArgumentException |
||
847 | * @throws \PHPDaemon\Clients\AMQP\Driver\Protocol\Exception\AMQPProtocolException |
||
848 | */ |
||
849 | public function recover($requeue = true) |
||
850 | { |
||
851 | $outputFrame = Basic\BasicRecoverFrame::create($requeue); |
||
852 | $outputFrame->frameChannelId = $this->id; |
||
853 | $this->connection->command($outputFrame); |
||
854 | } |
||
855 | |||
856 | /** |
||
857 | * @return mixed |
||
858 | */ |
||
859 | public function getId() |
||
860 | { |
||
861 | return $this->id; |
||
862 | } |
||
863 | |||
864 | /** |
||
865 | * @return Connection |
||
866 | */ |
||
867 | public function getConnection() |
||
868 | { |
||
869 | return $this->connection; |
||
870 | } |
||
871 | |||
872 | /** |
||
873 | * @return bool |
||
874 | */ |
||
875 | public function isConnected() |
||
876 | { |
||
877 | return $this->isConnected; |
||
878 | } |
||
879 | |||
880 | /** |
||
881 | * @return $this |
||
882 | */ |
||
883 | private function triggerOneAndUnbind() |
||
884 | { |
||
885 | $args = func_get_args(); |
||
886 | $name = array_shift($args); |
||
887 | if ($this->addThisToEvents) { |
||
888 | array_unshift($args, $this); |
||
889 | } |
||
890 | if (isset($this->eventHandlers[$name])) { |
||
891 | $cb = array_shift($this->eventHandlers[$name]); |
||
892 | if ($cb(...$args) === true) { |
||
893 | return $this; |
||
894 | } |
||
895 | } |
||
896 | return $this; |
||
897 | } |
||
898 | } |
||
899 |
This check looks from parameters that have been defined for a function or method, but which are not used in the method body.