AbstractChannel::wait()   B
last analyzed

Complexity

Conditions 9
Paths 15

Size

Total Lines 48
Code Lines 28

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 19
CRAP Score 9.4259

Importance

Changes 0
Metric Value
cc 9
eloc 28
nc 15
nop 3
dl 0
loc 48
ccs 19
cts 23
cp 0.8261
crap 9.4259
rs 8.0555
c 0
b 0
f 0
1
<?php
2
3
namespace PhpAmqpLib\Channel;
4
5
use PhpAmqpLib\Connection\AbstractConnection;
6
use PhpAmqpLib\Exception\AMQPChannelClosedException;
7
use PhpAmqpLib\Exception\AMQPConnectionClosedException;
8
use PhpAmqpLib\Exception\AMQPInvalidFrameException;
9
use PhpAmqpLib\Exception\AMQPNoDataException;
10
use PhpAmqpLib\Exception\AMQPNotImplementedException;
11
use PhpAmqpLib\Exception\AMQPOutOfBoundsException;
12
use PhpAmqpLib\Exception\AMQPOutOfRangeException;
13
use PhpAmqpLib\Helper\DebugHelper;
14
use PhpAmqpLib\Helper\Protocol\MethodMap080;
15
use PhpAmqpLib\Helper\Protocol\MethodMap091;
16
use PhpAmqpLib\Helper\Protocol\Protocol080;
17
use PhpAmqpLib\Helper\Protocol\Protocol091;
18
use PhpAmqpLib\Helper\Protocol\Wait080;
19
use PhpAmqpLib\Helper\Protocol\Wait091;
20
use PhpAmqpLib\Message\AMQPMessage;
21
use PhpAmqpLib\Wire;
22
use PhpAmqpLib\Wire\AMQPReader;
23
24
abstract class AbstractChannel
25
{
26
    /**
27
     * @deprecated
28
     */
29
    const PROTOCOL_080 = Wire\Constants080::VERSION;
30
31
    /**
32
     * @deprecated
33
     */
34
    const PROTOCOL_091 = Wire\Constants091::VERSION;
35
36
    /**
37
     * Lower level queue for frames
38
     * @var \SplQueue|Frame[]
39
     */
40
    protected $frame_queue;
41
42
    /**
43
     * Higher level queue for methods
44
     * @var array
45
     */
46
    protected $method_queue = array();
47
48
    /** @var bool */
49
    protected $auto_decode = false;
50
51
    /** @var Wire\Constants */
52
    protected $constants;
53
54
    /** @var \PhpAmqpLib\Helper\DebugHelper */
55
    protected $debug;
56
57
    /** @var null|AbstractConnection */
58
    protected $connection;
59
60
    /**
61
     * @var string
62
     * @deprecated
63
     */
64
    protected $protocolVersion;
65
66
    /** @var int */
67
    protected $maxBodySize;
68
69
    /** @var Protocol080|Protocol091 */
70
    protected $protocolWriter;
71
72
    /** @var Wait080|Wait091 */
73
    protected $waitHelper;
74
75
    /** @var MethodMap080|MethodMap091 */
76
    protected $methodMap;
77
78
    /** @var int|null */
79
    protected $channel_id;
80
81
    /** @var Wire\AMQPBufferReader */
82
    protected $msg_property_reader;
83
84
    /** @var Wire\AMQPBufferReader */
85
    protected $dispatch_reader;
86
87
    /**
88
     * @param AbstractConnection $connection
89
     * @param int $channel_id
90
     * @throws \PhpAmqpLib\Exception\AMQPRuntimeException
91
     */
92 50
    public function __construct(AbstractConnection $connection, $channel_id)
93
    {
94 50
        $this->connection = $connection;
95 50
        $this->channel_id = (int)$channel_id;
96 50
        $connection->channels[$channel_id] = $this;
97
98 50
        $this->msg_property_reader = new Wire\AMQPBufferReader('');
99 50
        $this->dispatch_reader = new Wire\AMQPBufferReader('');
100 50
101
        $this->protocolVersion = self::getProtocolVersion();
0 ignored issues
show
Deprecated Code introduced by
The function PhpAmqpLib\Channel\Abstr...l::getProtocolVersion() has been deprecated. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-deprecated  annotation

101
        $this->protocolVersion = /** @scrutinizer ignore-deprecated */ self::getProtocolVersion();
Loading history...
Deprecated Code introduced by
The property PhpAmqpLib\Channel\Abstr...annel::$protocolVersion has been deprecated. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-deprecated  annotation

101
        /** @scrutinizer ignore-deprecated */ $this->protocolVersion = self::getProtocolVersion();
Loading history...
102 50
        switch ($this->protocolVersion) {
0 ignored issues
show
Deprecated Code introduced by
The property PhpAmqpLib\Channel\Abstr...annel::$protocolVersion has been deprecated. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-deprecated  annotation

102
        switch (/** @scrutinizer ignore-deprecated */ $this->protocolVersion) {
Loading history...
103 50
            case Wire\Constants091::VERSION:
104 50
                $constantClass = Wire\Constants091::class;
105 50
                $this->protocolWriter = new Protocol091();
106 50
                $this->waitHelper = new Wait091();
107 50
                $this->methodMap = new MethodMap091();
108 50
                break;
109 50
            case Wire\Constants080::VERSION:
110
                $constantClass = Wire\Constants080::class;
111
                $this->protocolWriter = new Protocol080();
112
                $this->waitHelper = new Wait080();
113
                $this->methodMap = new MethodMap080();
114
                break;
115
            default:
116
                throw new AMQPNotImplementedException(sprintf(
117
                    'Protocol: %s not implemented.',
118
                    $this->protocolVersion
0 ignored issues
show
Deprecated Code introduced by
The property PhpAmqpLib\Channel\Abstr...annel::$protocolVersion has been deprecated. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-deprecated  annotation

118
                    /** @scrutinizer ignore-deprecated */ $this->protocolVersion
Loading history...
119
                ));
120
        }
121
        $this->constants = new $constantClass();
122 50
        $this->debug = new DebugHelper($this->constants);
123 50
        $this->frame_queue = new \SplQueue();
124
    }
125
126
    /**
127
     * @return string
128
     * @throws AMQPOutOfRangeException
129
     * @deprecated
130 50
     */
131
    public static function getProtocolVersion()
132 50
    {
133
        $protocol = defined('AMQP_PROTOCOL') ? AMQP_PROTOCOL : Wire\Constants091::VERSION;
0 ignored issues
show
Bug introduced by
The constant PhpAmqpLib\Channel\AMQP_PROTOCOL was not found. Maybe you did not declare it correctly or list all dependencies?
Loading history...
134 50
        //adding check here to catch unknown protocol ASAP, as this method may be called from the outside
135
        if (!in_array($protocol, array(Wire\Constants080::VERSION, Wire\Constants091::VERSION), true)) {
136
            throw new AMQPOutOfRangeException(sprintf('Protocol version %s not implemented.', $protocol));
137
        }
138 50
139
        return $protocol;
140
    }
141
142
    /**
143
     * @return int|null
144
     */
145
    public function getChannelId()
146
    {
147
        return $this->channel_id;
148
    }
149
150
    /**
151
     * @param int $max_bytes Max message body size for this channel
152
     * @return $this
153
     */
154
    public function setBodySizeLimit($max_bytes)
155
    {
156
        $max_bytes = (int) $max_bytes;
157
158
        if ($max_bytes > 0) {
159
            $this->maxBodySize = $max_bytes;
160
        }
161
162
        return $this;
163
    }
164
165
    /**
166
     * @return AbstractConnection|null
167 8
     */
168
    public function getConnection()
169 8
    {
170
        return $this->connection;
171
    }
172
173
    /**
174
     * @return array
175
     */
176
    public function getMethodQueue()
177
    {
178
        return $this->method_queue;
179
    }
180
181
    /**
182
     * @return bool
183
     */
184
    public function hasPendingMethods()
185
    {
186
        return !empty($this->method_queue);
187
    }
188
189
    /**
190
     * @param string $method_sig
191
     * @param string $args
192
     * @param AMQPMessage|null $amqpMessage
193
     * @return mixed
194
     * @throws \PhpAmqpLib\Exception\AMQPRuntimeException
195 47
     */
196
    public function dispatch($method_sig, $args, $amqpMessage)
197 47
    {
198
        if (!$this->methodMap->valid_method($method_sig)) {
199
            throw new AMQPNotImplementedException(sprintf(
200
                'Unknown AMQP method "%s"',
201
                $method_sig
202
            ));
203
        }
204 47
205
        $amqp_method = $this->methodMap->get_method($method_sig);
206 47
207
        if (!method_exists($this, $amqp_method)) {
208
            throw new AMQPNotImplementedException(sprintf(
209
                'Method: "%s" not implemented by class: %s',
210
                $amqp_method,
211
                get_class($this)
212
            ));
213
        }
214 47
215
        $this->dispatch_reader->reset($args);
216 47
217 47
        if ($amqpMessage === null) {
218
            return call_user_func(array($this, $amqp_method), $this->dispatch_reader);
219
        }
220 12
221
        return call_user_func(array($this, $amqp_method), $this->dispatch_reader, $amqpMessage);
222
    }
223
224
    /**
225
     * @param int|float|null $timeout
226
     * @return Frame
227 47
     */
228
    protected function next_frame($timeout = 0): Frame
229 47
    {
230
        $this->debug->debug_msg('waiting for a new frame');
231 47
232 1
        if (!$this->frame_queue->isEmpty()) {
233
            return $this->frame_queue->dequeue();
234
        }
235 47
236
        return $this->connection->wait_channel($this->channel_id, $timeout);
0 ignored issues
show
Bug introduced by
The method wait_channel() does not exist on null. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-call  annotation

236
        return $this->connection->/** @scrutinizer ignore-call */ wait_channel($this->channel_id, $timeout);

This check looks for calls to methods that do not seem to exist on a given type. It looks for the method on the type itself as well as in inherited classes or implemented interfaces.

This is most likely a typographical error or the method has been renamed.

Loading history...
Bug introduced by
It seems like $this->channel_id can also be of type null; however, parameter $channel_id of PhpAmqpLib\Connection\Ab...nection::wait_channel() does only seem to accept integer, maybe add an additional type check? ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-type  annotation

236
        return $this->connection->wait_channel(/** @scrutinizer ignore-type */ $this->channel_id, $timeout);
Loading history...
237
    }
238
239
    /**
240
     * @param array $method_sig
241
     * @param \PhpAmqpLib\Wire\AMQPWriter|string $args
242 47
     */
243
    protected function send_method_frame($method_sig, $args = '')
244 47
    {
245 4
        if ($this->connection === null) {
246
            throw new AMQPChannelClosedException('Channel connection is closed.');
247
        }
248 47
249
        $this->connection->send_channel_method_frame($this->channel_id, $method_sig, $args);
250
    }
251
252
    /**
253
     * This is here for performance reasons to batch calls to fwrite from basic.publish
254
     *
255
     * @param array $method_sig
256
     * @param \PhpAmqpLib\Wire\AMQPWriter|string $args
257
     * @param \PhpAmqpLib\Wire\AMQPWriter $pkt
258
     * @return \PhpAmqpLib\Wire\AMQPWriter
259 14
     */
260
    protected function prepare_method_frame($method_sig, $args = '', $pkt = null)
261 14
    {
262
        return $this->connection->prepare_channel_method_frame($this->channel_id, $method_sig, $args, $pkt);
263
    }
264
265
    /**
266
     * @return AMQPMessage
267
     * @throws \PhpAmqpLib\Exception\AMQPRuntimeException
268 12
     * @throws AMQPInvalidFrameException
269
     */
270 12
    public function wait_content(): AMQPMessage
271
    {
272 12
        $frame = $this->next_frame();
273
        $this->validate_frame($frame, Frame::TYPE_HEADER);
274 12
        $payload = $frame->getPayload();
275
        // skip class-id and weight(4 bytes) and start from size, everything else is properties
276 12
        // @link https://www.rabbitmq.com/resources/specs/amqp0-9-1.pdf 4.2.6.1 The Content Header
277 12
        $this->msg_property_reader->reset(mb_substr($payload, 4, null, 'ASCII'));
0 ignored issues
show
Bug introduced by
It seems like $payload can also be of type null; however, parameter $string of mb_substr() does only seem to accept string, maybe add an additional type check? ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-type  annotation

277
        $this->msg_property_reader->reset(mb_substr(/** @scrutinizer ignore-type */ $payload, 4, null, 'ASCII'));
Loading history...
278
        $size = $this->msg_property_reader->read_longlong();
279
280 12
        return $this->createMessage(
281
            $this->msg_property_reader,
282 12
            $size
0 ignored issues
show
Bug introduced by
It seems like $size can also be of type string; however, parameter $bodySize of PhpAmqpLib\Channel\Abstr...hannel::createMessage() does only seem to accept integer, maybe add an additional type check? ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-type  annotation

282
            /** @scrutinizer ignore-type */ $size
Loading history...
283 12
        );
284 12
    }
285
286
    protected function createMessage(AMQPReader $propertyReader, int $bodySize): AMQPMessage
287
    {
288
        $body = '';
289
        $bodyReceivedBytes = 0;
290
        $message = new AMQPMessage();
291
        $message
292
            ->load_properties($propertyReader)
293 12
            ->setBodySize($bodySize);
294
295 12
        while ($bodySize > $bodyReceivedBytes) {
296 12
            $frame = $this->next_frame();
297 12
            // @link https://www.rabbitmq.com/resources/specs/amqp0-9-1.pdf 4.2.6.2 The Content Body
298
            $this->validate_frame($frame, Frame::TYPE_BODY);
299 12
            $bodyReceivedBytes += $frame->getSize();
300 12
301
            if (is_int($this->maxBodySize) && $bodyReceivedBytes > $this->maxBodySize) {
302 12
                $message->setIsTruncated(true);
303 11
                continue;
304
            }
305 11
306 11
            $body .= $frame->getPayload();
307
        }
308 11
309
        $message->setBody($body);
310
311
        return $message;
312
    }
313 11
314
    /**
315
     * Wait for some expected AMQP methods and dispatch to them.
316 12
     * Unexpected methods are queued up for later calls to this PHP
317
     * method.
318 12
     *
319
     * @param array|null $allowed_methods
320
     * @param bool $non_blocking
321
     * @param int|float|null $timeout
322
     * @return mixed
323
     * @throws \PhpAmqpLib\Exception\AMQPRuntimeException
324
     * @throws \PhpAmqpLib\Exception\AMQPTimeoutException
325
     * @throws \PhpAmqpLib\Exception\AMQPConnectionClosedException
326
     * @throws AMQPOutOfBoundsException
327
     */
328
    public function wait($allowed_methods = null, $non_blocking = false, $timeout = 0)
329
    {
330
        $this->debug->debug_allowed_methods($allowed_methods);
331
332
        $deferred = $this->process_deferred_methods($allowed_methods);
333
        if ($deferred['dispatch'] === true) {
334
            return $this->dispatch_deferred_method($deferred['queued_method']);
335 47
        }
336
337 47
        // timeouts must be deactivated for non-blocking actions
338
        if (true === $non_blocking) {
339 47
            $timeout = null;
340 47
        }
341
342
        // No deferred methods?  wait for new ones
343
        while (true) {
344
            try {
345 47
                $frame = $this->next_frame($timeout);
346 4
            } catch (AMQPNoDataException $e) {
347
                // no data ready for non-blocking actions - stop and exit
348
                break;
349
            } catch (AMQPConnectionClosedException $exception) {
350 47
                if ($this instanceof AMQPChannel) {
351
                    $this->do_close();
352 47
                }
353 13
                throw $exception;
354
            }
355 8
356 5
            $this->validate_method_frame($frame);
357
            $this->validate_frame_payload($frame);
358
            $payload = $frame->getPayload();
359
            $method = $this->parseMethod($payload);
0 ignored issues
show
Bug introduced by
It seems like $payload can also be of type null; however, parameter $payload of PhpAmqpLib\Channel\AbstractChannel::parseMethod() does only seem to accept string, maybe add an additional type check? ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-type  annotation

359
            $method = $this->parseMethod(/** @scrutinizer ignore-type */ $payload);
Loading history...
360
            $method_sig = $method->getSignature();
361
362
            $this->debug->debug_method_signature('> %s', $method_sig);
363 47
364 47
            $amqpMessage = $this->maybe_wait_for_content($method_sig);
365
366 47
            if ($this->should_dispatch_method($allowed_methods, $method_sig)) {
367 47
                return $this->dispatch($method_sig, $method->getArguments(), $amqpMessage);
368
            }
369 47
370
            // Wasn't what we were looking for? save it for later
371 47
            $this->debug->debug_method_signature('Queueing for later: %s', $method_sig);
372
            $this->method_queue[] = array($method_sig, $method->getArguments(), $amqpMessage);
373 47
374 47
            if ($non_blocking) {
375
                break;
376
            }
377
        }
378
    }
379
380
    /**
381
     * @param array|null $allowed_methods
382
     * @return array
383
     */
384
    protected function process_deferred_methods($allowed_methods)
385
    {
386
        $dispatch = false;
387
        $queued_method = array();
388
389
        foreach ($this->method_queue as $qk => $qm) {
390
            $this->debug->debug_msg('checking queue method ' . $qk);
391 47
392
            $method_sig = $qm[0];
393 47
394 47
            if ($allowed_methods === null || in_array($method_sig, $allowed_methods, true)) {
395
                unset($this->method_queue[$qk]);
396 47
                $dispatch = true;
397
                $queued_method = $qm;
398
                break;
399
            }
400
        }
401
402
        return array('dispatch' => $dispatch, 'queued_method' => $queued_method);
403
    }
404
405
    /**
406
     * @param array $queued_method
407
     * @return mixed
408
     */
409 47
    protected function dispatch_deferred_method($queued_method)
410
    {
411
        $this->debug->debug_method_signature('Executing queued method: %s', $queued_method[0]);
412
413
        return $this->dispatch($queued_method[0], $queued_method[1], $queued_method[2]);
414
    }
415
416
    /**
417
     * @param Frame $frame
418
     * @throws \PhpAmqpLib\Exception\AMQPInvalidFrameException
419
     */
420
    protected function validate_method_frame(Frame $frame): void
421
    {
422
        $this->validate_frame($frame, Frame::TYPE_METHOD);
423
    }
424
425
    /**
426
     * @param Frame $frame
427 47
     * @param int $expectedType
428
     * @throws AMQPInvalidFrameException
429 47
     */
430
    protected function validate_frame(Frame $frame, int $expectedType): void
431
    {
432
        if ($frame->getType() !== $expectedType) {
433
            throw new AMQPInvalidFrameException(sprintf(
434
                'Expecting %u, received frame type %s (%s)',
435
                $expectedType,
436 12
                $frame->getType(),
437
                $this->constants->getFrameType($frame->getType())
438 12
            ));
439
        }
440
    }
441
442
    /**
443
     * @param Frame $frame
444
     * @throws AMQPOutOfBoundsException
445 11
     * @throws AMQPInvalidFrameException
446
     */
447 11
    protected function validate_frame_payload(Frame $frame): void
448
    {
449
        $payload = $frame->getPayload();
450
        $payloadSize = mb_strlen($payload, 'ASCII');
0 ignored issues
show
Bug introduced by
It seems like $payload can also be of type null; however, parameter $string of mb_strlen() does only seem to accept string, maybe add an additional type check? ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-type  annotation

450
        $payloadSize = mb_strlen(/** @scrutinizer ignore-type */ $payload, 'ASCII');
Loading history...
451
        if ($payloadSize < 4) {
452
            throw new AMQPOutOfBoundsException('Method frame too short');
453
        }
454
        if ($payloadSize !== $frame->getSize()) {
455 47
            throw new AMQPInvalidFrameException('Frame size does not match payload');
456
        }
457 47
    }
458
459
    protected function parseMethod(string $payload): Method
460
    {
461
        $result = unpack('n2method/a*args', $payload);
462
463
        return new Method($result['method1'], $result['method2'], $result['args']);
464
    }
465
466
    /**
467
     * @param array|null $allowed_methods
468
     * @param string $method_sig
469
     * @return bool
470
     */
471 47
    protected function should_dispatch_method($allowed_methods, $method_sig)
472
    {
473 47
        return $allowed_methods === null
474
            || in_array($method_sig, $allowed_methods, true)
475
            || $this->constants->isCloseMethod($method_sig);
476
    }
477
478
    /**
479
     * @param string $method_sig
480
     * @return AMQPMessage|null
481
     */
482 47
    protected function maybe_wait_for_content($method_sig)
483
    {
484 47
        $amqpMessage = null;
485
        if ($this->constants->isContentMethod($method_sig)) {
486 47
            $amqpMessage = $this->wait_content();
487
        }
488
489
        return $amqpMessage;
490
    }
491
492
    /**
493 47
     * @param callable $handler
494
     * @param array $arguments
495 47
     */
496
    protected function dispatch_to_handler($handler, array $arguments = [])
497
    {
498
        if (is_callable($handler)) {
499
            call_user_func_array($handler, $arguments);
500
        }
501
    }
502
}
503