Completed
Push — master ( 0b8755...dfeaf2 )
by Charlotte
9s
created

MessageProcessor::processControlFrame()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 8
Code Lines 5

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 5
CRAP Score 1

Importance

Changes 0
Metric Value
c 0
b 0
f 0
dl 0
loc 8
ccs 5
cts 5
cp 1
rs 9.4285
cc 1
eloc 5
nc 1
nop 2
crap 1
1
<?php
2
/**
3
 * This file is a part of Woketo package.
4
 *
5
 * (c) Nekland <[email protected]>
6
 *
7
 * For the full license, take a look to the LICENSE file
8
 * on the root directory of this project
9
 */
10
11
namespace Nekland\Woketo\Rfc6455;
12
13
use Nekland\Woketo\Exception\Frame\IncoherentDataException;
14
use Nekland\Woketo\Exception\Frame\IncompleteFrameException;
15
use Nekland\Woketo\Exception\Frame\ProtocolErrorException;
16
use Nekland\Woketo\Exception\LimitationException;
17
use Nekland\Woketo\Rfc6455\MessageHandler\Rfc6455MessageHandlerInterface;
18
use Nekland\Woketo\Utils\BitManipulation;
19
use React\Socket\ConnectionInterface;
20
21
/**
22
 * Class MessageProcessor
23
 *
24
 * This class is only a helper for Connection to avoid having so much instances of classes in memory.
25
 * Using it like that allow us to have only one instance of MessageProcessor.
26
 */
