Completed
Push — master ( 05544f...34252d )
by Maxime
02:24
created

MessageProcessor::close()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 5
Code Lines 3

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 4
CRAP Score 1

Importance

Changes 0
Metric Value
dl 0
loc 5
ccs 4
cts 4
cp 1
rs 9.4285
c 0
b 0
f 0
cc 1
eloc 3
nc 1
nop 1
crap 1
1
<?php
2
/**
3
 * This file is a part of Woketo package.
4
 *
5
 * (c) Nekland <[email protected]>
6
 *
7
 * For the full license, take a look to the LICENSE file
8
 * on the root directory of this project
9
 */
10
11
namespace Nekland\Woketo\Rfc6455;
12
13
use Nekland\Woketo\Exception\Frame\IncoherentDataException;
14
use Nekland\Woketo\Exception\Frame\IncompleteFrameException;
15
use Nekland\Woketo\Exception\Frame\ProtocolErrorException;
16
use Nekland\Woketo\Exception\LimitationException;
17
use Nekland\Woketo\Rfc6455\MessageHandler\Rfc6455MessageHandlerInterface;
18
use React\Socket\ConnectionInterface;
19
20
/**
21
 * Class MessageProcessor
22
 *
23
 * This class is only a helper for Connection to avoid having so much instances of classes in memory.
24
 * Using it like that allow us to have only one instance of MessageProcessor.
25
 */
