Completed
Pull Request — master (#286)
by
unknown
06:13
created

Frame   A

Complexity

Total Complexity 12

Size/Duplication

Total Lines 159
Duplicated Lines 0 %

Coupling/Cohesion

Components 1
Dependencies 6

Importance

Changes 0
Metric Value
dl 0
loc 159
rs 10
c 0
b 0
f 0
wmc 12
lcom 1
cbo 6

2 Methods

Rating   Name   Duplication   Size   Complexity  
A __construct() 0 6 1
C feed() 0 100 11
1
<?php
2
3
namespace PHPDaemon\Clients\AMQP\Driver\Protocol\v091\Parser;
4
5
use PHPDaemon\Clients\AMQP\Driver\Protocol\Exception\AMQPProtocolException;
6
use PHPDaemon\Clients\AMQP\Driver\Protocol\v091\Protocol\BodyFrame;
7
use PHPDaemon\Clients\AMQP\Driver\Protocol\v091\Protocol\Constants;
8
use PHPDaemon\Clients\AMQP\Driver\Protocol\v091\Protocol\FrameInterface as ProtocolFrameInterface;
9
use PHPDaemon\Clients\AMQP\Driver\Protocol\v091\Protocol\HeartbeatFrame;
10
11
/**
12
 * Produces Frame objects from binary data.
13
 *
14
 * Class GeneratedFrameParser
15
 * @author Aleksey I. Kuleshov YOU GLOBAL LIMITED
16
 * @package PHPDaemon\Clients\AMQP\Driver\Protocol\v091\Parser
17
 */
18
class Frame implements FrameInterface
19
{
20
    use ScalarParserTrait, HeaderFrameParserTrait, MethodFrameParserTrait;
21
22
    // the size of each portion of the header ...
23
    const HEADER_TYPE_SIZE = 1; // header field "frame type" - unsigned octet
24
    const HEADER_CHANNEL_SIZE = 2; // header field "channel id" - unsigned short
25
    const HEADER_PAYLOAD_LENGTH_SIZE = 4; // header field "payload length" - unsigned long
26
27
    // the total header size ...
28
    const HEADER_SIZE = self::HEADER_TYPE_SIZE
29
    + self::HEADER_CHANNEL_SIZE
30
    + self::HEADER_PAYLOAD_LENGTH_SIZE;
31
32
    // minimum size of a valid frame (header + end with no payload) ...
33
    const MINIMUM_FRAME_SIZE = self::HEADER_SIZE + 1; // end marker is always 1 byte
34
35
    /**
36
     * The parser used to parse AMQP tables.
37
     * @var Table
38
     */
39
    private $tableParser;
40
41
    /**
42
     * @var integer The number of bytes required in the buffer to produce the
43
     *              next frame.
44
     *
45
     * This value starts as MINIMUM_FRAME_SIZE and is increased to include the
46
     * frame's payload size when the frame header becomes available.
47
     */
48
    private $requiredBytes;
49
50
    /**
51
     * @var string A buffer containing incoming binary data that can not yet be
52
     *             used to produce a frame.
53
     */
54
    private $buffer;
55
56
    /**
57
     * @param Table $tableParser The parser used to parse AMQP tables.
58
     */
59
    public function __construct(Table $tableParser)
60
    {
61
        $this->tableParser = $tableParser;
62
        $this->requiredBytes = self::MINIMUM_FRAME_SIZE;
63
        $this->buffer = '';
64
    }
65
66
    /**
67
     * Retrieve the next frame from the internal buffer.
68
     *
69
     * @param string $buffer Binary data to feed to the parser.
70
     * @param int &$requiredBytes The minimum number of bytes that must be
71
     *                               read to produce the next frame.
72
     *
73
     * @return ProtocolFrameInterface|null        The frame parsed from the start of the buffer.
74
     * @throws AMQPProtocolException              The incoming data does not conform to the AMQP specification.
75
     */
76
    public function feed($buffer, &$requiredBytes = 0)
77
    {
78
        $this->buffer .= $buffer;
79
        $availableBytes = \strlen($this->buffer);
80
81
        // not enough bytes for a frame ...
82
        if ($availableBytes < $this->requiredBytes) {
83
            $requiredBytes = $this->requiredBytes;
84
85
            return null;
86
87
            // we're still looking for the header ...
88
        }
89
        if ($this->requiredBytes === self::MINIMUM_FRAME_SIZE) {
90
            // now that we know the payload size we can add that to the number
91
            // of required bytes ...
92
            $this->requiredBytes += \unpack(
93
                'N',
94
                \substr(
95
                    $this->buffer,
96
                    self::HEADER_TYPE_SIZE + self::HEADER_CHANNEL_SIZE,
97
                    self::HEADER_PAYLOAD_LENGTH_SIZE
98
                )
99
            )[1];
100
101
            // taking the payload into account we still don't have enough bytes
102
            // for the frame ...
103
            if ($availableBytes < $this->requiredBytes) {
104
                $requiredBytes = $this->requiredBytes;
105
106
                return null;
107
            }
108
        }
109
110
        // we've got enough bytes, check that the last byte is the end marker ...
111
        if (\ord($this->buffer[$this->requiredBytes - 1]) !== Constants::FRAME_END) {
112
            throw new AMQPProtocolException(
113
                sprintf(
114
                    'Frame end marker (0x%02x) is invalid.',
115
                    \ord($this->buffer[$this->requiredBytes - 1])
116
                )
117
            );
118
        }
119
120
        // read the (t)ype and (c)hannel then discard the header ...
121
        $fields = \unpack('Ct/nc', $this->buffer);
122
        $this->buffer = \substr($this->buffer, self::HEADER_SIZE);
123
124
        $type = $fields['t'];
125
126
        // read the frame ...
127
        if ($type === Constants::FRAME_METHOD) {
128
            $frame = $this->parseMethodFrame();
129
        } elseif ($type === Constants::FRAME_HEADER) {
130
            $frame = $this->parseHeaderFrame();
131
        } elseif ($type === Constants::FRAME_BODY) {
132
            $length = $this->requiredBytes - self::MINIMUM_FRAME_SIZE;
133
            $frame = new BodyFrame();
134
            $frame->content = \substr($this->buffer, 0, $length);
135
            $this->buffer = \substr($this->buffer, $length);
136
        } elseif ($type === Constants::FRAME_HEARTBEAT) {
137
            if (self::MINIMUM_FRAME_SIZE !== $this->requiredBytes) {
138
                throw new AMQPProtocolException(
139
                    sprintf(
140
                        'Heartbeat frame payload size (%d) is invalid, must be zero.',
141
                        $this->requiredBytes - self::MINIMUM_FRAME_SIZE
142
                    )
143
                );
144
            }
145
            $frame = new HeartbeatFrame();
146
        } else {
147
            throw new AMQPProtocolException(
148
                sprintf(
149
                    'Frame type (0x%02x) is invalid.',
150
                    $type
151
                )
152
            );
153
        }
154
155
        // discard the end marker ...
156
        $this->buffer = \substr($this->buffer, 1);
157
158
        $consumedBytes = $availableBytes - \strlen($this->buffer);
159
160
        // the frame lied about its payload size ...
161
        if ($consumedBytes !== $this->requiredBytes) {
162
            throw new AMQPProtocolException(
163
                sprintf(
164
                    'Mismatch between frame size (%s) and consumed bytes (%s).',
165
                    $this->requiredBytes,
166
                    $consumedBytes
167
                )
168
            );
169
        }
170
171
        $this->requiredBytes = $requiredBytes = self::MINIMUM_FRAME_SIZE;
172
        $frame->frameChannelId = $fields['c'];
173
174
        return $frame;
175
    }
176
}
177