Completed
Pull Request — master (#82)
by Charlotte
02:06
created

MessageProcessor::onData()   D

Complexity

Conditions 16
Paths 426

Size

Total Lines 65
Code Lines 41

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 36
CRAP Score 16.005

Importance

Changes 0
Metric Value
dl 0
loc 65
ccs 36
cts 37
cp 0.973
rs 4.2936
c 0
b 0
f 0
cc 16
eloc 41
nc 426
nop 3
crap 16.005

How to fix   Long Method    Complexity   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

1
<?php
2
/**
3
 * This file is a part of Woketo package.
4
 *
5
 * (c) Nekland <[email protected]>
6
 *
7
 * For the full license, take a look to the LICENSE file
8
 * on the root directory of this project
9
 */
10
11
namespace Nekland\Woketo\Rfc6455;
12
13
use Nekland\Woketo\Exception\Frame\IncoherentDataException;
14
use Nekland\Woketo\Exception\Frame\IncompleteFrameException;
15
use Nekland\Woketo\Exception\Frame\ProtocolErrorException;
16
use Nekland\Woketo\Exception\LimitationException;
17
use Nekland\Woketo\Rfc6455\MessageHandler\Rfc6455MessageHandlerInterface;
18
use React\Socket\ConnectionInterface;
19
20
/**
21
 * Class MessageProcessor
22
 *
23
 * This class is only a helper for Connection to avoid having so much instances of classes in memory.
24
 * Using it like that allow us to have only one instance of MessageProcessor.
25
 */
26
class MessageProcessor
27
{
28
    /**
29
     * @var FrameFactory
30
     */
31
    private $frameFactory;
32
33
    /**
34
     * @var Rfc6455MessageHandlerInterface[]
35
     */
36
    private $handlers;
37
38 15
    public function __construct(FrameFactory $factory = null)
39
    {
40 15
        $this->frameFactory = $factory ?: new FrameFactory();
41 15
        $this->handlers = [];
42 15
    }
43
44
    /**
45
     * This methods process data received from the socket to generate a `Message` entity and/or process handler
46
     * which may answer to some special ws-frames.
47
     *
48
     * Legend:
49
     *     - {} stands for bin-frames
50
     *     - || stands for ws-frames
51
     *     - [] stands for Messages (of potentially many ws-frames)
52
     *     - () are here for comment purpose
53
     *
54
     * This method buffer in many ways:
55
     *
56
     * - { [|ws-frame1 (not final) |, |ws-frame2 (final)|] }
57
     *   => buffer 2 ws-frames from 1 bin-frame to generate 1 Message
58
     *
59
     * - { [|ws-frame1 (not final) } { ws-frame 1 (final)| } { |ws-frame 2 (final)|] }
60
     *   => buffer 2 ws-frames from 3 bin-frame to generate 1 Message
61
     *
62
     * - { [|ws-frame1 (not final)| |ws-frame 2 (final, control frame, is not part of the current message)| |ws-frame3 (final, with ws-frame1)|] }
63
     *   => buffer 2 ws-frames from 1 bin-frame to generate 1 Message with a control frame in the middle of the bin-frame.
64
     *
65
     * @param string              $data
66
     * @param ConnectionInterface $socket
67
     * @param Message|null        $message
68
     * @return \Generator
69
     */
70 13
    public function onData(string $data, ConnectionInterface $socket, Message $message = null)
71
    {
72
        do {
73 13
            if (null === $message) {
74 13
                $message = new Message();
75
            }
76
77
            try {
78 13
                $message->addBuffer($data);
79
80
                // Loop that build message if the message is in many frames in the same data binary frame received.
81
                do {
82
                    try {
83 13
                        $frame = new Frame($message->getBuffer());
84
85 10
                        if ($frame->getOpcode() === Frame::OP_CONTINUE && $message->countFrames() === 0) {
86 1
                            throw new ProtocolErrorException('The first frame cannot be a continuation frame');
87
                        }
88
89 9
                        if ($frame->getOpcode() === Frame::OP_TEXT && $message->countFrames() >= 1) {
90 1
                            throw new ProtocolErrorException('When there are fragmented frames, the next frame cannot
91 1
                             be a text frame');
92
                        }
93
94
                        // This condition intercept control frames in the middle of normal frames
95 9
                        if ($frame->isControlFrame() && $message->hasFrames()) {
96 1
                            $controlFrameMessage = $this->processControlFrame($frame, $socket);
97
98 1
                            yield $controlFrameMessage; // Because every message should be returned !
99
                        } else {
100 9
                            $message->addFrame($frame);
101
                        }
102
103
                        // If the frame is a success maybe we still need to create messages
104
                        // And the buffer must be updated
105 9
                        $data = $message->removeFromBuffer($frame);
106 5
                    } catch (IncompleteFrameException $e) {
107
                        // Data is now stored in the message, let's clean the variable to stop both loops.
108
                        $data = null;
109
                    }
110 9
                } while(!$message->isComplete() && !empty($data));
111
112 8
                if ($message->isComplete()) {
113 8
                    $this->processHelper($message, $socket);
114
115 8
                    yield $message;
116 8
                    $message = null;
117
                } else {
118 8
                    yield $message;
119
                }
120 5
            } catch (IncoherentDataException $e) {
121 1
                $this->write($this->frameFactory->createCloseFrame(Frame::CLOSE_INCOHERENT_DATA), $socket);
122 1
                $socket->end();
123 1
                $data = '';
124 4
            } catch (ProtocolErrorException $e) {
125 3
                $this->write($this->frameFactory->createCloseFrame(Frame::CLOSE_PROTOCOL_ERROR), $socket);
126 3
                $socket->end();
127 3
                $data = '';
128 1
            } catch (LimitationException $e) {
129 1
                $this->write($this->frameFactory->createCloseFrame(Frame::CLOSE_TOO_BIG_TO_PROCESS), $socket);
130 1
                $socket->end();
131 1
                $data = '';
132
            }
133 13
        } while(!empty($data));
134 13
    }
135
136
    /**
137
     * @param Message $message
138
     * @param ConnectionInterface $socket
139
     */
140 8
    protected function processHelper(Message $message, ConnectionInterface $socket)
141
    {
142 8
        foreach ($this->handlers as $handler) {
143 2
            if ($handler->supports($message)) {
144 2
                $handler->process($message, $this, $socket);
145
            }
146
        }
147 8
    }
148
149
    /**
150
     * @param Frame $frame
151
     * @param ConnectionInterface $socket
152
     *
153
     * @return Message
154
     */
155 1
    protected function processControlFrame(Frame $frame, ConnectionInterface $socket) : Message
156
    {
157 1
        $controlFrameMessage = new Message();
158 1
        $controlFrameMessage->addFrame($frame);
159 1
        $this->processHelper($controlFrameMessage, $socket);
160
161 1
        return $controlFrameMessage;
162
    }
163
164
    /**
165
     * @param Rfc6455MessageHandlerInterface $handler
166
     * @return self
167
     */
168 2
    public function addHandler(Rfc6455MessageHandlerInterface $handler)
169
    {
170 2
        $this->handlers[] = $handler;
171
172 2
        return $this;
173
    }
174
175
    /**
176
     * @param Frame|string        $frame
177
     * @param ConnectionInterface $socket
178
     * @param int                 $opCode An int representing binary or text data (const of Frame class)
179
     */
180 8
    public function write($frame, ConnectionInterface $socket, int $opCode = Frame::OP_TEXT)
181
    {
182 8
        if (!$frame instanceof Frame) {
183 1
            $data = $frame;
184 1
            $frame = new Frame();
185 1
            $frame->setPayload($data);
186 1
            $frame->setOpcode($opCode);
187
        }
188
189 8
        $socket->write($frame->getRawData());
190 8
    }
191
192
    /**
193
     * @return FrameFactory
194
     */
195 2
    public function getFrameFactory(): FrameFactory
196
    {
197 2
        return $this->frameFactory;
198
    }
199
200
    /**
201
     * @param ConnectionInterface $socket
202
     */
203
    public function timeout(ConnectionInterface $socket)
204
    {
205
        $this->write($this->frameFactory->createCloseFrame(Frame::CLOSE_PROTOCOL_ERROR), $socket);
206
        $socket->close();
207
    }
208
209
    /**
210
     * @param ConnectionInterface $socket
211
     */
212
    public function close(ConnectionInterface $socket)
213
    {
214
        $this->write($this->frameFactory->createCloseFrame(), $socket);
215
    }
216
}
217