Completed
Pull Request — master (#81)
by Charlotte
02:04
created

MessageProcessor::processHelper()   A

Complexity

Conditions 3
Paths 3

Size

Total Lines 8
Code Lines 4

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 5
CRAP Score 3

Importance

Changes 0
Metric Value
dl 0
loc 8
ccs 5
cts 5
cp 1
rs 9.4285
c 0
b 0
f 0
cc 3
eloc 4
nc 3
nop 2
crap 3
1
<?php
2
/**
3
 * This file is a part of Woketo package.
4
 *
5
 * (c) Nekland <[email protected]>
6
 *
7
 * For the full license, take a look to the LICENSE file
8
 * on the root directory of this project
9
 */
10
11
namespace Nekland\Woketo\Rfc6455;
12
13
use Nekland\Woketo\Exception\Frame\IncoherentDataException;
14
use Nekland\Woketo\Exception\Frame\IncompleteFrameException;
15
use Nekland\Woketo\Exception\Frame\ProtocolErrorException;
16
use Nekland\Woketo\Exception\LimitationException;
17
use Nekland\Woketo\Rfc6455\MessageHandler\Rfc6455MessageHandlerInterface;
18
use React\Socket\ConnectionInterface;
19
20
/**
21
 * Class MessageProcessor
22
 *
23
 * This class is only a helper for Connection to avoid having so much instances of classes in memory.
24
 * Using it like that allow us to have only one instance of MessageProcessor.
25
 */
26
class MessageProcessor
27
{
28
    /**
29
     * @var FrameFactory
30
     */
31
    private $frameFactory;
32
33
    /**
34
     * @var Rfc6455MessageHandlerInterface[]
35
     */
36
    private $handlers;
37
38 13
    public function __construct(FrameFactory $factory = null)
39
    {
40 13
        $this->frameFactory = $factory ?: new FrameFactory();
41 13
        $this->handlers = [];
42 13
    }
43
44
    /**
45
     * This methods process data received from the socket to generate a `Message` entity and/or process handler
46
     * which may answer to some special frames.
47
     *
48
     * (binary data entry string in {}, frames in || and messages (of potentially many frames) in [])
49
     * This method buffer in many ways:
50
     *
51
     * - { [|frame1 (not final) |, |frame2 (final)|] }
52
     *   => buffer 2 frames from 1 binary to generate 1 message
53
     *
54
     * - { [|frame1 (not final, not finished } { frame 1 (final, finished)| } { |frame 2 (final, finished)|] }
55
     *   => buffer 2 frames from 3 binary data to generate 1 message
56
     *
57
     * - { [|frame1 (not final, not finished } { frame control 1 (final, finished)| } { |frame 2 (final, finished)|] }
58
     *   => buffer 2 frames from 3 binary data to generate 1 ping and 1 message
59
     *
60
     * @param string              $data
61
     * @param ConnectionInterface $socket
62
     * @param Message|null        $message
63
     * @return \Generator
64
     */
65 11
    public function onData(string $data, ConnectionInterface $socket, Message $message = null)
66
    {
67
        do {
68 11
            if (null === $message) {
69 11
                $message = new Message();
70
            }
71
72
            try {
73 11
                $message->addBuffer($data);
74
75
                // Loop that build message if the message is in many frames in the same data binary frame received.
76
                do {
77
                    try {
78 11
                        $frame = new Frame($message->getBuffer());
79
80 8
                        if ($frame->isControlFrame()) {
81 3
                            $controlFrameMessage = $this->processControlFrame($frame, $socket);
82
83 3
                            yield $controlFrameMessage; // Because every message should be returned !
84
                        } else {
85 7
                            $message->addFrame($frame);
86
                        }
87
88
                        // If the frame is a success maybe we still need to create messages
89
                        // And the buffer must be updated
90 8
                        $data = $message->removeFromBuffer($frame);
91 3
                    } catch (IncompleteFrameException $e) {
92
                        // Data is now stored in the message, let's clean the variable to stop both loops.
93
                        $data = null;
94
                    }
95 8
                } while(!$message->isComplete() && !empty($data));
96
97 8
                if ($message->isComplete()) {
98 7
                    $this->processHelper($message, $socket);
99
100 7
                    yield $message;
101 7
                    $message = null;
102 3
                } else if ($message->countFrames() > 0) {
103 8
                    yield $message;
104
                }
105 3
            } catch (IncoherentDataException $e) {
106 1
                $this->write($this->frameFactory->createCloseFrame(Frame::CLOSE_INCOHERENT_DATA), $socket);
107 1
                $socket->end();
108 1
                $data = '';
109 2
            } catch (ProtocolErrorException $e) {
110 1
                $this->write($this->frameFactory->createCloseFrame(Frame::CLOSE_PROTOCOL_ERROR), $socket);
111 1
                $socket->end();
112 1
                $data = '';
113 1
            } catch (LimitationException $e) {
114 1
                $this->write($this->frameFactory->createCloseFrame(Frame::CLOSE_TOO_BIG_TO_PROCESS), $socket);
115 1
                $socket->end();
116 1
                $data = '';
117
            }
118 11
        } while(!empty($data));
119 11
    }
120
121
    /**
122
     * @param Message $message
123
     * @param ConnectionInterface $socket
124
     */
125 8
    protected function processHelper(Message $message, ConnectionInterface $socket)
126
    {
127 8
        foreach ($this->handlers as $handler) {
128 2
            if ($handler->supports($message)) {
129 2
                $handler->process($message, $this, $socket);
130
            }
131
        }
132 8
    }
133
134
    /**
135
     * @param Frame $frame
136
     * @param ConnectionInterface $socket
137
     *
138
     * @return Message
139
     */
140 3
    protected function processControlFrame(Frame $frame, ConnectionInterface $socket) : Message
141
    {
142 3
        $controlFrameMessage = new Message();
143 3
        $controlFrameMessage->addFrame($frame);
144 3
        $this->processHelper($controlFrameMessage, $socket);
145
146 3
        return $controlFrameMessage;
147
    }
148
149
    /**
150
     * @param Rfc6455MessageHandlerInterface $handler
151
     * @return self
152
     */
153 2
    public function addHandler(Rfc6455MessageHandlerInterface $handler)
154
    {
155 2
        $this->handlers[] = $handler;
156
157 2
        return $this;
158
    }
159
160
    /**
161
     * @param Frame|string        $frame
162
     * @param ConnectionInterface $socket
163
     * @param int                 $opCode An int representing binary or text data (const of Frame class)
164
     */
165 6
    public function write($frame, ConnectionInterface $socket, int $opCode = Frame::OP_TEXT)
166
    {
167 6
        if (!$frame instanceof Frame) {
168 1
            $data = $frame;
169 1
            $frame = new Frame();
170 1
            $frame->setPayload($data);
171 1
            $frame->setOpcode($opCode);
172
        }
173
174 6
        $socket->write($frame->getRawData());
175 6
    }
176
177
    /**
178
     * @return FrameFactory
179
     */
180 2
    public function getFrameFactory(): FrameFactory
181
    {
182 2
        return $this->frameFactory;
183
    }
184
185
    /**
186
     * @param ConnectionInterface $socket
187
     */
188
    public function timeout(ConnectionInterface $socket)
189
    {
190
        $this->write($this->frameFactory->createCloseFrame(Frame::CLOSE_PROTOCOL_ERROR), $socket);
191
        $socket->close();
192
    }
193
194
    /**
195
     * @param ConnectionInterface $socket
196
     */
197
    public function close(ConnectionInterface $socket)
198
    {
199
        $this->write($this->frameFactory->createCloseFrame(), $socket);
200
    }
201
}
202