1
|
|
|
<?php |
2
|
|
|
|
3
|
|
|
namespace PhpAmqpLib\Wire; |
4
|
|
|
|
5
|
|
|
use PhpAmqpLib\Exception\AMQPDataReadException; |
6
|
|
|
use PhpAmqpLib\Exception\AMQPInvalidArgumentException; |
7
|
|
|
use PhpAmqpLib\Exception\AMQPIOWaitException; |
8
|
|
|
use PhpAmqpLib\Exception\AMQPNoDataException; |
9
|
|
|
use PhpAmqpLib\Exception\AMQPOutOfBoundsException; |
10
|
|
|
use PhpAmqpLib\Exception\AMQPTimeoutException; |
11
|
|
|
use PhpAmqpLib\Helper\MiscHelper; |
12
|
|
|
use PhpAmqpLib\Wire\IO\AbstractIO; |
13
|
|
|
use phpseclib\Math\BigInteger; |
14
|
|
|
|
15
|
|
|
/** |
16
|
|
|
* This class can read from a string or from a stream |
17
|
|
|
* |
18
|
|
|
* TODO : split this class: AMQPStreamReader and a AMQPBufferReader |
19
|
|
|
*/ |
20
|
|
|
class AMQPReader extends AbstractClient |
21
|
|
|
{ |
22
|
|
|
const BIT = 1; |
23
|
|
|
const OCTET = 1; |
24
|
|
|
const SHORTSTR = 1; |
25
|
|
|
const SHORT = 2; |
26
|
|
|
const LONG = 4; |
27
|
|
|
const SIGNED_LONG = 4; |
28
|
|
|
const READ_PHP_INT = 4; // use READ_ to avoid possible clashes with PHP |
29
|
|
|
const LONGLONG = 8; |
30
|
|
|
const TIMESTAMP = 8; |
31
|
|
|
|
32
|
|
|
/** @var string */ |
33
|
|
|
protected $str = ''; |
34
|
|
|
|
35
|
|
|
/** @var int */ |
36
|
|
|
protected $str_length = 0; |
37
|
|
|
|
38
|
|
|
/** @var int */ |
39
|
|
|
protected $offset = 0; |
40
|
|
|
|
41
|
|
|
/** @var int */ |
42
|
|
|
protected $bitcount = 0; |
43
|
|
|
|
44
|
|
|
/** @var int|float|null */ |
45
|
|
|
protected $timeout; |
46
|
|
|
|
47
|
|
|
/** @var int */ |
48
|
|
|
protected $bits = 0; |
49
|
|
|
|
50
|
|
|
/** @var null|\PhpAmqpLib\Wire\IO\AbstractIO */ |
51
|
|
|
protected $io; |
52
|
|
|
|
53
|
|
|
/** |
54
|
|
|
* @param string|null $str |
55
|
|
|
* @param AbstractIO $io |
56
|
|
|
* @param int|float $timeout |
57
|
|
|
*/ |
58
|
681 |
|
public function __construct($str, AbstractIO $io = null, $timeout = 0) |
59
|
|
|
{ |
60
|
681 |
|
if (is_string($str)) { |
61
|
670 |
|
$this->str = (string)$str; |
62
|
670 |
|
$this->str_length = mb_strlen($this->str, 'ASCII'); |
63
|
|
|
} |
64
|
681 |
|
$this->io = $io; |
65
|
681 |
|
$this->timeout = $timeout; |
66
|
681 |
|
} |
67
|
|
|
|
68
|
|
|
/** |
69
|
|
|
* Resets the object from the injected param |
70
|
|
|
* |
71
|
|
|
* Used to not need to create a new AMQPReader instance every time. |
72
|
|
|
* when we can just pass a string and reset the object state. |
73
|
|
|
* NOTE: since we are working with strings we don't need to pass an AbstractIO |
74
|
|
|
* or a timeout. |
75
|
|
|
* |
76
|
|
|
* @param string $str |
77
|
|
|
*/ |
78
|
57 |
|
public function reuse($str) |
79
|
|
|
{ |
80
|
57 |
|
$this->str = $str; |
81
|
57 |
|
$this->str_length = mb_strlen($this->str, 'ASCII'); |
82
|
57 |
|
$this->offset = 0; |
83
|
57 |
|
$this->resetCounters(); |
84
|
57 |
|
} |
85
|
|
|
|
86
|
|
|
/** |
87
|
|
|
* Closes the stream |
88
|
|
|
*/ |
89
|
35 |
|
public function close() |
90
|
|
|
{ |
91
|
35 |
|
if ($this->io) { |
92
|
35 |
|
$this->io->close(); |
93
|
|
|
} |
94
|
35 |
|
} |
95
|
|
|
|
96
|
|
|
/** |
97
|
|
|
* @param int $n |
98
|
|
|
* @return string |
99
|
|
|
*/ |
100
|
42 |
|
public function read($n) |
101
|
|
|
{ |
102
|
42 |
|
$this->resetCounters(); |
103
|
|
|
|
104
|
42 |
|
return $this->rawread($n); |
105
|
|
|
} |
106
|
|
|
|
107
|
|
|
/** |
108
|
|
|
* Waits until some data is retrieved from the socket. |
109
|
|
|
* |
110
|
|
|
* AMQPTimeoutException can be raised if the timeout is set |
111
|
|
|
* |
112
|
|
|
* @throws \PhpAmqpLib\Exception\AMQPTimeoutException when timeout is set and no data received |
113
|
|
|
* @throws \PhpAmqpLib\Exception\AMQPNoDataException when no data is ready to read from IO |
114
|
|
|
*/ |
115
|
42 |
|
protected function wait() |
116
|
|
|
{ |
117
|
42 |
|
$timeout = $this->getTimeout(); |
118
|
42 |
|
if (null === $timeout) { |
119
|
|
|
// timeout=null just poll state and return instantly |
120
|
4 |
|
$sec = 0; |
121
|
4 |
|
$usec = 0; |
122
|
42 |
|
} elseif ($timeout > 0) { |
123
|
42 |
|
list($sec, $usec) = MiscHelper::splitSecondsMicroseconds($this->getTimeout()); |
124
|
|
|
} else { |
125
|
|
|
// wait indefinitely for data if timeout=0 |
126
|
36 |
|
$sec = null; |
127
|
36 |
|
$usec = 0; |
128
|
|
|
} |
129
|
|
|
|
130
|
42 |
|
$result = $this->io->select($sec, $usec); |
131
|
|
|
|
132
|
42 |
|
if ($result === 0) { |
133
|
13 |
|
if ($timeout > 0) { |
134
|
5 |
|
throw new AMQPTimeoutException(sprintf( |
135
|
5 |
|
'The connection timed out after %s sec while awaiting incoming data', |
136
|
|
|
$timeout |
137
|
|
|
)); |
138
|
|
|
} else { |
139
|
8 |
|
throw new AMQPNoDataException('No data is ready to read'); |
140
|
|
|
} |
141
|
|
|
} |
142
|
42 |
|
} |
143
|
|
|
|
144
|
|
|
/** |
145
|
|
|
* @param int $n |
146
|
|
|
* @return string |
147
|
|
|
* @throws \RuntimeException |
148
|
|
|
* @throws \PhpAmqpLib\Exception\AMQPDataReadException |
149
|
|
|
* @throws \PhpAmqpLib\Exception\AMQPNoDataException |
150
|
|
|
*/ |
151
|
678 |
|
protected function rawread($n) |
152
|
|
|
{ |
153
|
678 |
|
if ($this->io) { |
154
|
42 |
|
$res = ''; |
155
|
42 |
|
while (true) { |
156
|
42 |
|
$this->wait(); |
157
|
|
|
try { |
158
|
42 |
|
$res = $this->io->read($n); |
159
|
42 |
|
break; |
160
|
|
|
} catch (AMQPTimeoutException $e) { |
161
|
|
|
if ($this->getTimeout() > 0) { |
162
|
|
|
throw $e; |
163
|
|
|
} |
164
|
|
|
} |
165
|
|
|
} |
166
|
42 |
|
$this->offset += $n; |
167
|
42 |
|
return $res; |
168
|
|
|
} |
169
|
|
|
|
170
|
678 |
|
if ($this->str_length < $n) { |
171
|
|
|
throw new AMQPDataReadException(sprintf( |
172
|
|
|
'Error reading data. Requested %s bytes while string buffer has only %s', |
173
|
|
|
$n, |
174
|
|
|
$this->str_length |
175
|
|
|
)); |
176
|
|
|
} |
177
|
|
|
|
178
|
678 |
|
$res = mb_substr($this->str, 0, $n, 'ASCII'); |
179
|
678 |
|
$this->str = mb_substr($this->str, $n, null, 'ASCII'); |
180
|
678 |
|
$this->str_length -= $n; |
181
|
678 |
|
$this->offset += $n; |
182
|
|
|
|
183
|
678 |
|
return $res; |
184
|
|
|
} |
185
|
|
|
|
186
|
|
|
/** |
187
|
|
|
* @return bool |
188
|
|
|
*/ |
189
|
16 |
|
public function read_bit() |
190
|
|
|
{ |
191
|
16 |
|
if (empty($this->bitcount)) { |
192
|
16 |
|
$this->bits = ord($this->rawread(1)); |
193
|
16 |
|
$this->bitcount = 8; |
194
|
|
|
} |
195
|
|
|
|
196
|
16 |
|
$result = ($this->bits & 1) === 1; |
197
|
16 |
|
$this->bits >>= 1; |
198
|
16 |
|
$this->bitcount--; |
199
|
|
|
|
200
|
16 |
|
return $result; |
201
|
|
|
} |
202
|
|
|
|
203
|
|
|
/** |
204
|
|
|
* @return int |
205
|
|
|
*/ |
206
|
306 |
|
public function read_octet() |
207
|
|
|
{ |
208
|
306 |
|
$this->resetCounters(); |
209
|
306 |
|
list(, $res) = unpack('C', $this->rawread(1)); |
210
|
|
|
|
211
|
306 |
|
return $res; |
212
|
|
|
} |
213
|
|
|
|
214
|
|
|
/** |
215
|
|
|
* @return int |
216
|
|
|
*/ |
217
|
256 |
|
public function read_signed_octet() |
218
|
|
|
{ |
219
|
256 |
|
$this->resetCounters(); |
220
|
256 |
|
list(, $res) = unpack('c', $this->rawread(1)); |
221
|
|
|
|
222
|
256 |
|
return $res; |
223
|
|
|
} |
224
|
|
|
|
225
|
|
|
/** |
226
|
|
|
* @return int |
227
|
|
|
*/ |
228
|
57 |
|
public function read_short() |
229
|
|
|
{ |
230
|
57 |
|
$this->resetCounters(); |
231
|
57 |
|
list(, $res) = unpack('n', $this->rawread(2)); |
232
|
|
|
|
233
|
57 |
|
return $res; |
234
|
|
|
} |
235
|
|
|
|
236
|
|
|
/** |
237
|
|
|
* @return int |
238
|
|
|
*/ |
239
|
4 |
|
public function read_signed_short() |
240
|
|
|
{ |
241
|
4 |
|
$this->resetCounters(); |
242
|
4 |
|
list(, $res) = unpack('s', $this->correctEndianness($this->rawread(2))); |
243
|
|
|
|
244
|
4 |
|
return $res; |
245
|
|
|
} |
246
|
|
|
|
247
|
|
|
/** |
248
|
|
|
* Reads 32 bit integer in big-endian byte order. |
249
|
|
|
* |
250
|
|
|
* On 64 bit systems it will return always unsigned int |
251
|
|
|
* value in 0..2^32 range. |
252
|
|
|
* |
253
|
|
|
* On 32 bit systems it will return signed int value in |
254
|
|
|
* -2^31...+2^31 range. |
255
|
|
|
* |
256
|
|
|
* Use with caution! |
257
|
|
|
* @return int|string |
258
|
|
|
*/ |
259
|
57 |
|
public function read_php_int() |
260
|
|
|
{ |
261
|
57 |
|
list(, $res) = unpack('N', $this->rawread(4)); |
262
|
|
|
|
263
|
57 |
|
if (self::PLATFORM_64BIT) { |
264
|
57 |
|
return (int) sprintf('%u', $res); |
265
|
|
|
} |
266
|
|
|
|
267
|
|
|
return $res; |
268
|
|
|
} |
269
|
|
|
|
270
|
|
|
/** |
271
|
|
|
* PHP does not have unsigned 32 bit int, |
272
|
|
|
* so we return it as a string |
273
|
|
|
* |
274
|
|
|
* @return int|string |
275
|
|
|
*/ |
276
|
59 |
|
public function read_long() |
277
|
|
|
{ |
278
|
59 |
|
$this->resetCounters(); |
279
|
59 |
|
list(, $res) = unpack('N', $this->rawread(4)); |
280
|
59 |
|
if (!self::PLATFORM_64BIT && $this->getLongMSB($res)) { |
281
|
|
|
return sprintf('%u', $res); |
282
|
|
|
} |
283
|
|
|
|
284
|
59 |
|
return $res; |
285
|
|
|
} |
286
|
|
|
|
287
|
|
|
/** |
288
|
|
|
* @return int |
289
|
|
|
*/ |
290
|
27 |
|
private function readSignedLong() |
291
|
|
|
{ |
292
|
27 |
|
$this->resetCounters(); |
293
|
27 |
|
list(, $res) = unpack('l', $this->correctEndianness($this->rawread(4))); |
294
|
|
|
|
295
|
27 |
|
return $res; |
296
|
|
|
} |
297
|
|
|
|
298
|
|
|
/** |
299
|
|
|
* Even on 64 bit systems PHP integers are signed. |
300
|
|
|
* Since we need an unsigned value here we return it as a string. |
301
|
|
|
* |
302
|
|
|
* @return int|string |
303
|
|
|
*/ |
304
|
36 |
|
public function read_longlong() |
305
|
|
|
{ |
306
|
36 |
|
$this->resetCounters(); |
307
|
36 |
|
$bytes = $this->rawread(8); |
308
|
|
|
|
309
|
36 |
|
if (self::PLATFORM_64BIT) { |
310
|
|
|
// we can "unpack" if MSB bit is 0 (at most 63 bit integer), fallback to BigInteger otherwise |
311
|
36 |
|
if (!$this->getMSB($bytes)) { |
312
|
31 |
|
$res = unpack('J', $bytes); |
313
|
36 |
|
return $res[1]; |
314
|
|
|
} |
315
|
|
|
} else { |
316
|
|
|
// on 32-bit systems we can "unpack" up to 31 bits integer |
317
|
|
|
list(, $hi, $lo) = unpack('N2', $bytes); |
318
|
|
|
if ($hi === 0 && $lo > 0) { |
319
|
|
|
return $lo; |
320
|
|
|
} |
321
|
|
|
} |
322
|
|
|
|
323
|
5 |
|
$var = new BigInteger($bytes, 256); |
324
|
|
|
|
325
|
5 |
|
return $var->toString(); |
326
|
|
|
} |
327
|
|
|
|
328
|
|
|
/** |
329
|
|
|
* @return int|string |
330
|
|
|
*/ |
331
|
37 |
|
public function read_signed_longlong() |
332
|
|
|
{ |
333
|
37 |
|
$this->resetCounters(); |
334
|
37 |
|
$bytes = $this->rawread(8); |
335
|
|
|
|
336
|
37 |
|
if (self::PLATFORM_64BIT) { |
337
|
37 |
|
$res = unpack('q', $this->correctEndianness($bytes)); |
338
|
37 |
|
return $res[1]; |
339
|
|
|
} else { |
340
|
|
|
// on 32-bit systems we can "unpack" up to 31 bits integer |
341
|
|
|
list(, $hi, $lo) = unpack('N2', $bytes); |
342
|
|
|
if ($hi === 0 && $lo > 0) { |
343
|
|
|
// positive and less than 2^31-1 |
344
|
|
|
return $lo; |
345
|
|
|
} |
346
|
|
|
// negative and more than -2^31 |
347
|
|
|
if ($hi === -1 && $this->getLongMSB($lo)) { |
348
|
|
|
return $lo; |
349
|
|
|
} |
350
|
|
|
} |
351
|
|
|
|
352
|
|
|
$var = new BigInteger($bytes, -256); |
353
|
|
|
|
354
|
|
|
return $var->toString(); |
355
|
|
|
} |
356
|
|
|
|
357
|
|
|
/** |
358
|
|
|
* Read a utf-8 encoded string that's stored in up to |
359
|
|
|
* 255 bytes. Return it decoded as a PHP unicode object. |
360
|
|
|
* @return string |
361
|
|
|
*/ |
362
|
58 |
|
public function read_shortstr() |
363
|
|
|
{ |
364
|
58 |
|
$this->resetCounters(); |
365
|
58 |
|
list(, $slen) = unpack('C', $this->rawread(1)); |
366
|
|
|
|
367
|
58 |
|
return $this->rawread($slen); |
368
|
|
|
} |
369
|
|
|
|
370
|
|
|
/** |
371
|
|
|
* Read a string that's up to 2**32 bytes, the encoding |
372
|
|
|
* isn't specified in the AMQP spec, so just return it as |
373
|
|
|
* a plain PHP string. |
374
|
|
|
* @return string |
375
|
|
|
*/ |
376
|
52 |
|
public function read_longstr() |
377
|
|
|
{ |
378
|
52 |
|
$this->resetCounters(); |
379
|
52 |
|
$slen = $this->read_php_int(); |
380
|
|
|
|
381
|
52 |
|
if ($slen < 0) { |
382
|
|
|
throw new AMQPOutOfBoundsException('Strings longer than supported on this platform'); |
383
|
|
|
} |
384
|
|
|
|
385
|
52 |
|
return $this->rawread($slen); |
386
|
|
|
} |
387
|
|
|
|
388
|
|
|
/** |
389
|
|
|
* Read and AMQP timestamp, which is a 64-bit integer representing |
390
|
|
|
* seconds since the Unix epoch in 1-second resolution. |
391
|
|
|
* @return int|string |
392
|
|
|
*/ |
393
|
1 |
|
public function read_timestamp() |
394
|
|
|
{ |
395
|
1 |
|
return $this->read_longlong(); |
396
|
|
|
} |
397
|
|
|
|
398
|
|
|
/** |
399
|
|
|
* Read an AMQP table, and return as a PHP array. keys are strings, |
400
|
|
|
* values are (type,value) tuples. |
401
|
|
|
* |
402
|
|
|
* @param bool $returnObject Whether to return AMQPArray instance instead of plain array |
403
|
|
|
* @return array|AMQPTable |
404
|
|
|
*/ |
405
|
53 |
|
public function read_table($returnObject = false) |
406
|
|
|
{ |
407
|
53 |
|
$this->resetCounters(); |
408
|
53 |
|
$tlen = $this->read_php_int(); |
409
|
|
|
|
410
|
53 |
|
if ($tlen < 0) { |
411
|
|
|
throw new AMQPOutOfBoundsException('Table is longer than supported'); |
412
|
|
|
} |
413
|
|
|
|
414
|
53 |
|
$table_data = new AMQPReader($this->rawread($tlen), null); |
415
|
53 |
|
$result = $returnObject ? new AMQPTable() : array(); |
416
|
|
|
|
417
|
53 |
|
while ($table_data->tell() < $tlen) { |
418
|
53 |
|
$name = $table_data->read_shortstr(); |
419
|
53 |
|
$ftype = AMQPAbstractCollection::getDataTypeForSymbol($ftypeSym = $table_data->rawread(1)); |
420
|
53 |
|
$val = $table_data->read_value($ftype, $returnObject); |
421
|
53 |
|
$returnObject ? $result->set($name, $val, $ftype) : $result[$name] = array($ftypeSym, $val); |
422
|
|
|
} |
423
|
|
|
|
424
|
53 |
|
return $result; |
425
|
|
|
} |
426
|
|
|
|
427
|
|
|
/** |
428
|
|
|
* @return array|AMQPTable |
429
|
|
|
*/ |
430
|
8 |
|
public function read_table_object() |
431
|
|
|
{ |
432
|
8 |
|
return $this->read_table(true); |
433
|
|
|
} |
434
|
|
|
|
435
|
|
|
/** |
436
|
|
|
* Reads the array in the next value. |
437
|
|
|
* |
438
|
|
|
* @param bool $returnObject Whether to return AMQPArray instance instead of plain array |
439
|
|
|
* @return array|AMQPArray |
440
|
|
|
*/ |
441
|
6 |
|
public function read_array($returnObject = false) |
442
|
|
|
{ |
443
|
6 |
|
$this->resetCounters(); |
444
|
|
|
|
445
|
|
|
// Determine array length and its end position |
446
|
6 |
|
$arrayLength = $this->read_php_int(); |
447
|
6 |
|
$endOffset = $this->offset + $arrayLength; |
448
|
|
|
|
449
|
6 |
|
$result = $returnObject ? new AMQPArray() : array(); |
450
|
|
|
|
451
|
|
|
// Read values until we reach the end of the array |
452
|
6 |
|
while ($this->offset < $endOffset) { |
453
|
5 |
|
$fieldType = AMQPAbstractCollection::getDataTypeForSymbol($this->rawread(1)); |
454
|
5 |
|
$fieldValue = $this->read_value($fieldType, $returnObject); |
455
|
5 |
|
$returnObject ? $result->push($fieldValue, $fieldType) : $result[] = $fieldValue; |
456
|
|
|
} |
457
|
|
|
|
458
|
6 |
|
return $result; |
459
|
|
|
} |
460
|
|
|
|
461
|
|
|
/** |
462
|
|
|
* @return array|AMQPArray |
463
|
|
|
*/ |
464
|
|
|
public function read_array_object() |
465
|
|
|
{ |
466
|
|
|
return $this->read_array(true); |
467
|
|
|
} |
468
|
|
|
|
469
|
|
|
/** |
470
|
|
|
* Reads the next value as the provided field type. |
471
|
|
|
* |
472
|
|
|
* @param int $fieldType One of AMQPAbstractCollection::T_* constants |
473
|
|
|
* @param bool $collectionsAsObjects Description |
474
|
|
|
* @return mixed |
475
|
|
|
* @throws \PhpAmqpLib\Exception\AMQPDataReadException |
476
|
|
|
*/ |
477
|
54 |
|
public function read_value($fieldType, $collectionsAsObjects = false) |
478
|
|
|
{ |
479
|
54 |
|
$this->resetCounters(); |
480
|
|
|
|
481
|
|
|
switch ($fieldType) { |
482
|
54 |
|
case AMQPAbstractCollection::T_INT_SHORTSHORT: |
483
|
|
|
//according to AMQP091 spec, 'b' is not bit, it is short-short-int, also valid for rabbit/qpid |
484
|
|
|
//$val=$this->read_bit(); |
485
|
|
|
$val = $this->read_signed_octet(); |
486
|
|
|
break; |
487
|
54 |
|
case AMQPAbstractCollection::T_INT_SHORTSHORT_U: |
488
|
54 |
|
case AMQPAbstractCollection::T_BOOL: |
489
|
46 |
|
$val = $this->read_octet(); |
490
|
46 |
|
break; |
491
|
54 |
|
case AMQPAbstractCollection::T_INT_SHORT: |
492
|
|
|
$val = $this->read_signed_short(); |
493
|
|
|
break; |
494
|
54 |
|
case AMQPAbstractCollection::T_INT_SHORT_U: |
495
|
|
|
$val = $this->read_short(); |
496
|
|
|
break; |
497
|
54 |
|
case AMQPAbstractCollection::T_INT_LONG: |
498
|
7 |
|
$val = $this->readSignedLong(); |
499
|
7 |
|
break; |
500
|
51 |
|
case AMQPAbstractCollection::T_INT_LONG_U: |
501
|
|
|
$val = $this->read_long(); |
502
|
|
|
break; |
503
|
51 |
|
case AMQPAbstractCollection::T_INT_LONGLONG: |
504
|
3 |
|
$val = $this->read_signed_longlong(); |
505
|
3 |
|
break; |
506
|
51 |
|
case AMQPAbstractCollection::T_INT_LONGLONG_U: |
507
|
|
|
$val = $this->read_longlong(); |
508
|
|
|
break; |
509
|
51 |
|
case AMQPAbstractCollection::T_DECIMAL: |
510
|
|
|
$e = $this->read_octet(); |
511
|
|
|
$n = $this->readSignedLong(); |
512
|
|
|
$val = new AMQPDecimal($n, $e); |
513
|
|
|
break; |
514
|
51 |
|
case AMQPAbstractCollection::T_TIMESTAMP: |
515
|
|
|
$val = $this->read_timestamp(); |
516
|
|
|
break; |
517
|
51 |
|
case AMQPAbstractCollection::T_STRING_SHORT: |
518
|
|
|
$val = $this->read_shortstr(); |
519
|
|
|
break; |
520
|
51 |
|
case AMQPAbstractCollection::T_STRING_LONG: |
521
|
49 |
|
case AMQPAbstractCollection::T_BYTES: |
522
|
49 |
|
$val = $this->read_longstr(); |
523
|
49 |
|
break; |
524
|
48 |
|
case AMQPAbstractCollection::T_ARRAY: |
525
|
6 |
|
$val = $this->read_array($collectionsAsObjects); |
526
|
6 |
|
break; |
527
|
46 |
|
case AMQPAbstractCollection::T_TABLE: |
528
|
45 |
|
$val = $this->read_table($collectionsAsObjects); |
529
|
45 |
|
break; |
530
|
1 |
|
case AMQPAbstractCollection::T_VOID: |
531
|
1 |
|
$val = null; |
532
|
1 |
|
break; |
533
|
|
|
default: |
534
|
|
|
throw new AMQPInvalidArgumentException(sprintf( |
535
|
|
|
'Unsupported type "%s"', |
536
|
|
|
$fieldType |
537
|
|
|
)); |
538
|
|
|
} |
539
|
|
|
|
540
|
54 |
|
return $val; |
541
|
|
|
} |
542
|
|
|
|
543
|
|
|
/** |
544
|
|
|
* @return int |
545
|
|
|
*/ |
546
|
53 |
|
protected function tell() |
547
|
|
|
{ |
548
|
53 |
|
return $this->offset; |
549
|
|
|
} |
550
|
|
|
|
551
|
|
|
/** |
552
|
|
|
* Sets the timeout (second) |
553
|
|
|
* |
554
|
|
|
* @param int|float|null $timeout |
555
|
|
|
*/ |
556
|
42 |
|
public function setTimeout($timeout) |
557
|
|
|
{ |
558
|
42 |
|
$this->timeout = $timeout; |
559
|
42 |
|
} |
560
|
|
|
|
561
|
|
|
/** |
562
|
|
|
* @return int|float|null |
563
|
|
|
*/ |
564
|
42 |
|
public function getTimeout() |
565
|
|
|
{ |
566
|
42 |
|
return $this->timeout; |
567
|
|
|
} |
568
|
|
|
|
569
|
676 |
|
private function resetCounters() |
570
|
|
|
{ |
571
|
676 |
|
$this->bitcount = $this->bits = 0; |
572
|
676 |
|
} |
573
|
|
|
} |
574
|
|
|
|