Channel::publish()   F
last analyzed

Complexity

Conditions 20
Paths > 20000

Size

Total Lines 65

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 20
dl 0
loc 65
rs 0
c 0
b 0
f 0
nc 393216
nop 4

How to fix   Long Method    Complexity   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

1
<?php
2
3
namespace PHPDaemon\Clients\AMQP;
4
5
use PHPDaemon\Clients\AMQP\Driver\Exception\AMQPChannelException;
6
use PHPDaemon\Clients\AMQP\Driver\Protocol\v091\Protocol\Basic;
7
use PHPDaemon\Clients\AMQP\Driver\Protocol\v091\Protocol\BodyFrame;
8
use PHPDaemon\Clients\AMQP\Driver\Protocol\v091\Protocol\Channel as ProtocolChannel;
9
use PHPDaemon\Clients\AMQP\Driver\Protocol\v091\Protocol\Exchange;
10
use PHPDaemon\Clients\AMQP\Driver\Protocol\v091\Protocol\IncomingFrame;
11
use PHPDaemon\Clients\AMQP\Driver\Protocol\v091\Protocol\Queue;
12
use PHPDaemon\Core\Daemon;
13
use PHPDaemon\Traits\EventHandlers;
14
15
/**
16
 * Class Channel
17
 * @author Aleksey I. Kuleshov YOU GLOBAL LIMITED
18
 * @package PHPDaemon\Clients\AMQP
19
 */
