Completed
Pull Request — master (#28)
by Maxime
01:54
created

MessageProcessor::onData()   C

Complexity

Conditions 7
Paths 20

Size

Total Lines 31
Code Lines 19

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
c 0
b 0
f 0
dl 0
loc 31
rs 6.7272
cc 7
eloc 19
nc 20
nop 3
1
<?php
2
/**
3
 * This file is a part of Woketo package.
4
 *
5
 * (c) Nekland <[email protected]>
6
 *
7
 * For the full license, take a look to the LICENSE file
8
 * on the root directory of this project
9
 */
10
11
namespace Nekland\Woketo\Rfc6455;
12
13
use Nekland\Tools\StringTools;
14
use Nekland\Woketo\Exception\LimitationException;
15
use Nekland\Woketo\Rfc6455\MessageHandler\Rfc6455MessageHandlerInterface;
16
use Nekland\Woketo\Utils\BitManipulation;
17
use React\Socket\ConnectionInterface;
18
19
/**
20
 * Class MessageProcessor
21
 *
22
 * This class is only a helper for Connection to avoid having so much instances of classes in memory.
23
 * Using it like that allow us to have only one instance of MessageProcessor.
24
 */
25
class MessageProcessor
26
{
27
    /**
28
     * @var FrameFactory
29
     */
30
    private $frameFactory;
31
32
    /**
33
     * @var Rfc6455MessageHandlerInterface[]
34
     */
35
    private $handlers;
36
37
    public function __construct(FrameFactory $factory = null)
38
    {
39
        $this->frameFactory = $factory ?: new FrameFactory();
40
        $this->handlers = [];
41
    }
42
43
    /**
44
     * @param string $data
45
     * @param ConnectionInterface $socket
46
     * @param Message|null $message
47
     * @return \Generator
48
     */
49
    public function onData(string $data, ConnectionInterface $socket, Message $message = null)
50
    {
51
        do {
52
53
            if (null === $message) {
54
                $message = new Message();
55
            }
56
57
            try {
58
                $data = $message->addData($data);
59
60
                if ($message->isComplete()) {
61
                    foreach ($this->handlers as $handler) {
62
                        if ($handler->supports($message)) {
63
                            $handler->process($message, $this, $socket);
64
                        }
65
                    }
66
67
                    yield $message;
68
                    $message = null;
69
                } else {
70
                    yield $message;
71
                }
72
73
            } catch (LimitationException $e) {
74
                $this->write($this->frameFactory->createCloseFrame(Frame::CLOSE_TOO_BIG_TO_PROCESS), $socket);
75
                $socket->end();
76
                $data = '';
77
            }
78
        } while(!empty($data));
79
    }
80
81
    /**
82
     * @param Rfc6455MessageHandlerInterface $handler
83
     * @return self
84
     */
85
    public function addHandler(Rfc6455MessageHandlerInterface $handler)
86
    {
87
        $this->handlers[] = $handler;
88
89
        return $this;
90
    }
91
92
    /**
93
     * @param Frame|string        $frame
94
     * @param ConnectionInterface $socket
95
     * @param int                 $opCode An int representing binary or text data (const of Frame class)
96
     */
97
    public function write($frame, ConnectionInterface $socket, int $opCode = Frame::OP_TEXT)
98
    {
99
        if (!$frame instanceof Frame) {
100
            $data = $frame;
101
            $frame = new Frame();
102
            $frame->setPayload($data);
103
            $frame->setOpcode($opCode);
104
        }
105
106
        $socket->write($frame->getRawData());
107
    }
108
109
    /**
110
     * @return FrameFactory
111
     */
112
    public function getFrameFactory(): FrameFactory
113
    {
114
        return $this->frameFactory;
115
    }
116
117
    /**
118
     * @param ConnectionInterface $socket
119
     */
120
    public function timeout(ConnectionInterface $socket)
121
    {
122
        $this->write($this->frameFactory->createCloseFrame(Frame::CLOSE_PROTOCOL_ERROR), $socket);
123
        $socket->close();
124
    }
125
126
    /**
127
     * @param ConnectionInterface $socket
128
     */
129
    public function close(ConnectionInterface $socket)
130
    {
131
        $this->write($this->frameFactory->createCloseFrame(), $socket);
132
    }
133
}
134