Completed
Push — master ( d75eab...48de7d )
by
unknown
24s
created

AbstractChannel::validate_frame_payload()   A

Complexity

Conditions 2
Paths 2

Size

Total Lines 6

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 3
CRAP Score 2.0625

Importance

Changes 0
Metric Value
dl 0
loc 6
ccs 3
cts 4
cp 0.75
rs 10
c 0
b 0
f 0
cc 2
nc 2
nop 1
crap 2.0625
1
<?php
2
namespace PhpAmqpLib\Channel;
3
4
use PhpAmqpLib\Connection\AbstractConnection;
5
use PhpAmqpLib\Exception\AMQPChannelClosedException;
6
use PhpAmqpLib\Exception\AMQPConnectionClosedException;
7
use PhpAmqpLib\Exception\AMQPInvalidFrameException;
8
use PhpAmqpLib\Exception\AMQPNoDataException;
9
use PhpAmqpLib\Exception\AMQPNotImplementedException;
10
use PhpAmqpLib\Exception\AMQPOutOfBoundsException;
11
use PhpAmqpLib\Exception\AMQPOutOfRangeException;
12
use PhpAmqpLib\Helper\DebugHelper;
13
use PhpAmqpLib\Helper\Protocol\MethodMap080;
14
use PhpAmqpLib\Helper\Protocol\MethodMap091;
15
use PhpAmqpLib\Helper\Protocol\Protocol080;
16
use PhpAmqpLib\Helper\Protocol\Protocol091;
17
use PhpAmqpLib\Helper\Protocol\Wait080;
18
use PhpAmqpLib\Helper\Protocol\Wait091;
19
use PhpAmqpLib\Message\AMQPMessage;
20
use PhpAmqpLib\Wire\AMQPReader;
21
22
abstract class AbstractChannel
23
{
24
    const PROTOCOL_080 = '0.8';
25
    const PROTOCOL_091 = '0.9.1';
26
27
    public static $PROTOCOL_CONSTANTS_CLASS;
28
29
    /** @var array */
30
    protected $frame_queue;
31
32
    /** @var array */
33
    protected $method_queue;
34
35
    /** @var bool */
36
    protected $auto_decode;
37
38
    /** @var string */
39
    protected $amqp_protocol_header;
40
41
    /** @var \PhpAmqpLib\Helper\DebugHelper */
42
    protected $debug;
43
44
    /** @var \PhpAmqpLib\Connection\AbstractConnection */
45
    protected $connection;
46
47
    /** @var string */
48
    protected $protocolVersion;
49
50
    /** @var int */
51
    protected $maxBodySize;
52
53
    /** @var \PhpAmqpLib\Helper\Protocol\Protocol080|\PhpAmqpLib\Helper\Protocol\Protocol091 */
54
    protected $protocolWriter;
55
56
    /** @var \PhpAmqpLib\Helper\Protocol\Wait080|\PhpAmqpLib\Helper\Protocol\Wait091 */
57
    protected $waitHelper;
58
59
    /** @var \PhpAmqpLib\Helper\Protocol\MethodMap080|\PhpAmqpLib\Helper\Protocol\MethodMap091 */
60
    protected $methodMap;
61
62
    /** @var string */
63
    protected $channel_id;
64
65
    /** @var \PhpAmqpLib\Wire\AMQPReader */
66
    protected $msg_property_reader;
67
68
    /** @var \PhpAmqpLib\Wire\AMQPReader */
69
    protected $wait_content_reader;
70
71
    /** @var \PhpAmqpLib\Wire\AMQPReader */
72
    protected $dispatch_reader;
73
74
    /**
75
     * @param AbstractConnection $connection
76
     * @param string $channel_id
77
     * @throws \PhpAmqpLib\Exception\AMQPRuntimeException
78
     */
79 180
    public function __construct(AbstractConnection $connection, $channel_id)
80
    {
81 180
        $this->connection = $connection;
82 180
        $this->channel_id = $channel_id;
83 180
        $connection->channels[$channel_id] = $this;
84 180
        $this->frame_queue = array(); // Lower level queue for frames
85 180
        $this->method_queue = array(); // Higher level queue for methods
86 180
        $this->auto_decode = false;
87
88 180
        $this->msg_property_reader = new AMQPReader(null);
89 180
        $this->wait_content_reader = new AMQPReader(null);
90 180
        $this->dispatch_reader = new AMQPReader(null);
91
92 180
        $this->protocolVersion = self::getProtocolVersion();
93 180
        switch ($this->protocolVersion) {
94 180 View Code Duplication
            case self::PROTOCOL_091:
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated across your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
95 180
                self::$PROTOCOL_CONSTANTS_CLASS = 'PhpAmqpLib\Wire\Constants091';
96 180
                $c = self::$PROTOCOL_CONSTANTS_CLASS;
97 180
                $this->debug = new DebugHelper($c);
98 180
                $this->amqp_protocol_header = $c::$AMQP_PROTOCOL_HEADER;
99 180
                $this->protocolWriter = new Protocol091();
100 180
                $this->waitHelper = new Wait091();
101 180
                $this->methodMap = new MethodMap091();
102 180
                break;
103 View Code Duplication
            case self::PROTOCOL_080:
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated across your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
104
                self::$PROTOCOL_CONSTANTS_CLASS = 'PhpAmqpLib\Wire\Constants080';
105
                $c = self::$PROTOCOL_CONSTANTS_CLASS;
106
                $this->debug = new DebugHelper($c);
107
                $this->amqp_protocol_header = $c::$AMQP_PROTOCOL_HEADER;
108
                $this->protocolWriter = new Protocol080();
109
                $this->waitHelper = new Wait080();
110
                $this->methodMap = new MethodMap080();
111
                break;
112
            default:
113
                throw new AMQPNotImplementedException(sprintf(
114
                    'Protocol: %s not implemented.',
115
                    $this->protocolVersion
116
                ));
117 90
        }
118 180
    }
119
120
    /**
121
     * @return string
122
     * @throws AMQPOutOfRangeException
123
     */
124 180
    public static function getProtocolVersion()
125
    {
126 180
        $protocol = defined('AMQP_PROTOCOL') ? AMQP_PROTOCOL : self::PROTOCOL_091;
127
        //adding check here to catch unknown protocol ASAP, as this method may be called from the outside
128 180
        if (!in_array($protocol, array(self::PROTOCOL_080, self::PROTOCOL_091), TRUE)) {
129
            throw new AMQPOutOfRangeException(sprintf('Protocol version %s not implemented.', $protocol));
130
        }
131
132 180
        return $protocol;
133
    }
134
135
    /**
136
     * @return string
137
     */
138
    public function getChannelId()
139
    {
140
        return $this->channel_id;
141
    }
142
143
    /**
144
     * @param int $max_bytes Max message body size for this channel
145
     * @return $this
146
     */
147
    public function setBodySizeLimit($max_bytes)
148
    {
149
        $max_bytes = (int) $max_bytes;
150
151
        if ($max_bytes > 0) {
152
            $this->maxBodySize = $max_bytes;
153
        }
154
155
        return $this;
156
    }
157
158
    /**
159
     * @return AbstractConnection
160
     */
161 48
    public function getConnection()
162
    {
163 48
        return $this->connection;
164
    }
165
166
    /**
167
     * @return array
168
     */
169
    public function getMethodQueue()
170
    {
171
        return $this->method_queue;
172
    }
173
174
    /**
175
     * @return bool
176
     */
177
    public function hasPendingMethods()
178
    {
179
        return !empty($this->method_queue);
180
    }
181
182
    /**
183
     * @param string $method_sig
184
     * @param string $args
185
     * @param AMQPMessage|null $amqpMessage
186
     * @return mixed
187
     * @throws \PhpAmqpLib\Exception\AMQPRuntimeException
188
     */
189 180
    public function dispatch($method_sig, $args, $amqpMessage)
190
    {
191 180
        if (!$this->methodMap->valid_method($method_sig)) {
192
            throw new AMQPNotImplementedException(sprintf(
193
                'Unknown AMQP method "%s"',
194
                $method_sig
195
            ));
196
        }
197
198 180
        $amqp_method = $this->methodMap->get_method($method_sig);
199
200 180
        if (!method_exists($this, $amqp_method)) {
201
            throw new AMQPNotImplementedException(sprintf(
202
                'Method: "%s" not implemented by class: %s',
203
                $amqp_method,
204
                get_class($this)
205
            ));
206
        }
207
208 180
        $this->dispatch_reader->reuse($args);
209
210 180
        if ($amqpMessage == null) {
211 180
            return call_user_func(array($this, $amqp_method), $this->dispatch_reader);
212
        }
213
214 66
        return call_user_func(array($this, $amqp_method), $this->dispatch_reader, $amqpMessage);
215 3
    }
216
217
    /**
218
     * @param int|float|null $timeout
219
     * @return array|mixed
220
     */
221 180
    public function next_frame($timeout = 0)
222
    {
223 180
        $this->debug->debug_msg('waiting for a new frame');
224
225 180
        if (!empty($this->frame_queue)) {
226 6
            return array_shift($this->frame_queue);
227
        }
228
229 180
        return $this->connection->wait_channel($this->channel_id, $timeout);
0 ignored issues
show
Bug introduced by
The method wait_channel() cannot be called from this context as it is declared protected in class PhpAmqpLib\Connection\AbstractConnection.

This check looks for access to methods that are not accessible from the current context.

If you need to make a method accessible to another context you can raise its visibility level in the defining class.

Loading history...
230
    }
231
232
    /**
233
     * @param array $method_sig
234
     * @param \PhpAmqpLib\Wire\AMQPWriter|string $args
235
     */
236 180
    protected function send_method_frame($method_sig, $args = '')
237
    {
238 180
        if ($this->connection === null) {
239 24
            throw new AMQPChannelClosedException('Channel connection is closed.');
240
        }
241
242 180
        $this->connection->send_channel_method_frame($this->channel_id, $method_sig, $args);
0 ignored issues
show
Bug introduced by
The method send_channel_method_frame() cannot be called from this context as it is declared protected in class PhpAmqpLib\Connection\AbstractConnection.

This check looks for access to methods that are not accessible from the current context.

If you need to make a method accessible to another context you can raise its visibility level in the defining class.

Loading history...
243 180
    }
244
245
    /**
246
     * This is here for performance reasons to batch calls to fwrite from basic.publish
247
     *
248
     * @param array $method_sig
249
     * @param \PhpAmqpLib\Wire\AMQPWriter|string $args
250
     * @param \PhpAmqpLib\Wire\AMQPWriter $pkt
251
     * @return \PhpAmqpLib\Wire\AMQPWriter
252
     */
253 66
    protected function prepare_method_frame($method_sig, $args = '', $pkt = null)
254
    {
255 66
        return $this->connection->prepare_channel_method_frame($this->channel_id, $method_sig, $args, $pkt);
0 ignored issues
show
Bug introduced by
The method prepare_channel_method_frame() cannot be called from this context as it is declared protected in class PhpAmqpLib\Connection\AbstractConnection.

This check looks for access to methods that are not accessible from the current context.

If you need to make a method accessible to another context you can raise its visibility level in the defining class.

Loading history...
256
    }
257
258
    /**
259
     * @return AMQPMessage
260
     * @throws \PhpAmqpLib\Exception\AMQPRuntimeException
261
     */
262 66
    public function wait_content()
263
    {
264 66
        list($frame_type, $payload) = $this->next_frame();
265
266 66
        $this->validate_header_frame($frame_type);
267
268 66
        $this->wait_content_reader->reuse(mb_substr($payload, 0, 12, 'ASCII'));
269
270 66
        $class_id = $this->wait_content_reader->read_short();
0 ignored issues
show
Unused Code introduced by
$class_id is not used, you could remove the assignment.

This check looks for variable assignements that are either overwritten by other assignments or where the variable is not used subsequently.

$myVar = 'Value';
$higher = false;

if (rand(1, 6) > 3) {
    $higher = true;
} else {
    $higher = false;
}

Both the $myVar assignment in line 1 and the $higher assignment in line 2 are dead. The first because $myVar is never used and the second because $higher is always overwritten for every possible time line.

Loading history...
271 66
        $weight = $this->wait_content_reader->read_short();
0 ignored issues
show
Unused Code introduced by
$weight is not used, you could remove the assignment.

This check looks for variable assignements that are either overwritten by other assignments or where the variable is not used subsequently.

$myVar = 'Value';
$higher = false;

if (rand(1, 6) > 3) {
    $higher = true;
} else {
    $higher = false;
}

Both the $myVar assignment in line 1 and the $higher assignment in line 2 are dead. The first because $myVar is never used and the second because $higher is always overwritten for every possible time line.

Loading history...
272
273
        //hack to avoid creating new instances of AMQPReader;
274 66
        $this->msg_property_reader->reuse(mb_substr($payload, 12, mb_strlen($payload, 'ASCII') - 12, 'ASCII'));
275
276 66
        return $this->createMessage(
277 66
            $this->msg_property_reader,
278 66
            $this->wait_content_reader
279 33
        );
280
    }
281
282
    /**
283
     * @param AMQPReader $propertyReader
284
     * @param AMQPReader $contentReader
285
     * @return \PhpAmqpLib\Message\AMQPMessage
286
     */
287 66
    protected function createMessage($propertyReader, $contentReader)
288
    {
289 66
        $bodyChunks = array();
290 66
        $bodyReceivedBytes = 0;
291
292 66
        $message = new AMQPMessage();
293
        $message
294 66
            ->load_properties($propertyReader)
295 66
            ->setBodySize($contentReader->read_longlong());
296
297 66
        while (bccomp($message->getBodySize(), $bodyReceivedBytes, 0) == 1) {
298 60
            list($frame_type, $payload) = $this->next_frame();
299
300 60
            $this->validate_body_frame($frame_type);
301 60
            $bodyReceivedBytes = bcadd($bodyReceivedBytes, mb_strlen($payload, 'ASCII'), 0);
302
303 60
            if (is_int($this->maxBodySize) && $bodyReceivedBytes > $this->maxBodySize ) {
304
                $message->setIsTruncated(true);
305
                continue;
306
            }
307
308 60
            $bodyChunks[] = $payload;
309 30
        }
310
311 66
        $message->setBody(implode('', $bodyChunks));
312
313 66
        return $message;
314
    }
315
316
    /**
317
     * Wait for some expected AMQP methods and dispatch to them.
318
     * Unexpected methods are queued up for later calls to this PHP
319
     * method.
320
     *
321
     * @param array $allowed_methods
322
     * @param bool $non_blocking
323
     * @param int|float|null $timeout
324
     * @throws \PhpAmqpLib\Exception\AMQPOutOfBoundsException
325
     * @throws \PhpAmqpLib\Exception\AMQPRuntimeException
326
     * @throws \PhpAmqpLib\Exception\AMQPTimeoutException
327
     * @throws \ErrorException
328
     * @return mixed
329
     */
330 180
    public function wait($allowed_methods = null, $non_blocking = false, $timeout = 0)
331
    {
332 180
        $this->debug->debug_allowed_methods($allowed_methods);
0 ignored issues
show
Bug introduced by
It seems like $allowed_methods defined by parameter $allowed_methods on line 330 can also be of type null; however, PhpAmqpLib\Helper\DebugH...debug_allowed_methods() does only seem to accept array, maybe add an additional type check?

This check looks at variables that have been passed in as parameters and are passed out again to other methods.

If the outgoing method call has stricter type requirements than the method itself, an issue is raised.

An additional type check may prevent trouble.

Loading history...
333
334 180
        $deferred = $this->process_deferred_methods($allowed_methods);
0 ignored issues
show
Bug introduced by
It seems like $allowed_methods defined by parameter $allowed_methods on line 330 can also be of type null; however, PhpAmqpLib\Channel\Abstr...cess_deferred_methods() does only seem to accept array, maybe add an additional type check?

This check looks at variables that have been passed in as parameters and are passed out again to other methods.

If the outgoing method call has stricter type requirements than the method itself, an issue is raised.

An additional type check may prevent trouble.

Loading history...
335 180
        if ($deferred['dispatch'] === true) {
336
            return $this->dispatch_deferred_method($deferred['queued_method']);
337
        }
338
339
        // timeouts must be deactivated for non-blocking actions
340 180
        if (true === $non_blocking) {
341 30
            $timeout = null;
342 15
        }
343
344
        // No deferred methods?  wait for new ones
345 180
        while (true) {
346
            try {
347 180
                list($frame_type, $payload) = $this->next_frame($timeout);
348 129
            } catch (AMQPNoDataException $e) {
349
                // no data ready for non-blocking actions - stop and exit
350 48
                break;
351 30
            } catch (AMQPConnectionClosedException $exception) {
352
                if ($this instanceof AMQPChannel) {
353
                    $this->do_close();
354
                }
355
                throw $exception;
356
            }
357
358 180
            $this->validate_method_frame($frame_type);
359 180
            $this->validate_frame_payload($payload);
360
361 180
            $method_sig = $this->build_method_signature($payload);
362 180
            $args = $this->extract_args($payload);
363
364 180
            $this->debug->debug_method_signature('> %s', $method_sig);
365
366 180
            $amqpMessage = $this->maybe_wait_for_content($method_sig);
367
368 180
            if ($this->should_dispatch_method($allowed_methods, $method_sig)) {
369 180
                return $this->dispatch($method_sig, $args, $amqpMessage);
370
            }
371
372
            // Wasn't what we were looking for? save it for later
373
            $this->debug->debug_method_signature('Queueing for later: %s', $method_sig);
374
            $this->method_queue[] = array($method_sig, $args, $amqpMessage);
375
376
            if ($non_blocking) {
377
                break;
378
            }
379
        }
380 48
    }
381
382
    /**
383
     * @param array $allowed_methods
384
     * @return array
385
     */
386 180
    protected function process_deferred_methods($allowed_methods)
387
    {
388 180
        $dispatch = false;
389 180
        $queued_method = array();
390
391 180
        foreach ($this->method_queue as $qk => $qm) {
392
            $this->debug->debug_msg('checking queue method ' . $qk);
393
394
            $method_sig = $qm[0];
395
396
            if ($allowed_methods == null || in_array($method_sig, $allowed_methods)) {
397
                unset($this->method_queue[$qk]);
398
                $dispatch = true;
399
                $queued_method = $qm;
400
                break;
401
            }
402 90
        }
403
404 180
        return array('dispatch' => $dispatch, 'queued_method' => $queued_method);
405
    }
406
407
    /**
408
     * @param array $queued_method
409
     * @return mixed
410
     */
411
    protected function dispatch_deferred_method($queued_method)
412
    {
413
        $this->debug->debug_method_signature('Executing queued method: %s', $queued_method[0]);
414
415
        return $this->dispatch($queued_method[0], $queued_method[1], $queued_method[2]);
416
    }
417
418
    /**
419
     * @param int $frame_type
420
     * @throws \PhpAmqpLib\Exception\AMQPInvalidFrameException
421
     */
422 180
    protected function validate_method_frame($frame_type)
423
    {
424 180
        $this->validate_frame($frame_type, 1, 'AMQP method');
425 180
    }
426
427
    /**
428
     * @param int $frame_type
429
     * @throws \PhpAmqpLib\Exception\AMQPInvalidFrameException
430
     */
431 66
    protected function validate_header_frame($frame_type)
432
    {
433 66
        $this->validate_frame($frame_type, 2, 'AMQP Content header');
434 66
    }
435
436
    /**
437
     * @param int $frame_type
438
     * @throws \PhpAmqpLib\Exception\AMQPInvalidFrameException
439
     */
440 60
    protected function validate_body_frame($frame_type)
441
    {
442 60
        $this->validate_frame($frame_type, 3, 'AMQP Content body');
443 60
    }
444
445
    /**
446
     * @param int $frameType
447
     * @param int $expectedType
448
     * @param string $expectedMessage
449
     */
450 180
    protected function validate_frame($frameType, $expectedType, $expectedMessage)
451
    {
452 180
        if ($frameType != $expectedType) {
453
            $protocolClass = self::$PROTOCOL_CONSTANTS_CLASS;
454
            throw new AMQPInvalidFrameException(sprintf(
455
                    'Expecting %s, received frame type %s (%s)',
456
                    $expectedMessage,
457
                    $frameType,
458
                    $protocolClass::$FRAME_TYPES[$frameType]
459
                ));
460
        }
461 180
    }
462
463
    /**
464
     * @param string $payload
465
     * @throws \PhpAmqpLib\Exception\AMQPOutOfBoundsException
466
     */
467 180
    protected function validate_frame_payload($payload)
468
    {
469 180
        if (mb_strlen($payload, 'ASCII') < 4) {
470
            throw new AMQPOutOfBoundsException('Method frame too short');
471
        }
472 180
    }
473
474
    /**
475
     * @param string $payload
476
     * @return string
477
     */
478 180
    protected function build_method_signature($payload)
479
    {
480 180
        $method_sig_array = unpack('n2', mb_substr($payload, 0, 4, 'ASCII'));
481
482 180
        return sprintf('%s,%s', $method_sig_array[1], $method_sig_array[2]);
483
    }
484
485
    /**
486
     * @param string $payload
487
     * @return string
488
     */
489 180
    protected function extract_args($payload)
490
    {
491 180
        return mb_substr($payload, 4, mb_strlen($payload, 'ASCII') - 4, 'ASCII');
492
    }
493
494
    /**
495
     * @param array|null $allowed_methods
496
     * @param string $method_sig
497
     * @return bool
498
     */
499 180
    protected function should_dispatch_method($allowed_methods, $method_sig)
500
    {
501 180
        $protocolClass = self::$PROTOCOL_CONSTANTS_CLASS;
502
503 90
        return $allowed_methods == null
504 180
            || in_array($method_sig, $allowed_methods)
505 180
            || in_array($method_sig, $protocolClass::$CLOSE_METHODS);
506
    }
507
508
    /**
509
     * @param string $method_sig
510
     * @return AMQPMessage|null
511
     */
512 180
    protected function maybe_wait_for_content($method_sig)
513
    {
514 180
        $protocolClass = self::$PROTOCOL_CONSTANTS_CLASS;
515 180
        $amqpMessage = null;
516
517 180
        if (in_array($method_sig, $protocolClass::$CONTENT_METHODS)) {
518 66
            $amqpMessage = $this->wait_content();
519 33
        }
520
521 180
        return $amqpMessage;
522
    }
523
524
    /**
525
     * @param callable $handler
526
     * @param array $arguments
527
     */
528 6
    protected function dispatch_to_handler($handler, array $arguments)
529
    {
530 6
        if (is_callable($handler)) {
531 6
            call_user_func_array($handler, $arguments);
532 3
        }
533 6
    }
534
}
535