20
class Channel
21
{
22
    use EventHandlers;
23
24
    /**
25
     * The QOS prefetch count value
26
     */
27
    const QOS_PREFETCH_COUNT = 3;
28
29
    /**
30
     * The QOS prefetch size value
31
     */
32
    const QOS_PREFECTH_SIZE = 0;
33
34
    /**
35
     * This event raised on channel open
36
     */
37
    const EVENT_ON_CHANNEL_OPEN_CALLBACK = 'event.on.channel.open.callback';
38
39
    /**
40
     *  This event raised on channel close
41
     */
42
    const EVENT_ON_CHANNEL_CLOSE_CALLBACK = 'event.on.channel.close.callback';
43
44
    /**
45
     * This event raised on channel consume ok frame
46
     */
47
    const EVENT_ON_CHANNEL_CONSUMEOK_CALLBACK = 'event.on.channel.consumeOk.callback';
48
49
    /**
50
     * This event raised on queue declare confirmation
51
     */
52
    const EVENT_ON_CHANNEL_DECLARE_QUEUE_CALLBACK = 'event.channel.dispatch.declareQueue.callback';
53
54
    /**
55
     * This event raised on queue delete confirmation
56
     */
57
    const EVENT_ON_CHANNEL_DELETE_QUEUE_CALLBACK = 'event.on.channel.deleteQueue.callback';
58
59
    /**
60
     * This event raised on queue purge confirmation
61
     */
62
    const EVENT_ON_CHANNEL_PURGE_QUEUE_CALLBACK = 'event.on.channel.purgeQueue.callback';
63
64
    /**
65
     * This event raised on queue bind confirmation
66
     */
67
    const EVENT_ON_CHANNEL_BIND_QUEUE_CALLBACK = 'event.on.channel.bindQueue.callback';
68
69
    /**
70
     * This event raised on queue unbind confirmation
71
     */
72
    const EVENT_ON_CHANNEL_UNBIND_QUEUE_CALLBACK = 'event.on.channel.unbindQueue.callback';
73
74
    /**
75
     * This event raised on BasicGet message income
76
     */
77
    const EVENT_DISPATCH_MESSAGE = 'event.channel.dispatch.message';
78
79
    /**
80
     * This event raised on BasicConsume message income
81
     */
82
    const EVENT_DISPATCH_CONSUMER_MESSAGE = 'event.channel.dispatch.consumer.message';
83
84
    /**
85
     * This event raised on exchange declare confirmation
86
     */
87
    const EVENT_ON_CHANNEL_DECLARE_EXCHANGE_CALLBACK = 'event.channel.dispatch.declareExchange.callback';
88
89
    /**
90
     * This event raised on exchange delete confirmation
91
     */
92
    const EVENT_ON_CHANNEL_DELETE_EXCHANGE_CALLBACK = 'event.channel.dispatch.deleteExchange.callback';
93
94
    /**
95
     * This event raised on exchange bind confirmation
96
     */
97
    const EVENT_ON_CHANNEL_BIND_EXCHANGE_CALLBACK = 'event.channel.dispatch.bindExchange.callback';
98
99
    /**
100
     * This event raised on exchange unbind confirmation
101
     */
102
    const EVENT_ON_CHANNEL_UNBIND_EXCHANGE_CALLBACK = 'event.channel.dispatch.unbindExchange.callback';
103
104
    /**
105
     * @var Connection
106
     */
107
    protected $connection;
108
109
    /**
110
     * @var int
111
     */
112
    protected $id;
113
114
    /**
115
     * @var array
116
     */
117
    protected $consumers = [];
118
119
    /**
120
     * @var
121
     */
122
    private $stack;
123
124
    /**
125
     * @var bool
126
     */
127
    private $isConnected;
128
129
    /**
130
     * AMQPChannel constructor.
131
     * @param Connection $connection
132
     * @param callable|null $callback
133
     * @throws \InvalidArgumentException
134
     * @throws \PHPDaemon\Clients\AMQP\Driver\Protocol\Exception\AMQPProtocolException
135
     * @throws \PHPDaemon\Clients\AMQP\Driver\Exception\AMQPConnectionException
136
     */
137
    public function __construct(Connection $connection, callable $callback = null)
138
    {
139
        $this->connection = $connection;
140
        $this->addThisToEvents = false;
141
142
        $outputFrame = new ProtocolChannel\ChannelOpenFrame();
143
        $outputFrame->frameChannelId = $this->connection->findChannelId();
144
        $this->connection->addChannel($outputFrame->frameChannelId, $this);
145
146
        $this->connection->command($outputFrame);
147
148
        $this->on(ProtocolChannel\ChannelOpenOkFrame::class, [$this, 'dispatch']);
149
        $this->on(ProtocolChannel\ChannelCloseFrame::class, [$this, 'dispatch']);
150
        $this->on(ProtocolChannel\ChannelCloseOkFrame::class, [$this, 'dispatch']);
151
152
        $this->on(Basic\BasicQosOkFrame::class, [$this, 'dispatch']);
153
        $this->on(Basic\BasicCancelOkFrame::class, [$this, 'dispatch']);
154
        $this->on(Basic\BasicDeliverFrame::class, [$this, 'dispatch']);
155
        $this->on(Basic\BasicGetOkFrame::class, [$this, 'dispatch']);
156
        $this->on(Basic\BasicHeaderFrame::class, [$this, 'dispatch']);
157
        $this->on(Basic\BasicGetEmptyFrame::class, [$this, 'dispatch']);
158
        $this->on(Basic\BasicConsumeOkFrame::class, [$this, 'dispatch']);
159
160
        $this->on(BodyFrame::class, [$this, 'dispatch']);
161
162
        $this->on(Queue\QueueDeclareOkFrame::class, [$this, 'dispatch']);
163
        $this->on(Queue\QueueDeleteOkFrame::class, [$this, 'dispatch']);
164
        $this->on(Queue\QueuePurgeOkFrame::class, [$this, 'dispatch']);
165
        $this->on(Queue\QueueBindOkFrame::class, [$this, 'dispatch']);
166
        $this->on(Queue\QueueUnbindOkFrame::class, [$this, 'dispatch']);
167
168
        $this->on(Exchange\ExchangeDeclareOkFrame::class, [$this, 'dispatch']);
169
        $this->on(Exchange\ExchangeDeleteOkFrame::class, [$this, 'dispatch']);
170
        $this->on(Exchange\ExchangeBindOkFrame::class, [$this, 'dispatch']);
171
        $this->on(Exchange\ExchangeUnbindOkFrame::class, [$this, 'dispatch']);
172
173
        $this->on(self::EVENT_DISPATCH_CONSUMER_MESSAGE, function ($consumerTag, $message) {
174
            if (array_key_exists($consumerTag, $this->consumers)) {
175
                $consumer = $this->consumers[$consumerTag];
176
                $consumer($message);
177
                return;
178
            }
179
        });
180
181
        $this->on(self::EVENT_ON_CHANNEL_OPEN_CALLBACK, function ($channel) use ($callback) {
182
            if (is_callable($callback)) {
183
                $callback($channel);
184
            }
185
        });
186
187
        $this->stack = new \SplObjectStorage();
188
    }
189
190
    /**
191
     * Dispatch incoming frame
192
     * @param IncomingFrame $incomingFrame
193
     * @throws \InvalidArgumentException
194
     * @throws \PHPDaemon\Clients\AMQP\Driver\Protocol\Exception\AMQPProtocolException
195
     */
196
    public function dispatch(IncomingFrame $incomingFrame)
197
    {
198
        switch (true) {
199
            case $incomingFrame instanceof Basic\BasicDeliverFrame:
200
            case $incomingFrame instanceof Basic\BasicGetOkFrame:
201
                $message = new Message();
202
                $message->setRoutingKey($incomingFrame->routingKey)
203
                    ->setExchange($incomingFrame->exchange)
204
                    ->setTag($incomingFrame->deliveryTag)
205
                    ->setChannel($this);
206
                $object = new \stdClass();
207
                if ($incomingFrame instanceof Basic\BasicDeliverFrame) {
208
                    $object->consumerTag = $incomingFrame->consumerTag;
209
                }
210
                $object->message = $message;
211
                $this->stack->attach($object);
212
                $this->stack->rewind();
213
                break;
214
            case $incomingFrame instanceof Basic\BasicHeaderFrame:
215
                $object = $this->stack->current();
216
                /** @var Message $message */
217
                $message = $object->message;
218
                $message->setContentLength($incomingFrame->contentLength)
219
                    ->setContentType($incomingFrame->contentType)
220
                    ->setContentEncoding($incomingFrame->contentEncoding)
221
                    ->setHeaders($incomingFrame->headers)
222
                    ->setMessageId($incomingFrame->messageId)
223
                    ->setDeliveryMode($incomingFrame->deliveryMode)
224
                    ->setCorrelationId($incomingFrame->correlationId)
225
                    ->setReplyTo($incomingFrame->replyTo)
226
                    ->setExpiration($incomingFrame->expiration)
227
                    ->setTimestamp($incomingFrame->timestamp)
228
                    ->setType($incomingFrame->type)
229
                    ->setUserId($incomingFrame->userId)
230
                    ->setAppId($incomingFrame->appId)
231
                    ->setClusterId($incomingFrame->clusterId);
232
                $object->totalPayloadSize = $incomingFrame->contentLength;
233
                break;
234
            case $incomingFrame instanceof BodyFrame:
235
                $object = $this->stack->current();
236
                /**
237
                 * Use only php strlen because wee need string length in bytes
238
                 */
239
                $currentContentLength = strlen($incomingFrame->content);
240
241
                /** @var Message $message */
242
                $message = $object->message;
243
                $message->setContent($message->getContent() . $incomingFrame->content);
244
245
                $object->totalPayloadSize -= $currentContentLength;
246
247
                if ($object->totalPayloadSize === 0) {
248
                    if (isset($object->consumerTag)) {
249
                        $this->trigger(self::EVENT_DISPATCH_CONSUMER_MESSAGE, $object->consumerTag, $message);
250
                    } else {
251
                        $this->triggerOneAndUnbind(self::EVENT_DISPATCH_MESSAGE, $message);
252
                    }
253
                    $this->stack->detach($object);
254
                }
255
                break;
256
            case $incomingFrame instanceof Basic\BasicGetEmptyFrame:
257
                $this->triggerOneAndUnbind(self::EVENT_DISPATCH_MESSAGE, false);
258
                break;
259
            case $incomingFrame instanceof Basic\BasicCancelOkFrame:
260
                unset($this->consumers[$incomingFrame->consumerTag]);
261
                break;
262
263
            case $incomingFrame instanceof ProtocolChannel\ChannelOpenOkFrame:
264
                $this->isConnected = true;
265
                $this->id = $incomingFrame->frameChannelId;
266
                //write QoS
267
                $outputFrame = Basic\BasicQosFrame::create(
268
                    self::QOS_PREFECTH_SIZE,
269
                    self::QOS_PREFETCH_COUNT
270
                );
271
                $outputFrame->frameChannelId = $incomingFrame->frameChannelId;
272
                $this->connection->command($outputFrame);
273
                break;
274
            case $incomingFrame instanceof ProtocolChannel\ChannelCloseFrame:
275
                $this->trigger(self::EVENT_ON_CHANNEL_CLOSE_CALLBACK);
276
                Daemon::log(sprintf('[AMQP] Channel closed by broker. Reason: %s[%d]', $incomingFrame->replyText, $incomingFrame->replyCode));
277
                $this->isConnected = false;
278
                break;
279
            case $incomingFrame instanceof Basic\BasicQosOkFrame:
280
                $this->trigger(self::EVENT_ON_CHANNEL_OPEN_CALLBACK, $this);
281
                break;
282
            case $incomingFrame instanceof Basic\BasicConsumeOkFrame:
283
                $this->triggerOneAndUnbind(self::EVENT_ON_CHANNEL_CONSUMEOK_CALLBACK, $incomingFrame);
284
                break;
285
            case $incomingFrame instanceof ProtocolChannel\ChannelCloseOkFrame:
286
                $this->isConnected = false;
287
                break;
288
            case $incomingFrame instanceof Queue\QueueDeclareOkFrame:
289
                $this->triggerOneAndUnbind(self::EVENT_ON_CHANNEL_DECLARE_QUEUE_CALLBACK);
290
                break;
291
            case $incomingFrame instanceof Queue\QueueDeleteOkFrame:
292
                $this->triggerOneAndUnbind(self::EVENT_ON_CHANNEL_DELETE_QUEUE_CALLBACK);
293
                break;
294
            case $incomingFrame instanceof Queue\QueuePurgeOkFrame:
295
                $this->triggerOneAndUnbind(self::EVENT_ON_CHANNEL_PURGE_QUEUE_CALLBACK);
296
                break;
297
            case $incomingFrame instanceof Queue\QueueBindOkFrame:
298
                $this->triggerOneAndUnbind(self::EVENT_ON_CHANNEL_BIND_QUEUE_CALLBACK);
299
                break;
300
            case $incomingFrame instanceof Queue\QueueUnbindOkFrame:
301
                $this->triggerOneAndUnbind(self::EVENT_ON_CHANNEL_UNBIND_QUEUE_CALLBACK);
302
                break;
303
            case $incomingFrame instanceof Exchange\ExchangeDeclareOkFrame:
304
                $this->triggerOneAndUnbind(self::EVENT_ON_CHANNEL_DECLARE_EXCHANGE_CALLBACK);
305
                break;
306
            case $incomingFrame instanceof Exchange\ExchangeDeleteOkFrame:
307
                $this->triggerOneAndUnbind(self::EVENT_ON_CHANNEL_DELETE_EXCHANGE_CALLBACK);
308
                break;
309
            case $incomingFrame instanceof Exchange\ExchangeBindOkFrame:
310
                $this->triggerOneAndUnbind(self::EVENT_ON_CHANNEL_BIND_EXCHANGE_CALLBACK);
311
                break;
312
            case $incomingFrame instanceof Exchange\ExchangeUnbindOkFrame:
313
                $this->triggerOneAndUnbind(self::EVENT_ON_CHANNEL_UNBIND_EXCHANGE_CALLBACK);
314
                break;
315
        }
316
    }
317
318
    /**
319
     * Close the channel.
320
     *
321
     * @throws \InvalidArgumentException
322
     * @throws \PHPDaemon\Clients\AMQP\Driver\Protocol\Exception\AMQPProtocolException
323
     */
324
    public function close()
325
    {
326
        $outputFrame = ProtocolChannel\ChannelCloseFrame::create(
327
            0,//@todo replyCode
328
            'Channel closed by client'
329
        );
330
        $outputFrame->frameChannelId = $this->id;
331
        $this->connection->command($outputFrame);
332
    }
333
334
    /**
335
     * Check queue
336
     *
337
     * @param $name
338
     * @param array $options
339
     * @param callable|null $callback
340
     */
341
    public function checkQueue($name, array $options = [], callable $callback = null)
0 ignored issues
show
Unused Code introduced by
The parameter $name is not used and could be removed.

This check looks from parameters that have been defined for a function or method, but which are not used in the method body.

Loading history...
Unused Code introduced by
The parameter $options is not used and could be removed.

This check looks from parameters that have been defined for a function or method, but which are not used in the method body.

Loading history...
Unused Code introduced by
The parameter $callback is not used and could be removed.

This check looks from parameters that have been defined for a function or method, but which are not used in the method body.

Loading history...
342
    {
343
        //@todo implement this
344
    }
345
346
    /**
347
     * DeclareQueue
348
     *
349
     * @param string $name a quque name
350
     * @param array $options a queue options
351
     * @param callable|null $callback
352
     * @throws \InvalidArgumentException
353
     * @throws \PHPDaemon\Clients\AMQP\Driver\Protocol\Exception\AMQPProtocolException
354
     */
355
    public function declareQueue($name, array $options = [], callable $callback = null)
356
    {
357
        $passive = array_key_exists('passive', $options) ? (bool)$options['passive'] : null;
358
        $durable = array_key_exists('durable', $options) ? (bool)$options['durable'] : null;
359
        $exclusive = array_key_exists('exclusive', $options) ? (bool)$options['exclusive'] : null;
360
        $autoDelete = array_key_exists('autoDelete', $options) ? (bool)$options['autoDelete'] : null;
361
        $noWait = array_key_exists('noWait', $options) ? (bool)$options['noWait'] : null;
362
        $arguments = array_key_exists('arguments', $options) ? $options['arguments'] : null;
363
364
        $outputFrame = Queue\QueueDeclareFrame::create(
365
            $name,
366
            $passive,
367
            $durable,
368
            $exclusive,
369
            $autoDelete,
370
            $noWait,
371
            $arguments
372
        );
373
        $outputFrame->frameChannelId = $this->id;
374
        $this->connection->command($outputFrame);
375
376
        if (is_callable($callback)) {
377
            $this->on(self::EVENT_ON_CHANNEL_DECLARE_QUEUE_CALLBACK, $callback);
378
        }
379
    }
380
381
    /**
382
     * Delete Queue
383
     *
384
     * @param string $name a queue name
385
     * @param array $options a options array
386
     * @param callable|null $callback
387
     * @throws \InvalidArgumentException
388
     * @throws \PHPDaemon\Clients\AMQP\Driver\Protocol\Exception\AMQPProtocolException
389
     */
390
    public function deleteQueue($name, array $options = [], callable $callback = null)
391
    {
392
        $ifUnused = array_key_exists('ifUnused', $options) ? (bool)$options['ifUnused'] : null;
393
        $ifEmpty = array_key_exists('ifEmpty', $options) ? (bool)$options['ifEmpty'] : null;
394
        $noWait = array_key_exists('noWait', $options) ? (bool)$options['noWait'] : null;
395
396
        $outputFrame = Queue\QueueDeleteFrame::create($name, $ifUnused, $ifEmpty, $noWait);
397
        $outputFrame->frameChannelId = $this->id;
398
        $this->connection->command($outputFrame);
399
400
        if (is_callable($callback)) {
401
            $this->on(self::EVENT_ON_CHANNEL_DELETE_QUEUE_CALLBACK, $callback);
402
        }
403
    }
404
405
    /**
406
     * Purge queue messages
407
     *
408
     * @param string $name a queue name
409
     * @param array $options a options array
410
     * @param callable|null $callback
411
     * @throws \InvalidArgumentException
412
     * @throws \PHPDaemon\Clients\AMQP\Driver\Protocol\Exception\AMQPProtocolException
413
     */
414 View Code Duplication
    public function purgeQueue($name, array $options = [], callable $callback = null)
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
415
    {
416
        $noWait = array_key_exists('noWait', $options) ? (bool)$options['noWait'] : null;
417
418
        $outputFrame = Queue\QueuePurgeFrame::create($name, $noWait);
419
        $outputFrame->frameChannelId = $this->id;
420
        $this->connection->command($outputFrame);
421
422
        if (is_callable($callback)) {
423
            $this->on(self::EVENT_ON_CHANNEL_PURGE_QUEUE_CALLBACK, $callback);
424
        }
425
    }
426
427
    /**
428
     * Bind queue to exchange
429
     *
430
     * @param string $name a queue name
431
     * @param string $exchangeName a exchange name
432
     * @param string $routingKey a routing key
433
     * @param array $options additional options
434
     * @param callable|null $callback
435
     * @throws \InvalidArgumentException
436
     * @throws \PHPDaemon\Clients\AMQP\Driver\Protocol\Exception\AMQPProtocolException
437
     */
438 View Code Duplication
    public function bindQueue($name, $exchangeName, $routingKey, array $options = [], callable $callback = null)
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
439
    {
440
        $noWait = array_key_exists('noWait', $options) ? (bool)$options['noWait'] : null;
441
        $arguments = array_key_exists('arguments', $options) ? $options['arguments'] : null;
442
443
        $outputFrame = Queue\QueueBindFrame::create(
444
            $name,
445
            $exchangeName,
446
            $routingKey,
447
            $noWait,
448
            $arguments
449
        );
450
        $outputFrame->frameChannelId = $this->id;
451
        $this->connection->command($outputFrame);
452
453
        if (is_callable($callback)) {
454
            $this->on(self::EVENT_ON_CHANNEL_BIND_QUEUE_CALLBACK, $callback);
455
        }
456
    }
457
458
    /**
459
     * Unbind queue from exchange
460
     *
461
     * @param string $name a queue name
462
     * @param string $exchangeName a exchange name
463
     * @param string $routingKey a routing key
464
     * @param callable|null $callback
465
     * @throws \InvalidArgumentException
466
     * @throws \PHPDaemon\Clients\AMQP\Driver\Protocol\Exception\AMQPProtocolException
467
     */
468
    public function unbindQueue($name, $exchangeName, $routingKey, callable $callback = null)
469
    {
470
        $outputFrame = Queue\QueueUnbindFrame::create(
471
            $name,
472
            $exchangeName,
473
            $routingKey
474
        );
475
        $outputFrame->frameChannelId = $this->id;
476
        $this->connection->command($outputFrame);
477
478
        if (is_callable($callback)) {
479
            $this->on(self::EVENT_ON_CHANNEL_UNBIND_QUEUE_CALLBACK, $callback);
480
        }
481
    }
482
483
    /**
484
     * Declare exchange
485
     *
486
     * @param string $name a name of exchange
487
     * @param array $options exchange options
488
     * @param callable|null $callback
489
     * @throws \InvalidArgumentException
490
     * @throws \PHPDaemon\Clients\AMQP\Driver\Protocol\Exception\AMQPProtocolException
491
     */
492
    public function declareExchange($name, array $options = [], callable $callback = null)
493
    {
494
        $type = array_key_exists('type', $options) ? $options['type'] : null;
495
        $passive = array_key_exists('passive', $options) ? (bool)$options['passive'] : null;
496
        $durable = array_key_exists('durable', $options) ? (bool)$options['durable'] : null;
497
        $internal = array_key_exists('internal', $options) ? (bool)$options['internal'] : null;
498
        $autoDelete = array_key_exists('autoDelete', $options) ? (bool)$options['autoDelete'] : null;
499
        $noWait = array_key_exists('noWait', $options) ? (bool)$options['noWait'] : null;
500
        $arguments = array_key_exists('arguments', $options) ? $options['arguments'] : null;
501
502
        $outputFrame = Exchange\ExchangeDeclareFrame::create(
503
            $name,
504
            $type,
505
            $passive,
506
            $durable,
507
            $autoDelete,
508
            $internal,
509
            $noWait,
510
            $arguments
511
        );
512
        $outputFrame->frameChannelId = $this->id;
513
        $this->connection->command($outputFrame);
514
515
        if (is_callable($callback)) {
516
            $this->on(self::EVENT_ON_CHANNEL_DECLARE_EXCHANGE_CALLBACK, $callback);
517
        }
518
    }
519
520
    /**
521
     * Delete Exchange
522
     *
523
     * @param string $name a exchange name
524
     * @param array $options a exchange options
525
     * @param callable|null $callback
526
     * @throws \InvalidArgumentException
527
     * @throws \PHPDaemon\Clients\AMQP\Driver\Protocol\Exception\AMQPProtocolException
528
     */
529 View Code Duplication
    public function deleteExchange($name, array $options = [], callable $callback = null)
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
530
    {
531
        $ifUnused = array_key_exists('ifUnused', $options) ? (bool)$options['ifUnused'] : null;
532
        $noWait = array_key_exists('noWait', $options) ? (bool)$options['noWait'] : null;
533
534
        $outputFrame = Exchange\ExchangeDeleteFrame::create($name, $ifUnused, $noWait);
535
        $outputFrame->frameChannelId = $this->id;
536
        $this->connection->command($outputFrame);
537
538
        if (is_callable($callback)) {
539
            $this->on(self::EVENT_ON_CHANNEL_DELETE_EXCHANGE_CALLBACK, $callback);
540
        }
541
    }
542
543
    /**
544
     * Bind exchange
545
     *
546
     * @param string $name a source exchange name
547
     * @param string $exchangeName a destination exchange name
548
     * @param string $routingKey a routing key
549
     * @param array $options
550
     * @param callable|null $callback
551
     * @throws \InvalidArgumentException
552
     * @throws \PHPDaemon\Clients\AMQP\Driver\Protocol\Exception\AMQPProtocolException
553
     * @throws \PHPDaemon\Clients\AMQP\Driver\Exception\AMQPChannelException
554
     */
555 View Code Duplication
    public function bindExchange($name, $exchangeName, $routingKey, array $options = [], callable $callback = null)
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
556
    {
557
        if (!$this->connection->getFeatures()->exchangeToExchangeBindings) {
558
            throw new AMQPChannelException('Broker does not support exchange to exchange bindings');
559
        }
560
561
        if ($exchangeName === $name) {
562
            throw new AMQPChannelException('Exchange cannot bind to itself');
563
        }
564
565
        $noWait = array_key_exists('noWait', $options) ? $options['noWait'] : false;
566
567
        $outputFrame = Exchange\ExchangeBindFrame::create(
568
            $name,
569
            $exchangeName,
570
            $routingKey,
571
            $noWait,
572
            $options
573
        );
574
        $outputFrame->frameChannelId = $this->id;
575
        $this->connection->command($outputFrame);
576
577
        if (is_callable($callback)) {
578
            $this->on(self::EVENT_ON_CHANNEL_BIND_EXCHANGE_CALLBACK, $callback);
579
        }
580
    }
581
582
    /**
583
     * Unbind exchange
584
     *
585
     * @param string $name a source exchange name
586
     * @param string $exchangeName a destination exchange name
587
     * @param string $routingKey a routing key
588
     * @param array $options
589
     * @param callable|null $callback
590
     * @throws \InvalidArgumentException
591
     * @throws \PHPDaemon\Clients\AMQP\Driver\Exception\AMQPChannelException
592
     * @throws \PHPDaemon\Clients\AMQP\Driver\Protocol\Exception\AMQPProtocolException
593
     */
594 View Code Duplication
    public function unbindExchange($name, $exchangeName, $routingKey, array $options = [], callable $callback = null)
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
595
    {
596
        if (!$this->connection->getFeatures()->exchangeToExchangeBindings) {
597
            throw new AMQPChannelException('Broker does not support exchange to exchange bindings');
598
        }
599
600
        if ($exchangeName === $name) {
601
            throw new AMQPChannelException('Exchange cannot unbind itself');
602
        }
603
604
        $noWait = array_key_exists('noWait', $options) ? $options['noWait'] : false;
605
606
        $outputFrame = Exchange\ExchangeUnbindFrame::create(
607
            $name,
608
            $exchangeName,
609
            $routingKey,
610
            $noWait,
611
            $options
612
        );
613
        $outputFrame->frameChannelId = $this->id;
614
        $this->connection->command($outputFrame);
615
616
        if (is_callable($callback)) {
617
            $this->on(self::EVENT_ON_CHANNEL_UNBIND_EXCHANGE_CALLBACK, $callback);
618
        }
619
    }
620
621
    /**
622
     * Publish message to exchange
623
     *
624
     * @param string $content The message content
625
     * @param string $exchangeName exchange name
626
     * @param string $routingKey routing key
627
     * @param array $options
628
     * @throws \InvalidArgumentException
629
     * @throws \PHPDaemon\Clients\AMQP\Driver\Protocol\Exception\AMQPProtocolException
630
     */
631
    public function publish($content, $exchangeName, $routingKey, array $options = [])
632
    {
633
        /**
634
         * Нам нужно собрать докучи три фрейма
635
         * 1. BasicPublishFrame сообщает брокеру , что будет чтото передавать.
636
         * 2. BasicHeaderFrame сообщает брокеру заголовки отправляемого сообщения
637
         * 3. BodyFrame содержит контент сообщения . Отправляется этот фрейм пачками по $this->channel->getConnection()->getMaximumFrameSize()
638
         */
639
        $outputBasicPublishFrame = Basic\BasicPublishFrame::create(
640
            $exchangeName, $routingKey
641
        );
642
        $outputBasicPublishFrame->frameChannelId = $this->id;
643
        $this->connection->command($outputBasicPublishFrame);
644
645
        $outputBasicHeaderFrame = new Basic\BasicHeaderFrame();
646
        $outputBasicHeaderFrame->frameChannelId = $this->id;
647
        $outputBasicHeaderFrame->contentLength = array_key_exists('contentLength', $options) ? $options['contentLength'] : null;
648
        $outputBasicHeaderFrame->contentType = array_key_exists('contentType', $options) ? $options['contentType'] : null;
649
        $outputBasicHeaderFrame->contentEncoding = array_key_exists('contentEncoding', $options) ? $options['contentEncoding'] : null;
650
        $outputBasicHeaderFrame->headers = array_key_exists('headers', $options) ? $options['headers'] : null;
651
        $outputBasicHeaderFrame->messageId = array_key_exists('messageId', $options) ? $options['messageId'] : null;
652
        $outputBasicHeaderFrame->deliveryMode = array_key_exists('deliveryMode', $options) ? $options['deliveryMode'] : null;
653
        $outputBasicHeaderFrame->correlationId = array_key_exists('correlationId', $options) ? $options['correlationId'] : null;
654
        $outputBasicHeaderFrame->replyTo = array_key_exists('replyTo', $options) ? $options['replyTo'] : null;
655
        $outputBasicHeaderFrame->expiration = array_key_exists('expiration', $options) ? $options['expiration'] : null;
656
        $outputBasicHeaderFrame->timestamp = array_key_exists('timestamp', $options) ? $options['timestamp'] : null;
657
        $outputBasicHeaderFrame->type = array_key_exists('type', $options) ? $options['type'] : null;
658
        $outputBasicHeaderFrame->userId = array_key_exists('userId', $options) ? $options['userId'] : null;
659
        $outputBasicHeaderFrame->appId = array_key_exists('appId', $options) ? $options['appId'] : null;
660
        $outputBasicHeaderFrame->clusterId = array_key_exists('clusterId', $options) ? $options['clusterId'] : null;
661
662
        $fInfo = new \finfo();
663
        if (null === $outputBasicHeaderFrame->contentType) {
664
            $outputBasicHeaderFrame->contentType = $fInfo->buffer($content, FILEINFO_MIME_TYPE);
665
        }
666
        if (null === $outputBasicHeaderFrame->contentEncoding) {
667
            $outputBasicHeaderFrame->contentEncoding = $fInfo->buffer($content, FILEINFO_MIME_ENCODING);
668
        }
669
        unset($fInfo);
670
671
        if (null === $outputBasicHeaderFrame->contentLength) {
672
            $outputBasicHeaderFrame->contentLength = strlen($content);
673
        }
674
        $this->connection->command($outputBasicHeaderFrame);
675
676
        $maxFrameSize = $this->connection->getMaximumFrameSize();
677
        $length = $outputBasicHeaderFrame->contentLength;
678
679
        $contentBuffer = $content;
680
        while ($length) {
681
            $outputBodyFrame = new BodyFrame();
682
            $outputBodyFrame->frameChannelId = $this->id;
683
684
            if ($length <= $maxFrameSize) {
685
                $outputBodyFrame->content = $contentBuffer;
686
                $contentBuffer = '';
687
                $length = 0;
688
            } else {
689
                $outputBodyFrame->content = substr($contentBuffer, 0, $maxFrameSize);
690
                $contentBuffer = substr($contentBuffer, $maxFrameSize);
691
                $length -= $maxFrameSize;
692
            }
693
            $this->connection->command($outputBodyFrame);
694
        }
695
    }
696
697
    /**
698
     * Send message directly to queue
699
     *
700
     * @param string $content a message content
701
     * @param string $name a queue name
702
     * @param array $options
703
     * @throws \InvalidArgumentException
704
     * @throws \PHPDaemon\Clients\AMQP\Driver\Protocol\Exception\AMQPProtocolException
705
     */
706
    public function sendToQueue($content, $name, array $options = [])
707
    {
708
        $this->publish($content, '', $name, $options);
709
    }
710
711
    /**
712
     * Bind a consumer to consume on message receive
713
     *
714
     * @param string $queueName a queue name
715
     * @param array $options
716
     * @param callable $callback
717
     * @throws \InvalidArgumentException
718
     * @throws \PHPDaemon\Clients\AMQP\Driver\Protocol\Exception\AMQPProtocolException
719
     */
720
    public function consume($queueName, array $options = [], callable $callback)
721
    {
722
        $consumerTag = array_key_exists('consumerTag', $options) ? $options['consumerTag'] : null;
723
        $noLocal = array_key_exists('noLocal', $options) ? (bool)$options['noLocal'] : null;
724
        $noAck = array_key_exists('noAck', $options) ? (bool)$options['noAck'] : null;
725
        $exclusive = array_key_exists('exclusive', $options) ? (bool)$options['exclusive'] : null;
726
        $noWait = array_key_exists('noWait', $options) ? (bool)$options['noWait'] : null;
727
        $arguments = array_key_exists('arguments', $options) ? $options['arguments'] : null;
728
729
        $outputFrame = Basic\BasicConsumeFrame::create(
730
            $queueName,
731
            $consumerTag,
732
            $noLocal,
733
            $noAck,
734
            $exclusive,
735
            $noWait,
736
            $arguments
737
        );
738
        $outputFrame->frameChannelId = $this->id;
739
        $this->connection->command($outputFrame);
740
741
        if (is_callable($callback)) {
742
            $this->on(self::EVENT_ON_CHANNEL_CONSUMEOK_CALLBACK, function (Basic\BasicConsumeOkFrame $incomingFrame) use ($callback) {
743
                $this->consumers[$incomingFrame->consumerTag] = $callback;
744
            });
745
        }
746
    }
747
748
    /**
749
     * Unbind consumer
750
     *
751
     * @param string $consumerTag
752
     * @param array $options
753
     * @throws \InvalidArgumentException
754
     * @throws \PHPDaemon\Clients\AMQP\Driver\Protocol\Exception\AMQPProtocolException
755
     */
756 View Code Duplication
    public function cancel($consumerTag, array $options = [])
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
757
    {
758
        $noWait = array_key_exists('noWait', $options) ? $options['noWait'] : null;
759
760
        $outputFrame = Basic\BasicCancelFrame::create($consumerTag, $noWait);
761
        $outputFrame->frameChannelId = $this->id;
762
        $this->connection->command($outputFrame);
763
    }
764
765
    /**
766
     * get message from queue
767
     *
768
     * @param string $queueName a queue name
769
     * @param array $options
770
     * @param callable|null $callback
771
     * @throws \InvalidArgumentException
772
     * @throws \PHPDaemon\Clients\AMQP\Driver\Protocol\Exception\AMQPProtocolException
773
     */
774 View Code Duplication
    public function get($queueName, array $options = [], callable $callback = null)
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
775
    {
776
        $noAck = array_key_exists('noAck', $options) ? $options['noAck'] : null;
777
778
        $outputFrame = Basic\BasicGetFrame::create(
779
            $queueName,
780
            $noAck
781
        );
782
        $outputFrame->frameChannelId = $this->id;
783
        $this->connection->command($outputFrame);
784
785
        if (is_callable($callback)) {
786
            $this->on(self::EVENT_DISPATCH_MESSAGE, $callback);
787
        }
788
    }
789
790
    /**
791
     * Ack message by delivery tag
792
     *
793
     * @param int $deliveryTag
794
     * @param array $options
795
     * @throws \InvalidArgumentException
796
     * @throws \PHPDaemon\Clients\AMQP\Driver\Protocol\Exception\AMQPProtocolException
797
     */
798 View Code Duplication
    public function ack($deliveryTag, array $options = [])
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
799
    {
800
        $multiple = array_key_exists('multiple', $options) ? (int)$options['multiple'] : null;
801
802
        $outputFrame = Basic\BasicAckFrame::create($deliveryTag, $multiple);
803
        $outputFrame->frameChannelId = $this->id;
804
        $this->connection->command($outputFrame);
805
    }
806
807
    /**
808
     * Nack message
809
     *
810
     * @param $deliveryTag
811
     * @param array $options
812
     * @throws \InvalidArgumentException
813
     * @throws \PHPDaemon\Clients\AMQP\Driver\Protocol\Exception\AMQPProtocolException
814
     */
815
    public function nack($deliveryTag, array $options = [])
816
    {
817
        $multiple = array_key_exists('multiple', $options) ? (int)$options['multiple'] : null;
818
        $requeue = array_key_exists('requeue', $options) ? (bool)$options['requeue'] : null;
819
820
        $outputFrame = Basic\BasicNackFrame::create($deliveryTag, $multiple, $requeue);
821
        $outputFrame->frameChannelId = $this->id;
822
        $this->connection->command($outputFrame);
823
    }
824
825
    /**
826
     * Reject a message
827
     *
828
     * @param $deliveryTag
829
     * @param array $options
830
     * @throws \InvalidArgumentException
831
     * @throws \PHPDaemon\Clients\AMQP\Driver\Protocol\Exception\AMQPProtocolException
832
     */
833 View Code Duplication
    public function reject($deliveryTag, array $options = [])
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
834
    {
835
        $requeue = array_key_exists('requeue', $options) ? $options['requeue'] : null;
836
837
        $outputFrame = Basic\BasicRejectFrame::create($deliveryTag, $requeue);
838
        $outputFrame->frameChannelId = $this->id;
839
        $this->connection->command($outputFrame);
840
    }
841
842
    /**
843
     * Redeliver unacknowledged messages.
844
     *
845
     * @param bool $requeue
846
     * @throws \InvalidArgumentException
847
     * @throws \PHPDaemon\Clients\AMQP\Driver\Protocol\Exception\AMQPProtocolException
848
     */
849
    public function recover($requeue = true)
850
    {
851
        $outputFrame = Basic\BasicRecoverFrame::create($requeue);
852
        $outputFrame->frameChannelId = $this->id;
853
        $this->connection->command($outputFrame);
854
    }
855
856
    /**
857
     * @return mixed
858
     */
859
    public function getId()
860
    {
861
        return $this->id;
862
    }
863
864
    /**
865
     * @return Connection
866
     */
867
    public function getConnection()
868
    {
869
        return $this->connection;
870
    }
871
872
    /**
873
     * @return bool
874
     */
875
    public function isConnected()
876
    {
877
        return $this->isConnected;
878
    }
879
880
    /**
881
     * @return $this
882
     */
883
    private function triggerOneAndUnbind()
884
    {
885
        $args = func_get_args();
886
        $name = array_shift($args);
887
        if ($this->addThisToEvents) {
888
            array_unshift($args, $this);
889
        }
890
        if (isset($this->eventHandlers[$name])) {
891
            $cb = array_shift($this->eventHandlers[$name]);
892
            if ($cb(...$args) === true) {
893
                return $this;
894
            }
895
        }
896
        return $this;
897
    }
898
}
899