Completed
Branch 0.4-dev (25e57a)
by Evgenij
02:18
created

StreamedClientIo::verifySendResult()   B

Complexity

Conditions 6
Paths 3

Size

Total Lines 13
Code Lines 7

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 4
CRAP Score 6

Importance

Changes 0
Metric Value
dl 0
loc 13
rs 8.8571
c 0
b 0
f 0
ccs 4
cts 4
cp 1
cc 6
eloc 7
nc 3
nop 2
crap 6
1
<?php
2
/**
3
 * Async sockets
4
 *
5
 * @copyright Copyright (c) 2015-2017, Efimov Evgenij <[email protected]>
6
 *
7
 * This source file is subject to the MIT license that is bundled
8
 * with this source code in the file LICENSE.
9
 */
10
namespace AsyncSockets\Socket\Io;
11
12
use AsyncSockets\Exception\DisconnectException;
13
use AsyncSockets\Exception\NetworkSocketException;
14
use AsyncSockets\Exception\RecvDataException;
15
use AsyncSockets\Exception\SendDataException;
16
use AsyncSockets\Frame\FramePickerInterface;
17
18
/**
19
 * Class StreamedClientIo
20
 */
21
class StreamedClientIo extends AbstractClientIo
22
{
23
    /**
24
     * Read attempts count
25
     */
26
    const READ_ATTEMPTS = 2;
27
28
    /**
29
     * Amount of read attempts
30
     *
31
     * @var int
32
     */
33
    private $readAttempts = self::READ_ATTEMPTS;
34
35
    /**
36
     * Remote socket address
37
     *
38
     * @var string
39
     */
40
    private $remoteAddress;
41
42 26
    /** {@inheritdoc} */
43
    protected function readRawDataIntoPicker(FramePickerInterface $picker, $isOutOfBand)
44 26
    {
45
        return $isOutOfBand ? $this->readOobData($picker) : $this->readRegularData($picker);
46
    }
47
48 5
    /** {@inheritdoc} */
49
    protected function writeRawData($data, $isOutOfBand)
50 5
    {
51 5
        $resource = $this->socket->getStreamResource();
52 5
        $this->verifySendResult('', stream_socket_sendto($resource, ''));
53 1
54 1
        $written = $isOutOfBand ?
55 1
            $this->writeOobData($resource, $data) :
56
            fwrite($resource, $data, strlen($data));
57
58
        $this->verifySendResult($data, $written);
59 4
60 1
        return $written;
61 4
    }
62
63 4
    /**
64 1
     * Verifies that send operation completed successfully
65 1
     *
66 1
     * @param string   $data Data for remote side
67
     * @param int|bool $sendResult Return value from send function
68
     *
69
     * @return void
70 3
     * @throws NetworkSocketException
71 1
     */
72
    private function verifySendResult($data, $sendResult)
73
    {
74 2
        if ($sendResult === false || $sendResult < 0) {
75
            throw new SendDataException(
76
                $this->socket,
77
                trim('Failed to send data. ' . $this->getLastPhpErrorMessage())
78 37
            );
79
        }
80 37
81
        if ($sendResult === 0 && !empty($data) && !$this->isConnected()) {
82
            throw DisconnectException::lostRemoteConnection($this->socket);
83
        }
84 25
    }
85
86 25
    /** {@inheritdoc} */
87 25
    protected function isConnected()
88 25
    {
89
        return $this->resolveRemoteAddress() !== null;
90
    }
91
92
    /** {@inheritdoc} */
93 25
    protected function getRemoteAddress()
94
    {
95
        if ($this->remoteAddress === null) {
96
            $this->remoteAddress = $this->resolveRemoteAddress();
97 2
            if ($this->remoteAddress === null) {
98
                throw DisconnectException::lostRemoteConnection($this->socket);
99 2
            }
100
        }
101
102
        return $this->remoteAddress;
103
    }
104
105
    /** {@inheritdoc} */
106
    protected function canReachFrame()
107
    {
108
        return $this->readAttempts > 0 && $this->isConnected();
109 1
    }
110
111 1
    /**
112 1
     * Read OOB data from socket
113 1
     *
114 1
     * @param FramePickerInterface $picker
115
     *
116
     * @return string
117 1
     */
118
    private function readOobData(FramePickerInterface $picker)
119
    {
120
        $data = stream_socket_recvfrom(
121
            $this->socket->getStreamResource(),
122
            self::SOCKET_BUFFER_SIZE,
123
            STREAM_OOB
124
        );
125
126
        return $picker->pickUpData($data, $this->getRemoteAddress());
127 25
    }
128
129
    /**
130 25
     * Read regular data
131
     *
132 25
     * @param FramePickerInterface $picker Picker to read data into
133 25
     *
134
     * @return string
135
     */
136
    private function readRegularData(FramePickerInterface $picker)
137
    {
138 25
        // work-around https://bugs.php.net/bug.php?id=52602
139 25
        $resource         = $this->socket->getStreamResource();
140 1
        $readContext      = [
141 1
            'countCycles'       => 0,
142 1
            'dataBeforeIo'      => $this->getDataInSocket(),
143
            'isStreamDataEmpty' => false,
144
        ];
145
146 24
        do {
147 24
            $data = fread($resource, self::SOCKET_BUFFER_SIZE);
148
            if ($data === false) {
149 24
                throw new RecvDataException(
150 24
                    $this->socket,
151 24
                    trim('Failed to read data. ' . $this->getLastPhpErrorMessage())
152 24
                );
153
            }
154 24
155
            $isDataEmpty = $data === '';
156
            $result      = $picker->pickUpData($data, $this->getRemoteAddress());
157
158
            $readContext['countCycles']      += 1;
159
            $readContext['isStreamDataEmpty'] = $this->isReadDataActuallyEmpty($data);
160
            $this->readAttempts               = $this->resolveReadAttempts($readContext, $this->readAttempts);
161
        } while (!$picker->isEof() && !$isDataEmpty);
162 25
163
        return $result;
164 25
    }
165
166
    /**
167
     * Return first byte from socket buffer
168
     *
169
     * @return string
170
     */
171
    private function getDataInSocket()
172
    {
173
        return stream_socket_recvfrom($this->socket->getStreamResource(), 1, STREAM_PEEK);
174 24
    }
175
176 24
    /**
177 24
     * Checks whether data read from stream buffer can be filled later
178 5
     *
179 5
     * @param string $data Read data
180
     *
181
     * @return bool
182 24
     */
183
    private function isReadDataActuallyEmpty($data)
184
    {
185
        $result = false;
186
        if ($data === '') {
187
            $dataInSocket = $this->getDataInSocket();
188
            $result       = $dataInSocket === '' || $dataInSocket === false;
189
        }
190
191
        return $result;
192
    }
193 24
194
    /**
195 24
     * Calculate attempts value
196 24
     *
197 20
     * @param array $context Read context
198 24
     * @param int   $currentAttempts Current attempts counter
199
     *
200
     * @return int
201
     */
202
    private function resolveReadAttempts(array $context, $currentAttempts)
203
    {
204
        return ($context['countCycles'] === 1 && empty($context['dataBeforeIo'])) ||
205
               ($context['countCycles'] > 1   && $context['isStreamDataEmpty']) ?
206
            $currentAttempts - 1 :
207
            self::READ_ATTEMPTS;
208
209
    }
210 1
211
    /**
212 1
     * Write out-of-band data
213 1
     *
214 1
     * @param resource $socket Socket resource
215 1
     * @param string   $data Data to write
216 1
     *
217
     * @return int Amount of written bytes
218
     */
219
    private function writeOobData($socket, $data)
220
    {
221
        $result     = 0;
222
        $dataLength = strlen($data);
223 1
        for ($i = 0; $i < $dataLength; $i++) {
224
            $written = stream_socket_sendto($socket, $data[$i], STREAM_OOB);
225
            $this->verifySendResult($data[$i], $written);
226
227 1
            if ($written === 0) {
228
                break;
229
            }
230 1
231
            $result += $written;
232
        }
233
234
        return $result;
235
    }
236
237
    /**
238 37
     * Return remote address if we connected or false otherwise
239
     *
240 37
     * @return string|null
241
     */
242 37
    private function resolveRemoteAddress()
243
    {
244
        $result = stream_socket_get_name($this->socket->getStreamResource(), true);
245
246
        return $result !== false ? $result : null;
247
    }
248
}
249