AbstractChannel::validate_body_frame()   A
last analyzed

Complexity

Conditions 1
Paths 1

Size

Total Lines 4
Code Lines 2

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 3
CRAP Score 1

Importance

Changes 2
Bugs 0 Features 0
Metric Value
c 2
b 0
f 0
dl 0
loc 4
ccs 3
cts 3
cp 1
rs 10
cc 1
eloc 2
nc 1
nop 1
crap 1
1
<?php
2
namespace PhpAmqpLib\Channel;
3
4
use PhpAmqpLib\Connection\AbstractConnection;
5
use PhpAmqpLib\Exception\AMQPOutOfBoundsException;
6
use PhpAmqpLib\Exception\AMQPOutOfRangeException;
7
use PhpAmqpLib\Exception\AMQPRuntimeException;
8
use PhpAmqpLib\Helper\DebugHelper;
9
use PhpAmqpLib\Helper\MiscHelper;
10
use PhpAmqpLib\Helper\Protocol\MethodMap080;
11
use PhpAmqpLib\Helper\Protocol\MethodMap091;
12
use PhpAmqpLib\Helper\Protocol\Protocol080;
13
use PhpAmqpLib\Helper\Protocol\Protocol091;
14
use PhpAmqpLib\Helper\Protocol\Wait080;
15
use PhpAmqpLib\Helper\Protocol\Wait091;
16
use PhpAmqpLib\Message\AMQPMessage;
17
use PhpAmqpLib\Wire\AMQPReader;
18
19
abstract class AbstractChannel
20
{
21
    const PROTOCOL_080 = '0.8';
22
    const PROTOCOL_091 = '0.9.1';
23
24
    public static $PROTOCOL_CONSTANTS_CLASS;
25
26
    /** @var array */
27
    protected $frame_queue;
28
29
    /** @var array */
30
    protected $method_queue;
31
32
    /** @var bool */
33
    protected $auto_decode;
34
35
    /** @var string */
36
    protected $amqp_protocol_header;
37
38
    /** @var \PhpAmqpLib\Helper\DebugHelper */
39
    protected $debug;
40
41
    /** @var \PhpAmqpLib\Connection\AbstractConnection */
42
    protected $connection;
43
44
    /** @var string */
45
    protected $protocolVersion;
46
47
    /** @var int */
48
    protected $maxBodySize;
49
50
    /** @var \PhpAmqpLib\Helper\Protocol\Protocol080|\PhpAmqpLib\Helper\Protocol\Protocol091 */
51
    protected $protocolWriter;
52
53
    /** @var \PhpAmqpLib\Helper\Protocol\Wait080|\PhpAmqpLib\Helper\Protocol\Wait091 */
54
    protected $waitHelper;
55
56
    /** @var \PhpAmqpLib\Helper\Protocol\MethodMap080|\PhpAmqpLib\Helper\Protocol\MethodMap091 */
57
    protected $methodMap;
58
59
    /** @var string */
60
    protected $channel_id;
61
62
    /** @var \PhpAmqpLib\Wire\AMQPReader */
63
    protected $msg_property_reader;
64
65
    /** @var \PhpAmqpLib\Wire\AMQPReader */
66
    protected $wait_content_reader;
67
68
    /** @var \PhpAmqpLib\Wire\AMQPReader */
69
    protected $dispatch_reader;
70
71
    /**
72
     * @param AbstractConnection $connection
73
     * @param $channel_id
74
     * @throws \PhpAmqpLib\Exception\AMQPRuntimeException
75
     */
76 60
    public function __construct(AbstractConnection $connection, $channel_id)
77
    {
78 60
        $this->connection = $connection;
79 60
        $this->channel_id = $channel_id;
80 60
        $connection->channels[$channel_id] = $this;
81 60
        $this->frame_queue = array(); // Lower level queue for frames
82 60
        $this->method_queue = array(); // Higher level queue for methods
83 60
        $this->auto_decode = false;
84
85 60
        $this->msg_property_reader = new AMQPReader(null);
86 60
        $this->wait_content_reader = new AMQPReader(null);
87 60
        $this->dispatch_reader = new AMQPReader(null);
88
89 60
        $this->protocolVersion = self::getProtocolVersion();
90 60
        switch ($this->protocolVersion) {
91 60 View Code Duplication
            case self::PROTOCOL_091:
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated across your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
92 60
                self::$PROTOCOL_CONSTANTS_CLASS = 'PhpAmqpLib\Wire\Constants091';
93 60
                $c = self::$PROTOCOL_CONSTANTS_CLASS;
94 60
                $this->debug = new DebugHelper($c);
95 60
                $this->amqp_protocol_header = $c::$AMQP_PROTOCOL_HEADER;
96 60
                $this->protocolWriter = new Protocol091();
97 60
                $this->waitHelper = new Wait091();
98 60
                $this->methodMap = new MethodMap091();
99 60
                break;
100 View Code Duplication
            case self::PROTOCOL_080:
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated across your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
101
                self::$PROTOCOL_CONSTANTS_CLASS = 'PhpAmqpLib\Wire\Constants080';
102
                $c = self::$PROTOCOL_CONSTANTS_CLASS;
103
                $this->debug = new DebugHelper($c);
104
                $this->amqp_protocol_header = $c::$AMQP_PROTOCOL_HEADER;
105
                $this->protocolWriter = new Protocol080();
106
                $this->waitHelper = new Wait080();
107
                $this->methodMap = new MethodMap080();
108
                break;
109
            default:
110
                throw new AMQPRuntimeException(sprintf(
111
                    'Protocol: %s not implemented.',
112
                    $this->protocolVersion
113
                ));
114
                break;
0 ignored issues
show
Unused Code introduced by
break; does not seem to be reachable.

This check looks for unreachable code. It uses sophisticated control flow analysis techniques to find statements which will never be executed.

Unreachable code is most often the result of return, die or exit statements that have been added for debug purposes.

function fx() {
    try {
        doSomething();
        return true;
    }
    catch (\Exception $e) {
        return false;
    }

    return false;
}

In the above example, the last return false will never be executed, because a return statement has already been met in every possible execution path.

Loading history...
115 48
        }
116 60
    }
117
118
    /**
119
     * @return string
120
     * @throws AMQPOutOfRangeException
121
     */
122 60
    public static function getProtocolVersion()
123
    {
124 60
        $protocol = defined('AMQP_PROTOCOL') ? AMQP_PROTOCOL : self::PROTOCOL_091;
125
        //adding check here to catch unknown protocol ASAP, as this method may be called from the outside
126 60
        if (!in_array($protocol, array(self::PROTOCOL_080, self::PROTOCOL_091), TRUE)) {
127
            throw new AMQPOutOfRangeException(sprintf('Protocol version %s not implemented.', $protocol));
128
        }
129
130 60
        return $protocol;
131
    }
132
133
    /**
134
     * @return string
135
     */
136
    public function getChannelId()
137
    {
138
        return $this->channel_id;
139
    }
140
141
    /**
142
     * @param int $max_bytes Max message body size for this channel
143
     * @return $this
144
     */
145
    public function setBodySizeLimit($max_bytes)
146
    {
147
        $max_bytes = (int) $max_bytes;
148
149
        if ($max_bytes > 0) {
150
            $this->maxBodySize = $max_bytes;
151
        }
152
153
        return $this;
154
    }
155
156
    /**
157
     * @return AbstractConnection
158
     */
159
    public function getConnection()
160
    {
161
        return $this->connection;
162
    }
163
164
    /**
165
     * @return array
166
     */
167
    public function getMethodQueue()
168
    {
169
        return $this->method_queue;
170
    }
171
172
    /**
173
     * @param string $method_sig
174
     * @param string $args
175
     * @param $content
176
     * @return mixed
177
     * @throws \PhpAmqpLib\Exception\AMQPRuntimeException
178
     */
179 60
    public function dispatch($method_sig, $args, $content)
180
    {
181 60
        if (!$this->methodMap->valid_method($method_sig)) {
182
            throw new AMQPRuntimeException(sprintf(
183
                'Unknown AMQP method "%s"',
184
                $method_sig
185
            ));
186
        }
187
188 60
        $amqp_method = $this->methodMap->get_method($method_sig);
189
190 60
        if (!method_exists($this, $amqp_method)) {
191
            throw new AMQPRuntimeException(sprintf(
192
                'Method: "%s" not implemented by class: %s',
193
                $amqp_method,
194
                get_class($this)
195
            ));
196
        }
197
198 60
        $this->dispatch_reader->reuse($args);
199
200 60
        if ($content == null) {
201 60
            return call_user_func(array($this, $amqp_method), $this->dispatch_reader);
202
        }
203
204 45
        return call_user_func(array($this, $amqp_method), $this->dispatch_reader, $content);
205
    }
206
207
    /**
208
     * @param int $timeout
209
     * @return array|mixed
210
     */
211 60
    public function next_frame($timeout = 0)
212
    {
213 60
        $this->debug->debug_msg('waiting for a new frame');
214
215 60
        if (!empty($this->frame_queue)) {
216 5
            return array_shift($this->frame_queue);
217
        }
218
219 60
        return $this->connection->wait_channel($this->channel_id, $timeout);
0 ignored issues
show
Bug introduced by
The method wait_channel() cannot be called from this context as it is declared protected in class PhpAmqpLib\Connection\AbstractConnection.

This check looks for access to methods that are not accessible from the current context.

If you need to make a method accessible to another context you can raise its visibility level in the defining class.

Loading history...
220
    }
221
222
    /**
223
     * @param $method_sig
224
     * @param string $args
225
     */
226 60
    protected function send_method_frame($method_sig, $args = '')
227
    {
228 60
        $this->connection->send_channel_method_frame($this->channel_id, $method_sig, $args);
0 ignored issues
show
Bug introduced by
The method send_channel_method_frame() cannot be called from this context as it is declared protected in class PhpAmqpLib\Connection\AbstractConnection.

This check looks for access to methods that are not accessible from the current context.

If you need to make a method accessible to another context you can raise its visibility level in the defining class.

Loading history...
229 60
    }
230
231
    /**
232
     * This is here for performance reasons to batch calls to fwrite from basic.publish
233
     *
234
     * @param $method_sig
235
     * @param string $args
236
     * @param null $pkt
237
     * @return null|\PhpAmqpLib\Wire\AMQPWriter
238
     */
239 55
    protected function prepare_method_frame($method_sig, $args = '', $pkt = null)
240
    {
241 55
        return $this->connection->prepare_channel_method_frame($this->channel_id, $method_sig, $args, $pkt);
0 ignored issues
show
Bug introduced by
The method prepare_channel_method_frame() cannot be called from this context as it is declared protected in class PhpAmqpLib\Connection\AbstractConnection.

This check looks for access to methods that are not accessible from the current context.

If you need to make a method accessible to another context you can raise its visibility level in the defining class.

Loading history...
242
    }
243
244
    /**
245
     * @return AMQPMessage
246
     * @throws \PhpAmqpLib\Exception\AMQPRuntimeException
247
     */
248 45
    public function wait_content()
249
    {
250 45
        list($frame_type, $payload) = $this->next_frame();
251
252 45
        $this->validate_header_frame($frame_type);
253
254 45
        $this->wait_content_reader->reuse(mb_substr($payload, 0, 12, 'ASCII'));
255
256
        // $payload_reader = new AMQPReader(substr($payload,0,12));
0 ignored issues
show
Unused Code Comprehensibility introduced by
64% of this comment could be valid code. Did you maybe forget this after debugging?

Sometimes obsolete code just ends up commented out instead of removed. In this case it is better to remove the code once you have checked you do not need it.

The code might also have been commented out for debugging purposes. In this case it is vital that someone uncomments it again or your project may behave in very unexpected ways in production.

This check looks for comments that seem to be mostly valid code and reports them.

Loading history...
257 45
        $class_id = $this->wait_content_reader->read_short();
0 ignored issues
show
Unused Code introduced by
$class_id is not used, you could remove the assignment.

This check looks for variable assignements that are either overwritten by other assignments or where the variable is not used subsequently.

$myVar = 'Value';
$higher = false;

if (rand(1, 6) > 3) {
    $higher = true;
} else {
    $higher = false;
}

Both the $myVar assignment in line 1 and the $higher assignment in line 2 are dead. The first because $myVar is never used and the second because $higher is always overwritten for every possible time line.

Loading history...
258 45
        $weight = $this->wait_content_reader->read_short();
0 ignored issues
show
Unused Code introduced by
$weight is not used, you could remove the assignment.

This check looks for variable assignements that are either overwritten by other assignments or where the variable is not used subsequently.

$myVar = 'Value';
$higher = false;

if (rand(1, 6) > 3) {
    $higher = true;
} else {
    $higher = false;
}

Both the $myVar assignment in line 1 and the $higher assignment in line 2 are dead. The first because $myVar is never used and the second because $higher is always overwritten for every possible time line.

Loading history...
259
260
        //hack to avoid creating new instances of AMQPReader;
261 45
        $this->msg_property_reader->reuse(mb_substr($payload, 12, mb_strlen($payload, 'ASCII') - 12, 'ASCII'));
262
263 45
        return $this->createMessage(
264 45
            $this->msg_property_reader,
265 45
            $this->wait_content_reader
266 36
        );
267
    }
268
269
    /**
270
     * @param AMQPReader $propertyReader
271
     * @param AMQPReader $contentReader
272
     * @return \PhpAmqpLib\Message\AMQPMessage
273
     */
274 45
    protected function createMessage($propertyReader, $contentReader)
275
    {
276 45
        $bodyChunks = array();
277 45
        $bodyReceivedBytes = 0;
278
279 45
        $message = new AMQPMessage();
280
        $message
281 45
            ->load_properties($propertyReader)
282 45
            ->setBodySize($contentReader->read_longlong());
283
284 45
        while (bccomp($message->getBodySize(), $bodyReceivedBytes, 0) == 1) {
285 40
            list($frame_type, $payload) = $this->next_frame();
286
287 40
            $this->validate_body_frame($frame_type);
288 40
            $bodyReceivedBytes = bcadd($bodyReceivedBytes, mb_strlen($payload, 'ASCII'), 0);
289
290 40
            if (is_int($this->maxBodySize) && $bodyReceivedBytes > $this->maxBodySize ) {
291
                $message->setIsTruncated(true);
292
                continue;
293
            }
294
295 40
            $bodyChunks[] = $payload;
296 32
        }
297
298 45
        $message->setBody(implode('', $bodyChunks));
299
300 45
        $messageEncoding = $message->getContentEncoding();
301
        
302 45
        if ($this->auto_decode && !empty($messageEncoding)) {
303
            try {
304
                // Where does the decode() method come from if body is a string?
305
                $decodedBody = $message->getBody()->decode($messageEncoding);
0 ignored issues
show
Bug introduced by
The method decode cannot be called on $message->getBody() (of type string).

Methods can only be called on objects. This check looks for methods being called on variables that have been inferred to never be objects.

Loading history...
306
                $message->setBody($decodedBody);
307
            } catch (\Exception $e) {
308
                $this->debug->debug_msg('Ignoring body decoding exception: ' . $e->getMessage());
309
            }
310
        }
311
312 45
        return $message;
313
    }
314
315
    /**
316
     * Wait for some expected AMQP methods and dispatch to them.
317
     * Unexpected methods are queued up for later calls to this PHP
318
     * method.
319
     *
320
     * @param array $allowed_methods
321
     * @param bool $non_blocking
322
     * @param int $timeout
323
     * @throws \PhpAmqpLib\Exception\AMQPOutOfBoundsException
324
     * @throws \PhpAmqpLib\Exception\AMQPRuntimeException
325
     * @return mixed
326
     */
327 60
    public function wait($allowed_methods = null, $non_blocking = false, $timeout = 0)
328
    {
329 60
        $this->debug->debug_allowed_methods($allowed_methods);
330
331 60
        $deferred = $this->process_deferred_methods($allowed_methods);
332 60
        if ($deferred['dispatch'] === true) {
333
            return $this->dispatch_deferred_method($deferred['queued_method']);
334
        }
335
336
        // No deferred methods?  wait for new ones
337 60
        while (true) {
338 60
            list($frame_type, $payload) = $this->next_frame($timeout);
339
340 60
            $this->validate_method_frame($frame_type);
341 60
            $this->validate_frame_payload($payload);
342
343 60
            $method_sig = $this->build_method_signature($payload);
344 60
            $args = $this->extract_args($payload);
345
346 60
            $this->debug->debug_method_signature('> %s', $method_sig);
347
348 60
            $content = $this->maybe_wait_for_content($method_sig);
349
350 60
            if ($this->should_dispatch_method($allowed_methods, $method_sig)) {
351 60
                return $this->dispatch($method_sig, $args, $content);
352
            }
353
354
            // Wasn't what we were looking for? save it for later
355
            $this->debug->debug_method_signature('Queueing for later: %s', $method_sig);
356
            $this->method_queue[] = array($method_sig, $args, $content);
357
358
            if ($non_blocking) {
359
                break;
360
            }
361
        }
362
    }
363
364 60
    protected function process_deferred_methods($allowed_methods)
365
    {
366 60
        $dispatch = false;
367 60
        $queued_method = array();
368
369 60
        foreach ($this->method_queue as $qk => $qm) {
370
            $this->debug->debug_msg('checking queue method ' . $qk);
371
372
            $method_sig = $qm[0];
373
374
            if ($allowed_methods == null || in_array($method_sig, $allowed_methods)) {
375
                unset($this->method_queue[$qk]);
376
                $dispatch = true;
377
                $queued_method = $qm;
378
                break;
379
            }
380 48
        }
381
382 60
        return array('dispatch' => $dispatch, 'queued_method' => $queued_method);
383
    }
384
385
    protected function dispatch_deferred_method($queued_method)
386
    {
387
        $this->debug->debug_method_signature('Executing queued method: %s', $queued_method[0]);
388
389
        return $this->dispatch($queued_method[0], $queued_method[1], $queued_method[2]);
390
    }
391
392
    /**
393
     * @param $frame_type
394
     * @throws \PhpAmqpLib\Exception\AMQPRuntimeException
395
     */
396 60
    protected function validate_method_frame($frame_type)
397
    {
398 60
        $this->validate_frame($frame_type, 1, 'AMQP method');
399 60
    }
400
401 45
    protected function validate_header_frame($frame_type)
402
    {
403 45
        $this->validate_frame($frame_type, 2, 'AMQP Content header');
404 45
    }
405
406 40
    protected function validate_body_frame($frame_type)
407
    {
408 40
        $this->validate_frame($frame_type, 3, 'AMQP Content body');
409 40
    }
410
411 60
    protected function validate_frame($frame_type, $expected_type, $expected_msg)
412
    {
413 60
        if ($frame_type != $expected_type) {
414
            $PROTOCOL_CONSTANTS_CLASS = self::$PROTOCOL_CONSTANTS_CLASS;
415
            throw new AMQPRuntimeException(sprintf(
416
                'Expecting %s, received frame type %s (%s)',
417
                $expected_msg,
418
                $frame_type,
419
                $PROTOCOL_CONSTANTS_CLASS::$FRAME_TYPES[$frame_type]
420
            ));
421
        }
422 60
    }
423
424
    /**
425
     * @param $payload
426
     * @throws \PhpAmqpLib\Exception\AMQPOutOfBoundsException
427
     */
428 60
    protected function validate_frame_payload($payload)
429
    {
430 60
        if (mb_strlen($payload, 'ASCII') < 4) {
431
            throw new AMQPOutOfBoundsException('Method frame too short');
432
        }
433 60
    }
434
435 60
    protected function build_method_signature($payload)
436
    {
437 60
        $method_sig_array = unpack('n2', mb_substr($payload, 0, 4, 'ASCII'));
438
439 60
        return sprintf('%s,%s', $method_sig_array[1], $method_sig_array[2]);
440
    }
441
442 60
    protected function extract_args($payload)
443
    {
444 60
        return mb_substr($payload, 4, mb_strlen($payload, 'ASCII') - 4, 'ASCII');
445
    }
446
447 60
    protected function should_dispatch_method($allowed_methods, $method_sig)
448
    {
449 60
        $PROTOCOL_CONSTANTS_CLASS = self::$PROTOCOL_CONSTANTS_CLASS;
450
451 12
        return $allowed_methods == null
452 60
            || in_array($method_sig, $allowed_methods)
453 60
            || in_array($method_sig, $PROTOCOL_CONSTANTS_CLASS::$CLOSE_METHODS);
454
    }
455
456 60
    protected function maybe_wait_for_content($method_sig)
457
    {
458 60
        $PROTOCOL_CONSTANTS_CLASS = self::$PROTOCOL_CONSTANTS_CLASS;
459 60
        $content = null;
460
461 60
        if (in_array($method_sig, $PROTOCOL_CONSTANTS_CLASS::$CONTENT_METHODS)) {
462 45
            $content = $this->wait_content();
463 36
        }
464
465 60
        return $content;
466
    }
467
468
    /**
469
     * @param $handler
470
     * @param array $arguments
471
     */
472
    protected function dispatch_to_handler($handler, array $arguments)
473
    {
474
        if (is_callable($handler)) {
475
            call_user_func_array($handler, $arguments);
476
        }
477
    }
478
}
479