Completed
Branch 0.4-dev (d01bc8)
by Evgenij
02:48
created

StreamedClientIo::readRegularData()   A

Complexity

Conditions 3
Paths 1

Size

Total Lines 23
Code Lines 16

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 12
CRAP Score 3

Importance

Changes 1
Bugs 0 Features 0
Metric Value
c 1
b 0
f 0
dl 0
loc 23
ccs 12
cts 12
cp 1
rs 9.0856
cc 3
eloc 16
nc 1
nop 1
crap 3
1
<?php
2
/**
3
 * Async sockets
4
 *
5
 * @copyright Copyright (c) 2015-2016, Efimov Evgenij <[email protected]>
6
 *
7
 * This source file is subject to the MIT license that is bundled
8
 * with this source code in the file LICENSE.
9
 */
10
namespace AsyncSockets\Socket\Io;
11
12
use AsyncSockets\Frame\FramePickerInterface;
13
14
/**
15
 * Class StreamedClientIo
16
 */
17
class StreamedClientIo extends AbstractClientIo
18
{
19
    /**
20
     * Read attempts count
21
     */
22
    const READ_ATTEMPTS = 2;
23
24
    /**
25
     * Amount of read attempts
26
     *
27
     * @var int
28
     */
29
    private $readAttempts = self::READ_ATTEMPTS;
30
31
    /**
32
     * Remote socket address
33
     *
34
     * @var string
35
     */
36
    private $remoteAddress;
37
38
    /** {@inheritdoc} */
39 25
    protected function readRawDataIntoPicker(FramePickerInterface $picker, $isOutOfBand)
40
    {
41
        return $isOutOfBand ? $this->readOobData($picker) : $this->readRegularData($picker);
42 25
    }
43
44 25
    /**
45 25
     * Read OOB data from socket
46 25
     *
47 25
     * @param FramePickerInterface $picker
48
     *
49
     * @return string
50 25
     */
51 25
    private function readOobData(FramePickerInterface $picker)
52 24
    {
53 24
        $data = stream_socket_recvfrom(
54
            $this->socket->getStreamResource(),
55 24
            self::SOCKET_BUFFER_SIZE,
56 24
            STREAM_OOB
57 24
        );
58 24
59
        return $picker->pickUpData($data, $this->getRemoteAddress());
60 24
    }
61
62
    /**
63
     * Read regular data
64
     *
65
     * @param FramePickerInterface $picker Picker to read data into
66
     *
67
     * @return string
68
     */
69
    private function readRegularData(FramePickerInterface $picker)
70
    {
71 24
        // work-around https://bugs.php.net/bug.php?id=52602
72
        $resource         = $this->socket->getStreamResource();
73 24
        $readContext      = [
74 11
            'countCycles'       => 0,
75 24
            'dataBeforeIo'      => $this->getDataInSocket(),
76 24
            'isStreamDataEmpty' => false,
77
        ];
78
79
        do {
80
            $data = fread($resource, self::SOCKET_BUFFER_SIZE);
81 5
            $this->throwNetworkSocketExceptionIf($data === false, 'Failed to read data.', true);
82
            $isDataEmpty = $data === '';
83 5
            $result      = $picker->pickUpData($data, $this->getRemoteAddress());
84 5
85 5
            $readContext['countCycles']      += 1;
86
            $readContext['isStreamDataEmpty'] = $this->isReadDataActuallyEmpty($data);
87
            $this->readAttempts               = $this->resolveReadAttempts($readContext, $this->readAttempts);
88 4
        } while (!$picker->isEof() && !$isDataEmpty);
89 4
90
        return $result;
91 4
    }
92
93 3
    /**
94 2
     * Calculate attempts value
95 1
     *
96
     * @param array $context Read context
97 2
     * @param int   $currentAttempts Current attempts counter
98
     *
99
     * @return int
100
     */
101 34
    private function resolveReadAttempts(array $context, $currentAttempts)
102
    {
103 34
        return ($context['countCycles'] === 1 && empty($context['dataBeforeIo'])) ||
104
               ($context['countCycles'] > 1   && $context['isStreamDataEmpty']) ?
105
            $currentAttempts - 1 :
106
            self::READ_ATTEMPTS;
107 24
108
    }
109 24
110 24
    /** {@inheritdoc} */
111 24
    protected function writeRawData($data, $isOutOfBand)
112
    {
113 24
        $resource = $this->socket->getStreamResource();
114
        $test     = stream_socket_sendto($resource, '');
115
        $this->throwNetworkSocketExceptionIf($test !== 0, 'Failed to send data.', true);
116
117
        $written = $isOutOfBand ?
118
            $this->writeOobData($resource, $data) :
119
            fwrite($resource, $data, strlen($data));
120
121
        $this->throwNetworkSocketExceptionIf($written === false, 'Failed to send data.', true);
122
123 24
        if ($written === 0) {
124
            $this->throwExceptionIfNotConnected('Remote connection has been lost.');
125 24
        }
126 24
127 5
        return $written;
128 5
    }
129 5
130
    /** {@inheritdoc} */
131 24
    protected function isConnected()
132
    {
133
        return $this->resolveRemoteAddress() !== false;
134
    }
135 2
136
    /** {@inheritdoc} */
137 2
    protected function getRemoteAddress()
138
    {
139
        if ($this->remoteAddress === null) {
140
            $this->remoteAddress = $this->resolveRemoteAddress();
141
        }
142
143
        return $this->remoteAddress;
144
    }
145 25
146
    /**
147 25
     * Checks whether data read from stream buffer can be filled later
148
     *
149
     * @param string $data Read data
150
     *
151
     * @return bool
152
     */
153
    private function isReadDataActuallyEmpty($data)
154
    {
155 34
        $result = false;
156
        if ($data === '') {
157 34
            $dataInSocket = $this->getDataInSocket();
158
            $result       = $dataInSocket === '' || $dataInSocket === false;
159
        }
160
161
        return $result;
162
    }
163
164
    /** {@inheritdoc} */
165
    protected function canReachFrame()
166
    {
167
        return $this->readAttempts > 0 && $this->isConnected();
168 1
    }
169
170 1
    /**
171 1
     * Return first byte from socket buffer
172 1
     *
173 1
     * @return string
174 1
     */
175 1
    private function getDataInSocket()
176
    {
177
        return stream_socket_recvfrom($this->socket->getStreamResource(), 1, STREAM_PEEK);
178
    }
179 1
180 1
    /**
181
     * Return remote address if we connected or false otherwise
182 1
     *
183
     * @return string|bool
184
     */
185
    private function resolveRemoteAddress()
186
    {
187
        return stream_socket_get_name($this->socket->getStreamResource(), true);
188
    }
189
190
    /**
191
     * Write out-of-band data
192
     *
193
     * @param resource $socket Socket resource
194
     * @param string   $data Data to write
195
     *
196
     * @return int Amount of written bytes
197
     */
198
    private function writeOobData($socket, $data)
199
    {
200
        $result     = 0;
201
        $dataLength = strlen($data);
202
        for ($i = 0; $i < $dataLength; $i++) {
203
            $written = stream_socket_sendto($socket, $data[$i], STREAM_OOB);
204
            $this->throwNetworkSocketExceptionIf($written < 0, 'Failed to send data.', true);
205
            if ($written === 0) {
206
                break;
207
            }
208
209
            $result += $written;
210
        }
211
212
        return $result;
213
    }
214
}
215