AMQPReader::getTimeout()   A
last analyzed

Complexity

Conditions 1
Paths 1

Size

Total Lines 4
Code Lines 2

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 2
CRAP Score 1
Metric Value
dl 0
loc 4
ccs 2
cts 2
cp 1
rs 10
cc 1
eloc 2
nc 1
nop 0
crap 1
1
<?php
2
namespace PhpAmqpLib\Wire;
3
4
use PhpAmqpLib\Exception\AMQPInvalidArgumentException;
5
use PhpAmqpLib\Exception\AMQPOutOfBoundsException;
6
use PhpAmqpLib\Exception\AMQPRuntimeException;
7
use PhpAmqpLib\Exception\AMQPTimeoutException;
8
use PhpAmqpLib\Helper\MiscHelper;
9
use PhpAmqpLib\Wire\IO\AbstractIO;
10
11
/**
12
 * This class can read from a string or from a stream
13
 *
14
 * TODO : split this class: AMQPStreamReader and a AMQPBufferReader
15
 */
16
class AMQPReader extends AbstractClient
17
{
18
    const BIT = 1;
19
    const OCTET = 1;
20
    const SHORTSTR = 1;
21
    const SHORT = 2;
22
    const LONG = 4;
23
    const SIGNED_LONG = 4;
24
    const READ_PHP_INT = 4; // use READ_ to avoid possible clashes with PHP
25
    const LONGLONG = 8;
26
    const TIMESTAMP = 8;
27
28
    /** @var string */
29
    protected $str;
30
31
    /** @var int */
32
    protected $str_length;
33
34
    /** @var int */
35
    protected $offset;
36
37
    /** @var int */
38
    protected $bitcount;
39
40
    /** @var bool */
41
    protected $is64bits;
42
43
    /** @var int */
44
    protected $timeout;
45
46
    /** @var int */
47
    protected $bits;
48
49
    /** @var \PhpAmqpLib\Wire\IO\AbstractIO */
50
    protected $io;
51
52
    /**
53
     * @param string $str
54
     * @param AbstractIO $io
55
     * @param int $timeout
56
     */
57 210
    public function __construct($str, AbstractIO $io = null, $timeout = 0)
58
    {
59 210
        parent::__construct();
60
61 210
        $this->str = $str;
62 210
        $this->str_length = mb_strlen($this->str, 'ASCII');
63 210
        $this->io = $io;
64 210
        $this->offset = 0;
65 210
        $this->bitcount = $this->bits = 0;
66 210
        $this->timeout = $timeout;
67 210
    }
68
69
    /**
70
     * Resets the object from the injected param
71
     *
72
     * Used to not need to create a new AMQPReader instance every time.
73
     * when we can just pass a string and reset the object state.
74
     * NOTE: since we are working with strings we don't need to pass an AbstractIO
75
     *       or a timeout.
76
     *
77
     * @param string $str
78
     */
79 135
    public function reuse($str)
80
    {
81 135
        $this->str = $str;
82 135
        $this->str_length = mb_strlen($this->str, 'ASCII');
83 135
        $this->offset = 0;
84 135
        $this->bitcount = $this->bits = 0;
85 135
    }
86
87
    /**
88
     * Closes the stream
89
     */
90 60
    public function close()
91
    {
92 60
        if ($this->io) {
93 60
            $this->io->close();
94 48
        }
95 60
    }
96
97
    /**
98
     * @param $n
99
     * @return string
100
     */
101 60
    public function read($n)
102
    {
103 60
        $this->bitcount = $this->bits = 0;
104
105 60
        return $this->rawread($n);
106
    }
107
108
    /**
109
     * Waits until some data is retrieved from the socket.
110
     *
111
     * AMQPTimeoutException can be raised if the timeout is set
112
     *
113
     * @throws \PhpAmqpLib\Exception\AMQPTimeoutException
114
     */
115 60
    protected function wait()
116
    {
117 60
        if ($this->getTimeout() == 0) {
118 60
            return null;
119
        }
120
121
        // wait ..
122
        list($sec, $usec) = MiscHelper::splitSecondsMicroseconds($this->getTimeout());
123
        $result = $this->io->select($sec, $usec);
124
125
        if ($result === false) {
126
            throw new AMQPRuntimeException('A network error occured while awaiting for incoming data');
127
        }
128
129
        if ($result === 0) {
130
            throw new AMQPTimeoutException(sprintf(
131
                'The connection timed out after %s sec while awaiting incoming data',
132
                $this->getTimeout()
133
            ));
134
        }
135
    }
136
137
    /**
138
     * @param $n
139
     * @return string
140
     * @throws \RuntimeException
141
     * @throws \PhpAmqpLib\Exception\AMQPRuntimeException
142
     */
143 210
    protected function rawread($n)
144
    {
145 210
        if ($this->io) {
146 60
            $this->wait();
147 60
            $res = $this->io->read($n);
148 60
            $this->offset += $n;
149 48
        } else {
150 210
            if ($this->str_length < $n) {
151
                throw new AMQPRuntimeException(sprintf(
152
                    'Error reading data. Requested %s bytes while string buffer has only %s',
153
                    $n,
154
                    $this->str_length
155
                ));
156
            }
157
158 210
            $res = mb_substr($this->str, 0, $n, 'ASCII');
159 210
            $this->str = mb_substr($this->str, $n, mb_strlen($this->str, 'ASCII') - $n, 'ASCII');
160 210
            $this->str_length -= $n;
161 210
            $this->offset += $n;
162
        }
163
164 210
        return $res;
165
    }
166
167
    /**
168
     * @return bool
169
     */
170 50
    public function read_bit()
171
    {
172 50
        if (!$this->bitcount) {
173 50
            $this->bits = ord($this->rawread(1));
174 50
            $this->bitcount = 8;
175 40
        }
176
177 50
        $result = ($this->bits & 1) == 1;
178 50
        $this->bits >>= 1;
179 50
        $this->bitcount -= 1;
180
181 50
        return $result;
182
    }
183
184
    /**
185
     * @return mixed
186
     */
187 105 View Code Duplication
    public function read_octet()
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
188
    {
189 105
        $this->bitcount = $this->bits = 0;
190 105
        list(, $res) = unpack('C', $this->rawread(1));
191
192 105
        return $res;
193
    }
194
195
    /**
196
     * @return mixed
197
     */
198 5 View Code Duplication
    public function read_signed_octet()
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
199
    {
200 5
        $this->bitcount = $this->bits = 0;
201 5
        list(, $res) = unpack('c', $this->rawread(1));
202
203 5
        return $res;
204
    }
205
206
    /**
207
     * @return mixed
208
     */
209 140
    public function read_short()
210
    {
211 140
        $this->bitcount = $this->bits = 0;
212 140
        list(, $res) = unpack('n', $this->rawread(2));
213
214 140
        return $res;
215
    }
216
217
    /**
218
     * @return mixed
219
     */
220 5 View Code Duplication
    public function read_signed_short()
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
221
    {
222 5
        $this->bitcount = $this->bits = 0;
223 5
        list(, $res) = unpack('s', $this->correctEndianness($this->rawread(2)));
224
225 5
        return $res;
226
    }
227
228
    /**
229
     * Reads 32 bit integer in big-endian byte order.
230
     *
231
     * On 64 bit systems it will return always unsigned int
232
     * value in 0..2^32 range.
233
     *
234
     * On 32 bit systems it will return signed int value in
235
     * -2^31...+2^31 range.
236
     *
237
     * Use with caution!
238
     */
239 120
    public function read_php_int()
240
    {
241 120
        list(, $res) = unpack('N', $this->rawread(4));
242
243 120
        if ($this->is64bits) {
244 120
            return (int) sprintf('%u', $res);
245
        }
246
247
        return $res;
248
    }
249
250
    /**
251
     * PHP does not have unsigned 32 bit int,
252
     * so we return it as a string
253
     *
254
     * @return string
255
     */
256 65
    public function read_long()
257
    {
258 65
        $this->bitcount = $this->bits = 0;
259 65
        list(, $res) = unpack('N', $this->rawread(4));
260
261 65
        return !$this->is64bits && self::getLongMSB($res) ? sprintf('%u', $res) : $res;
262
    }
263
264
    /**
265
     * @return integer
266
     */
267 40 View Code Duplication
    private function read_signed_long()
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
268
    {
269 40
        $this->bitcount = $this->bits = 0;
270 40
        list(, $res) = unpack('l', $this->correctEndianness($this->rawread(4)));
271
272 40
        return $res;
273
    }
274
275
    /**
276
     * Even on 64 bit systems PHP integers are singed.
277
     * Since we need an unsigned value here we return it
278
     * as a string.
279
     *
280
     * @return string
281
     */
282 55
    public function read_longlong()
283
    {
284 55
        $this->bitcount = $this->bits = 0;
285
286 55
        list(, $hi, $lo) = unpack('N2', $this->rawread(8));
287 55
        $msb = self::getLongMSB($hi);
288
289 55
        if (!$this->is64bits) {
290
            if ($msb) {
291
                $hi = sprintf('%u', $hi);
292
            }
293
            if (self::getLongMSB($lo)) {
294
                $lo = sprintf('%u', $lo);
295
            }
296
        }
297
298 55
        return bcadd($this->is64bits && !$msb ? $hi << 32 : bcmul($hi, '4294967296', 0), $lo, 0);
299
    }
300
301
    /**
302
     * @return string
303
     */
304 20
    public function read_signed_longlong()
305
    {
306 20
        $this->bitcount = $this->bits = 0;
307
308 20
        list(, $hi, $lo) = unpack('N2', $this->rawread(8));
309
310 20
        if ($this->is64bits) {
311 20
            return bcadd($hi << 32, $lo, 0);
312
        } else {
313
            return bcadd(bcmul($hi, '4294967296', 0), self::getLongMSB($lo) ? sprintf('%u', $lo) : $lo, 0);
314
        }
315
    }
316
317
    /**
318
     * @param int $longInt
319
     * @return bool
320
     */
321 55
    private static function getLongMSB($longInt)
322
    {
323 55
        return (bool) ($longInt & 0x80000000);
324
    }
325
326
    /**
327
     * Read a utf-8 encoded string that's stored in up to
328
     * 255 bytes.  Return it decoded as a PHP unicode object.
329
     */
330 125 View Code Duplication
    public function read_shortstr()
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
331
    {
332 125
        $this->bitcount = $this->bits = 0;
333 125
        list(, $slen) = unpack('C', $this->rawread(1));
334
335 125
        return $this->rawread($slen);
336
    }
337
338
    /**
339
     * Read a string that's up to 2**32 bytes, the encoding
340
     * isn't specified in the AMQP spec, so just return it as
341
     * a plain PHP string.
342
     */
343 95
    public function read_longstr()
344
    {
345 95
        $this->bitcount = $this->bits = 0;
346 95
        $slen = $this->read_php_int();
347
348 95
        if ($slen < 0) {
349
            throw new AMQPOutOfBoundsException('Strings longer than supported on this platform');
350
        }
351
352 95
        return $this->rawread($slen);
353
    }
354
355
    /**
356
     * Read and AMQP timestamp, which is a 64-bit integer representing
357
     * seconds since the Unix epoch in 1-second resolution.
358
     */
359 5
    public function read_timestamp()
360
    {
361 5
        return $this->read_longlong();
362
    }
363
364
    /**
365
     * Read an AMQP table, and return as a PHP array. keys are strings,
366
     * values are (type,value) tuples.
367
     *
368
     * @param bool $returnObject Whether to return AMQPArray instance instead of plain array
369
     * @return array|AMQPTable
370
     */
371 110
    public function read_table($returnObject = false)
372
    {
373 110
        $this->bitcount = $this->bits = 0;
374 110
        $tlen = $this->read_php_int();
375
376 110
        if ($tlen < 0) {
377
            throw new AMQPOutOfBoundsException('Table is longer than supported');
378
        }
379
380 110
        $table_data = new AMQPReader($this->rawread($tlen), null);
381 110
        $result = $returnObject ? new AMQPTable() : array();
382
383 110
        while ($table_data->tell() < $tlen) {
384 110
            $name = $table_data->read_shortstr();
385 110
            $ftype = AMQPAbstractCollection::getDataTypeForSymbol($ftypeSym = $table_data->rawread(1));
386 110
            $val = $table_data->read_value($ftype, $returnObject);
387 110
            $returnObject ? $result->set($name, $val, $ftype) : $result[$name] = array($ftypeSym, $val);
388 88
        }
389
390 110
        return $result;
391
    }
392
393
    /**
394
     * @return array|AMQPTable
395
     */
396 40
    public function read_table_object()
397
    {
398 40
        return $this->read_table(true);
399
    }
400
401
    /**
402
     * Reads the array in the next value.
403
     *
404
     * @param bool $returnObject Whether to return AMQPArray instance instead of plain array
405
     * @return array|AMQPArray
406
     */
407 30
    public function read_array($returnObject = false)
408
    {
409 30
        $this->bitcount = $this->bits = 0;
410
411
        // Determine array length and its end position
412 30
        $arrayLength = $this->read_php_int();
413 30
        $endOffset = $this->offset + $arrayLength;
414
415 30
        $result = $returnObject ? new AMQPArray() : array();
416
417
        // Read values until we reach the end of the array
418 30
        while ($this->offset < $endOffset) {
419 25
            $fieldType = AMQPAbstractCollection::getDataTypeForSymbol($this->rawread(1));
420 25
            $fieldValue = $this->read_value($fieldType, $returnObject);
421 25
            $returnObject ? $result->push($fieldValue, $fieldType) : $result[] = $fieldValue;
422 20
        }
423
424 30
        return $result;
425
    }
426
427
    /**
428
     * @return array|AMQPArray
429
     */
430
    public function read_array_object()
431
    {
432
        return $this->read_array(true);
433
    }
434
435
    /**
436
     * Reads the next value as the provided field type.
437
     *
438
     * @param int $fieldType One of AMQPAbstractCollection::T_* constants
439
     * @param bool $collectionsAsObjects Description
440
     * @return mixed
441
     * @throws \PhpAmqpLib\Exception\AMQPRuntimeException
442
     */
443 115
    public function read_value($fieldType, $collectionsAsObjects = false)
444
    {
445 115
        $this->bitcount = $this->bits = 0;
446 115
        $val = null;
0 ignored issues
show
Unused Code introduced by
$val is not used, you could remove the assignment.

This check looks for variable assignements that are either overwritten by other assignments or where the variable is not used subsequently.

$myVar = 'Value';
$higher = false;

if (rand(1, 6) > 3) {
    $higher = true;
} else {
    $higher = false;
}

Both the $myVar assignment in line 1 and the $higher assignment in line 2 are dead. The first because $myVar is never used and the second because $higher is always overwritten for every possible time line.

Loading history...
447
448
        switch ($fieldType) {
449 115
            case AMQPAbstractCollection::T_INT_SHORTSHORT:
450
                //according to AMQP091 spec, 'b' is not bit, it is short-short-int, also valid for rabbit/qpid
451
                //$val=$this->read_bit();
0 ignored issues
show
Unused Code Comprehensibility introduced by
75% of this comment could be valid code. Did you maybe forget this after debugging?

Sometimes obsolete code just ends up commented out instead of removed. In this case it is better to remove the code once you have checked you do not need it.

The code might also have been commented out for debugging purposes. In this case it is vital that someone uncomments it again or your project may behave in very unexpected ways in production.

This check looks for comments that seem to be mostly valid code and reports them.

Loading history...
452
                $val = $this->read_signed_octet();
453
                break;
454 115
            case AMQPAbstractCollection::T_INT_SHORTSHORT_U:
455
                $val = $this->read_octet();
456
                break;
457 115
            case AMQPAbstractCollection::T_INT_SHORT:
458
                $val = $this->read_signed_short();
459
                break;
460 115
            case AMQPAbstractCollection::T_INT_SHORT_U:
461
                $val = $this->read_short();
462
                break;
463 115
            case AMQPAbstractCollection::T_INT_LONG:
464 35
                $val = $this->read_signed_long();
465 35
                break;
466 100
            case AMQPAbstractCollection::T_INT_LONG_U:
467
                $val = $this->read_long();
468
                break;
469 100
            case AMQPAbstractCollection::T_INT_LONGLONG:
470 15
                $val = $this->read_signed_longlong();
471 15
                break;
472 100
            case AMQPAbstractCollection::T_INT_LONGLONG_U:
473
                $val = $this->read_longlong();
474
                break;
475 100
            case AMQPAbstractCollection::T_DECIMAL:
476
                $e = $this->read_octet();
477
                $n = $this->read_signed_long();
478
                $val = new AMQPDecimal($n, $e);
479
                break;
480 100
            case AMQPAbstractCollection::T_TIMESTAMP:
481
                $val = $this->read_timestamp();
482
                break;
483 100
            case AMQPAbstractCollection::T_BOOL:
484 80
                $val = $this->read_octet();
485 80
                break;
486 100
            case AMQPAbstractCollection::T_STRING_SHORT:
487
                $val = $this->read_shortstr();
488
                break;
489 100
            case AMQPAbstractCollection::T_STRING_LONG:
490 90
                $val = $this->read_longstr();
491 90
                break;
492 90
            case AMQPAbstractCollection::T_ARRAY:
493 30
                $val = $this->read_array($collectionsAsObjects);
494 30
                break;
495 80
            case AMQPAbstractCollection::T_TABLE:
496 75
                $val = $this->read_table($collectionsAsObjects);
497 75
                break;
498 5
            case AMQPAbstractCollection::T_VOID:
499 5
                $val = null;
500 5
                break;
501
            default:
502
                throw new AMQPInvalidArgumentException(sprintf(
503
                    'Unsupported type "%s"',
504
                    $fieldType
505
                ));
506
        }
507
508 115
        return $val;
509
    }
510
511
    /**
512
     * @return int
513
     */
514 110
    protected function tell()
515
    {
516 110
        return $this->offset;
517
    }
518
519
    /**
520
     * Sets the timeout (second)
521
     *
522
     * @param $timeout
523
     */
524 60
    public function setTimeout($timeout)
525
    {
526 60
        $this->timeout = $timeout;
527 60
    }
528
529
    /**
530
     * @return int
531
     */
532 60
    public function getTimeout()
533
    {
534 60
        return $this->timeout;
535
    }
536
}
537