AbstractChannel::wait()   B
last analyzed

Complexity

Conditions 9
Paths 15

Size

Total Lines 48
Code Lines 28

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 17
CRAP Score 9.5599

Importance

Changes 0
Metric Value
cc 9
eloc 28
nc 15
nop 3
dl 0
loc 48
ccs 17
cts 21
cp 0.8095
crap 9.5599
rs 8.0555
c 0
b 0
f 0
1
<?php
2
3
namespace PhpAmqpLib\Channel;
4
5
use PhpAmqpLib\Connection\AbstractConnection;
6
use PhpAmqpLib\Exception\AMQPChannelClosedException;
7
use PhpAmqpLib\Exception\AMQPConnectionClosedException;
8
use PhpAmqpLib\Exception\AMQPInvalidFrameException;
9
use PhpAmqpLib\Exception\AMQPNoDataException;
10
use PhpAmqpLib\Exception\AMQPNotImplementedException;
11
use PhpAmqpLib\Exception\AMQPOutOfBoundsException;
12
use PhpAmqpLib\Exception\AMQPOutOfRangeException;
13
use PhpAmqpLib\Helper\DebugHelper;
14
use PhpAmqpLib\Helper\Protocol\MethodMap080;
15
use PhpAmqpLib\Helper\Protocol\MethodMap091;
16
use PhpAmqpLib\Helper\Protocol\Protocol080;
17
use PhpAmqpLib\Helper\Protocol\Protocol091;
18
use PhpAmqpLib\Helper\Protocol\Wait080;
19
use PhpAmqpLib\Helper\Protocol\Wait091;
20
use PhpAmqpLib\Message\AMQPMessage;
21
use PhpAmqpLib\Wire;
22
use PhpAmqpLib\Wire\AMQPReader;
23
24
abstract class AbstractChannel
25
{
26
    /**
27
     * @deprecated
28
     */
29
    const PROTOCOL_080 = Wire\Constants080::VERSION;
30
31
    /**
32
     * @deprecated
33
     */
34
    const PROTOCOL_091 = Wire\Constants091::VERSION;
35
36
    /**
37
     * Lower level queue for frames
38
     * @var \SplQueue|Frame[]
39
     */
40
    protected $frame_queue;
41
42
    /**
43
     * Higher level queue for methods
44
     * @var array
45
     */
46
    protected $method_queue = array();
47
48
    /** @var bool */
49
    protected $auto_decode = false;
50
51
    /** @var Wire\Constants */
52
    protected $constants;
53
54
    /** @var \PhpAmqpLib\Helper\DebugHelper */
55
    protected $debug;
56
57
    /** @var null|AbstractConnection */
58
    protected $connection;
59
60
    /** @var string */
61
    protected $protocolVersion;
62
63
    /** @var int */
64
    protected $maxBodySize;
65
66
    /** @var Protocol080|Protocol091 */
67
    protected $protocolWriter;
68
69
    /** @var Wait080|Wait091 */
70
    protected $waitHelper;
71
72
    /** @var MethodMap080|MethodMap091 */
73
    protected $methodMap;
74
75
    /** @var int|null */
76
    protected $channel_id;
77
78
    /** @var Wire\AMQPBufferReader */
79
    protected $msg_property_reader;
80
81
    /** @var Wire\AMQPBufferReader */
82
    protected $dispatch_reader;
83
84
    /**
85
     * @param AbstractConnection $connection
86
     * @param int $channel_id
87
     * @throws \PhpAmqpLib\Exception\AMQPRuntimeException
88
     */
89
    public function __construct(AbstractConnection $connection, $channel_id)
90
    {
91
        $this->connection = $connection;
92 50
        $this->channel_id = (int)$channel_id;
93
        $connection->channels[$channel_id] = $this;
94 50
95 50
        $this->msg_property_reader = new Wire\AMQPBufferReader('');
96 50
        $this->dispatch_reader = new Wire\AMQPBufferReader('');
97
98 50
        $this->protocolVersion = self::getProtocolVersion();
99 50
        switch ($this->protocolVersion) {
100 50
            case Wire\Constants091::VERSION:
101
                $constantClass = Wire\Constants091::class;
102 50
                $this->protocolWriter = new Protocol091();
103 50
                $this->waitHelper = new Wait091();
104 50
                $this->methodMap = new MethodMap091();
105 50
                break;
106 50
            case Wire\Constants080::VERSION:
107 50
                $constantClass = Wire\Constants080::class;
108 50
                $this->protocolWriter = new Protocol080();
109 50
                $this->waitHelper = new Wait080();
110
                $this->methodMap = new MethodMap080();
111
                break;
112
            default:
113
                throw new AMQPNotImplementedException(sprintf(
114
                    'Protocol: %s not implemented.',
115
                    $this->protocolVersion
116
                ));
117
        }
118
        $this->constants = new $constantClass();
119
        $this->debug = new DebugHelper($this->constants);
120
        $this->frame_queue = new \SplQueue();
121
    }
122 50
123 50
    /**
124
     * @return string
125
     * @throws AMQPOutOfRangeException
126
     */
127
    public static function getProtocolVersion()
128
    {
129
        $protocol = defined('AMQP_PROTOCOL') ? AMQP_PROTOCOL : Wire\Constants091::VERSION;
0 ignored issues
show
Bug introduced by
The constant PhpAmqpLib\Channel\AMQP_PROTOCOL was not found. Maybe you did not declare it correctly or list all dependencies?
Loading history...
130 50
        //adding check here to catch unknown protocol ASAP, as this method may be called from the outside
131
        if (!in_array($protocol, array(Wire\Constants080::VERSION, Wire\Constants091::VERSION), true)) {
132 50
            throw new AMQPOutOfRangeException(sprintf('Protocol version %s not implemented.', $protocol));
133
        }
134 50
135
        return $protocol;
136
    }
137
138 50
    /**
139
     * @return int|null
140
     */
141
    public function getChannelId()
142
    {
143
        return $this->channel_id;
144
    }
145
146
    /**
147
     * @param int $max_bytes Max message body size for this channel
148
     * @return $this
149
     */
150
    public function setBodySizeLimit($max_bytes)
151
    {
152
        $max_bytes = (int) $max_bytes;
153
154
        if ($max_bytes > 0) {
155
            $this->maxBodySize = $max_bytes;
156
        }
157
158
        return $this;
159
    }
160
161
    /**
162
     * @return AbstractConnection|null
163
     */
164
    public function getConnection()
165
    {
166
        return $this->connection;
167 8
    }
168
169 8
    /**
170
     * @return array
171
     */
172
    public function getMethodQueue()
173
    {
174
        return $this->method_queue;
175
    }
176
177
    /**
178
     * @return bool
179
     */
180
    public function hasPendingMethods()
181
    {
182
        return !empty($this->method_queue);
183
    }
184
185
    /**
186
     * @param string $method_sig
187
     * @param string $args
188
     * @param AMQPMessage|null $amqpMessage
189
     * @return mixed
190
     * @throws \PhpAmqpLib\Exception\AMQPRuntimeException
191
     */
192
    public function dispatch($method_sig, $args, $amqpMessage)
193
    {
194
        if (!$this->methodMap->valid_method($method_sig)) {
195 47
            throw new AMQPNotImplementedException(sprintf(
196
                'Unknown AMQP method "%s"',
197 47
                $method_sig
198
            ));
199
        }
200
201
        $amqp_method = $this->methodMap->get_method($method_sig);
202
203
        if (!method_exists($this, $amqp_method)) {
204 47
            throw new AMQPNotImplementedException(sprintf(
205
                'Method: "%s" not implemented by class: %s',
206 47
                $amqp_method,
207
                get_class($this)
208
            ));
209
        }
210
211
        $this->dispatch_reader->reset($args);
212
213
        if ($amqpMessage === null) {
214 47
            return call_user_func(array($this, $amqp_method), $this->dispatch_reader);
215
        }
216 47
217 47
        return call_user_func(array($this, $amqp_method), $this->dispatch_reader, $amqpMessage);
218
    }
219
220 12
    /**
221
     * @param int|float|null $timeout
222
     * @return Frame
223
     */
224
    protected function next_frame($timeout = 0): Frame
225
    {
226
        $this->debug->debug_msg('waiting for a new frame');
227 47
228
        if (!$this->frame_queue->isEmpty()) {
229 47
            return $this->frame_queue->dequeue();
230
        }
231 47
232 1
        return $this->connection->wait_channel($this->channel_id, $timeout);
0 ignored issues
show
Bug introduced by
The method wait_channel() does not exist on null. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-call  annotation

232
        return $this->connection->/** @scrutinizer ignore-call */ wait_channel($this->channel_id, $timeout);

This check looks for calls to methods that do not seem to exist on a given type. It looks for the method on the type itself as well as in inherited classes or implemented interfaces.

This is most likely a typographical error or the method has been renamed.

Loading history...
Bug introduced by
It seems like $this->channel_id can also be of type null; however, parameter $channel_id of PhpAmqpLib\Connection\Ab...nection::wait_channel() does only seem to accept integer, maybe add an additional type check? ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-type  annotation

232
        return $this->connection->wait_channel(/** @scrutinizer ignore-type */ $this->channel_id, $timeout);
Loading history...
233
    }
234
235 47
    /**
236
     * @param array $method_sig
237
     * @param \PhpAmqpLib\Wire\AMQPWriter|string $args
238
     */
239
    protected function send_method_frame($method_sig, $args = '')
240
    {
241
        if ($this->connection === null) {
242 47
            throw new AMQPChannelClosedException('Channel connection is closed.');
243
        }
244 47
245 4
        $this->connection->send_channel_method_frame($this->channel_id, $method_sig, $args);
246
    }
247
248 47
    /**
249
     * This is here for performance reasons to batch calls to fwrite from basic.publish
250
     *
251
     * @param array $method_sig
252
     * @param \PhpAmqpLib\Wire\AMQPWriter|string $args
253
     * @param \PhpAmqpLib\Wire\AMQPWriter $pkt
254
     * @return \PhpAmqpLib\Wire\AMQPWriter
255
     */
256
    protected function prepare_method_frame($method_sig, $args = '', $pkt = null)
257
    {
258
        return $this->connection->prepare_channel_method_frame($this->channel_id, $method_sig, $args, $pkt);
259 14
    }
260
261 14
    /**
262
     * @return AMQPMessage
263
     * @throws \PhpAmqpLib\Exception\AMQPRuntimeException
264
     * @throws AMQPInvalidFrameException
265
     */
266
    public function wait_content(): AMQPMessage
267
    {
268 12
        $frame = $this->next_frame();
269
        $this->validate_frame($frame, Frame::TYPE_HEADER);
270 12
        $payload = $frame->getPayload();
271
        // skip class-id and weight(4 bytes) and start from size, everything else is properties
272 12
        // @link https://www.rabbitmq.com/resources/specs/amqp0-9-1.pdf 4.2.6.1 The Content Header
273
        $this->msg_property_reader->reset(mb_substr($payload, 4, null, 'ASCII'));
0 ignored issues
show
Bug introduced by
It seems like $payload can also be of type null; however, parameter $string of mb_substr() does only seem to accept string, maybe add an additional type check? ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-type  annotation

273
        $this->msg_property_reader->reset(mb_substr(/** @scrutinizer ignore-type */ $payload, 4, null, 'ASCII'));
Loading history...
274 12
        $size = $this->msg_property_reader->read_longlong();
275
276 12
        return $this->createMessage(
277 12
            $this->msg_property_reader,
278
            $size
0 ignored issues
show
Bug introduced by
It seems like $size can also be of type string; however, parameter $bodySize of PhpAmqpLib\Channel\Abstr...hannel::createMessage() does only seem to accept integer, maybe add an additional type check? ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-type  annotation

278
            /** @scrutinizer ignore-type */ $size
Loading history...
279
        );
280 12
    }
281
282 12
    protected function createMessage(AMQPReader $propertyReader, int $bodySize): AMQPMessage
283 12
    {
284 12
        $body = '';
285
        $bodyReceivedBytes = 0;
286
        $message = new AMQPMessage();
287
        $message
288
            ->load_properties($propertyReader)
289
            ->setBodySize($bodySize);
290
291
        while ($bodySize > $bodyReceivedBytes) {
292
            $frame = $this->next_frame();
293 12
            // @link https://www.rabbitmq.com/resources/specs/amqp0-9-1.pdf 4.2.6.2 The Content Body
294
            $this->validate_frame($frame, Frame::TYPE_BODY);
295 12
            $bodyReceivedBytes += $frame->getSize();
296 12
297 12
            if (is_int($this->maxBodySize) && $bodyReceivedBytes > $this->maxBodySize) {
298
                $message->setIsTruncated(true);
299 12
                continue;
300 12
            }
301
302 12
            $body .= $frame->getPayload();
303 11
        }
304
305 11
        $message->setBody($body);
306 11
307
        return $message;
308 11
    }
309
310
    /**
311
     * Wait for some expected AMQP methods and dispatch to them.
312
     * Unexpected methods are queued up for later calls to this PHP
313 11
     * method.
314
     *
315
     * @param array|null $allowed_methods
316 12
     * @param bool $non_blocking
317
     * @param int|float|null $timeout
318 12
     * @return mixed
319
     * @throws \PhpAmqpLib\Exception\AMQPRuntimeException
320
     * @throws \PhpAmqpLib\Exception\AMQPTimeoutException
321
     * @throws \PhpAmqpLib\Exception\AMQPConnectionClosedException
322
     * @throws AMQPOutOfBoundsException
323
     */
324
    public function wait($allowed_methods = null, $non_blocking = false, $timeout = 0)
325
    {
326
        $this->debug->debug_allowed_methods($allowed_methods);
327
328
        $deferred = $this->process_deferred_methods($allowed_methods);
329
        if ($deferred['dispatch'] === true) {
330
            return $this->dispatch_deferred_method($deferred['queued_method']);
331
        }
332
333
        // timeouts must be deactivated for non-blocking actions
334
        if (true === $non_blocking) {
335 47
            $timeout = null;
336
        }
337 47
338
        // No deferred methods?  wait for new ones
339 47
        while (true) {
340 47
            try {
341
                $frame = $this->next_frame($timeout);
342
            } catch (AMQPNoDataException $e) {
343
                // no data ready for non-blocking actions - stop and exit
344
                break;
345 47
            } catch (AMQPConnectionClosedException $exception) {
346 4
                if ($this instanceof AMQPChannel) {
347
                    $this->do_close();
348
                }
349
                throw $exception;
350 47
            }
351
352 47
            $this->validate_method_frame($frame);
353 13
            $this->validate_frame_payload($frame);
354
            $payload = $frame->getPayload();
355 8
            $method = $this->parseMethod($payload);
0 ignored issues
show
Bug introduced by
It seems like $payload can also be of type null; however, parameter $payload of PhpAmqpLib\Channel\AbstractChannel::parseMethod() does only seem to accept string, maybe add an additional type check? ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-type  annotation

355
            $method = $this->parseMethod(/** @scrutinizer ignore-type */ $payload);
Loading history...
356 5
            $method_sig = $method->getSignature();
357
358
            $this->debug->debug_method_signature('> %s', $method_sig);
359
360
            $amqpMessage = $this->maybe_wait_for_content($method_sig);
361
362
            if ($this->should_dispatch_method($allowed_methods, $method_sig)) {
363 47
                return $this->dispatch($method_sig, $method->getArguments(), $amqpMessage);
364 47
            }
365
366 47
            // Wasn't what we were looking for? save it for later
367 47
            $this->debug->debug_method_signature('Queueing for later: %s', $method_sig);
368
            $this->method_queue[] = array($method_sig, $method->getArguments(), $amqpMessage);
369 47
370
            if ($non_blocking) {
371 47
                break;
372
            }
373 47
        }
374 47
    }
375
376
    /**
377
     * @param array|null $allowed_methods
378
     * @return array
379
     */
380
    protected function process_deferred_methods($allowed_methods)
381
    {
382
        $dispatch = false;
383
        $queued_method = array();
384
385
        foreach ($this->method_queue as $qk => $qm) {
386
            $this->debug->debug_msg('checking queue method ' . $qk);
387
388
            $method_sig = $qm[0];
389
390
            if ($allowed_methods === null || in_array($method_sig, $allowed_methods, true)) {
391 47
                unset($this->method_queue[$qk]);
392
                $dispatch = true;
393 47
                $queued_method = $qm;
394 47
                break;
395
            }
396 47
        }
397
398
        return array('dispatch' => $dispatch, 'queued_method' => $queued_method);
399
    }
400
401
    /**
402
     * @param array $queued_method
403
     * @return mixed
404
     */
405
    protected function dispatch_deferred_method($queued_method)
406
    {
407
        $this->debug->debug_method_signature('Executing queued method: %s', $queued_method[0]);
408
409 47
        return $this->dispatch($queued_method[0], $queued_method[1], $queued_method[2]);
410
    }
411
412
    /**
413
     * @param Frame $frame
414
     * @throws \PhpAmqpLib\Exception\AMQPInvalidFrameException
415
     */
416
    protected function validate_method_frame(Frame $frame): void
417
    {
418
        $this->validate_frame($frame, Frame::TYPE_METHOD);
419
    }
420
421
    /**
422
     * @param Frame $frame
423
     * @param int $expectedType
424
     * @throws AMQPInvalidFrameException
425
     */
426
    protected function validate_frame(Frame $frame, int $expectedType): void
427 47
    {
428
        if ($frame->getType() !== $expectedType) {
429 47
            throw new AMQPInvalidFrameException(sprintf(
430
                'Expecting %u, received frame type %s (%s)',
431
                $expectedType,
432
                $frame->getType(),
433
                $this->constants->getFrameType($frame->getType())
434
            ));
435
        }
436 12
    }
437
438 12
    /**
439
     * @param Frame $frame
440
     * @throws AMQPOutOfBoundsException
441
     * @throws AMQPInvalidFrameException
442
     */
443
    protected function validate_frame_payload(Frame $frame): void
444
    {
445 11
        $payload = $frame->getPayload();
446
        $payloadSize = mb_strlen($payload, 'ASCII');
0 ignored issues
show
Bug introduced by
It seems like $payload can also be of type null; however, parameter $string of mb_strlen() does only seem to accept string, maybe add an additional type check? ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-type  annotation

446
        $payloadSize = mb_strlen(/** @scrutinizer ignore-type */ $payload, 'ASCII');
Loading history...
447 11
        if ($payloadSize < 4) {
448
            throw new AMQPOutOfBoundsException('Method frame too short');
449
        }
450
        if ($payloadSize !== $frame->getSize()) {
451
            throw new AMQPInvalidFrameException('Frame size does not match payload');
452
        }
453
    }
454
455 47
    protected function parseMethod(string $payload): Method
456
    {
457 47
        $result = unpack('n2method/a*args', $payload);
458
459
        return new Method($result['method1'], $result['method2'], $result['args']);
460
    }
461
462
    /**
463
     * @param array|null $allowed_methods
464
     * @param string $method_sig
465
     * @return bool
466
     */
467
    protected function should_dispatch_method($allowed_methods, $method_sig)
468
    {
469
        return $allowed_methods === null
470
            || in_array($method_sig, $allowed_methods, true)
471 47
            || $this->constants->isCloseMethod($method_sig);
472
    }
473 47
474
    /**
475
     * @param string $method_sig
476
     * @return AMQPMessage|null
477
     */
478
    protected function maybe_wait_for_content($method_sig)
479
    {
480
        $amqpMessage = null;
481
        if ($this->constants->isContentMethod($method_sig)) {
482 47
            $amqpMessage = $this->wait_content();
483
        }
484 47
485
        return $amqpMessage;
486 47
    }
487
488
    /**
489
     * @param callable $handler
490
     * @param array $arguments
491
     */
492
    protected function dispatch_to_handler($handler, array $arguments = [])
493 47
    {
494
        if (is_callable($handler)) {
495 47
            call_user_func_array($handler, $arguments);
496
        }
497
    }
498
}
499