27
class MessageProcessor
28
{
29
    /**
30
     * @var FrameFactory
31
     */
32
    private $frameFactory;
33
34
    /**
35
     * @var Rfc6455MessageHandlerInterface[]
36
     */
37
    private $handlers;
38
39 13
    public function __construct(FrameFactory $factory = null)
40
    {
41 13
        $this->frameFactory = $factory ?: new FrameFactory();
42 13
        $this->handlers = [];
43 13
    }
44
45
    /**
46
     * This methods process data received from the socket to generate a `Message` entity and/or process handler
47
     * which may answer to some special ws-frames.
48
     *
49
     * Legend:
50
     *     - {} stands for bin-frames
51
     *     - || stands for ws-frames
52
     *     - [] stands for Messages (of potentially many ws-frames)
53
     *     - () are here for comment purpose
54
     *
55
     * This method buffer in many ways:
56
     *
57
     * - { [|ws-frame1 (not final) |, |ws-frame2 (final)|] }
58
     *   => buffer 2 ws-frames from 1 bin-frame to generate 1 Message
59
     *
60
     * - { [|ws-frame1 (not final) } { ws-frame 1 (final)| } { |ws-frame 2 (final)|] }
61
     *   => buffer 2 ws-frames from 3 bin-frame to generate 1 Message
62
     *
63
     * - { [|ws-frame1 (not final)| |ws-frame 2 (final, control frame, is not part of the current message)| |ws-frame3 (final, with ws-frame1)|] }
64
     *   => buffer 2 ws-frames from 1 bin-frame to generate 1 Message with a control frame in the middle of the bin-frame.
65
     *
66
     * @param string              $data
67
     * @param ConnectionInterface $socket
68
     * @param Message|null        $message
69
     * @return \Generator
70
     */
71 11
    public function onData(string $data, ConnectionInterface $socket, Message $message = null)
72
    {
73
        do {
74 11
            if (null === $message) {
75 11
                $message = new Message();
76
            }
77
78
            try {
79 11
                $message->addBuffer($data);
80
81
                // Loop that build message if the message is in many frames in the same data binary frame received.
82
                do {
83
                    try {
84 11
                        $frame = new Frame($message->getBuffer());
85
86
                        // This condition intercept control frames in the middle of normal frames
87 8
                        if ($frame->isControlFrame() && $message->hasFrames()) {
88 1
                            $controlFrameMessage = $this->processControlFrame($frame, $socket);
89
90 1
                            yield $controlFrameMessage; // Because every message should be returned !
91
                        } else {
92 8
                            $message->addFrame($frame);
93
                        }
94
95
                        // If the frame is a success maybe we still need to create messages
96
                        // And the buffer must be updated
97 8
                        $data = $message->removeFromBuffer($frame);
98 3
                    } catch (IncompleteFrameException $e) {
99
                        // Data is now stored in the message, let's clean the variable to stop both loops.
100
                        $data = null;
101
                    }
102 8
                } while(!$message->isComplete() && !empty($data));
103
104 8
                if ($message->isComplete()) {
105 8
                    $this->processHelper($message, $socket);
106
107 8
                    yield $message;
108 8
                    $message = null;
109
                } else {
110 8
                    yield $message;
111
                }
112 3
            } catch (IncoherentDataException $e) {
113 1
                $this->write($this->frameFactory->createCloseFrame(Frame::CLOSE_INCOHERENT_DATA), $socket);
114 1
                $socket->end();
115 1
                $data = '';
116 2
            } catch (ProtocolErrorException $e) {
117 1
                $this->write($this->frameFactory->createCloseFrame(Frame::CLOSE_PROTOCOL_ERROR), $socket);
118 1
                $socket->end();
119 1
                $data = '';
120 1
            } catch (LimitationException $e) {
121 1
                $this->write($this->frameFactory->createCloseFrame(Frame::CLOSE_TOO_BIG_TO_PROCESS), $socket);
122 1
                $socket->end();
123 1
                $data = '';
124
            }
125 11
        } while(!empty($data));
126 11
    }
127
128
    /**
129
     * @param Message $message
130
     * @param ConnectionInterface $socket
131
     */
132 8
    protected function processHelper(Message $message, ConnectionInterface $socket)
133
    {
134 8
        foreach ($this->handlers as $handler) {
135 2
            if ($handler->supports($message)) {
136 2
                $handler->process($message, $this, $socket);
137
            }
138
        }
139 8
    }
140
141
    /**
142
     * @param Frame $frame
143
     * @param ConnectionInterface $socket
144
     *
145
     * @return Message
146
     */
147 1
    protected function processControlFrame(Frame $frame, ConnectionInterface $socket) : Message
148
    {
149 1
        $controlFrameMessage = new Message();
150 1
        $controlFrameMessage->addFrame($frame);
151 1
        $this->processHelper($controlFrameMessage, $socket);
152
153 1
        return $controlFrameMessage;
154
    }
155
156
    /**
157
     * @param Rfc6455MessageHandlerInterface $handler
158
     * @return self
159
     */
160 2
    public function addHandler(Rfc6455MessageHandlerInterface $handler)
161
    {
162 2
        $this->handlers[] = $handler;
163
164 2
        return $this;
165
    }
166
167
    /**
168
     * @param Frame|string        $frame
169
     * @param ConnectionInterface $socket
170
     * @param int                 $opCode An int representing binary or text data (const of Frame class)
171
     */
172 6
    public function write($frame, ConnectionInterface $socket, int $opCode = Frame::OP_TEXT)
173
    {
174 6
        if (!$frame instanceof Frame) {
175 1
            $data = $frame;
176 1
            $frame = new Frame();
177 1
            $frame->setPayload($data);
178 1
            $frame->setOpcode($opCode);
179
        }
180
181 6
        $socket->write($frame->getRawData());
182 6
    }
183
184
    /**
185
     * @return FrameFactory
186
     */
187 2
    public function getFrameFactory(): FrameFactory
188
    {
189 2
        return $this->frameFactory;
190
    }
191
192
    /**
193
     * @param ConnectionInterface $socket
194
     */
195
    public function timeout(ConnectionInterface $socket)
196
    {
197
        $this->write($this->frameFactory->createCloseFrame(Frame::CLOSE_PROTOCOL_ERROR), $socket);
198
        $socket->close();
199
    }
200
201
    /**
202
     * @param ConnectionInterface $socket
203
     */
204
    public function close(ConnectionInterface $socket)
205
    {
206
        $this->write($this->frameFactory->createCloseFrame(), $socket);
207
    }
208
}
209