Completed
Pull Request — master (#81)
by Maxime
02:03
created

MessageProcessor::processHelper()   A

Complexity

Conditions 3
Paths 3

Size

Total Lines 8
Code Lines 4

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 5
CRAP Score 3

Importance

Changes 0
Metric Value
dl 0
loc 8
ccs 5
cts 5
cp 1
rs 9.4285
c 0
b 0
f 0
cc 3
eloc 4
nc 3
nop 2
crap 3
1
<?php
2
/**
3
 * This file is a part of Woketo package.
4
 *
5
 * (c) Nekland <[email protected]>
6
 *
7
 * For the full license, take a look to the LICENSE file
8
 * on the root directory of this project
9
 */
10
11
namespace Nekland\Woketo\Rfc6455;
12
13
use Nekland\Tools\StringTools;
14
use Nekland\Woketo\Exception\Frame\IncoherentDataException;
15
use Nekland\Woketo\Exception\Frame\IncompleteFrameException;
16
use Nekland\Woketo\Exception\Frame\ProtocolErrorException;
17
use Nekland\Woketo\Exception\LimitationException;
18
use Nekland\Woketo\Rfc6455\MessageHandler\Rfc6455MessageHandlerInterface;
19
use React\Socket\ConnectionInterface;
20
21
/**
22
 * Class MessageProcessor
23
 *
24
 * This class is only a helper for Connection to avoid having so much instances of classes in memory.
25
 * Using it like that allow us to have only one instance of MessageProcessor.
26
 */
27
class MessageProcessor
28
{
29
    /**
30
     * @var FrameFactory
31
     */
32
    private $frameFactory;
33
34
    /**
35
     * @var Rfc6455MessageHandlerInterface[]
36
     */
37
    private $handlers;
38
39 11
    public function __construct(FrameFactory $factory = null)
40
    {
41 11
        $this->frameFactory = $factory ?: new FrameFactory();
42 11
        $this->handlers = [];
43 11
    }
44
45
    /**
46
     * This methods process data received from the socket to generate a `Message` entity and/or process handler
47
     * which may answer to some special frames.
48
     *
49
     * (binary data entry string in {}, frames in || and messages (of potentially many frames) in [])
50
     * This method buffer in many ways:
51
     *
52
     * - { [|frame1 (not final) |, |frame2 (final)|] }
53
     *   => buffer 2 frames from 1 binary to generate 1 message
54
     *
55
     * - { [|frame1 (not final, not finished } { frame 1 (not final, finished)| } { |frame 2 (final)|] }
56
     *   => buffer 2 frames from 3 binary data to generate 1 message
57
     *
58
     *
59
     * TODO: refactor this part that is far to complicated to be understanding by normal humans.
60
     *
61
     * @param string              $data
62
     * @param ConnectionInterface $socket
63
     * @param Message|null        $message
64
     * @return \Generator
65
     */
66 9
    public function onData(string $data, ConnectionInterface $socket, Message $message = null)
67
    {
68
        do {
69 9
            if (null === $message) {
70 9
                $message = new Message();
71
            }
72
73
            try {
74 9
                $data = $message->addData($data);
75
76
                // Loop that build message if the message is in many frames in the same data binary frame received.
77
                do {
78
                    try {
79 6
                        $message->addBuffer($data);
80 6
                        $frame = new Frame($message->getBuffer());
81
82 2
                        if ($frame->isControlFrame()) {
83
                            $controlFrameMessage = new Message();
84
                            $controlFrameMessage->addFrame($frame);
85
                            $this->processHelper($controlFrameMessage, $socket);
86
87
                            yield $controlFrameMessage; // Because every message should be return !
88
                        } else {
89 2
                            $message->addFrame($frame);
90
91
                            // If the frame is a success maybe we still need to create messages
92
                            $data = StringTools::removeStart($data, $frame->getRawData(), '8bit');
93
                        }
94 6
                    } catch (IncompleteFrameException $e) {
95
                        // Data is now stored in the message, let's clean the variable to stop both loops.
96 4
                        $data = null;
97
                    }
98 4
                } while(!$message->isComplete() && !empty($data));
99
100 4
                if ($message->isComplete()) {
101 4
                    $this->processHelper($message, $socket);
102
103 4
                    yield $message;
104 4
                    $message = null;
105
                } else {
106 4
                    yield $message;
107
                }
108 5
            } catch (IncoherentDataException $e) {
109 1
                $this->write($this->frameFactory->createCloseFrame(Frame::CLOSE_INCOHERENT_DATA), $socket);
110 1
                $socket->end();
111 1
                $data = '';
112 4
            } catch (ProtocolErrorException $e) {
113 1
                $this->write($this->frameFactory->createCloseFrame(Frame::CLOSE_PROTOCOL_ERROR), $socket);
114 1
                $socket->end();
115 1
                $data = '';
116 3
            } catch (LimitationException $e) {
117 1
                $this->write($this->frameFactory->createCloseFrame(Frame::CLOSE_TOO_BIG_TO_PROCESS), $socket);
118 1
                $socket->end();
119 1
                $data = '';
120
            }
121 7
        } while(!empty($data));
122 7
    }
123
124
    /**
125
     * @param Message $message
126
     * @param ConnectionInterface $socket
127
     */
128 4
    protected function processHelper(Message $message, ConnectionInterface $socket)
129
    {
130 4
        foreach ($this->handlers as $handler) {
131 1
            if ($handler->supports($message)) {
132 1
                $handler->process($message, $this, $socket);
133
            }
134
        }
135 4
    }
136
137
    /**
138
     * @param Rfc6455MessageHandlerInterface $handler
139
     * @return self
140
     */
141 2
    public function addHandler(Rfc6455MessageHandlerInterface $handler)
142
    {
143 2
        $this->handlers[] = $handler;
144
145 2
        return $this;
146
    }
147
148
    /**
149
     * @param Frame|string        $frame
150
     * @param ConnectionInterface $socket
151
     * @param int                 $opCode An int representing binary or text data (const of Frame class)
152
     */
153 5
    public function write($frame, ConnectionInterface $socket, int $opCode = Frame::OP_TEXT)
154
    {
155 5
        if (!$frame instanceof Frame) {
156 1
            $data = $frame;
157 1
            $frame = new Frame();
158 1
            $frame->setPayload($data);
159 1
            $frame->setOpcode($opCode);
160
        }
161
162 5
        $socket->write($frame->getRawData());
163 5
    }
164
165
    /**
166
     * @return FrameFactory
167
     */
168 1
    public function getFrameFactory(): FrameFactory
169
    {
170 1
        return $this->frameFactory;
171
    }
172
173
    /**
174
     * @param ConnectionInterface $socket
175
     */
176
    public function timeout(ConnectionInterface $socket)
177
    {
178
        $this->write($this->frameFactory->createCloseFrame(Frame::CLOSE_PROTOCOL_ERROR), $socket);
179
        $socket->close();
180
    }
181
182
    /**
183
     * @param ConnectionInterface $socket
184
     */
185
    public function close(ConnectionInterface $socket)
186
    {
187
        $this->write($this->frameFactory->createCloseFrame(), $socket);
188
    }
189
}
190