StreamedClientIo::readRegularData()   B
last analyzed

Complexity

Conditions 4
Paths 2

Size

Total Lines 29
Code Lines 19

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 19
CRAP Score 4

Importance

Changes 0
Metric Value
dl 0
loc 29
ccs 19
cts 19
cp 1
rs 8.5806
c 0
b 0
f 0
cc 4
eloc 19
nc 2
nop 1
crap 4
1
<?php
2
/**
3
 * Async sockets
4
 *
5
 * @copyright Copyright (c) 2015-2017, Efimov Evgenij <[email protected]>
6
 *
7
 * This source file is subject to the MIT license that is bundled
8
 * with this source code in the file LICENSE.
9
 */
10
namespace AsyncSockets\Socket\Io;
11
12
use AsyncSockets\Exception\DisconnectException;
13
use AsyncSockets\Exception\NetworkSocketException;
14
use AsyncSockets\Exception\RecvDataException;
15
use AsyncSockets\Exception\SendDataException;
16
use AsyncSockets\Frame\FramePickerInterface;
17
18
/**
19
 * Class StreamedClientIo
20
 */
21
class StreamedClientIo extends AbstractClientIo
22
{
23
    /**
24
     * Read attempts count
25
     */
26
    const READ_ATTEMPTS = 2;
27
28
    /**
29
     * Amount of read attempts
30
     *
31
     * @var int
32
     */
33
    private $readAttempts = self::READ_ATTEMPTS;
34
35
    /**
36
     * Remote socket address
37
     *
38
     * @var string
39
     */
40
    private $remoteAddress;
41
42
    /** {@inheritdoc} */
43 27
    protected function readRawDataIntoPicker(FramePickerInterface $picker, $isOutOfBand)
44
    {
45 27
        return $isOutOfBand ? $this->readOobData($picker) : $this->readRegularData($picker);
46
    }
47
48
    /** {@inheritdoc} */
49 7
    protected function writeRawData($data, $isOutOfBand)
50
    {
51 7
        $resource = $this->socket->getStreamResource();
52 7
        $this->verifySendResult('', stream_socket_sendto($resource, ''));
53
54
        $written = $isOutOfBand ?
55 6
            $this->writeOobData($resource, $data) :
56 5
            fwrite($resource, $data, strlen($data));
57
58 5
        $this->verifySendResult($data, $written);
59
60 3
        return $written;
61
    }
62
63
    /**
64
     * Verifies that send operation completed successfully
65
     *
66
     * @param string   $data Data for remote side
67
     * @param int|bool $sendResult Return value from send function
68
     *
69
     * @return void
70
     * @throws NetworkSocketException
71
     */
72 7
    private function verifySendResult($data, $sendResult)
73
    {
74 7
        if ($sendResult === false || $sendResult < 0) {
75 3
            throw new SendDataException(
76 3
                $this->socket,
77 3
                trim('Failed to send data. ' . $this->getLastPhpErrorMessage())
78 3
            );
79
        }
80
81 6
        if ($sendResult === 0 && !empty($data) && !$this->isConnected()) {
82 1
            throw DisconnectException::lostRemoteConnection($this->socket);
83
        }
84 6
    }
85
86
    /** {@inheritdoc} */
87 40
    public function isConnected()
88
    {
89 40
        return $this->resolveRemoteAddress() !== null;
90
    }
91
92
    /** {@inheritdoc} */
93 26
    protected function getRemoteAddress()
94
    {
95 26
        if ($this->remoteAddress === null) {
96 26
            $this->remoteAddress = $this->resolveRemoteAddress();
97 26
            if ($this->remoteAddress === null) {
98
                throw DisconnectException::lostRemoteConnection($this->socket);
99
            }
100 26
        }
101
102 26
        return $this->remoteAddress;
103
    }
104
105
    /** {@inheritdoc} */
106 3
    protected function canReachFrame()
107
    {
108 3
        return $this->readAttempts > 0 && $this->isConnected();
109
    }
110
111
    /**
112
     * Read OOB data from socket
113
     *
114
     * @param FramePickerInterface $picker
115
     *
116
     * @return string
117
     */
118 1
    private function readOobData(FramePickerInterface $picker)
119
    {
120 1
        $data = stream_socket_recvfrom(
121 1
            $this->socket->getStreamResource(),
122 1
            self::SOCKET_BUFFER_SIZE,
123
            STREAM_OOB
124 1
        );
125
126 1
        return $picker->pickUpData($data, $this->getRemoteAddress());
127
    }
128
129
    /**
130
     * Read regular data
131
     *
132
     * @param FramePickerInterface $picker Picker to read data into
133
     *
134
     * @return string
135
     */
136 26
    private function readRegularData(FramePickerInterface $picker)
137
    {
138
        // work-around https://bugs.php.net/bug.php?id=52602
139 26
        $resource         = $this->socket->getStreamResource();
140
        $readContext      = [
141 26
            'countCycles'       => 0,
142 26
            'dataBeforeIo'      => $this->getDataInSocket(),
143 26
            'isStreamDataEmpty' => false,
144 26
        ];
145
146
        do {
147 26
            $data = fread($resource, self::SOCKET_BUFFER_SIZE);
148 26
            if ($data === false) {
149 1
                throw new RecvDataException(
150 1
                    $this->socket,
151 1
                    trim('Failed to read data. ' . $this->getLastPhpErrorMessage())
152 1
                );
153
            }
154
155 25
            $isDataEmpty = $data === '';
156 25
            $result      = $picker->pickUpData($data, $this->getRemoteAddress());
157
158 25
            $readContext['countCycles']      += 1;
159 25
            $readContext['isStreamDataEmpty'] = $this->isReadDataActuallyEmpty($data);
160 25
            $this->readAttempts               = $this->resolveReadAttempts($readContext, $this->readAttempts);
161 25
        } while (!$picker->isEof() && !$isDataEmpty);
162
163 25
        return $result;
164
    }
165
166
    /**
167
     * Return first byte from socket buffer
168
     *
169
     * @return string
170
     */
171 26
    private function getDataInSocket()
172
    {
173 26
        return stream_socket_recvfrom($this->socket->getStreamResource(), 1, STREAM_PEEK);
174
    }
175
176
    /**
177
     * Checks whether data read from stream buffer can be filled later
178
     *
179
     * @param string $data Read data
180
     *
181
     * @return bool
182
     */
183 25
    private function isReadDataActuallyEmpty($data)
184
    {
185 25
        $result = false;
186 25
        if ($data === '') {
187 6
            $dataInSocket = $this->getDataInSocket();
188 6
            $result       = $dataInSocket === '' || $dataInSocket === false;
189 6
        }
190
191 25
        return $result;
192
    }
193
194
    /**
195
     * Calculate attempts value
196
     *
197
     * @param array $context Read context
198
     * @param int   $currentAttempts Current attempts counter
199
     *
200
     * @return int
201
     */
202 25
    private function resolveReadAttempts(array $context, $currentAttempts)
203
    {
204 25
        return ($context['countCycles'] === 1 && empty($context['dataBeforeIo'])) ||
205 11
               ($context['countCycles'] > 1   && $context['isStreamDataEmpty']) ?
206 25
            $currentAttempts - 1 :
207 25
            self::READ_ATTEMPTS;
208
209
    }
210
211
    /**
212
     * Write out-of-band data
213
     *
214
     * @param resource $socket Socket resource
215
     * @param string   $data Data to write
216
     *
217
     * @return int Amount of written bytes
218
     */
219 2
    private function writeOobData($socket, $data)
220
    {
221 2
        $result     = 0;
222 2
        $dataLength = strlen($data);
223 2
        for ($i = 0; $i < $dataLength; $i++) {
224 2
            $written = stream_socket_sendto($socket, $data[$i], STREAM_OOB);
225 2
            $this->verifySendResult($data[$i], $written);
226
227 1
            if ($written === 0) {
228
                break;
229
            }
230
231 1
            $result += $written;
232 1
        }
233
234 1
        return $result;
235
    }
236
237
    /**
238
     * Return remote address if we connected or false otherwise
239
     *
240
     * @return string|null
241
     */
242 40
    private function resolveRemoteAddress()
243
    {
244 40
        $result = stream_socket_get_name($this->socket->getStreamResource(), true);
245
246 40
        return $result !== false ? $result : null;
247
    }
248
}
249