Completed
Branch 0.4-dev (bb28f2)
by Evgenij
02:53
created

StreamedClientIo::canReachFrame()   A

Complexity

Conditions 2
Paths 2

Size

Total Lines 4
Code Lines 2

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 2
CRAP Score 2

Importance

Changes 3
Bugs 0 Features 0
Metric Value
c 3
b 0
f 0
dl 0
loc 4
ccs 2
cts 2
cp 1
rs 10
cc 2
eloc 2
nc 2
nop 0
crap 2
1
<?php
2
/**
3
 * Async sockets
4
 *
5
 * @copyright Copyright (c) 2015-2016, Efimov Evgenij <[email protected]>
6
 *
7
 * This source file is subject to the MIT license that is bundled
8
 * with this source code in the file LICENSE.
9
 */
10
namespace AsyncSockets\Socket\Io;
11
12
use AsyncSockets\Exception\DisconnectException;
13
use AsyncSockets\Exception\RecvDataException;
14
use AsyncSockets\Exception\SendDataException;
15
use AsyncSockets\Frame\FramePickerInterface;
16
17
/**
18
 * Class StreamedClientIo
19
 */
20
class StreamedClientIo extends AbstractClientIo
21
{
22
    /**
23
     * Read attempts count
24
     */
25
    const READ_ATTEMPTS = 2;
26
27
    /**
28
     * Amount of read attempts
29
     *
30
     * @var int
31
     */
32
    private $readAttempts = self::READ_ATTEMPTS;
33
34
    /**
35
     * Remote socket address
36
     *
37
     * @var string
38
     */
39
    private $remoteAddress;
40
41
    /** {@inheritdoc} */
42 26
    protected function readRawDataIntoPicker(FramePickerInterface $picker, $isOutOfBand)
43
    {
44 26
        return $isOutOfBand ? $this->readOobData($picker) : $this->readRegularData($picker);
45
    }
46
47
    /** {@inheritdoc} */
48 5
    protected function writeRawData($data, $isOutOfBand)
49
    {
50 5
        $resource = $this->socket->getStreamResource();
51 5
        $test     = stream_socket_sendto($resource, '');
52 5
        if ($test !== 0) {
53 1
            throw new SendDataException(
54 1
                $this->socket,
55 1
                trim('Failed to send data. ' . $this->getLastPhpErrorMessage())
56 1
            );
57
        }
58
59
        $written = $isOutOfBand ?
60 4
            $this->writeOobData($resource, $data) :
61 4
            fwrite($resource, $data, strlen($data));
62
63 4
        if ($written === false) {
64 1
            throw new SendDataException(
65 1
                $this->socket,
66 1
                trim('Failed to send data. ' . $this->getLastPhpErrorMessage())
67 1
            );
68
        }
69
70 3
        if ($written === 0 && !$this->isConnected()) {
71 1
            throw new DisconnectException($this->socket, 'Remote connection has been lost.');
72
        }
73
74 2
        return $written;
75
    }
76
77
    /** {@inheritdoc} */
78 37
    protected function isConnected()
79
    {
80 37
        return $this->resolveRemoteAddress() !== false;
81
    }
82
83
    /** {@inheritdoc} */
84 25
    protected function getRemoteAddress()
85
    {
86 25
        if ($this->remoteAddress === null) {
87 25
            $this->remoteAddress = $this->resolveRemoteAddress();
88 25
        }
89
90 25
        return $this->remoteAddress;
91
    }
92
93
    /** {@inheritdoc} */
94 2
    protected function canReachFrame()
95
    {
96 2
        return $this->readAttempts > 0 && $this->isConnected();
97
    }
98
99
    /**
100
     * Read OOB data from socket
101
     *
102
     * @param FramePickerInterface $picker
103
     *
104
     * @return string
105
     */
106 1
    private function readOobData(FramePickerInterface $picker)
107
    {
108 1
        $data = stream_socket_recvfrom(
109 1
            $this->socket->getStreamResource(),
110 1
            self::SOCKET_BUFFER_SIZE,
111
            STREAM_OOB
112 1
        );
113
114 1
        return $picker->pickUpData($data, $this->getRemoteAddress());
115
    }
116
117
    /**
118
     * Read regular data
119
     *
120
     * @param FramePickerInterface $picker Picker to read data into
121
     *
122
     * @return string
123
     */
124 25
    private function readRegularData(FramePickerInterface $picker)
125
    {
126
        // work-around https://bugs.php.net/bug.php?id=52602
127 25
        $resource         = $this->socket->getStreamResource();
128
        $readContext      = [
129 25
            'countCycles'       => 0,
130 25
            'dataBeforeIo'      => $this->getDataInSocket(),
131 25
            'isStreamDataEmpty' => false,
132 25
        ];
133
134
        do {
135 25
            $data = fread($resource, self::SOCKET_BUFFER_SIZE);
136 25
            if ($data === false) {
137 1
                throw new RecvDataException(
138 1
                    $this->socket,
139 1
                    trim('Failed to read data. ' . $this->getLastPhpErrorMessage())
140 1
                );
141
            }
142
143 24
            $isDataEmpty = $data === '';
144 24
            $result      = $picker->pickUpData($data, $this->getRemoteAddress());
145
146 24
            $readContext['countCycles']      += 1;
147 24
            $readContext['isStreamDataEmpty'] = $this->isReadDataActuallyEmpty($data);
148 24
            $this->readAttempts               = $this->resolveReadAttempts($readContext, $this->readAttempts);
149 24
        } while (!$picker->isEof() && !$isDataEmpty);
150
151 24
        return $result;
152
    }
153
154
    /**
155
     * Return first byte from socket buffer
156
     *
157
     * @return string
158
     */
159 25
    private function getDataInSocket()
160
    {
161 25
        return stream_socket_recvfrom($this->socket->getStreamResource(), 1, STREAM_PEEK);
162
    }
163
164
    /**
165
     * Checks whether data read from stream buffer can be filled later
166
     *
167
     * @param string $data Read data
168
     *
169
     * @return bool
170
     */
171 24
    private function isReadDataActuallyEmpty($data)
172
    {
173 24
        $result = false;
174 24
        if ($data === '') {
175 5
            $dataInSocket = $this->getDataInSocket();
176 5
            $result       = $dataInSocket === '' || $dataInSocket === false;
177 5
        }
178
179 24
        return $result;
180
    }
181
182
    /**
183
     * Calculate attempts value
184
     *
185
     * @param array $context Read context
186
     * @param int   $currentAttempts Current attempts counter
187
     *
188
     * @return int
189
     */
190 24
    private function resolveReadAttempts(array $context, $currentAttempts)
191
    {
192 24
        return ($context['countCycles'] === 1 && empty($context['dataBeforeIo'])) ||
193 11
               ($context['countCycles'] > 1   && $context['isStreamDataEmpty']) ?
194 24
            $currentAttempts - 1 :
195 24
            self::READ_ATTEMPTS;
196
197
    }
198
199
    /**
200
     * Write out-of-band data
201
     *
202
     * @param resource $socket Socket resource
203
     * @param string   $data Data to write
204
     *
205
     * @return int Amount of written bytes
206
     */
207 1
    private function writeOobData($socket, $data)
208
    {
209 1
        $result     = 0;
210 1
        $dataLength = strlen($data);
211 1
        for ($i = 0; $i < $dataLength; $i++) {
212 1
            $written = stream_socket_sendto($socket, $data[$i], STREAM_OOB);
213 1
            if ($written < 0) {
214
                throw new SendDataException(
215
                    $this->socket,
216
                    trim('Failed to send data. ' . $this->getLastPhpErrorMessage())
217
                );
218
            }
219
220 1
            if ($written === 0) {
221
                break;
222
            }
223
224 1
            $result += $written;
225 1
        }
226
227 1
        return $result;
228
    }
229
230
    /**
231
     * Return remote address if we connected or false otherwise
232
     *
233
     * @return string|bool
234
     */
235 37
    private function resolveRemoteAddress()
236
    {
237 37
        return stream_socket_get_name($this->socket->getStreamResource(), true);
238
    }
239
}
240