Completed
Pull Request — master (#81)
by Charlotte
02:20
created

MessageProcessor::processHelper()   A

Complexity

Conditions 3
Paths 3

Size

Total Lines 8
Code Lines 4

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 5
CRAP Score 3

Importance

Changes 0
Metric Value
dl 0
loc 8
ccs 5
cts 5
cp 1
rs 9.4285
c 0
b 0
f 0
cc 3
eloc 4
nc 3
nop 2
crap 3
1
<?php
2
/**
3
 * This file is a part of Woketo package.
4
 *
5
 * (c) Nekland <[email protected]>
6
 *
7
 * For the full license, take a look to the LICENSE file
8
 * on the root directory of this project
9
 */
10
11
namespace Nekland\Woketo\Rfc6455;
12
13
use Nekland\Woketo\Exception\Frame\IncoherentDataException;
14
use Nekland\Woketo\Exception\Frame\IncompleteFrameException;
15
use Nekland\Woketo\Exception\Frame\ProtocolErrorException;
16
use Nekland\Woketo\Exception\LimitationException;
17
use Nekland\Woketo\Rfc6455\MessageHandler\Rfc6455MessageHandlerInterface;
18
use React\Socket\ConnectionInterface;
19
20
/**
21
 * Class MessageProcessor
22
 *
23
 * This class is only a helper for Connection to avoid having so much instances of classes in memory.
24
 * Using it like that allow us to have only one instance of MessageProcessor.
25
 */
26
class MessageProcessor
27
{
28
    /**
29
     * @var FrameFactory
30
     */
31
    private $frameFactory;
32
33
    /**
34
     * @var Rfc6455MessageHandlerInterface[]
35
     */
36
    private $handlers;
37
38 11
    public function __construct(FrameFactory $factory = null)
39
    {
40 11
        $this->frameFactory = $factory ?: new FrameFactory();
41 11
        $this->handlers = [];
42 11
    }
43
44
    /**
45
     * This methods process data received from the socket to generate a `Message` entity and/or process handler
46
     * which may answer to some special frames.
47
     *
48
     * (binary data entry string in {}, frames in || and messages (of potentially many frames) in [])
49
     * This method buffer in many ways:
50
     *
51
     * - { [|frame1 (not final) |, |frame2 (final)|] }
52
     *   => buffer 2 frames from 1 binary to generate 1 message
53
     *
54
     * - { [|frame1 (not final, not finished } { frame 1 (final, finished)| } { |frame 2 (final, finished)|] }
55
     *   => buffer 2 frames from 3 binary data to generate 1 message
56
     *
57
     * - { [|frame1 (not final, not finished } { frame control 1 (final, finished)| } { |frame 2 (final, finished)|] }
58
     *   => buffer 2 frames from 3 binary data to generate 1 ping and 1 message
59
     *
60
     * @param string              $data
61
     * @param ConnectionInterface $socket
62
     * @param Message|null        $message
63
     * @return \Generator
64
     */
65 9
    public function onData(string $data, ConnectionInterface $socket, Message $message = null)
66
    {
67
        do {
68 9
            if (null === $message) {
69 9
                $message = new Message();
70
            }
71
72
            try {
73 9
                $message->addBuffer($data);
74
75
                // Loop that build message if the message is in many frames in the same data binary frame received.
76
                do {
77
                    try {
78 9
                        $frame = new Frame($message->getBuffer());
79
80 6
                        if ($frame->isControlFrame()) {
81 2
                            $controlFrameMessage = $this->processControlFrame($frame, $socket);
82
83 2
                            yield $controlFrameMessage; // Because every message should be returned !
84
                        } else {
85 5
                            $message->addFrame($frame);
86
                        }
87
88
                        // If the frame is a success maybe we still need to create messages
89
                        // And the buffer must be updated
90 6
                        $data = $message->removeFromBuffer($frame);
91 3
                    } catch (IncompleteFrameException $e) {
92
                        // Data is now stored in the message, let's clean the variable to stop both loops.
93
                        $data = null;
94
                    }
95 6
                } while(!$message->isComplete() && !empty($data));
96
97 6
                if ($message->isComplete()) {
98 5
                    $this->processHelper($message, $socket);
99
100 5
                    yield $message;
101 5
                    $message = null;
102
                } else {
103 6
                    yield $message;
104
                }
105 3
            } catch (IncoherentDataException $e) {
106 1
                $this->write($this->frameFactory->createCloseFrame(Frame::CLOSE_INCOHERENT_DATA), $socket);
107 1
                $socket->end();
108 1
                $data = '';
109 2
            } catch (ProtocolErrorException $e) {
110 1
                $this->write($this->frameFactory->createCloseFrame(Frame::CLOSE_PROTOCOL_ERROR), $socket);
111 1
                $socket->end();
112 1
                $data = '';
113 1
            } catch (LimitationException $e) {
114 1
                $this->write($this->frameFactory->createCloseFrame(Frame::CLOSE_TOO_BIG_TO_PROCESS), $socket);
115 1
                $socket->end();
116 1
                $data = '';
117
            }
118 9
        } while(!empty($data));
119 9
    }
120
121
    /**
122
     * @param Message $message
123
     * @param ConnectionInterface $socket
124
     */
125 6
    protected function processHelper(Message $message, ConnectionInterface $socket)
126
    {
127 6
        foreach ($this->handlers as $handler) {
128 2
            if ($handler->supports($message)) {
129 2
                $handler->process($message, $this, $socket);
130
            }
131
        }
132 6
    }
133
134
    /**
135
     * @param Frame $frame
136
     * @param ConnectionInterface $socket
137
     *
138
     * @return Message
139
     */
140 2
    protected function processControlFrame(Frame $frame, ConnectionInterface $socket) : Message
141
    {
142 2
        $controlFrameMessage = new Message();
143 2
        $controlFrameMessage->addFrame($frame);
144 2
        $this->processHelper($controlFrameMessage, $socket);
145
146 2
        return $controlFrameMessage;
147
    }
148
149
    protected function processData()
150
    {
151
152
    }
153
154
    /**
155
     * @param Rfc6455MessageHandlerInterface $handler
156
     * @return self
157
     */
158 2
    public function addHandler(Rfc6455MessageHandlerInterface $handler)
159
    {
160 2
        $this->handlers[] = $handler;
161
162 2
        return $this;
163
    }
164
165
    /**
166
     * @param Frame|string        $frame
167
     * @param ConnectionInterface $socket
168
     * @param int                 $opCode An int representing binary or text data (const of Frame class)
169
     */
170 6
    public function write($frame, ConnectionInterface $socket, int $opCode = Frame::OP_TEXT)
171
    {
172 6
        if (!$frame instanceof Frame) {
173 1
            $data = $frame;
174 1
            $frame = new Frame();
175 1
            $frame->setPayload($data);
176 1
            $frame->setOpcode($opCode);
177
        }
178
179 6
        $socket->write($frame->getRawData());
180 6
    }
181
182
    /**
183
     * @return FrameFactory
184
     */
185 2
    public function getFrameFactory(): FrameFactory
186
    {
187 2
        return $this->frameFactory;
188
    }
189
190
    /**
191
     * @param ConnectionInterface $socket
192
     */
193
    public function timeout(ConnectionInterface $socket)
194
    {
195
        $this->write($this->frameFactory->createCloseFrame(Frame::CLOSE_PROTOCOL_ERROR), $socket);
196
        $socket->close();
197
    }
198
199
    /**
200
     * @param ConnectionInterface $socket
201
     */
202
    public function close(ConnectionInterface $socket)
203
    {
204
        $this->write($this->frameFactory->createCloseFrame(), $socket);
205
    }
206
}
207