Completed
Push — master ( 04e536...b701b2 )
by
unknown
18s queued 10s
created

PhpAmqpLib/Channel/AbstractChannel.php (3 issues)

Upgrade to new PHP Analysis Engine

These results are based on our legacy PHP analysis, consider migrating to our new PHP analysis engine instead. Learn more

1
<?php
2
namespace PhpAmqpLib\Channel;
3
4
use PhpAmqpLib\Connection\AbstractConnection;
5
use PhpAmqpLib\Exception\AMQPChannelClosedException;
6
use PhpAmqpLib\Exception\AMQPConnectionClosedException;
7
use PhpAmqpLib\Exception\AMQPInvalidFrameException;
8
use PhpAmqpLib\Exception\AMQPNoDataException;
9
use PhpAmqpLib\Exception\AMQPNotImplementedException;
10
use PhpAmqpLib\Exception\AMQPOutOfBoundsException;
11
use PhpAmqpLib\Exception\AMQPOutOfRangeException;
12
use PhpAmqpLib\Helper\DebugHelper;
13
use PhpAmqpLib\Helper\Protocol\MethodMap080;
14
use PhpAmqpLib\Helper\Protocol\MethodMap091;
15
use PhpAmqpLib\Helper\Protocol\Protocol080;
16
use PhpAmqpLib\Helper\Protocol\Protocol091;
17
use PhpAmqpLib\Helper\Protocol\Wait080;
18
use PhpAmqpLib\Helper\Protocol\Wait091;
19
use PhpAmqpLib\Message\AMQPMessage;
20
use PhpAmqpLib\Wire\AMQPReader;
21
22
abstract class AbstractChannel
23
{
24
    const PROTOCOL_080 = '0.8';
25
    const PROTOCOL_091 = '0.9.1';
26
27
    /**
28
     * @var string
29
     * @deprecated
30
     */
31
    public static $PROTOCOL_CONSTANTS_CLASS;
32
33
    /** @var array */
34
    protected $frame_queue;
35
36
    /** @var array */
37
    protected $method_queue;
38
39
    /** @var bool */
40
    protected $auto_decode;
41
42
    /** @var string */
43
    protected $amqp_protocol_header;
44
45
    /** @var \PhpAmqpLib\Helper\DebugHelper */
46
    protected $debug;
47
48
    /** @var \PhpAmqpLib\Connection\AbstractConnection */
49
    protected $connection;
50
51
    /** @var string */
52
    protected $protocolVersion;
53
54
    /** @var int */
55
    protected $maxBodySize;
56
57
    /** @var \PhpAmqpLib\Helper\Protocol\Protocol080|\PhpAmqpLib\Helper\Protocol\Protocol091 */
58
    protected $protocolWriter;
59
60
    /** @var \PhpAmqpLib\Helper\Protocol\Wait080|\PhpAmqpLib\Helper\Protocol\Wait091 */
61
    protected $waitHelper;
62
63
    /** @var \PhpAmqpLib\Helper\Protocol\MethodMap080|\PhpAmqpLib\Helper\Protocol\MethodMap091 */
64
    protected $methodMap;
65
66
    /** @var string */
67
    protected $channel_id;
68
69
    /** @var \PhpAmqpLib\Wire\AMQPReader */
70
    protected $msg_property_reader;
71
72
    /** @var \PhpAmqpLib\Wire\AMQPReader */
73
    protected $wait_content_reader;
74
75
    /** @var \PhpAmqpLib\Wire\AMQPReader */
76
    protected $dispatch_reader;
77
78
    /**
79
     * @param AbstractConnection $connection
80
     * @param string $channel_id
81
     * @throws \PhpAmqpLib\Exception\AMQPRuntimeException
82
     */
83 180
    public function __construct(AbstractConnection $connection, $channel_id)
84
    {
85 180
        $this->connection = $connection;
86 180
        $this->channel_id = $channel_id;
87 180
        $connection->channels[$channel_id] = $this;
88 180
        $this->frame_queue = array(); // Lower level queue for frames
89 180
        $this->method_queue = array(); // Higher level queue for methods
90 180
        $this->auto_decode = false;
91
92 180
        $this->msg_property_reader = new AMQPReader(null);
93 180
        $this->wait_content_reader = new AMQPReader(null);
94 180
        $this->dispatch_reader = new AMQPReader(null);
95
96 180
        $this->protocolVersion = self::getProtocolVersion();
97 180
        switch ($this->protocolVersion) {
98 180 View Code Duplication
            case self::PROTOCOL_091:
99 180
                self::$PROTOCOL_CONSTANTS_CLASS = 'PhpAmqpLib\Wire\Constants091';
100 180
                $c = self::$PROTOCOL_CONSTANTS_CLASS;
101 180
                $this->debug = new DebugHelper($c);
102 180
                $this->amqp_protocol_header = $c::$AMQP_PROTOCOL_HEADER;
103 180
                $this->protocolWriter = new Protocol091();
104 180
                $this->waitHelper = new Wait091();
105 180
                $this->methodMap = new MethodMap091();
106 180
                break;
107 View Code Duplication
            case self::PROTOCOL_080:
108
                self::$PROTOCOL_CONSTANTS_CLASS = 'PhpAmqpLib\Wire\Constants080';
109
                $c = self::$PROTOCOL_CONSTANTS_CLASS;
110
                $this->debug = new DebugHelper($c);
111
                $this->amqp_protocol_header = $c::$AMQP_PROTOCOL_HEADER;
112
                $this->protocolWriter = new Protocol080();
113
                $this->waitHelper = new Wait080();
114
                $this->methodMap = new MethodMap080();
115
                break;
116
            default:
117
                throw new AMQPNotImplementedException(sprintf(
118
                    'Protocol: %s not implemented.',
119
                    $this->protocolVersion
120
                ));
121 90
        }
122 180
    }
123
124
    /**
125
     * @return string
126
     * @throws AMQPOutOfRangeException
127
     */
128 180
    public static function getProtocolVersion()
129
    {
130 180
        $protocol = defined('AMQP_PROTOCOL') ? AMQP_PROTOCOL : self::PROTOCOL_091;
131
        //adding check here to catch unknown protocol ASAP, as this method may be called from the outside
132 180
        if (!in_array($protocol, array(self::PROTOCOL_080, self::PROTOCOL_091), TRUE)) {
133
            throw new AMQPOutOfRangeException(sprintf('Protocol version %s not implemented.', $protocol));
134
        }
135
136 180
        return $protocol;
137
    }
138
139
    /**
140
     * @return string
141
     */
142
    public function getChannelId()
143
    {
144
        return $this->channel_id;
145
    }
146
147
    /**
148
     * @param int $max_bytes Max message body size for this channel
149
     * @return $this
150
     */
151
    public function setBodySizeLimit($max_bytes)
152
    {
153
        $max_bytes = (int) $max_bytes;
154
155
        if ($max_bytes > 0) {
156
            $this->maxBodySize = $max_bytes;
157
        }
158
159
        return $this;
160
    }
161
162
    /**
163
     * @return AbstractConnection
164
     */
165 48
    public function getConnection()
166
    {
167 48
        return $this->connection;
168
    }
169
170
    /**
171
     * @return array
172
     */
173
    public function getMethodQueue()
174
    {
175
        return $this->method_queue;
176
    }
177
178
    /**
179
     * @return bool
180
     */
181
    public function hasPendingMethods()
182
    {
183
        return !empty($this->method_queue);
184
    }
185
186
    /**
187
     * @param string $method_sig
188
     * @param string $args
189
     * @param AMQPMessage|null $amqpMessage
190
     * @return mixed
191
     * @throws \PhpAmqpLib\Exception\AMQPRuntimeException
192
     */
193 180
    public function dispatch($method_sig, $args, $amqpMessage)
194
    {
195 180
        if (!$this->methodMap->valid_method($method_sig)) {
196
            throw new AMQPNotImplementedException(sprintf(
197
                'Unknown AMQP method "%s"',
198
                $method_sig
199
            ));
200
        }
201
202 180
        $amqp_method = $this->methodMap->get_method($method_sig);
203
204 180
        if (!method_exists($this, $amqp_method)) {
205
            throw new AMQPNotImplementedException(sprintf(
206
                'Method: "%s" not implemented by class: %s',
207
                $amqp_method,
208
                get_class($this)
209
            ));
210 3
        }
211
212 180
        $this->dispatch_reader->reuse($args);
213
214 180
        if ($amqpMessage == null) {
215 180
            return call_user_func(array($this, $amqp_method), $this->dispatch_reader);
216
        }
217
218 66
        return call_user_func(array($this, $amqp_method), $this->dispatch_reader, $amqpMessage);
219
    }
220
221
    /**
222
     * @param int|float|null $timeout
223
     * @return array|mixed
224
     */
225 180
    public function next_frame($timeout = 0)
226
    {
227 180
        $this->debug->debug_msg('waiting for a new frame');
228
229 180
        if (!empty($this->frame_queue)) {
230 6
            return array_shift($this->frame_queue);
231
        }
232
233 180
        return $this->connection->wait_channel($this->channel_id, $timeout);
234
    }
235
236
    /**
237
     * @param array $method_sig
238
     * @param \PhpAmqpLib\Wire\AMQPWriter|string $args
239
     */
240 180
    protected function send_method_frame($method_sig, $args = '')
241
    {
242 180
        if ($this->connection === null) {
243 24
            throw new AMQPChannelClosedException('Channel connection is closed.');
244
        }
245
246 180
        $this->connection->send_channel_method_frame($this->channel_id, $method_sig, $args);
247 180
    }
248
249
    /**
250
     * This is here for performance reasons to batch calls to fwrite from basic.publish
251
     *
252
     * @param array $method_sig
253
     * @param \PhpAmqpLib\Wire\AMQPWriter|string $args
254
     * @param \PhpAmqpLib\Wire\AMQPWriter $pkt
255
     * @return \PhpAmqpLib\Wire\AMQPWriter
256
     */
257 66
    protected function prepare_method_frame($method_sig, $args = '', $pkt = null)
258
    {
259 66
        return $this->connection->prepare_channel_method_frame($this->channel_id, $method_sig, $args, $pkt);
260
    }
261
262
    /**
263
     * @return AMQPMessage
264
     * @throws \PhpAmqpLib\Exception\AMQPRuntimeException
265
     */
266 66
    public function wait_content()
267
    {
268 66
        list($frame_type, $payload) = $this->next_frame();
269
270 66
        $this->validate_header_frame($frame_type);
271
272 66
        $this->wait_content_reader->reuse(mb_substr($payload, 0, 12, 'ASCII'));
273
274 66
        $class_id = $this->wait_content_reader->read_short();
275 66
        $weight = $this->wait_content_reader->read_short();
276
277
        //hack to avoid creating new instances of AMQPReader;
278 66
        $this->msg_property_reader->reuse(mb_substr($payload, 12, mb_strlen($payload, 'ASCII') - 12, 'ASCII'));
279
280 66
        return $this->createMessage(
281 66
            $this->msg_property_reader,
282 66
            $this->wait_content_reader
283 33
        );
284
    }
285
286
    /**
287
     * @param AMQPReader $propertyReader
288
     * @param AMQPReader $contentReader
289
     * @return \PhpAmqpLib\Message\AMQPMessage
290
     */
291 66
    protected function createMessage($propertyReader, $contentReader)
292
    {
293 66
        $bodyChunks = array();
294 66
        $bodyReceivedBytes = 0;
295
296 66
        $message = new AMQPMessage();
297
        $message
298 66
            ->load_properties($propertyReader)
299 66
            ->setBodySize($contentReader->read_longlong());
300
301 66
        while (bccomp($message->getBodySize(), $bodyReceivedBytes, 0) == 1) {
302 60
            list($frame_type, $payload) = $this->next_frame();
303
304 60
            $this->validate_body_frame($frame_type);
305 60
            $bodyReceivedBytes = bcadd($bodyReceivedBytes, mb_strlen($payload, 'ASCII'), 0);
306
307 60
            if (is_int($this->maxBodySize) && $bodyReceivedBytes > $this->maxBodySize ) {
308
                $message->setIsTruncated(true);
309
                continue;
310
            }
311
312 60
            $bodyChunks[] = $payload;
313 30
        }
314
315 66
        $message->setBody(implode('', $bodyChunks));
316
317 66
        return $message;
318
    }
319
320
    /**
321
     * Wait for some expected AMQP methods and dispatch to them.
322
     * Unexpected methods are queued up for later calls to this PHP
323
     * method.
324
     *
325
     * @param array $allowed_methods
326
     * @param bool $non_blocking
327
     * @param int|float|null $timeout
328
     * @throws \PhpAmqpLib\Exception\AMQPOutOfBoundsException
329
     * @throws \PhpAmqpLib\Exception\AMQPRuntimeException
330
     * @throws \PhpAmqpLib\Exception\AMQPTimeoutException
331
     * @throws \ErrorException
332
     * @return mixed
333
     */
334 180
    public function wait($allowed_methods = null, $non_blocking = false, $timeout = 0)
335
    {
336 180
        $this->debug->debug_allowed_methods($allowed_methods);
337
338 180
        $deferred = $this->process_deferred_methods($allowed_methods);
339 180
        if ($deferred['dispatch'] === true) {
340
            return $this->dispatch_deferred_method($deferred['queued_method']);
341
        }
342
343
        // timeouts must be deactivated for non-blocking actions
344 180
        if (true === $non_blocking) {
345 30
            $timeout = null;
346 15
        }
347
348
        // No deferred methods?  wait for new ones
349 180
        while (true) {
350
            try {
351 180
                list($frame_type, $payload) = $this->next_frame($timeout);
352 129
            } catch (AMQPNoDataException $e) {
353
                // no data ready for non-blocking actions - stop and exit
354 48
                break;
355 30
            } catch (AMQPConnectionClosedException $exception) {
356
                if ($this instanceof AMQPChannel) {
357
                    $this->do_close();
358
                }
359
                throw $exception;
360
            }
361
362 180
            $this->validate_method_frame($frame_type);
363 180
            $this->validate_frame_payload($payload);
364
365 180
            $method_sig = $this->build_method_signature($payload);
366 180
            $args = $this->extract_args($payload);
367
368 180
            $this->debug->debug_method_signature('> %s', $method_sig);
369
370 180
            $amqpMessage = $this->maybe_wait_for_content($method_sig);
371
372 180
            if ($this->should_dispatch_method($allowed_methods, $method_sig)) {
373 180
                return $this->dispatch($method_sig, $args, $amqpMessage);
374
            }
375
376
            // Wasn't what we were looking for? save it for later
377
            $this->debug->debug_method_signature('Queueing for later: %s', $method_sig);
378
            $this->method_queue[] = array($method_sig, $args, $amqpMessage);
379
380
            if ($non_blocking) {
381
                break;
382
            }
383
        }
384 48
    }
385
386
    /**
387
     * @param array $allowed_methods
388
     * @return array
389
     */
390 180
    protected function process_deferred_methods($allowed_methods)
391
    {
392 180
        $dispatch = false;
393 180
        $queued_method = array();
394
395 180
        foreach ($this->method_queue as $qk => $qm) {
396
            $this->debug->debug_msg('checking queue method ' . $qk);
397
398
            $method_sig = $qm[0];
399
400
            if ($allowed_methods == null || in_array($method_sig, $allowed_methods)) {
401
                unset($this->method_queue[$qk]);
402
                $dispatch = true;
403
                $queued_method = $qm;
404
                break;
405
            }
406 90
        }
407
408 180
        return array('dispatch' => $dispatch, 'queued_method' => $queued_method);
409
    }
410
411
    /**
412
     * @param array $queued_method
413
     * @return mixed
414
     */
415
    protected function dispatch_deferred_method($queued_method)
416
    {
417
        $this->debug->debug_method_signature('Executing queued method: %s', $queued_method[0]);
418
419
        return $this->dispatch($queued_method[0], $queued_method[1], $queued_method[2]);
420
    }
421
422
    /**
423
     * @param int $frame_type
424
     * @throws \PhpAmqpLib\Exception\AMQPInvalidFrameException
425
     */
426 180
    protected function validate_method_frame($frame_type)
427
    {
428 180
        $this->validate_frame($frame_type, 1, 'AMQP method');
429 180
    }
430
431
    /**
432
     * @param int $frame_type
433
     * @throws \PhpAmqpLib\Exception\AMQPInvalidFrameException
434
     */
435 66
    protected function validate_header_frame($frame_type)
436
    {
437 66
        $this->validate_frame($frame_type, 2, 'AMQP Content header');
438 66
    }
439
440
    /**
441
     * @param int $frame_type
442
     * @throws \PhpAmqpLib\Exception\AMQPInvalidFrameException
443
     */
444 60
    protected function validate_body_frame($frame_type)
445
    {
446 60
        $this->validate_frame($frame_type, 3, 'AMQP Content body');
447 60
    }
448
449
    /**
450
     * @param int $frameType
451
     * @param int $expectedType
452
     * @param string $expectedMessage
453
     */
454 180
    protected function validate_frame($frameType, $expectedType, $expectedMessage)
455
    {
456 180
        if ($frameType != $expectedType) {
457
            $protocolClass = self::$PROTOCOL_CONSTANTS_CLASS;
0 ignored issues
show
Deprecated Code introduced by
The property PhpAmqpLib\Channel\Abstr...ROTOCOL_CONSTANTS_CLASS has been deprecated.

This property has been deprecated. The supplier of the class has supplied an explanatory message.

The explanatory message should give you some clue as to whether and when the property will be removed from the class and what other property to use instead.

Loading history...
458
            throw new AMQPInvalidFrameException(sprintf(
459
                    'Expecting %s, received frame type %s (%s)',
460
                    $expectedMessage,
461
                    $frameType,
462
                    $protocolClass::$FRAME_TYPES[$frameType]
463
                ));
464
        }
465 180
    }
466
467
    /**
468
     * @param string $payload
469
     * @throws \PhpAmqpLib\Exception\AMQPOutOfBoundsException
470
     */
471 180
    protected function validate_frame_payload($payload)
472
    {
473 180
        if (mb_strlen($payload, 'ASCII') < 4) {
474
            throw new AMQPOutOfBoundsException('Method frame too short');
475
        }
476 180
    }
477
478
    /**
479
     * @param string $payload
480
     * @return string
481
     */
482 180
    protected function build_method_signature($payload)
483
    {
484 180
        $method_sig_array = unpack('n2', mb_substr($payload, 0, 4, 'ASCII'));
485
486 180
        return sprintf('%s,%s', $method_sig_array[1], $method_sig_array[2]);
487
    }
488
489
    /**
490
     * @param string $payload
491
     * @return string
492
     */
493 180
    protected function extract_args($payload)
494
    {
495 180
        return mb_substr($payload, 4, mb_strlen($payload, 'ASCII') - 4, 'ASCII');
496
    }
497
498
    /**
499
     * @param array|null $allowed_methods
500
     * @param string $method_sig
501
     * @return bool
502
     */
503 180
    protected function should_dispatch_method($allowed_methods, $method_sig)
504
    {
505 180
        $protocolClass = self::$PROTOCOL_CONSTANTS_CLASS;
0 ignored issues
show
Deprecated Code introduced by
The property PhpAmqpLib\Channel\Abstr...ROTOCOL_CONSTANTS_CLASS has been deprecated.

This property has been deprecated. The supplier of the class has supplied an explanatory message.

The explanatory message should give you some clue as to whether and when the property will be removed from the class and what other property to use instead.

Loading history...
506
507 90
        return $allowed_methods == null
508 180
            || in_array($method_sig, $allowed_methods)
509 180
            || in_array($method_sig, $protocolClass::$CLOSE_METHODS);
510
    }
511
512
    /**
513
     * @param string $method_sig
514
     * @return AMQPMessage|null
515
     */
516 180
    protected function maybe_wait_for_content($method_sig)
517
    {
518 180
        $protocolClass = self::$PROTOCOL_CONSTANTS_CLASS;
0 ignored issues
show
Deprecated Code introduced by
The property PhpAmqpLib\Channel\Abstr...ROTOCOL_CONSTANTS_CLASS has been deprecated.

This property has been deprecated. The supplier of the class has supplied an explanatory message.

The explanatory message should give you some clue as to whether and when the property will be removed from the class and what other property to use instead.

Loading history...
519 180
        $amqpMessage = null;
520
521 180
        if (in_array($method_sig, $protocolClass::$CONTENT_METHODS)) {
522 66
            $amqpMessage = $this->wait_content();
523 33
        }
524
525 180
        return $amqpMessage;
526
    }
527
528
    /**
529
     * @param callable $handler
530
     * @param array $arguments
531
     */
532 6
    protected function dispatch_to_handler($handler, array $arguments)
533
    {
534 6
        if (is_callable($handler)) {
535 6
            call_user_func_array($handler, $arguments);
536 3
        }
537 6
    }
538
}
539