Completed
Push — master ( d5673e...8c6650 )
by
unknown
21:50
created

AbstractChannel::hasPendingMethods()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 4
Code Lines 2

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 0
CRAP Score 2

Importance

Changes 0
Metric Value
dl 0
loc 4
ccs 0
cts 0
cp 0
rs 10
c 0
b 0
f 0
cc 1
eloc 2
nc 1
nop 0
crap 2
1
<?php
2
namespace PhpAmqpLib\Channel;
3
4
use PhpAmqpLib\Connection\AbstractConnection;
5
use PhpAmqpLib\Exception\AMQPOutOfBoundsException;
6
use PhpAmqpLib\Exception\AMQPOutOfRangeException;
7
use PhpAmqpLib\Exception\AMQPRuntimeException;
8
use PhpAmqpLib\Helper\DebugHelper;
9
use PhpAmqpLib\Helper\Protocol\MethodMap080;
10
use PhpAmqpLib\Helper\Protocol\MethodMap091;
11
use PhpAmqpLib\Helper\Protocol\Protocol080;
12
use PhpAmqpLib\Helper\Protocol\Protocol091;
13
use PhpAmqpLib\Helper\Protocol\Wait080;
14
use PhpAmqpLib\Helper\Protocol\Wait091;
15
use PhpAmqpLib\Message\AMQPMessage;
16
use PhpAmqpLib\Wire\AMQPReader;
17
18
abstract class AbstractChannel
19
{
20
    const PROTOCOL_080 = '0.8';
21
    const PROTOCOL_091 = '0.9.1';
22
23
    public static $PROTOCOL_CONSTANTS_CLASS;
24
25
    /** @var array */
26
    protected $frame_queue;
27
28
    /** @var array */
29
    protected $method_queue;
30
31
    /** @var bool */
32
    protected $auto_decode;
33
34
    /** @var string */
35
    protected $amqp_protocol_header;
36
37
    /** @var \PhpAmqpLib\Helper\DebugHelper */
38
    protected $debug;
39
40
    /** @var \PhpAmqpLib\Connection\AbstractConnection */
41
    protected $connection;
42
43
    /** @var string */
44
    protected $protocolVersion;
45
46
    /** @var int */
47
    protected $maxBodySize;
48
49
    /** @var \PhpAmqpLib\Helper\Protocol\Protocol080|\PhpAmqpLib\Helper\Protocol\Protocol091 */
50
    protected $protocolWriter;
51
52
    /** @var \PhpAmqpLib\Helper\Protocol\Wait080|\PhpAmqpLib\Helper\Protocol\Wait091 */
53
    protected $waitHelper;
54
55
    /** @var \PhpAmqpLib\Helper\Protocol\MethodMap080|\PhpAmqpLib\Helper\Protocol\MethodMap091 */
56
    protected $methodMap;
57
58
    /** @var string */
59
    protected $channel_id;
60
61
    /** @var \PhpAmqpLib\Wire\AMQPReader */
62
    protected $msg_property_reader;
63
64
    /** @var \PhpAmqpLib\Wire\AMQPReader */
65
    protected $wait_content_reader;
66
67
    /** @var \PhpAmqpLib\Wire\AMQPReader */
68
    protected $dispatch_reader;
69
70
    /**
71
     * @param AbstractConnection $connection
72
     * @param string $channel_id
73
     * @throws \PhpAmqpLib\Exception\AMQPRuntimeException
74
     */
75 84
    public function __construct(AbstractConnection $connection, $channel_id)
76
    {
77 84
        $this->connection = $connection;
78 84
        $this->channel_id = $channel_id;
79 84
        $connection->channels[$channel_id] = $this;
80 84
        $this->frame_queue = array(); // Lower level queue for frames
81 84
        $this->method_queue = array(); // Higher level queue for methods
82 84
        $this->auto_decode = false;
83
84 84
        $this->msg_property_reader = new AMQPReader(null);
85 84
        $this->wait_content_reader = new AMQPReader(null);
86 84
        $this->dispatch_reader = new AMQPReader(null);
87
88 84
        $this->protocolVersion = self::getProtocolVersion();
89 84
        switch ($this->protocolVersion) {
90 84 View Code Duplication
            case self::PROTOCOL_091:
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated across your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
91 84
                self::$PROTOCOL_CONSTANTS_CLASS = 'PhpAmqpLib\Wire\Constants091';
92 84
                $c = self::$PROTOCOL_CONSTANTS_CLASS;
93 84
                $this->debug = new DebugHelper($c);
94 84
                $this->amqp_protocol_header = $c::$AMQP_PROTOCOL_HEADER;
95 84
                $this->protocolWriter = new Protocol091();
96 84
                $this->waitHelper = new Wait091();
97 84
                $this->methodMap = new MethodMap091();
98 84
                break;
99 View Code Duplication
            case self::PROTOCOL_080:
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated across your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
100
                self::$PROTOCOL_CONSTANTS_CLASS = 'PhpAmqpLib\Wire\Constants080';
101
                $c = self::$PROTOCOL_CONSTANTS_CLASS;
102
                $this->debug = new DebugHelper($c);
103
                $this->amqp_protocol_header = $c::$AMQP_PROTOCOL_HEADER;
104
                $this->protocolWriter = new Protocol080();
105
                $this->waitHelper = new Wait080();
106
                $this->methodMap = new MethodMap080();
107
                break;
108
            default:
109
                throw new AMQPRuntimeException(sprintf(
110
                    'Protocol: %s not implemented.',
111
                    $this->protocolVersion
112
                ));
113 42
        }
114 84
    }
115
116
    /**
117
     * @return string
118
     * @throws AMQPOutOfRangeException
119
     */
120 84
    public static function getProtocolVersion()
121
    {
122 84
        $protocol = defined('AMQP_PROTOCOL') ? AMQP_PROTOCOL : self::PROTOCOL_091;
123
        //adding check here to catch unknown protocol ASAP, as this method may be called from the outside
124 84
        if (!in_array($protocol, array(self::PROTOCOL_080, self::PROTOCOL_091), TRUE)) {
125
            throw new AMQPOutOfRangeException(sprintf('Protocol version %s not implemented.', $protocol));
126
        }
127
128 84
        return $protocol;
129
    }
130
131
    /**
132
     * @return string
133
     */
134
    public function getChannelId()
135
    {
136
        return $this->channel_id;
137
    }
138
139
    /**
140
     * @param int $max_bytes Max message body size for this channel
141
     * @return $this
142
     */
143
    public function setBodySizeLimit($max_bytes)
144
    {
145
        $max_bytes = (int) $max_bytes;
146
147
        if ($max_bytes > 0) {
148
            $this->maxBodySize = $max_bytes;
149
        }
150
151
        return $this;
152
    }
153
154
    /**
155
     * @return AbstractConnection
156
     */
157
    public function getConnection()
158
    {
159
        return $this->connection;
160
    }
161
162
    /**
163
     * @return array
164
     */
165
    public function getMethodQueue()
166
    {
167
        return $this->method_queue;
168
    }
169
170
    /**
171
     * @return bool
172
     */
173
    public function hasPendingMethods()
174
    {
175
        return !empty($this->method_queue);
176
    }
177 84
178
    /**
179 84
     * @param string $method_sig
180
     * @param string $args
181
     * @param AMQPMessage|null $amqpMessage
182
     * @return mixed
183
     * @throws \PhpAmqpLib\Exception\AMQPRuntimeException
184
     */
185
    public function dispatch($method_sig, $args, $amqpMessage)
186 84
    {
187
        if (!$this->methodMap->valid_method($method_sig)) {
188 84
            throw new AMQPRuntimeException(sprintf(
189
                'Unknown AMQP method "%s"',
190
                $method_sig
191
            ));
192
        }
193
194
        $amqp_method = $this->methodMap->get_method($method_sig);
195
196 84
        if (!method_exists($this, $amqp_method)) {
197
            throw new AMQPRuntimeException(sprintf(
198 84
                'Method: "%s" not implemented by class: %s',
199 84
                $amqp_method,
200
                get_class($this)
201
            ));
202 66
        }
203
204
        $this->dispatch_reader->reuse($args);
205
206
        if ($amqpMessage == null) {
207
            return call_user_func(array($this, $amqp_method), $this->dispatch_reader);
208
        }
209 84
210
        return call_user_func(array($this, $amqp_method), $this->dispatch_reader, $amqpMessage);
211 84
    }
212
213 84
    /**
214 6
     * @param int $timeout
215
     * @return array|mixed
216
     */
217 84
    public function next_frame($timeout = 0)
218
    {
219
        $this->debug->debug_msg('waiting for a new frame');
220
221
        if (!empty($this->frame_queue)) {
222
            return array_shift($this->frame_queue);
223
        }
224 84
225
        return $this->connection->wait_channel($this->channel_id, $timeout);
0 ignored issues
show
Bug introduced by
The method wait_channel() cannot be called from this context as it is declared protected in class PhpAmqpLib\Connection\AbstractConnection.

This check looks for access to methods that are not accessible from the current context.

If you need to make a method accessible to another context you can raise its visibility level in the defining class.

Loading history...
226 84
    }
227 6
228
    /**
229
     * @param array $method_sig
230 84
     * @param \PhpAmqpLib\Wire\AMQPWriter|string $args
231 84
     */
232
    protected function send_method_frame($method_sig, $args = '')
233
    {
234
        if ($this->connection === null) {
235
            throw new AMQPRuntimeException('Channel connection is closed.');
236
        }
237
238
        $this->connection->send_channel_method_frame($this->channel_id, $method_sig, $args);
0 ignored issues
show
Bug introduced by
The method send_channel_method_frame() cannot be called from this context as it is declared protected in class PhpAmqpLib\Connection\AbstractConnection.

This check looks for access to methods that are not accessible from the current context.

If you need to make a method accessible to another context you can raise its visibility level in the defining class.

Loading history...
239
    }
240
241 72
    /**
242
     * This is here for performance reasons to batch calls to fwrite from basic.publish
243 72
     *
244
     * @param array $method_sig
245
     * @param \PhpAmqpLib\Wire\AMQPWriter|string $args
246
     * @param \PhpAmqpLib\Wire\AMQPWriter $pkt
247
     * @return \PhpAmqpLib\Wire\AMQPWriter
248
     */
249
    protected function prepare_method_frame($method_sig, $args = '', $pkt = null)
250 66
    {
251
        return $this->connection->prepare_channel_method_frame($this->channel_id, $method_sig, $args, $pkt);
0 ignored issues
show
Bug introduced by
The method prepare_channel_method_frame() cannot be called from this context as it is declared protected in class PhpAmqpLib\Connection\AbstractConnection.

This check looks for access to methods that are not accessible from the current context.

If you need to make a method accessible to another context you can raise its visibility level in the defining class.

Loading history...
252 66
    }
253
254 66
    /**
255
     * @return AMQPMessage
256 66
     * @throws \PhpAmqpLib\Exception\AMQPRuntimeException
257
     */
258 66
    public function wait_content()
259 66
    {
260
        list($frame_type, $payload) = $this->next_frame();
261
262 66
        $this->validate_header_frame($frame_type);
263
264 66
        $this->wait_content_reader->reuse(mb_substr($payload, 0, 12, 'ASCII'));
265 66
266 66
        $class_id = $this->wait_content_reader->read_short();
0 ignored issues
show
Unused Code introduced by
$class_id is not used, you could remove the assignment.

This check looks for variable assignements that are either overwritten by other assignments or where the variable is not used subsequently.

$myVar = 'Value';
$higher = false;

if (rand(1, 6) > 3) {
    $higher = true;
} else {
    $higher = false;
}

Both the $myVar assignment in line 1 and the $higher assignment in line 2 are dead. The first because $myVar is never used and the second because $higher is always overwritten for every possible time line.

Loading history...
267 33
        $weight = $this->wait_content_reader->read_short();
0 ignored issues
show
Unused Code introduced by
$weight is not used, you could remove the assignment.

This check looks for variable assignements that are either overwritten by other assignments or where the variable is not used subsequently.

$myVar = 'Value';
$higher = false;

if (rand(1, 6) > 3) {
    $higher = true;
} else {
    $higher = false;
}

Both the $myVar assignment in line 1 and the $higher assignment in line 2 are dead. The first because $myVar is never used and the second because $higher is always overwritten for every possible time line.

Loading history...
268
269
        //hack to avoid creating new instances of AMQPReader;
270
        $this->msg_property_reader->reuse(mb_substr($payload, 12, mb_strlen($payload, 'ASCII') - 12, 'ASCII'));
271
272
        return $this->createMessage(
273
            $this->msg_property_reader,
274
            $this->wait_content_reader
275 66
        );
276
    }
277 66
278 66
    /**
279
     * @param AMQPReader $propertyReader
280 66
     * @param AMQPReader $contentReader
281
     * @return \PhpAmqpLib\Message\AMQPMessage
282 66
     */
283 66
    protected function createMessage($propertyReader, $contentReader)
284
    {
285 66
        $bodyChunks = array();
286 60
        $bodyReceivedBytes = 0;
287
288 60
        $message = new AMQPMessage();
289 60
        $message
290
            ->load_properties($propertyReader)
291 60
            ->setBodySize($contentReader->read_longlong());
292
293
        while (bccomp($message->getBodySize(), $bodyReceivedBytes, 0) == 1) {
294
            list($frame_type, $payload) = $this->next_frame();
295
296 60
            $this->validate_body_frame($frame_type);
297 30
            $bodyReceivedBytes = bcadd($bodyReceivedBytes, mb_strlen($payload, 'ASCII'), 0);
298
299 66
            if (is_int($this->maxBodySize) && $bodyReceivedBytes > $this->maxBodySize ) {
300
                $message->setIsTruncated(true);
301 66
                continue;
302
            }
303
304
            $bodyChunks[] = $payload;
305
        }
306
307
        $message->setBody(implode('', $bodyChunks));
308
309
        return $message;
310
    }
311
312
    /**
313
     * Wait for some expected AMQP methods and dispatch to them.
314
     * Unexpected methods are queued up for later calls to this PHP
315
     * method.
316 84
     *
317
     * @param array $allowed_methods
318 84
     * @param bool $non_blocking
319
     * @param int $timeout
320 84
     * @throws \PhpAmqpLib\Exception\AMQPOutOfBoundsException
321 84
     * @throws \PhpAmqpLib\Exception\AMQPRuntimeException
322
     * @return mixed
323
     */
324
    public function wait($allowed_methods = null, $non_blocking = false, $timeout = 0)
325
    {
326 84
        $this->debug->debug_allowed_methods($allowed_methods);
0 ignored issues
show
Bug introduced by
It seems like $allowed_methods defined by parameter $allowed_methods on line 324 can also be of type null; however, PhpAmqpLib\Helper\DebugH...debug_allowed_methods() does only seem to accept array, maybe add an additional type check?

This check looks at variables that have been passed in as parameters and are passed out again to other methods.

If the outgoing method call has stricter type requirements than the method itself, an issue is raised.

An additional type check may prevent trouble.

Loading history...
327 84
328
        $deferred = $this->process_deferred_methods($allowed_methods);
0 ignored issues
show
Bug introduced by
It seems like $allowed_methods defined by parameter $allowed_methods on line 324 can also be of type null; however, PhpAmqpLib\Channel\Abstr...cess_deferred_methods() does only seem to accept array, maybe add an additional type check?

This check looks at variables that have been passed in as parameters and are passed out again to other methods.

If the outgoing method call has stricter type requirements than the method itself, an issue is raised.

An additional type check may prevent trouble.

Loading history...
329 84
        if ($deferred['dispatch'] === true) {
330 84
            return $this->dispatch_deferred_method($deferred['queued_method']);
331
        }
332 84
333 84
        // No deferred methods?  wait for new ones
334
        while (true) {
335 84
            list($frame_type, $payload) = $this->next_frame($timeout);
336
337 84
            $this->validate_method_frame($frame_type);
338
            $this->validate_frame_payload($payload);
339 84
340 84
            $method_sig = $this->build_method_signature($payload);
341
            $args = $this->extract_args($payload);
342
343
            $this->debug->debug_method_signature('> %s', $method_sig);
344
345
            $amqpMessage = $this->maybe_wait_for_content($method_sig);
346
347
            if ($this->should_dispatch_method($allowed_methods, $method_sig)) {
348
                return $this->dispatch($method_sig, $args, $amqpMessage);
349
            }
350
351
            // Wasn't what we were looking for? save it for later
352
            $this->debug->debug_method_signature('Queueing for later: %s', $method_sig);
353
            $this->method_queue[] = array($method_sig, $args, $amqpMessage);
354
355
            if ($non_blocking) {
356
                break;
357 84
            }
358
        }
359 84
    }
360 84
361
    /**
362 84
     * @param array $allowed_methods
363
     * @return array
364
     */
365
    protected function process_deferred_methods($allowed_methods)
366
    {
367
        $dispatch = false;
368
        $queued_method = array();
369
370
        foreach ($this->method_queue as $qk => $qm) {
371
            $this->debug->debug_msg('checking queue method ' . $qk);
372
373 42
            $method_sig = $qm[0];
374
375 84
            if ($allowed_methods == null || in_array($method_sig, $allowed_methods)) {
376
                unset($this->method_queue[$qk]);
377
                $dispatch = true;
378
                $queued_method = $qm;
379
                break;
380
            }
381
        }
382
383
        return array('dispatch' => $dispatch, 'queued_method' => $queued_method);
384
    }
385
386
    /**
387
     * @param array $queued_method
388
     * @return mixed
389
     */
390
    protected function dispatch_deferred_method($queued_method)
391
    {
392
        $this->debug->debug_method_signature('Executing queued method: %s', $queued_method[0]);
393 84
394
        return $this->dispatch($queued_method[0], $queued_method[1], $queued_method[2]);
395 84
    }
396 84
397
    /**
398
     * @param int $frame_type
399
     * @throws \PhpAmqpLib\Exception\AMQPRuntimeException
400
     */
401
    protected function validate_method_frame($frame_type)
402 66
    {
403
        $this->validate_frame($frame_type, 1, 'AMQP method');
404 66
    }
405 66
406
    /**
407
     * @param int $frame_type
408
     * @throws \PhpAmqpLib\Exception\AMQPRuntimeException
409
     */
410
    protected function validate_header_frame($frame_type)
411 60
    {
412
        $this->validate_frame($frame_type, 2, 'AMQP Content header');
413 60
    }
414 60
415
    /**
416
     * @param int $frame_type
417
     * @throws \PhpAmqpLib\Exception\AMQPRuntimeException
418
     */
419
    protected function validate_body_frame($frame_type)
420
    {
421 84
        $this->validate_frame($frame_type, 3, 'AMQP Content body');
422
    }
423 84
424
    /**
425
     * @param int $frameType
426
     * @param int $expectedType
427
     * @param string $expectedMessage
428
     */
429
    protected function validate_frame($frameType, $expectedType, $expectedMessage)
430
    {
431
        if ($frameType != $expectedType) {
432 84
            $protocolClass = self::$PROTOCOL_CONSTANTS_CLASS;
433
            throw new AMQPRuntimeException(sprintf(
434
                    'Expecting %s, received frame type %s (%s)',
435
                    $expectedMessage,
436
                    $frameType,
437
                    $protocolClass::$FRAME_TYPES[$frameType]
438 84
                ));
439
        }
440 84
    }
441
442
    /**
443 84
     * @param string $payload
444
     * @throws \PhpAmqpLib\Exception\AMQPOutOfBoundsException
445
     */
446
    protected function validate_frame_payload($payload)
447
    {
448
        if (mb_strlen($payload, 'ASCII') < 4) {
449 84
            throw new AMQPOutOfBoundsException('Method frame too short');
450
        }
451 84
    }
452
453 84
    /**
454
     * @param string $payload
455
     * @return string
456
     */
457
    protected function build_method_signature($payload)
458
    {
459
        $method_sig_array = unpack('n2', mb_substr($payload, 0, 4, 'ASCII'));
460 84
461
        return sprintf('%s,%s', $method_sig_array[1], $method_sig_array[2]);
462 84
    }
463
464
    /**
465
     * @param string $payload
466
     * @return string
467
     */
468
    protected function extract_args($payload)
469
    {
470 84
        return mb_substr($payload, 4, mb_strlen($payload, 'ASCII') - 4, 'ASCII');
471
    }
472 84
473
    /**
474 42
     * @param array|null $allowed_methods
475 84
     * @param string $method_sig
476 84
     * @return bool
477
     */
478
    protected function should_dispatch_method($allowed_methods, $method_sig)
479
    {
480
        $protocolClass = self::$PROTOCOL_CONSTANTS_CLASS;
481
482
        return $allowed_methods == null
483 84
            || in_array($method_sig, $allowed_methods)
484
            || in_array($method_sig, $protocolClass::$CLOSE_METHODS);
485 84
    }
486 84
487
    /**
488 84
     * @param string $method_sig
489 66
     * @return AMQPMessage|null
490 33
     */
491
    protected function maybe_wait_for_content($method_sig)
492 84
    {
493
        $protocolClass = self::$PROTOCOL_CONSTANTS_CLASS;
494
        $amqpMessage = null;
495
496
        if (in_array($method_sig, $protocolClass::$CONTENT_METHODS)) {
497
            $amqpMessage = $this->wait_content();
498
        }
499 6
500
        return $amqpMessage;
501 6
    }
502 6
503 3
    /**
504 6
     * @param callable $handler
505
     * @param array $arguments
506
     */
507
    protected function dispatch_to_handler($handler, array $arguments)
508
    {
509
        if (is_callable($handler)) {
510
            call_user_func_array($handler, $arguments);
511
        }
512
    }
513
}
514