26
class MessageProcessor
27
{
28
    /**
29
     * @var FrameFactory
30
     */
31
    private $frameFactory;
32
33
    /**
34
     * @var MessageFactory
35
     */
36
    private $messageFactory;
37
38
    /**
39
     * @var Rfc6455MessageHandlerInterface[]
40
     */
41
    private $handlers;
42
43 21
    public function __construct(FrameFactory $factory = null, MessageFactory $messageFactory = null)
44
    {
45 21
        $this->frameFactory = $factory ?: new FrameFactory();
46 21
        $this->messageFactory = $messageFactory ?: new MessageFactory();
47 21
        $this->handlers = [];
48 21
    }
49
50
    /**
51
     * This methods process data received from the socket to generate a `Message` entity and/or process handler
52
     * which may answer to some special ws-frames.
53
     *
54
     * Legend:
55
     *     - {} stands for bin-frames
56
     *     - || stands for ws-frames
57
     *     - [] stands for Messages (of potentially many ws-frames)
58
     *     - () are here for comment purpose
59
     *
60
     * This method buffer in many ways:
61
     *
62
     * - { [|ws-frame1 (not final) |, |ws-frame2 (final)|] }
63
     *   => buffer 2 ws-frames from 1 bin-frame to generate 1 Message
64
     *
65
     * - { [|ws-frame1 (not final) } { ws-frame 1 (final)| } { |ws-frame 2 (final)|] }
66
     *   => buffer 2 ws-frames from 3 bin-frame to generate 1 Message
67
     *
68
     * - { [|ws-frame1 (not final)| |ws-frame 2 (final, control frame, is not part of the current message)| |ws-frame3 (final, with ws-frame1)|] }
69
     *   => buffer 2 ws-frames from 1 bin-frame to generate 1 Message with a control frame in the middle of the bin-frame.
70
     *
71
     * @param string              $data
72
     * @param ConnectionInterface $socket
73
     * @param Message|null        $message
74
     * @return \Generator
75
     */
76 13
    public function onData(string $data, ConnectionInterface $socket, Message $message = null)
77
    {
78
        do {
79 13
            if (null === $message) {
80 13
                $message = $this->messageFactory->create();
81
            }
82
83
            try {
84 13
                $message->addBuffer($data);
85
86
                // Loop that build message if the message is in many frames in the same data binary frame received.
87
                do {
88
                    try {
89 13
                        $frame = $this->frameFactory->createNewFrame($message->getBuffer());
90
91
                        // This condition intercept control frames in the middle of normal frames
92 11
                        if ($frame->isControlFrame() && $message->hasFrames()) {
93 1
                            $controlFrameMessage = $this->processControlFrame($frame, $socket);
94
95 1
                            yield $controlFrameMessage; // Because every message should be returned !
96
                        } else {
97 11
                            if ($frame->getOpcode() === Frame::OP_CONTINUE && !$message->hasFrames()) {
98 1
                                throw new ProtocolErrorException('The first frame cannot be a continuation frame');
99
                            }
100
101 10
                            if ($frame->getOpcode() !== Frame::OP_CONTINUE && $message->hasFrames()) {
102 1
                                throw new ProtocolErrorException(
103 1
                                    'When the Message is fragmented in many frames the only frame that can be a something else than an continue frame is the first'
104
                                );
105
                            }
106 10
                            $message->addFrame($frame);
107
                        }
108
109
                        // If the frame is a success maybe we still need to create messages
110
                        // And the buffer must be updated
111 9
                        $data = $message->removeFromBuffer($frame);
112 5
                    } catch (IncompleteFrameException $e) {
113
114
                        // Data is now stored in the message, let's clean the variable to stop both loops.
115
                        $data = null;
116
                    }
117 9
                } while(!$message->isComplete() && !empty($data));
118
119 8
                if ($message->isComplete()) {
120 8
                    $this->processHelper($message, $socket);
121
122 8
                    yield $message;
123 8
                    $message = null;
124
                } else {
125 8
                    yield $message;
126
                }
127 5
            } catch (IncoherentDataException $e) {
128 1
                $this->write($this->frameFactory->createCloseFrame(Frame::CLOSE_INCOHERENT_DATA), $socket);
129 1
                $socket->end();
130 1
                $data = '';
131 4
            } catch (ProtocolErrorException $e) {
132 3
                $this->write($this->frameFactory->createCloseFrame(Frame::CLOSE_PROTOCOL_ERROR), $socket);
133 3
                $socket->end();
134 3
                $data = '';
135 1
            } catch (LimitationException $e) {
136 1
                $this->write($this->frameFactory->createCloseFrame(Frame::CLOSE_TOO_BIG_TO_PROCESS), $socket);
137 1
                $socket->end();
138 1
                $data = '';
139
            }
140 13
        } while(!empty($data));
141 13
    }
142
143
    /**
144
     * @param Message $message
145
     * @param ConnectionInterface $socket
146
     */
147 8
    protected function processHelper(Message $message, ConnectionInterface $socket)
148
    {
149 8
        foreach ($this->handlers as $handler) {
150 2
            if ($handler->supports($message)) {
151 2
                $handler->process($message, $this, $socket);
152
            }
153
        }
154 8
    }
155
156
    /**
157
     * @param Frame $frame
158
     * @param ConnectionInterface $socket
159
     *
160
     * @return Message
161
     */
162 1
    protected function processControlFrame(Frame $frame, ConnectionInterface $socket) : Message
163
    {
164 1
        $controlFrameMessage = new Message();
165 1
        $controlFrameMessage->addFrame($frame);
166 1
        $this->processHelper($controlFrameMessage, $socket);
167
168 1
        return $controlFrameMessage;
169
    }
170
171
    /**
172
     * @param Rfc6455MessageHandlerInterface $handler
173
     * @return self
174
     */
175 8
    public function addHandler(Rfc6455MessageHandlerInterface $handler)
176
    {
177 8
        $this->handlers[] = $handler;
178
179 8
        return $this;
180
    }
181
182
    /**
183
     * @param Frame|string        $frame
184
     * @param ConnectionInterface $socket
185
     * @param int                 $opCode An int representing binary or text data (const of Frame class)
186
     */
187 9
    public function write($frame, ConnectionInterface $socket, int $opCode = Frame::OP_TEXT)
188
    {
189 9
        if (!$frame instanceof Frame) {
190 1
            $data = $frame;
191 1
            $frame = new Frame();
192 1
            $frame->setPayload($data);
193 1
            $frame->setOpcode($opCode);
194
        }
195
196 9
        $socket->write($frame->getRawData());
197 9
    }
198
199
    /**
200
     * @return FrameFactory
201
     */
202 2
    public function getFrameFactory(): FrameFactory
203
    {
204 2
        return $this->frameFactory;
205
    }
206
207
    /**
208
     * @param ConnectionInterface $socket
209
     */
210
    public function timeout(ConnectionInterface $socket)
211
    {
212
        $this->write($this->frameFactory->createCloseFrame(Frame::CLOSE_PROTOCOL_ERROR), $socket);
213
        $socket->close();
214
    }
215
216
    /**
217
     * @param ConnectionInterface $socket
218
     */
219 1
    public function close(ConnectionInterface $socket)
220
    {
221 1
        $this->write($this->frameFactory->createCloseFrame(), $socket);
222 1
        $socket->end();
223 1
    }
224
}
225