MessageProcessor::onData()   F
last analyzed

Complexity

Conditions 16
Paths 426

Size

Total Lines 66

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 36
CRAP Score 16.005

Importance

Changes 0
Metric Value
dl 0
loc 66
ccs 36
cts 37
cp 0.973
rs 2.1972
c 0
b 0
f 0
cc 16
nc 426
nop 3
crap 16.005

How to fix   Long Method    Complexity   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

1
<?php
2
/**
3
 * This file is a part of Woketo package.
4
 *
5
 * (c) Nekland <[email protected]>
6
 *
7
 * For the full license, take a look to the LICENSE file
8
 * on the root directory of this project
9
 */
10
11
namespace Nekland\Woketo\Rfc6455;
12
13
use Nekland\Woketo\Exception\Frame\IncoherentDataException;
14
use Nekland\Woketo\Exception\Frame\IncompleteFrameException;
15
use Nekland\Woketo\Exception\Frame\ProtocolErrorException;
16
use Nekland\Woketo\Exception\LimitationException;
17
use Nekland\Woketo\Rfc6455\FrameHandler\Rfc6455FrameHandlerInterface;
18
use React\Socket\ConnectionInterface;
19
20
/**
21
 * Class MessageProcessor
22
 *
23
 * This class is only a helper for Connection to avoid having so much instances of classes in memory.
24
 * Using it like that allow us to have only one instance of MessageProcessor.
25
 */
