1 | <?php |
||
26 | class MessageProcessor |
||
27 | { |
||
28 | /** |
||
29 | * @var FrameFactory |
||
30 | */ |
||
31 | private $frameFactory; |
||
32 | |||
33 | /** |
||
34 | * @var Rfc6455MessageHandlerInterface[] |
||
35 | */ |
||
36 | private $handlers; |
||
37 | |||
38 | 13 | public function __construct(FrameFactory $factory = null) |
|
43 | |||
44 | /** |
||
45 | * This methods process data received from the socket to generate a `Message` entity and/or process handler |
||
46 | * which may answer to some special frames. |
||
47 | * |
||
48 | * (binary data entry string in {}, frames in || and messages (of potentially many frames) in []) |
||
49 | * This method buffer in many ways: |
||
50 | * |
||
51 | * - { [|frame1 (not final) |, |frame2 (final)|] } |
||
52 | * => buffer 2 frames from 1 binary to generate 1 message |
||
53 | * |
||
54 | * - { [|frame1 (not final, not finished } { frame 1 (final, finished)| } { |frame 2 (final, finished)|] } |
||
55 | * => buffer 2 frames from 3 binary data to generate 1 message |
||
56 | * |
||
57 | * - { [|frame1 (not final, not finished } { frame control 1 (final, finished)| } { |frame 2 (final, finished)|] } |
||
58 | * => buffer 2 frames from 3 binary data to generate 1 ping and 1 message |
||
59 | * |
||
60 | * @param string $data |
||
61 | * @param ConnectionInterface $socket |
||
62 | * @param Message|null $message |
||
63 | * @return \Generator |
||
64 | */ |
||
65 | 11 | public function onData(string $data, ConnectionInterface $socket, Message $message = null) |
|
66 | { |
||
67 | do { |
||
68 | 11 | if (null === $message) { |
|
69 | 11 | $message = new Message(); |
|
70 | } |
||
71 | |||
72 | try { |
||
73 | 11 | $message->addBuffer($data); |
|
74 | |||
75 | // Loop that build message if the message is in many frames in the same data binary frame received. |
||
76 | do { |
||
77 | try { |
||
78 | 11 | $frame = new Frame($message->getBuffer()); |
|
79 | |||
80 | 8 | if ($frame->isControlFrame()) { |
|
81 | 3 | $controlFrameMessage = $this->processControlFrame($frame, $socket); |
|
82 | |||
83 | 3 | yield $controlFrameMessage; // Because every message should be returned ! |
|
84 | } else { |
||
85 | 7 | $message->addFrame($frame); |
|
86 | } |
||
87 | |||
88 | // If the frame is a success maybe we still need to create messages |
||
89 | // And the buffer must be updated |
||
90 | 8 | $data = $message->removeFromBuffer($frame); |
|
91 | 3 | } catch (IncompleteFrameException $e) { |
|
92 | // Data is now stored in the message, let's clean the variable to stop both loops. |
||
93 | $data = null; |
||
94 | } |
||
95 | 8 | } while(!$message->isComplete() && !empty($data)); |
|
96 | |||
97 | 8 | if ($message->isComplete()) { |
|
98 | 7 | $this->processHelper($message, $socket); |
|
99 | |||
100 | 7 | yield $message; |
|
101 | 7 | $message = null; |
|
102 | 3 | } else if ($message->countFrames() > 0) { |
|
103 | 8 | yield $message; |
|
104 | } |
||
105 | 3 | } catch (IncoherentDataException $e) { |
|
106 | 1 | $this->write($this->frameFactory->createCloseFrame(Frame::CLOSE_INCOHERENT_DATA), $socket); |
|
107 | 1 | $socket->end(); |
|
108 | 1 | $data = ''; |
|
109 | 2 | } catch (ProtocolErrorException $e) { |
|
110 | 1 | $this->write($this->frameFactory->createCloseFrame(Frame::CLOSE_PROTOCOL_ERROR), $socket); |
|
111 | 1 | $socket->end(); |
|
112 | 1 | $data = ''; |
|
113 | 1 | } catch (LimitationException $e) { |
|
114 | 1 | $this->write($this->frameFactory->createCloseFrame(Frame::CLOSE_TOO_BIG_TO_PROCESS), $socket); |
|
115 | 1 | $socket->end(); |
|
116 | 1 | $data = ''; |
|
117 | } |
||
118 | 11 | } while(!empty($data)); |
|
119 | 11 | } |
|
120 | |||
121 | /** |
||
122 | * @param Message $message |
||
123 | * @param ConnectionInterface $socket |
||
124 | */ |
||
125 | 8 | protected function processHelper(Message $message, ConnectionInterface $socket) |
|
133 | |||
134 | /** |
||
135 | * @param Frame $frame |
||
136 | * @param ConnectionInterface $socket |
||
137 | * |
||
138 | * @return Message |
||
139 | */ |
||
140 | 3 | protected function processControlFrame(Frame $frame, ConnectionInterface $socket) : Message |
|
148 | |||
149 | /** |
||
150 | * @param Rfc6455MessageHandlerInterface $handler |
||
151 | * @return self |
||
152 | */ |
||
153 | 2 | public function addHandler(Rfc6455MessageHandlerInterface $handler) |
|
159 | |||
160 | /** |
||
161 | * @param Frame|string $frame |
||
162 | * @param ConnectionInterface $socket |
||
163 | * @param int $opCode An int representing binary or text data (const of Frame class) |
||
164 | */ |
||
165 | 6 | public function write($frame, ConnectionInterface $socket, int $opCode = Frame::OP_TEXT) |
|
176 | |||
177 | /** |
||
178 | * @return FrameFactory |
||
179 | */ |
||
180 | 2 | public function getFrameFactory(): FrameFactory |
|
184 | |||
185 | /** |
||
186 | * @param ConnectionInterface $socket |
||
187 | */ |
||
188 | public function timeout(ConnectionInterface $socket) |
||
193 | |||
194 | /** |
||
195 | * @param ConnectionInterface $socket |
||
196 | */ |
||
197 | public function close(ConnectionInterface $socket) |
||
201 | } |
||
202 |