Receiver   A
last analyzed

Complexity

Total Complexity 13

Size/Duplication

Total Lines 85
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
eloc 42
dl 0
loc 85
rs 10
c 0
b 0
f 0
wmc 13

2 Methods

Rating   Name   Duplication   Size   Complexity  
A __construct() 0 3 1
C inbound() 0 48 12
1
<?php
2
/**
3
 * Frame processor
4
 * User: moyo
5
 * Date: 26/02/2018
6
 * Time: 5:31 PM
7
 */
8
9
namespace Carno\NSQ\Protocol;
10
11
use Carno\NSQ\Connector\Nsqd;
12
use Carno\NSQ\Exception\ServerException;
13
use Carno\NSQ\Types\Consuming;
14
use Carno\Promise\Promised;
15
use Closure;
16
17
class Receiver
18
{
19
    // in receiving
20
    private const STA_RECV = 1;
21
22
    // sta cleaning
23
    private const STA_CLEAR = 0;
24
25
    /**
26
     * @var int
27
     */
28
    private $state = self::STA_CLEAR;
29
30
    /**
31
     * @var Buffer
32
     */
33
    private $buffer = null;
34
35
    /**
36
     * @var int
37
     */
38
    private $sized = 0;
39
40
    /**
41
     * Framing constructor.
42
     */
43
    public function __construct()
44
    {
45
        $this->buffer = new Buffer;
46
    }
47
48
    /**
49
     * @param string $recv
50
     * @param Nsqd $nsqd
51
     * @param Closure $waiter
52
     * @param Consuming $consuming
53
     */
54
    public function inbound(string $recv, Nsqd $nsqd, Closure $waiter, Consuming $consuming = null) : void
55
    {
56
        $this->buffer->write($recv);
57
58
        PARSE_LOOP:
59
60
        switch ($this->state) {
61
            case self::STA_RECV:
62
                break;
63
            case self::STA_CLEAR:
64
                $this->sized = Binary::int($this->buffer);
65
                $this->state = self::STA_RECV;
66
                break;
67
        }
68
69
        /**
70
         * @var Promised $waiting
71
         */
72
73
        if ($this->buffer->size() >= $this->sized) {
74
            $waiting = $waiter();
75
            $this->state = self::STA_CLEAR;
76
            $frame = new Frame($this->sized, $this->buffer);
77
            switch (1) {
78
                case $frame->isResponse():
79
                    switch (1) {
80
                        case $frame->isOK():
81
                            $waiting->resolve(true);
82
                            break;
83
                        case $frame->isHeartbeat():
84
                            $nsqd->nop();
85
                            break;
86
                        case $frame->isCloseWait():
87
                            $waiting->resolve();
88
                            $nsqd->close();
89
                            break;
90
                    }
91
                    break;
92
                case $frame->isMessage():
93
                    $consuming->invoking($nsqd, $frame->getMessage());
0 ignored issues
show
Bug introduced by
The method invoking() does not exist on null. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-call  annotation

93
                    $consuming->/** @scrutinizer ignore-call */ 
94
                                invoking($nsqd, $frame->getMessage());

This check looks for calls to methods that do not seem to exist on a given type. It looks for the method on the type itself as well as in inherited classes or implemented interfaces.

This is most likely a typographical error or the method has been renamed.

Loading history...
94
                    break;
95
                case $frame->isError():
96
                    $waiting->pended() && $waiting->throw(new ServerException($frame->getError()));
97
                    break;
98
            }
99
100
            if ($this->buffer->valid()) {
101
                goto PARSE_LOOP;
102
            }
103
        }
104
    }
105
}
106