Completed
Push — master ( dafd10...eb0376 )
by Ramūnas
12:40 queued 12:38
created

AMQPReader::readSignedLong()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 7

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 4
CRAP Score 1

Importance

Changes 0
Metric Value
dl 0
loc 7
ccs 4
cts 4
cp 1
rs 10
c 0
b 0
f 0
cc 1
nc 1
nop 0
crap 1
1
<?php
2
3
namespace PhpAmqpLib\Wire;
4
5
use PhpAmqpLib\Exception\AMQPDataReadException;
6
use PhpAmqpLib\Exception\AMQPInvalidArgumentException;
7
use PhpAmqpLib\Exception\AMQPIOWaitException;
8
use PhpAmqpLib\Exception\AMQPNoDataException;
9
use PhpAmqpLib\Exception\AMQPOutOfBoundsException;
10
use PhpAmqpLib\Exception\AMQPTimeoutException;
11
use PhpAmqpLib\Helper\MiscHelper;
12
use PhpAmqpLib\Wire\IO\AbstractIO;
13
use phpseclib\Math\BigInteger;
14
15
/**
16
 * This class can read from a string or from a stream
17
 *
18
 * TODO : split this class: AMQPStreamReader and a AMQPBufferReader
19
 */
20
class AMQPReader extends AbstractClient
21
{
22
    const BIT = 1;
23
    const OCTET = 1;
24
    const SHORTSTR = 1;
25
    const SHORT = 2;
26
    const LONG = 4;
27
    const SIGNED_LONG = 4;
28
    const READ_PHP_INT = 4; // use READ_ to avoid possible clashes with PHP
29
    const LONGLONG = 8;
30
    const TIMESTAMP = 8;
31
32
    /** @var string */
33
    protected $str = '';
34
35
    /** @var int */
36
    protected $str_length = 0;
37
38
    /** @var int */
39
    protected $offset = 0;
40
41
    /** @var int */
42
    protected $bitcount = 0;
43
44
    /** @var int|float|null */
45
    protected $timeout;
46
47
    /** @var int */
48
    protected $bits = 0;
49
50
    /** @var null|\PhpAmqpLib\Wire\IO\AbstractIO */
51
    protected $io;
52
53
    /**
54
     * @param string|null $str
55
     * @param AbstractIO $io
56
     * @param int|float $timeout
57
     */
58 681
    public function __construct($str, AbstractIO $io = null, $timeout = 0)
59
    {
60 681
        if (is_string($str)) {
61 670
            $this->str = (string)$str;
62 670
            $this->str_length = mb_strlen($this->str, 'ASCII');
63
        }
64 681
        $this->io = $io;
65 681
        $this->timeout = $timeout;
66 681
    }
67
68
    /**
69
     * Resets the object from the injected param
70
     *
71
     * Used to not need to create a new AMQPReader instance every time.
72
     * when we can just pass a string and reset the object state.
73
     * NOTE: since we are working with strings we don't need to pass an AbstractIO
74
     *       or a timeout.
75
     *
76
     * @param string $str
77
     */
78 57
    public function reuse($str)
79
    {
80 57
        $this->str = $str;
81 57
        $this->str_length = mb_strlen($this->str, 'ASCII');
82 57
        $this->offset = 0;
83 57
        $this->resetCounters();
84 57
    }
85
86
    /**
87
     * Closes the stream
88
     */
89 35
    public function close()
90
    {
91 35
        if ($this->io) {
92 35
            $this->io->close();
93
        }
94 35
    }
95
96
    /**
97
     * @param int $n
98
     * @return string
99
     */
100 42
    public function read($n)
101
    {
102 42
        $this->resetCounters();
103
104 42
        return $this->rawread($n);
105
    }
106
107
    /**
108
     * Waits until some data is retrieved from the socket.
109
     *
110
     * AMQPTimeoutException can be raised if the timeout is set
111
     *
112
     * @throws \PhpAmqpLib\Exception\AMQPTimeoutException when timeout is set and no data received
113
     * @throws \PhpAmqpLib\Exception\AMQPNoDataException when no data is ready to read from IO
114
     */
115 42
    protected function wait()
116
    {
117 42
        $timeout = $this->getTimeout();
118 42
        if (null === $timeout) {
119
            // timeout=null just poll state and return instantly
120 4
            $sec = 0;
121 4
            $usec = 0;
122 42
        } elseif ($timeout > 0) {
123 42
            list($sec, $usec) = MiscHelper::splitSecondsMicroseconds($this->getTimeout());
124
        } else {
125
            // wait indefinitely for data if timeout=0
126 36
            $sec = null;
127 36
            $usec = 0;
128
        }
129
130 42
        $result = $this->io->select($sec, $usec);
131
132 42
        if ($result === 0) {
133 13
            if ($timeout > 0) {
134 5
                throw new AMQPTimeoutException(sprintf(
135 5
                    'The connection timed out after %s sec while awaiting incoming data',
136
                    $timeout
137
                ));
138
            } else {
139 8
                throw new AMQPNoDataException('No data is ready to read');
140
            }
141
        }
142 42
    }
143
144
    /**
145
     * @param int $n
146
     * @return string
147
     * @throws \RuntimeException
148
     * @throws \PhpAmqpLib\Exception\AMQPDataReadException
149
     * @throws \PhpAmqpLib\Exception\AMQPNoDataException
150
     */
151 678
    protected function rawread($n)
152
    {
153 678
        if ($this->io) {
154 42
            $res = '';
155 42
            while (true) {
156 42
                $this->wait();
157
                try {
158 42
                    $res = $this->io->read($n);
159 42
                    break;
160
                } catch (AMQPTimeoutException $e) {
161
                    if ($this->getTimeout() > 0) {
162
                        throw $e;
163
                    }
164
                }
165
            }
166 42
            $this->offset += $n;
167 42
            return $res;
168
        }
169
170 678
        if ($this->str_length < $n) {
171
            throw new AMQPDataReadException(sprintf(
172
                'Error reading data. Requested %s bytes while string buffer has only %s',
173
                $n,
174
                $this->str_length
175
            ));
176
        }
177
178 678
        $res = mb_substr($this->str, 0, $n, 'ASCII');
179 678
        $this->str = mb_substr($this->str, $n, null, 'ASCII');
180 678
        $this->str_length -= $n;
181 678
        $this->offset += $n;
182
183 678
        return $res;
184
    }
185
186
    /**
187
     * @return bool
188
     */
189 16
    public function read_bit()
190
    {
191 16
        if (empty($this->bitcount)) {
192 16
            $this->bits = ord($this->rawread(1));
193 16
            $this->bitcount = 8;
194
        }
195
196 16
        $result = ($this->bits & 1) === 1;
197 16
        $this->bits >>= 1;
198 16
        $this->bitcount--;
199
200 16
        return $result;
201
    }
202
203
    /**
204
     * @return int
205
     */
206 306
    public function read_octet()
207
    {
208 306
        $this->resetCounters();
209 306
        list(, $res) = unpack('C', $this->rawread(1));
210
211 306
        return $res;
212
    }
213
214
    /**
215
     * @return int
216
     */
217 256
    public function read_signed_octet()
218
    {
219 256
        $this->resetCounters();
220 256
        list(, $res) = unpack('c', $this->rawread(1));
221
222 256
        return $res;
223
    }
224
225
    /**
226
     * @return int
227
     */
228 57
    public function read_short()
229
    {
230 57
        $this->resetCounters();
231 57
        list(, $res) = unpack('n', $this->rawread(2));
232
233 57
        return $res;
234
    }
235
236
    /**
237
     * @return int
238
     */
239 4
    public function read_signed_short()
240
    {
241 4
        $this->resetCounters();
242 4
        list(, $res) = unpack('s', $this->correctEndianness($this->rawread(2)));
243
244 4
        return $res;
245
    }
246
247
    /**
248
     * Reads 32 bit integer in big-endian byte order.
249
     *
250
     * On 64 bit systems it will return always unsigned int
251
     * value in 0..2^32 range.
252
     *
253
     * On 32 bit systems it will return signed int value in
254
     * -2^31...+2^31 range.
255
     *
256
     * Use with caution!
257
     * @return int|string
258
     */
259 57
    public function read_php_int()
260
    {
261 57
        list(, $res) = unpack('N', $this->rawread(4));
262
263 57
        if (self::PLATFORM_64BIT) {
264 57
            return (int) sprintf('%u', $res);
265
        }
266
267
        return $res;
268
    }
269
270
    /**
271
     * PHP does not have unsigned 32 bit int,
272
     * so we return it as a string
273
     *
274
     * @return int|string
275
     */
276 59
    public function read_long()
277
    {
278 59
        $this->resetCounters();
279 59
        list(, $res) = unpack('N', $this->rawread(4));
280 59
        if (!self::PLATFORM_64BIT && $this->getLongMSB($res)) {
281
            return sprintf('%u', $res);
282
        }
283
284 59
        return $res;
285
    }
286
287
    /**
288
     * @return int
289
     */
290 27
    private function readSignedLong()
291
    {
292 27
        $this->resetCounters();
293 27
        list(, $res) = unpack('l', $this->correctEndianness($this->rawread(4)));
294
295 27
        return $res;
296
    }
297
298
    /**
299
     * Even on 64 bit systems PHP integers are signed.
300
     * Since we need an unsigned value here we return it as a string.
301
     *
302
     * @return int|string
303
     */
304 36
    public function read_longlong()
305
    {
306 36
        $this->resetCounters();
307 36
        $bytes = $this->rawread(8);
308
309 36
        if (self::PLATFORM_64BIT) {
310
            // we can "unpack" if MSB bit is 0 (at most 63 bit integer), fallback to BigInteger otherwise
311 36
            if (!$this->getMSB($bytes)) {
312 31
                $res = unpack('J', $bytes);
313 36
                return $res[1];
314
            }
315
        } else {
316
            // on 32-bit systems we can "unpack" up to 31 bits integer
317
            list(, $hi, $lo) = unpack('N2', $bytes);
318
            if ($hi === 0 && $lo > 0) {
319
                return $lo;
320
            }
321
        }
322
323 5
        $var = new BigInteger($bytes, 256);
324
325 5
        return $var->toString();
326
    }
327
328
    /**
329
     * @return int|string
330
     */
331 37
    public function read_signed_longlong()
332
    {
333 37
        $this->resetCounters();
334 37
        $bytes = $this->rawread(8);
335
336 37
        if (self::PLATFORM_64BIT) {
337 37
            $res = unpack('q', $this->correctEndianness($bytes));
338 37
            return $res[1];
339
        } else {
340
            // on 32-bit systems we can "unpack" up to 31 bits integer
341
            list(, $hi, $lo) = unpack('N2', $bytes);
342
            if ($hi === 0 && $lo > 0) {
343
                // positive and less than 2^31-1
344
                return $lo;
345
            }
346
            // negative and more than -2^31
347
            if ($hi === -1 && $this->getLongMSB($lo)) {
348
                return $lo;
349
            }
350
        }
351
352
        $var = new BigInteger($bytes, -256);
353
354
        return $var->toString();
355
    }
356
357
    /**
358
     * Read a utf-8 encoded string that's stored in up to
359
     * 255 bytes.  Return it decoded as a PHP unicode object.
360
     * @return string
361
     */
362 58
    public function read_shortstr()
363
    {
364 58
        $this->resetCounters();
365 58
        list(, $slen) = unpack('C', $this->rawread(1));
366
367 58
        return $this->rawread($slen);
368
    }
369
370
    /**
371
     * Read a string that's up to 2**32 bytes, the encoding
372
     * isn't specified in the AMQP spec, so just return it as
373
     * a plain PHP string.
374
     * @return string
375
     */
376 52
    public function read_longstr()
377
    {
378 52
        $this->resetCounters();
379 52
        $slen = $this->read_php_int();
380
381 52
        if ($slen < 0) {
382
            throw new AMQPOutOfBoundsException('Strings longer than supported on this platform');
383
        }
384
385 52
        return $this->rawread($slen);
386
    }
387
388
    /**
389
     * Read and AMQP timestamp, which is a 64-bit integer representing
390
     * seconds since the Unix epoch in 1-second resolution.
391
     * @return int|string
392
     */
393 1
    public function read_timestamp()
394
    {
395 1
        return $this->read_longlong();
396
    }
397
398
    /**
399
     * Read an AMQP table, and return as a PHP array. keys are strings,
400
     * values are (type,value) tuples.
401
     *
402
     * @param bool $returnObject Whether to return AMQPArray instance instead of plain array
403
     * @return array|AMQPTable
404
     */
405 53
    public function read_table($returnObject = false)
406
    {
407 53
        $this->resetCounters();
408 53
        $tlen = $this->read_php_int();
409
410 53
        if ($tlen < 0) {
411
            throw new AMQPOutOfBoundsException('Table is longer than supported');
412
        }
413
414 53
        $table_data = new AMQPReader($this->rawread($tlen), null);
415 53
        $result = $returnObject ? new AMQPTable() : array();
416
417 53
        while ($table_data->tell() < $tlen) {
418 53
            $name = $table_data->read_shortstr();
419 53
            $ftype = AMQPAbstractCollection::getDataTypeForSymbol($ftypeSym = $table_data->rawread(1));
420 53
            $val = $table_data->read_value($ftype, $returnObject);
421 53
            $returnObject ? $result->set($name, $val, $ftype) : $result[$name] = array($ftypeSym, $val);
422
        }
423
424 53
        return $result;
425
    }
426
427
    /**
428
     * @return array|AMQPTable
429
     */
430 8
    public function read_table_object()
431
    {
432 8
        return $this->read_table(true);
433
    }
434
435
    /**
436
     * Reads the array in the next value.
437
     *
438
     * @param bool $returnObject Whether to return AMQPArray instance instead of plain array
439
     * @return array|AMQPArray
440
     */
441 6
    public function read_array($returnObject = false)
442
    {
443 6
        $this->resetCounters();
444
445
        // Determine array length and its end position
446 6
        $arrayLength = $this->read_php_int();
447 6
        $endOffset = $this->offset + $arrayLength;
448
449 6
        $result = $returnObject ? new AMQPArray() : array();
450
451
        // Read values until we reach the end of the array
452 6
        while ($this->offset < $endOffset) {
453 5
            $fieldType = AMQPAbstractCollection::getDataTypeForSymbol($this->rawread(1));
454 5
            $fieldValue = $this->read_value($fieldType, $returnObject);
455 5
            $returnObject ? $result->push($fieldValue, $fieldType) : $result[] = $fieldValue;
456
        }
457
458 6
        return $result;
459
    }
460
461
    /**
462
     * @return array|AMQPArray
463
     */
464
    public function read_array_object()
465
    {
466
        return $this->read_array(true);
467
    }
468
469
    /**
470
     * Reads the next value as the provided field type.
471
     *
472
     * @param int $fieldType One of AMQPAbstractCollection::T_* constants
473
     * @param bool $collectionsAsObjects Description
474
     * @return mixed
475
     * @throws \PhpAmqpLib\Exception\AMQPDataReadException
476
     */
477 54
    public function read_value($fieldType, $collectionsAsObjects = false)
478
    {
479 54
        $this->resetCounters();
480
481
        switch ($fieldType) {
482 54
            case AMQPAbstractCollection::T_INT_SHORTSHORT:
483
                //according to AMQP091 spec, 'b' is not bit, it is short-short-int, also valid for rabbit/qpid
484
                //$val=$this->read_bit();
485
                $val = $this->read_signed_octet();
486
                break;
487 54
            case AMQPAbstractCollection::T_INT_SHORTSHORT_U:
488 54
            case AMQPAbstractCollection::T_BOOL:
489 46
                $val = $this->read_octet();
490 46
                break;
491 54
            case AMQPAbstractCollection::T_INT_SHORT:
492
                $val = $this->read_signed_short();
493
                break;
494 54
            case AMQPAbstractCollection::T_INT_SHORT_U:
495
                $val = $this->read_short();
496
                break;
497 54
            case AMQPAbstractCollection::T_INT_LONG:
498 7
                $val = $this->readSignedLong();
499 7
                break;
500 51
            case AMQPAbstractCollection::T_INT_LONG_U:
501
                $val = $this->read_long();
502
                break;
503 51
            case AMQPAbstractCollection::T_INT_LONGLONG:
504 3
                $val = $this->read_signed_longlong();
505 3
                break;
506 51
            case AMQPAbstractCollection::T_INT_LONGLONG_U:
507
                $val = $this->read_longlong();
508
                break;
509 51
            case AMQPAbstractCollection::T_DECIMAL:
510
                $e = $this->read_octet();
511
                $n = $this->readSignedLong();
512
                $val = new AMQPDecimal($n, $e);
513
                break;
514 51
            case AMQPAbstractCollection::T_TIMESTAMP:
515
                $val = $this->read_timestamp();
516
                break;
517 51
            case AMQPAbstractCollection::T_STRING_SHORT:
518
                $val = $this->read_shortstr();
519
                break;
520 51
            case AMQPAbstractCollection::T_STRING_LONG:
521 49
            case AMQPAbstractCollection::T_BYTES:
522 49
                $val = $this->read_longstr();
523 49
                break;
524 48
            case AMQPAbstractCollection::T_ARRAY:
525 6
                $val = $this->read_array($collectionsAsObjects);
526 6
                break;
527 46
            case AMQPAbstractCollection::T_TABLE:
528 45
                $val = $this->read_table($collectionsAsObjects);
529 45
                break;
530 1
            case AMQPAbstractCollection::T_VOID:
531 1
                $val = null;
532 1
                break;
533
            default:
534
                throw new AMQPInvalidArgumentException(sprintf(
535
                    'Unsupported type "%s"',
536
                    $fieldType
537
                ));
538
        }
539
540 54
        return $val;
541
    }
542
543
    /**
544
     * @return int
545
     */
546 53
    protected function tell()
547
    {
548 53
        return $this->offset;
549
    }
550
551
    /**
552
     * Sets the timeout (second)
553
     *
554
     * @param int|float|null $timeout
555
     */
556 42
    public function setTimeout($timeout)
557
    {
558 42
        $this->timeout = $timeout;
559 42
    }
560
561
    /**
562
     * @return int|float|null
563
     */
564 42
    public function getTimeout()
565
    {
566 42
        return $this->timeout;
567
    }
568
569 676
    private function resetCounters()
570
    {
571 676
        $this->bitcount = $this->bits = 0;
572 676
    }
573
}
574