Completed
Pull Request — master (#45)
by Charlotte
02:01
created

MessageProcessor::write()   A

Complexity

Conditions 2
Paths 2

Size

Total Lines 11
Code Lines 7

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 8
CRAP Score 2

Importance

Changes 0
Metric Value
c 0
b 0
f 0
dl 0
loc 11
ccs 8
cts 8
cp 1
rs 9.4285
cc 2
eloc 7
nc 2
nop 3
crap 2
1
<?php
2
/**
3
 * This file is a part of Woketo package.
4
 *
5
 * (c) Nekland <[email protected]>
6
 *
7
 * For the full license, take a look to the LICENSE file
8
 * on the root directory of this project
9
 */
10
11
namespace Nekland\Woketo\Rfc6455;
12
13
use Nekland\Woketo\Exception\Frame\TooBigControlFrameException;
14
use Nekland\Woketo\Exception\LimitationException;
15
use Nekland\Woketo\Rfc6455\MessageHandler\Rfc6455MessageHandlerInterface;
16
use React\Socket\ConnectionInterface;
17
18
/**
19
 * Class MessageProcessor
20
 *
21
 * This class is only a helper for Connection to avoid having so much instances of classes in memory.
22
 * Using it like that allow us to have only one instance of MessageProcessor.
23
 */
24
class MessageProcessor
25
{
26
    /**
27
     * @var FrameFactory
28
     */
29
    private $frameFactory;
30
31
    /**
32
     * @var Rfc6455MessageHandlerInterface[]
33
     */
34
    private $handlers;
35
36 7
    public function __construct(FrameFactory $factory = null)
37
    {
38 7
        $this->frameFactory = $factory ?: new FrameFactory();
39 7
        $this->handlers = [];
40 7
    }
41
42
    /**
43
     * @param string $data
44
     * @param ConnectionInterface $socket
45
     * @param Message|null $message
46
     * @return \Generator
47
     */
48 5
    public function onData(string $data, ConnectionInterface $socket, Message $message = null)
49
    {
50
        do {
51
52 5
            if (null === $message) {
53 5
                $message = new Message();
54
            }
55
56
            try {
57 5
                $data = $message->addData($data);
58
59 5
                if ($message->isComplete()) {
60 5
                    foreach ($this->handlers as $handler) {
61 2
                        if ($handler->supports($message)) {
62 2
                            $handler->process($message, $this, $socket);
63
                        }
64
                    }
65
66 5
                    yield $message;
67 5
                    $message = null;
68
                } else {
69 5
                    yield $message;
70
                }
71
            } catch (TooBigControlFrameException $e) {
72
                $this->write($this->frameFactory->createCloseFrame(Frame::CLOSE_PROTOCOL_ERROR), $socket);
73
                $socket->end();
74
                $data = '';
75
            } catch (LimitationException $e) {
76
                $this->write($this->frameFactory->createCloseFrame(Frame::CLOSE_TOO_BIG_TO_PROCESS), $socket);
77
                $socket->end();
78
                $data = '';
79
            }
80 5
        } while(!empty($data));
81 5
    }
82
83
    /**
84
     * @param Rfc6455MessageHandlerInterface $handler
85
     * @return self
86
     */
87 2
    public function addHandler(Rfc6455MessageHandlerInterface $handler)
88
    {
89 2
        $this->handlers[] = $handler;
90
91 2
        return $this;
92
    }
93
94
    /**
95
     * @param Frame|string        $frame
96
     * @param ConnectionInterface $socket
97
     * @param int                 $opCode An int representing binary or text data (const of Frame class)
98
     */
99 3
    public function write($frame, ConnectionInterface $socket, int $opCode = Frame::OP_TEXT)
100
    {
101 3
        if (!$frame instanceof Frame) {
102 1
            $data = $frame;
103 1
            $frame = new Frame();
104 1
            $frame->setPayload($data);
105 1
            $frame->setOpcode($opCode);
106
        }
107
108 3
        $socket->write($frame->getRawData());
109 3
    }
110
111
    /**
112
     * @return FrameFactory
113
     */
114 2
    public function getFrameFactory(): FrameFactory
115
    {
116 2
        return $this->frameFactory;
117
    }
118
119
    /**
120
     * @param ConnectionInterface $socket
121
     */
122
    public function timeout(ConnectionInterface $socket)
123
    {
124
        $this->write($this->frameFactory->createCloseFrame(Frame::CLOSE_PROTOCOL_ERROR), $socket);
125
        $socket->close();
126
    }
127
128
    /**
129
     * @param ConnectionInterface $socket
130
     */
131
    public function close(ConnectionInterface $socket)
132
    {
133
        $this->write($this->frameFactory->createCloseFrame(), $socket);
134
    }
135
}
136