Completed
Branch 0.4-dev (87ea6d)
by Evgenij
02:20
created

StreamedClientIo::writeOobData()   B

Complexity

Conditions 4
Paths 4

Size

Total Lines 22
Code Lines 13

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 9
CRAP Score 4.4661

Importance

Changes 0
Metric Value
dl 0
loc 22
ccs 9
cts 13
cp 0.6923
rs 8.9197
c 0
b 0
f 0
cc 4
eloc 13
nc 4
nop 2
crap 4.4661
1
<?php
2
/**
3
 * Async sockets
4
 *
5
 * @copyright Copyright (c) 2015-2017, Efimov Evgenij <[email protected]>
6
 *
7
 * This source file is subject to the MIT license that is bundled
8
 * with this source code in the file LICENSE.
9
 */
10
namespace AsyncSockets\Socket\Io;
11
12
use AsyncSockets\Exception\DisconnectException;
13
use AsyncSockets\Exception\RecvDataException;
14
use AsyncSockets\Exception\SendDataException;
15
use AsyncSockets\Frame\FramePickerInterface;
16
17
/**
18
 * Class StreamedClientIo
19
 */
20
class StreamedClientIo extends AbstractClientIo
21
{
22
    /**
23
     * Read attempts count
24
     */
25
    const READ_ATTEMPTS = 2;
26
27
    /**
28
     * Amount of read attempts
29
     *
30
     * @var int
31
     */
32
    private $readAttempts = self::READ_ATTEMPTS;
33
34
    /**
35
     * Remote socket address
36
     *
37
     * @var string
38
     */
39
    private $remoteAddress;
40
41
    /** {@inheritdoc} */
42 26
    protected function readRawDataIntoPicker(FramePickerInterface $picker, $isOutOfBand)
43
    {
44 26
        return $isOutOfBand ? $this->readOobData($picker) : $this->readRegularData($picker);
45
    }
46
47
    /** {@inheritdoc} */
48 5
    protected function writeRawData($data, $isOutOfBand)
49
    {
50 5
        $resource = $this->socket->getStreamResource();
51 5
        $test     = stream_socket_sendto($resource, '');
52 5
        if ($test !== 0) {
53 1
            throw new SendDataException(
54 1
                $this->socket,
55 1
                trim('Failed to send data. ' . $this->getLastPhpErrorMessage())
56
            );
57
        }
58
59 4
        $written = $isOutOfBand ?
60 1
            $this->writeOobData($resource, $data) :
61 4
            fwrite($resource, $data, strlen($data));
62
63 4
        if ($written === false) {
64 1
            throw new SendDataException(
65 1
                $this->socket,
66 1
                trim('Failed to send data. ' . $this->getLastPhpErrorMessage())
67
            );
68
        }
69
70 3
        if ($written === 0 && !empty($data) && !$this->isConnected()) {
71 1
            throw new DisconnectException($this->socket, 'Remote connection has been lost.');
72
        }
73
74 2
        return $written;
75
    }
76
77
    /** {@inheritdoc} */
78 37
    protected function isConnected()
79
    {
80 37
        return $this->resolveRemoteAddress() !== null;
81
    }
82
83
    /** {@inheritdoc} */
84 25
    protected function getRemoteAddress()
85
    {
86 25
        if ($this->remoteAddress === null) {
87 25
            $this->remoteAddress = $this->resolveRemoteAddress();
88 25
            if ($this->remoteAddress === null) {
89
                throw new DisconnectException($this->socket, 'Remote connection has been lost.');
90
            }
91
        }
92
93 25
        return $this->remoteAddress;
94
    }
95
96
    /** {@inheritdoc} */
97 2
    protected function canReachFrame()
98
    {
99 2
        return $this->readAttempts > 0 && $this->isConnected();
100
    }
101
102
    /**
103
     * Read OOB data from socket
104
     *
105
     * @param FramePickerInterface $picker
106
     *
107
     * @return string
108
     */
109 1
    private function readOobData(FramePickerInterface $picker)
110
    {
111 1
        $data = stream_socket_recvfrom(
112 1
            $this->socket->getStreamResource(),
113 1
            self::SOCKET_BUFFER_SIZE,
114 1
            STREAM_OOB
115
        );
116
117 1
        return $picker->pickUpData($data, $this->getRemoteAddress());
118
    }
119
120
    /**
121
     * Read regular data
122
     *
123
     * @param FramePickerInterface $picker Picker to read data into
124
     *
125
     * @return string
126
     */
127 25
    private function readRegularData(FramePickerInterface $picker)
128
    {
129
        // work-around https://bugs.php.net/bug.php?id=52602
130 25
        $resource         = $this->socket->getStreamResource();
131
        $readContext      = [
132 25
            'countCycles'       => 0,
133 25
            'dataBeforeIo'      => $this->getDataInSocket(),
134
            'isStreamDataEmpty' => false,
135
        ];
136
137
        do {
138 25
            $data = fread($resource, self::SOCKET_BUFFER_SIZE);
139 25
            if ($data === false) {
140 1
                throw new RecvDataException(
141 1
                    $this->socket,
142 1
                    trim('Failed to read data. ' . $this->getLastPhpErrorMessage())
143
                );
144
            }
145
146 24
            $isDataEmpty = $data === '';
147 24
            $result      = $picker->pickUpData($data, $this->getRemoteAddress());
148
149 24
            $readContext['countCycles']      += 1;
150 24
            $readContext['isStreamDataEmpty'] = $this->isReadDataActuallyEmpty($data);
151 24
            $this->readAttempts               = $this->resolveReadAttempts($readContext, $this->readAttempts);
152 24
        } while (!$picker->isEof() && !$isDataEmpty);
153
154 24
        return $result;
155
    }
156
157
    /**
158
     * Return first byte from socket buffer
159
     *
160
     * @return string
161
     */
162 25
    private function getDataInSocket()
163
    {
164 25
        return stream_socket_recvfrom($this->socket->getStreamResource(), 1, STREAM_PEEK);
165
    }
166
167
    /**
168
     * Checks whether data read from stream buffer can be filled later
169
     *
170
     * @param string $data Read data
171
     *
172
     * @return bool
173
     */
174 24
    private function isReadDataActuallyEmpty($data)
175
    {
176 24
        $result = false;
177 24
        if ($data === '') {
178 5
            $dataInSocket = $this->getDataInSocket();
179 5
            $result       = $dataInSocket === '' || $dataInSocket === false;
180
        }
181
182 24
        return $result;
183
    }
184
185
    /**
186
     * Calculate attempts value
187
     *
188
     * @param array $context Read context
189
     * @param int   $currentAttempts Current attempts counter
190
     *
191
     * @return int
192
     */
193 24
    private function resolveReadAttempts(array $context, $currentAttempts)
194
    {
195 24
        return ($context['countCycles'] === 1 && empty($context['dataBeforeIo'])) ||
196 24
               ($context['countCycles'] > 1   && $context['isStreamDataEmpty']) ?
197 20
            $currentAttempts - 1 :
198 24
            self::READ_ATTEMPTS;
199
200
    }
201
202
    /**
203
     * Write out-of-band data
204
     *
205
     * @param resource $socket Socket resource
206
     * @param string   $data Data to write
207
     *
208
     * @return int Amount of written bytes
209
     */
210 1
    private function writeOobData($socket, $data)
211
    {
212 1
        $result     = 0;
213 1
        $dataLength = strlen($data);
214 1
        for ($i = 0; $i < $dataLength; $i++) {
215 1
            $written = stream_socket_sendto($socket, $data[$i], STREAM_OOB);
216 1
            if ($written < 0) {
217
                throw new SendDataException(
218
                    $this->socket,
219
                    trim('Failed to send data. ' . $this->getLastPhpErrorMessage())
220
                );
221
            }
222
223 1
            if ($written === 0) {
224
                break;
225
            }
226
227 1
            $result += $written;
228
        }
229
230 1
        return $result;
231
    }
232
233
    /**
234
     * Return remote address if we connected or false otherwise
235
     *
236
     * @return string|null
237
     */
238 37
    private function resolveRemoteAddress()
239
    {
240 37
        $result = stream_socket_get_name($this->socket->getStreamResource(), true);
241
242 37
        return $result !== false ? $result : null;
243
    }
244
}
245