1
|
|
|
<?php |
2
|
|
|
namespace PhpAmqpLib\Channel; |
3
|
|
|
|
4
|
|
|
use PhpAmqpLib\Connection\AbstractConnection; |
5
|
|
|
use PhpAmqpLib\Exception\AMQPOutOfBoundsException; |
6
|
|
|
use PhpAmqpLib\Exception\AMQPOutOfRangeException; |
7
|
|
|
use PhpAmqpLib\Exception\AMQPRuntimeException; |
8
|
|
|
use PhpAmqpLib\Helper\DebugHelper; |
9
|
|
|
use PhpAmqpLib\Helper\Protocol\MethodMap080; |
10
|
|
|
use PhpAmqpLib\Helper\Protocol\MethodMap091; |
11
|
|
|
use PhpAmqpLib\Helper\Protocol\Protocol080; |
12
|
|
|
use PhpAmqpLib\Helper\Protocol\Protocol091; |
13
|
|
|
use PhpAmqpLib\Helper\Protocol\Wait080; |
14
|
|
|
use PhpAmqpLib\Helper\Protocol\Wait091; |
15
|
|
|
use PhpAmqpLib\Message\AMQPMessage; |
16
|
|
|
use PhpAmqpLib\Wire\AMQPReader; |
17
|
|
|
|
18
|
|
|
abstract class AbstractChannel |
19
|
|
|
{ |
20
|
|
|
const PROTOCOL_080 = '0.8'; |
21
|
|
|
const PROTOCOL_091 = '0.9.1'; |
22
|
|
|
|
23
|
|
|
public static $PROTOCOL_CONSTANTS_CLASS; |
24
|
|
|
|
25
|
|
|
/** @var array */ |
26
|
|
|
protected $frame_queue; |
27
|
|
|
|
28
|
|
|
/** @var array */ |
29
|
|
|
protected $method_queue; |
30
|
|
|
|
31
|
|
|
/** @var bool */ |
32
|
|
|
protected $auto_decode; |
33
|
|
|
|
34
|
|
|
/** @var string */ |
35
|
|
|
protected $amqp_protocol_header; |
36
|
|
|
|
37
|
|
|
/** @var \PhpAmqpLib\Helper\DebugHelper */ |
38
|
|
|
protected $debug; |
39
|
|
|
|
40
|
|
|
/** @var \PhpAmqpLib\Connection\AbstractConnection */ |
41
|
|
|
protected $connection; |
42
|
|
|
|
43
|
|
|
/** @var string */ |
44
|
|
|
protected $protocolVersion; |
45
|
|
|
|
46
|
|
|
/** @var int */ |
47
|
|
|
protected $maxBodySize; |
48
|
|
|
|
49
|
|
|
/** @var \PhpAmqpLib\Helper\Protocol\Protocol080|\PhpAmqpLib\Helper\Protocol\Protocol091 */ |
50
|
|
|
protected $protocolWriter; |
51
|
|
|
|
52
|
|
|
/** @var \PhpAmqpLib\Helper\Protocol\Wait080|\PhpAmqpLib\Helper\Protocol\Wait091 */ |
53
|
|
|
protected $waitHelper; |
54
|
|
|
|
55
|
|
|
/** @var \PhpAmqpLib\Helper\Protocol\MethodMap080|\PhpAmqpLib\Helper\Protocol\MethodMap091 */ |
56
|
|
|
protected $methodMap; |
57
|
|
|
|
58
|
|
|
/** @var string */ |
59
|
|
|
protected $channel_id; |
60
|
|
|
|
61
|
|
|
/** @var \PhpAmqpLib\Wire\AMQPReader */ |
62
|
|
|
protected $msg_property_reader; |
63
|
|
|
|
64
|
|
|
/** @var \PhpAmqpLib\Wire\AMQPReader */ |
65
|
|
|
protected $wait_content_reader; |
66
|
|
|
|
67
|
|
|
/** @var \PhpAmqpLib\Wire\AMQPReader */ |
68
|
|
|
protected $dispatch_reader; |
69
|
|
|
|
70
|
|
|
/** |
71
|
|
|
* @param AbstractConnection $connection |
72
|
|
|
* @param string $channel_id |
73
|
|
|
* @throws \PhpAmqpLib\Exception\AMQPRuntimeException |
74
|
|
|
*/ |
75
|
84 |
|
public function __construct(AbstractConnection $connection, $channel_id) |
76
|
|
|
{ |
77
|
84 |
|
$this->connection = $connection; |
78
|
84 |
|
$this->channel_id = $channel_id; |
79
|
84 |
|
$connection->channels[$channel_id] = $this; |
80
|
84 |
|
$this->frame_queue = array(); // Lower level queue for frames |
81
|
84 |
|
$this->method_queue = array(); // Higher level queue for methods |
82
|
84 |
|
$this->auto_decode = false; |
83
|
|
|
|
84
|
84 |
|
$this->msg_property_reader = new AMQPReader(null); |
85
|
84 |
|
$this->wait_content_reader = new AMQPReader(null); |
86
|
84 |
|
$this->dispatch_reader = new AMQPReader(null); |
87
|
|
|
|
88
|
84 |
|
$this->protocolVersion = self::getProtocolVersion(); |
89
|
84 |
|
switch ($this->protocolVersion) { |
90
|
84 |
View Code Duplication |
case self::PROTOCOL_091: |
|
|
|
|
91
|
84 |
|
self::$PROTOCOL_CONSTANTS_CLASS = 'PhpAmqpLib\Wire\Constants091'; |
92
|
84 |
|
$c = self::$PROTOCOL_CONSTANTS_CLASS; |
93
|
84 |
|
$this->debug = new DebugHelper($c); |
94
|
84 |
|
$this->amqp_protocol_header = $c::$AMQP_PROTOCOL_HEADER; |
95
|
84 |
|
$this->protocolWriter = new Protocol091(); |
96
|
84 |
|
$this->waitHelper = new Wait091(); |
97
|
84 |
|
$this->methodMap = new MethodMap091(); |
98
|
84 |
|
break; |
99
|
|
View Code Duplication |
case self::PROTOCOL_080: |
|
|
|
|
100
|
|
|
self::$PROTOCOL_CONSTANTS_CLASS = 'PhpAmqpLib\Wire\Constants080'; |
101
|
|
|
$c = self::$PROTOCOL_CONSTANTS_CLASS; |
102
|
|
|
$this->debug = new DebugHelper($c); |
103
|
|
|
$this->amqp_protocol_header = $c::$AMQP_PROTOCOL_HEADER; |
104
|
|
|
$this->protocolWriter = new Protocol080(); |
105
|
|
|
$this->waitHelper = new Wait080(); |
106
|
|
|
$this->methodMap = new MethodMap080(); |
107
|
|
|
break; |
108
|
|
|
default: |
109
|
|
|
throw new AMQPRuntimeException(sprintf( |
110
|
|
|
'Protocol: %s not implemented.', |
111
|
|
|
$this->protocolVersion |
112
|
|
|
)); |
113
|
42 |
|
} |
114
|
84 |
|
} |
115
|
|
|
|
116
|
|
|
/** |
117
|
|
|
* @return string |
118
|
|
|
* @throws AMQPOutOfRangeException |
119
|
|
|
*/ |
120
|
84 |
|
public static function getProtocolVersion() |
121
|
|
|
{ |
122
|
84 |
|
$protocol = defined('AMQP_PROTOCOL') ? AMQP_PROTOCOL : self::PROTOCOL_091; |
123
|
|
|
//adding check here to catch unknown protocol ASAP, as this method may be called from the outside |
124
|
84 |
|
if (!in_array($protocol, array(self::PROTOCOL_080, self::PROTOCOL_091), TRUE)) { |
125
|
|
|
throw new AMQPOutOfRangeException(sprintf('Protocol version %s not implemented.', $protocol)); |
126
|
|
|
} |
127
|
|
|
|
128
|
84 |
|
return $protocol; |
129
|
|
|
} |
130
|
|
|
|
131
|
|
|
/** |
132
|
|
|
* @return string |
133
|
|
|
*/ |
134
|
|
|
public function getChannelId() |
135
|
|
|
{ |
136
|
|
|
return $this->channel_id; |
137
|
|
|
} |
138
|
|
|
|
139
|
|
|
/** |
140
|
|
|
* @param int $max_bytes Max message body size for this channel |
141
|
|
|
* @return $this |
142
|
|
|
*/ |
143
|
|
|
public function setBodySizeLimit($max_bytes) |
144
|
|
|
{ |
145
|
|
|
$max_bytes = (int) $max_bytes; |
146
|
|
|
|
147
|
|
|
if ($max_bytes > 0) { |
148
|
|
|
$this->maxBodySize = $max_bytes; |
149
|
|
|
} |
150
|
|
|
|
151
|
|
|
return $this; |
152
|
|
|
} |
153
|
|
|
|
154
|
|
|
/** |
155
|
|
|
* @return AbstractConnection |
156
|
|
|
*/ |
157
|
|
|
public function getConnection() |
158
|
|
|
{ |
159
|
|
|
return $this->connection; |
160
|
|
|
} |
161
|
|
|
|
162
|
|
|
/** |
163
|
|
|
* @return array |
164
|
|
|
*/ |
165
|
|
|
public function getMethodQueue() |
166
|
|
|
{ |
167
|
|
|
return $this->method_queue; |
168
|
|
|
} |
169
|
|
|
|
170
|
|
|
/** |
171
|
|
|
* @return bool |
172
|
|
|
*/ |
173
|
|
|
public function hasPendingMethods() |
174
|
|
|
{ |
175
|
|
|
return !empty($this->method_queue); |
176
|
|
|
} |
177
|
84 |
|
|
178
|
|
|
/** |
179
|
84 |
|
* @param string $method_sig |
180
|
|
|
* @param string $args |
181
|
|
|
* @param AMQPMessage|null $amqpMessage |
182
|
|
|
* @return mixed |
183
|
|
|
* @throws \PhpAmqpLib\Exception\AMQPRuntimeException |
184
|
|
|
*/ |
185
|
|
|
public function dispatch($method_sig, $args, $amqpMessage) |
186
|
84 |
|
{ |
187
|
|
|
if (!$this->methodMap->valid_method($method_sig)) { |
188
|
84 |
|
throw new AMQPRuntimeException(sprintf( |
189
|
|
|
'Unknown AMQP method "%s"', |
190
|
|
|
$method_sig |
191
|
|
|
)); |
192
|
|
|
} |
193
|
|
|
|
194
|
|
|
$amqp_method = $this->methodMap->get_method($method_sig); |
195
|
|
|
|
196
|
84 |
|
if (!method_exists($this, $amqp_method)) { |
197
|
|
|
throw new AMQPRuntimeException(sprintf( |
198
|
84 |
|
'Method: "%s" not implemented by class: %s', |
199
|
84 |
|
$amqp_method, |
200
|
|
|
get_class($this) |
201
|
|
|
)); |
202
|
66 |
|
} |
203
|
|
|
|
204
|
|
|
$this->dispatch_reader->reuse($args); |
205
|
|
|
|
206
|
|
|
if ($amqpMessage == null) { |
207
|
|
|
return call_user_func(array($this, $amqp_method), $this->dispatch_reader); |
208
|
|
|
} |
209
|
84 |
|
|
210
|
|
|
return call_user_func(array($this, $amqp_method), $this->dispatch_reader, $amqpMessage); |
211
|
84 |
|
} |
212
|
|
|
|
213
|
84 |
|
/** |
214
|
6 |
|
* @param int $timeout |
215
|
|
|
* @return array|mixed |
216
|
|
|
*/ |
217
|
84 |
|
public function next_frame($timeout = 0) |
218
|
|
|
{ |
219
|
|
|
$this->debug->debug_msg('waiting for a new frame'); |
220
|
|
|
|
221
|
|
|
if (!empty($this->frame_queue)) { |
222
|
|
|
return array_shift($this->frame_queue); |
223
|
|
|
} |
224
|
84 |
|
|
225
|
|
|
return $this->connection->wait_channel($this->channel_id, $timeout); |
|
|
|
|
226
|
84 |
|
} |
227
|
6 |
|
|
228
|
|
|
/** |
229
|
|
|
* @param array $method_sig |
230
|
84 |
|
* @param \PhpAmqpLib\Wire\AMQPWriter|string $args |
231
|
84 |
|
*/ |
232
|
|
|
protected function send_method_frame($method_sig, $args = '') |
233
|
|
|
{ |
234
|
|
|
if ($this->connection === null) { |
235
|
|
|
throw new AMQPRuntimeException('Channel connection is closed.'); |
236
|
|
|
} |
237
|
|
|
|
238
|
|
|
$this->connection->send_channel_method_frame($this->channel_id, $method_sig, $args); |
|
|
|
|
239
|
|
|
} |
240
|
|
|
|
241
|
72 |
|
/** |
242
|
|
|
* This is here for performance reasons to batch calls to fwrite from basic.publish |
243
|
72 |
|
* |
244
|
|
|
* @param array $method_sig |
245
|
|
|
* @param \PhpAmqpLib\Wire\AMQPWriter|string $args |
246
|
|
|
* @param \PhpAmqpLib\Wire\AMQPWriter $pkt |
247
|
|
|
* @return \PhpAmqpLib\Wire\AMQPWriter |
248
|
|
|
*/ |
249
|
|
|
protected function prepare_method_frame($method_sig, $args = '', $pkt = null) |
250
|
66 |
|
{ |
251
|
|
|
return $this->connection->prepare_channel_method_frame($this->channel_id, $method_sig, $args, $pkt); |
|
|
|
|
252
|
66 |
|
} |
253
|
|
|
|
254
|
66 |
|
/** |
255
|
|
|
* @return AMQPMessage |
256
|
66 |
|
* @throws \PhpAmqpLib\Exception\AMQPRuntimeException |
257
|
|
|
*/ |
258
|
66 |
|
public function wait_content() |
259
|
66 |
|
{ |
260
|
|
|
list($frame_type, $payload) = $this->next_frame(); |
261
|
|
|
|
262
|
66 |
|
$this->validate_header_frame($frame_type); |
263
|
|
|
|
264
|
66 |
|
$this->wait_content_reader->reuse(mb_substr($payload, 0, 12, 'ASCII')); |
265
|
66 |
|
|
266
|
66 |
|
$class_id = $this->wait_content_reader->read_short(); |
|
|
|
|
267
|
33 |
|
$weight = $this->wait_content_reader->read_short(); |
|
|
|
|
268
|
|
|
|
269
|
|
|
//hack to avoid creating new instances of AMQPReader; |
270
|
|
|
$this->msg_property_reader->reuse(mb_substr($payload, 12, mb_strlen($payload, 'ASCII') - 12, 'ASCII')); |
271
|
|
|
|
272
|
|
|
return $this->createMessage( |
273
|
|
|
$this->msg_property_reader, |
274
|
|
|
$this->wait_content_reader |
275
|
66 |
|
); |
276
|
|
|
} |
277
|
66 |
|
|
278
|
66 |
|
/** |
279
|
|
|
* @param AMQPReader $propertyReader |
280
|
66 |
|
* @param AMQPReader $contentReader |
281
|
|
|
* @return \PhpAmqpLib\Message\AMQPMessage |
282
|
66 |
|
*/ |
283
|
66 |
|
protected function createMessage($propertyReader, $contentReader) |
284
|
|
|
{ |
285
|
66 |
|
$bodyChunks = array(); |
286
|
60 |
|
$bodyReceivedBytes = 0; |
287
|
|
|
|
288
|
60 |
|
$message = new AMQPMessage(); |
289
|
60 |
|
$message |
290
|
|
|
->load_properties($propertyReader) |
291
|
60 |
|
->setBodySize($contentReader->read_longlong()); |
292
|
|
|
|
293
|
|
|
while (bccomp($message->getBodySize(), $bodyReceivedBytes, 0) == 1) { |
294
|
|
|
list($frame_type, $payload) = $this->next_frame(); |
295
|
|
|
|
296
|
60 |
|
$this->validate_body_frame($frame_type); |
297
|
30 |
|
$bodyReceivedBytes = bcadd($bodyReceivedBytes, mb_strlen($payload, 'ASCII'), 0); |
298
|
|
|
|
299
|
66 |
|
if (is_int($this->maxBodySize) && $bodyReceivedBytes > $this->maxBodySize ) { |
300
|
|
|
$message->setIsTruncated(true); |
301
|
66 |
|
continue; |
302
|
|
|
} |
303
|
|
|
|
304
|
|
|
$bodyChunks[] = $payload; |
305
|
|
|
} |
306
|
|
|
|
307
|
|
|
$message->setBody(implode('', $bodyChunks)); |
308
|
|
|
|
309
|
|
|
return $message; |
310
|
|
|
} |
311
|
|
|
|
312
|
|
|
/** |
313
|
|
|
* Wait for some expected AMQP methods and dispatch to them. |
314
|
|
|
* Unexpected methods are queued up for later calls to this PHP |
315
|
|
|
* method. |
316
|
84 |
|
* |
317
|
|
|
* @param array $allowed_methods |
318
|
84 |
|
* @param bool $non_blocking |
319
|
|
|
* @param int $timeout |
320
|
84 |
|
* @throws \PhpAmqpLib\Exception\AMQPOutOfBoundsException |
321
|
84 |
|
* @throws \PhpAmqpLib\Exception\AMQPRuntimeException |
322
|
|
|
* @return mixed |
323
|
|
|
*/ |
324
|
|
|
public function wait($allowed_methods = null, $non_blocking = false, $timeout = 0) |
325
|
|
|
{ |
326
|
84 |
|
$this->debug->debug_allowed_methods($allowed_methods); |
|
|
|
|
327
|
84 |
|
|
328
|
|
|
$deferred = $this->process_deferred_methods($allowed_methods); |
|
|
|
|
329
|
84 |
|
if ($deferred['dispatch'] === true) { |
330
|
84 |
|
return $this->dispatch_deferred_method($deferred['queued_method']); |
331
|
|
|
} |
332
|
84 |
|
|
333
|
84 |
|
// No deferred methods? wait for new ones |
334
|
|
|
while (true) { |
335
|
84 |
|
list($frame_type, $payload) = $this->next_frame($timeout); |
336
|
|
|
|
337
|
84 |
|
$this->validate_method_frame($frame_type); |
338
|
|
|
$this->validate_frame_payload($payload); |
339
|
84 |
|
|
340
|
84 |
|
$method_sig = $this->build_method_signature($payload); |
341
|
|
|
$args = $this->extract_args($payload); |
342
|
|
|
|
343
|
|
|
$this->debug->debug_method_signature('> %s', $method_sig); |
344
|
|
|
|
345
|
|
|
$amqpMessage = $this->maybe_wait_for_content($method_sig); |
346
|
|
|
|
347
|
|
|
if ($this->should_dispatch_method($allowed_methods, $method_sig)) { |
348
|
|
|
return $this->dispatch($method_sig, $args, $amqpMessage); |
349
|
|
|
} |
350
|
|
|
|
351
|
|
|
// Wasn't what we were looking for? save it for later |
352
|
|
|
$this->debug->debug_method_signature('Queueing for later: %s', $method_sig); |
353
|
|
|
$this->method_queue[] = array($method_sig, $args, $amqpMessage); |
354
|
|
|
|
355
|
|
|
if ($non_blocking) { |
356
|
|
|
break; |
357
|
84 |
|
} |
358
|
|
|
} |
359
|
84 |
|
} |
360
|
84 |
|
|
361
|
|
|
/** |
362
|
84 |
|
* @param array $allowed_methods |
363
|
|
|
* @return array |
364
|
|
|
*/ |
365
|
|
|
protected function process_deferred_methods($allowed_methods) |
366
|
|
|
{ |
367
|
|
|
$dispatch = false; |
368
|
|
|
$queued_method = array(); |
369
|
|
|
|
370
|
|
|
foreach ($this->method_queue as $qk => $qm) { |
371
|
|
|
$this->debug->debug_msg('checking queue method ' . $qk); |
372
|
|
|
|
373
|
42 |
|
$method_sig = $qm[0]; |
374
|
|
|
|
375
|
84 |
|
if ($allowed_methods == null || in_array($method_sig, $allowed_methods)) { |
376
|
|
|
unset($this->method_queue[$qk]); |
377
|
|
|
$dispatch = true; |
378
|
|
|
$queued_method = $qm; |
379
|
|
|
break; |
380
|
|
|
} |
381
|
|
|
} |
382
|
|
|
|
383
|
|
|
return array('dispatch' => $dispatch, 'queued_method' => $queued_method); |
384
|
|
|
} |
385
|
|
|
|
386
|
|
|
/** |
387
|
|
|
* @param array $queued_method |
388
|
|
|
* @return mixed |
389
|
|
|
*/ |
390
|
|
|
protected function dispatch_deferred_method($queued_method) |
391
|
|
|
{ |
392
|
|
|
$this->debug->debug_method_signature('Executing queued method: %s', $queued_method[0]); |
393
|
84 |
|
|
394
|
|
|
return $this->dispatch($queued_method[0], $queued_method[1], $queued_method[2]); |
395
|
84 |
|
} |
396
|
84 |
|
|
397
|
|
|
/** |
398
|
|
|
* @param int $frame_type |
399
|
|
|
* @throws \PhpAmqpLib\Exception\AMQPRuntimeException |
400
|
|
|
*/ |
401
|
|
|
protected function validate_method_frame($frame_type) |
402
|
66 |
|
{ |
403
|
|
|
$this->validate_frame($frame_type, 1, 'AMQP method'); |
404
|
66 |
|
} |
405
|
66 |
|
|
406
|
|
|
/** |
407
|
|
|
* @param int $frame_type |
408
|
|
|
* @throws \PhpAmqpLib\Exception\AMQPRuntimeException |
409
|
|
|
*/ |
410
|
|
|
protected function validate_header_frame($frame_type) |
411
|
60 |
|
{ |
412
|
|
|
$this->validate_frame($frame_type, 2, 'AMQP Content header'); |
413
|
60 |
|
} |
414
|
60 |
|
|
415
|
|
|
/** |
416
|
|
|
* @param int $frame_type |
417
|
|
|
* @throws \PhpAmqpLib\Exception\AMQPRuntimeException |
418
|
|
|
*/ |
419
|
|
|
protected function validate_body_frame($frame_type) |
420
|
|
|
{ |
421
|
84 |
|
$this->validate_frame($frame_type, 3, 'AMQP Content body'); |
422
|
|
|
} |
423
|
84 |
|
|
424
|
|
|
/** |
425
|
|
|
* @param int $frameType |
426
|
|
|
* @param int $expectedType |
427
|
|
|
* @param string $expectedMessage |
428
|
|
|
*/ |
429
|
|
|
protected function validate_frame($frameType, $expectedType, $expectedMessage) |
430
|
|
|
{ |
431
|
|
|
if ($frameType != $expectedType) { |
432
|
84 |
|
$protocolClass = self::$PROTOCOL_CONSTANTS_CLASS; |
433
|
|
|
throw new AMQPRuntimeException(sprintf( |
434
|
|
|
'Expecting %s, received frame type %s (%s)', |
435
|
|
|
$expectedMessage, |
436
|
|
|
$frameType, |
437
|
|
|
$protocolClass::$FRAME_TYPES[$frameType] |
438
|
84 |
|
)); |
439
|
|
|
} |
440
|
84 |
|
} |
441
|
|
|
|
442
|
|
|
/** |
443
|
84 |
|
* @param string $payload |
444
|
|
|
* @throws \PhpAmqpLib\Exception\AMQPOutOfBoundsException |
445
|
|
|
*/ |
446
|
|
|
protected function validate_frame_payload($payload) |
447
|
|
|
{ |
448
|
|
|
if (mb_strlen($payload, 'ASCII') < 4) { |
449
|
84 |
|
throw new AMQPOutOfBoundsException('Method frame too short'); |
450
|
|
|
} |
451
|
84 |
|
} |
452
|
|
|
|
453
|
84 |
|
/** |
454
|
|
|
* @param string $payload |
455
|
|
|
* @return string |
456
|
|
|
*/ |
457
|
|
|
protected function build_method_signature($payload) |
458
|
|
|
{ |
459
|
|
|
$method_sig_array = unpack('n2', mb_substr($payload, 0, 4, 'ASCII')); |
460
|
84 |
|
|
461
|
|
|
return sprintf('%s,%s', $method_sig_array[1], $method_sig_array[2]); |
462
|
84 |
|
} |
463
|
|
|
|
464
|
|
|
/** |
465
|
|
|
* @param string $payload |
466
|
|
|
* @return string |
467
|
|
|
*/ |
468
|
|
|
protected function extract_args($payload) |
469
|
|
|
{ |
470
|
84 |
|
return mb_substr($payload, 4, mb_strlen($payload, 'ASCII') - 4, 'ASCII'); |
471
|
|
|
} |
472
|
84 |
|
|
473
|
|
|
/** |
474
|
42 |
|
* @param array|null $allowed_methods |
475
|
84 |
|
* @param string $method_sig |
476
|
84 |
|
* @return bool |
477
|
|
|
*/ |
478
|
|
|
protected function should_dispatch_method($allowed_methods, $method_sig) |
479
|
|
|
{ |
480
|
|
|
$protocolClass = self::$PROTOCOL_CONSTANTS_CLASS; |
481
|
|
|
|
482
|
|
|
return $allowed_methods == null |
483
|
84 |
|
|| in_array($method_sig, $allowed_methods) |
484
|
|
|
|| in_array($method_sig, $protocolClass::$CLOSE_METHODS); |
485
|
84 |
|
} |
486
|
84 |
|
|
487
|
|
|
/** |
488
|
84 |
|
* @param string $method_sig |
489
|
66 |
|
* @return AMQPMessage|null |
490
|
33 |
|
*/ |
491
|
|
|
protected function maybe_wait_for_content($method_sig) |
492
|
84 |
|
{ |
493
|
|
|
$protocolClass = self::$PROTOCOL_CONSTANTS_CLASS; |
494
|
|
|
$amqpMessage = null; |
495
|
|
|
|
496
|
|
|
if (in_array($method_sig, $protocolClass::$CONTENT_METHODS)) { |
497
|
|
|
$amqpMessage = $this->wait_content(); |
498
|
|
|
} |
499
|
6 |
|
|
500
|
|
|
return $amqpMessage; |
501
|
6 |
|
} |
502
|
6 |
|
|
503
|
3 |
|
/** |
504
|
6 |
|
* @param callable $handler |
505
|
|
|
* @param array $arguments |
506
|
|
|
*/ |
507
|
|
|
protected function dispatch_to_handler($handler, array $arguments) |
508
|
|
|
{ |
509
|
|
|
if (is_callable($handler)) { |
510
|
|
|
call_user_func_array($handler, $arguments); |
511
|
|
|
} |
512
|
|
|
} |
513
|
|
|
} |
514
|
|
|
|
Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.
You can also find more detailed suggestions in the “Code” section of your repository.