26
class MessageProcessor
27
{
28
    /**
29
     * @var FrameFactory
30
     */
31
    private $frameFactory;
32
33
    /**
34
     * @var MessageFactory
35
     */
36
    private $messageFactory;
37
38
    /**
39
     * @var Rfc6455FrameHandlerInterface[]
40
     */
41
    private $handlers;
42
43
    /**
44
     * @var boolean
45
     */
46
    private $writeMasked;
47
48 23
    public function __construct($writeMasked = false, FrameFactory $factory = null, MessageFactory $messageFactory = null)
49
    {
50 23
        $this->writeMasked = $writeMasked;
51 23
        $this->frameFactory = $factory ?: new FrameFactory();
52 23
        $this->messageFactory = $messageFactory ?: new MessageFactory();
53 23
        $this->handlers = [];
54 23
    }
55
56
    /**
57
     * This methods process data received from the socket to generate a `Message` entity and/or process handler
58
     * which may answer to some special ws-frames.
59
     *
60
     * Legend:
61
     *     - {} stands for bin-frames
62
     *     - || stands for ws-frames
63
     *     - [] stands for Messages (of potentially many ws-frames)
64
     *     - () are here for comment purpose
65
     *
66
     * This method buffer in many ways:
67
     *
68
     * - { [|ws-frame1 (not final) |, |ws-frame2 (final)|] }
69
     *   => buffer 2 ws-frames from 1 bin-frame to generate 1 Message
70
     *
71
     * - { [|ws-frame1 (not final) } { ws-frame 1 (final)| } { |ws-frame 2 (final)|] }
72
     *   => buffer 2 ws-frames from 3 bin-frame to generate 1 Message
73
     *
74
     * - { [|ws-frame1 (not final)| |ws-frame 2 (final, control frame, is not part of the current message)| |ws-frame3 (final, with ws-frame1)|] }
75
     *   => buffer 2 ws-frames from 1 bin-frame to generate 1 Message with a control frame in the middle of the bin-frame.
76
     *
77
     * @param string              $data
78
     * @param ConnectionInterface $socket
79
     * @param Message|null        $message
80
     * @return \Generator
81
     */
82 13
    public function onData(string $data, ConnectionInterface $socket, Message $message = null)
83
    {
84
        do {
85 13
            if (null === $message) {
86 13
                $message = $this->messageFactory->create();
87
            }
88
89
            try {
90 13
                $message->addBuffer($data);
91
92
                // Loop that build message if the message is in many frames in the same data binary frame received.
93
                do {
94
                    try {
95 13
                        $frame = $this->frameFactory->createNewFrame($message->getBuffer());
96
97
                        // This condition intercept control frames in the middle of normal frames
98 11
                        if ($frame->isControlFrame() && $message->hasFrames()) {
99 1
                            $controlFrameMessage = $this->processControlFrame($frame, $socket);
100
101 1
                            yield $controlFrameMessage; // Because every message should be returned !
102
                        } else {
103 11
                            if ($frame->getOpcode() === Frame::OP_CONTINUE && !$message->hasFrames()) {
104 1
                                throw new ProtocolErrorException('The first frame cannot be a continuation frame');
105
                            }
106
107 10
                            if ($frame->getOpcode() !== Frame::OP_CONTINUE && $message->hasFrames()) {
108 1
                                throw new ProtocolErrorException(
109 1
                                    'When the Message is fragmented in many frames the only frame that can be a something else than an continue frame is the first'
110
                                );
111
                            }
112 10
                            $message->addFrame($frame);
113
                        }
114
115
                        // If the frame is a success maybe we still need to create messages
116
                        // And the buffer must be updated
117 9
                        $data = $message->removeFromBuffer($frame);
118 5
                    } catch (IncompleteFrameException $e) {
119
120
                        // Data is now stored in the message, let's clean the variable to stop both loops.
121
                        $data = null;
122
                    }
123 9
                } while(!$message->isComplete() && !empty($data));
124
125 8
                if ($message->isComplete()) {
126 8
                    $this->processHelper($message, $socket);
127
128 8
                    yield $message;
129 8
                    $message = null;
130
                } else {
131 8
                    yield $message;
132
                }
133 5
            } catch (IncoherentDataException $e) {
134 1
                $this->write($this->frameFactory->createCloseFrame(Frame::CLOSE_INCOHERENT_DATA), $socket);
135 1
                $socket->end();
136 1
                $data = '';
137 4
            } catch (ProtocolErrorException $e) {
138 3
                $this->write($this->frameFactory->createCloseFrame(Frame::CLOSE_PROTOCOL_ERROR), $socket);
139 3
                $socket->end();
140 3
                $data = '';
141 1
            } catch (LimitationException $e) {
142 1
                $this->write($this->frameFactory->createCloseFrame(Frame::CLOSE_TOO_BIG_TO_PROCESS), $socket);
143 1
                $socket->end();
144 1
                $data = '';
145
            }
146 13
        } while(!empty($data));
147 13
    }
148
149
    /**
150
     * @param Message             $message
151
     * @param ConnectionInterface $socket
152
     */
153 8
    protected function processHelper(Message $message, ConnectionInterface $socket)
154
    {
155 8
        foreach ($this->handlers as $handler) {
156 2
            if ($handler->supports($message)) {
157 2
                $handler->process($message, $this, $socket);
158
            }
159
        }
160 8
    }
161
162
    /**
163
     * @param Frame               $frame
164
     * @param ConnectionInterface $socket
165
     *
166
     * @return Message
167
     */
168 1
    protected function processControlFrame(Frame $frame, ConnectionInterface $socket) : Message
169
    {
170 1
        $controlFrameMessage = new Message();
171 1
        $controlFrameMessage->addFrame($frame);
172 1
        $this->processHelper($controlFrameMessage, $socket);
173
174 1
        return $controlFrameMessage;
175
    }
176
177
    /**
178
     * @param Rfc6455FrameHandlerInterface $handler
179
     * @return self
180
     */
181 10
    public function addHandler(Rfc6455FrameHandlerInterface $handler)
182
    {
183 10
        $this->handlers[] = $handler;
184
185 10
        return $this;
186
    }
187
188
    /**
189
     * @param Frame|string        $frame
190
     * @param ConnectionInterface $socket
191
     * @param int                 $opCode An int representing binary or text data (const of Frame class)
192
     */
193 9
    public function write($frame, ConnectionInterface $socket, int $opCode = Frame::OP_TEXT)
194
    {
195 9
        if (!$frame instanceof Frame) {
196 1
            $data = $frame;
197 1
            $frame = new Frame();
198 1
            $frame->setPayload($data);
199 1
            $frame->setOpcode($opCode);
200
        }
201
202 9
        if ($this->writeMasked) {
203
            $frame->setMaskingKey(FrameFactory::generateMask());
204
        }
205
206 9
        $socket->write($frame->getRawData());
207 9
    }
208
209
    /**
210
     * @return FrameFactory
211
     */
212 2
    public function getFrameFactory(): FrameFactory
213
    {
214 2
        return $this->frameFactory;
215
    }
216
217
    /**
218
     * @param ConnectionInterface $socket
219
     */
220
    public function timeout(ConnectionInterface $socket)
221
    {
222
        $this->write($this->frameFactory->createCloseFrame(Frame::CLOSE_PROTOCOL_ERROR), $socket);
223
        $socket->close();
224
    }
225
226
    /**
227
     * @param ConnectionInterface $socket
228
     * @param int $status
229
     * @param string|null $reason
230
     */
231 1
    public function close(ConnectionInterface $socket, int $status = Frame::CLOSE_NORMAL, string $reason = null)
232
    {
233 1
        $this->write($this->frameFactory->createCloseFrame($status, $reason), $socket);
234 1
        $socket->end();
235 1
    }
236
}
237