Passed
Pull Request — master (#34)
by Marc
05:44
created

Socket::write()   B

Complexity

Conditions 6
Paths 5

Size

Total Lines 28
Code Lines 18

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 7
CRAP Score 9.5384

Importance

Changes 0
Metric Value
dl 0
loc 28
rs 8.439
c 0
b 0
f 0
ccs 7
cts 13
cp 0.5385
cc 6
eloc 18
nc 5
nop 1
crap 9.5384
1
<?php
2
declare(strict_types = 1);
3
4
namespace TBolier\RethinkQL\Connection\Socket;
5
6
use Psr\Http\Message\StreamInterface;
7
use TBolier\RethinkQL\Connection\OptionsInterface;
8
use TBolier\RethinkQL\Connection\Socket\Exception;
9
10
class Socket implements StreamInterface
11
{
12
    /**
13
     * @var resource
14
     */
15
    private $stream;
16
17
    /**
18
     * @var int
19
     */
20
    private $tellPos = 0;
21
22
    /**
23
     * @var bool
24
     */
25
    private $nullTerminated = false;
26
27
    /**
28
     * @param OptionsInterface $options
29 16
     * @throws Exception
30
     */
31 16
    public function __construct(OptionsInterface $options)
32 16
    {
33 16
        $this->openStream(
34 16
            ($options->isSsl() ? 'ssl' : 'tcp').'://'.$options->getHostname().':'.$options->getPort(),
35
            $options->getTimeout(),
36 16
            $options->getTimeoutStream()
37
        );
38
    }
39
40
    /**
41
     * @param string $remote_socket
42
     * @param float $timeout
43 16
     * @param int $timeoutStream
44
     * @throws Exception
45 16
     */
46 16
    private function openStream(string $remote_socket, float $timeout, int $timeoutStream): void
47 16
    {
48 16
        $stream = stream_socket_client(
49 16
            $remote_socket,
50 16
            $errno,
51
            $errstr,
52
            $timeout,
53 16
            STREAM_CLIENT_CONNECT
54 16
        );
55
56
        if (!$stream) {
57
            throw new Exception('Failed to create a socket stream.');
58
        }
59
60
        $this->stream = $stream;
61
        stream_set_timeout($this->stream, $timeoutStream);
62
    }
63
64
    /**
65
     * @inheritdoc
66
     */
67
    public function __toString()
68
    {
69
        try {
70
            return $this->getContents();
71
        } catch (\Exception $e) {
72
            return '';
73
        }
74
    }
75
76
    /**
77
     * @inheritdoc
78
     */
79
    public function close()
80
    {
81
        fclose($this->stream);
82
        $this->stream = null;
83
    }
84
85
    /**
86
     * @inheritdoc
87
     */
88
    public function detach()
89
    {
90
        $this->close();
91
92
        return null;
93
    }
94
95
    /**
96
     * @inheritdoc
97
     */
98
    public function getSize()
99
    {
100
        return null;
101
    }
102
103
    /**
104
     * @inheritdoc
105
     */
106 16
    public function tell()
107
    {
108 16
        return $this->tellPos;
109
    }
110
111
    /**
112
     * @inheritdoc
113
     */
114
    public function eof()
115
    {
116
        return feof($this->stream);
117
    }
118
119
    /**
120
     * @inheritdoc
121
     */
122
    public function isSeekable()
123
    {
124
        return false;
125
    }
126
127
    /**
128
     * @inheritdoc
129
     */
130
    public function seek($offset, $whence = SEEK_SET)
131
    {
132
        throw new Exception('Cannot seek a socket stream');
133
    }
134
135
    /**
136
     * @inheritdoc
137
     */
138
    public function rewind()
139
    {
140 16
        $this->seek(0);
141
    }
142 16
143
    /**
144
     * Returns whether or not the stream is writable.
145
     *
146
     * @return bool
147
     */
148
    public function isWritable()
149
    {
150
        return $this->stream ? true : false;
151
    }
152 16
153
    /**
154 16
     * Write data to the stream.
155
     *
156
     * @param string $string The string that is to be written.
157
     * @return int Returns the number of bytes written to the stream.
158 16
     * @throws \RuntimeException on failure.
159 16
     */
160
    public function write($string)
161 16
    {
162 16
        if (!$this->isWritable()) {
163 16
            throw new \RuntimeException('The stream is not writable.');
164 16
        }
165
166
        $writeLength = \strlen($string);
167
        $this->tellPos = $writeLength;
168
169
        $bytesWritten = 0;
170
        while ($bytesWritten < $writeLength) {
171
            $result = fwrite($this->stream, substr($string, $bytesWritten));
172
            if ($result === false || $result === 0) {
173
                $this->detach();
174
                if ($this->getMetadata('timed_out')) {
175 16
                    throw new Exception(
176 16
                        'Timed out while writing to socket. Disconnected. '
177
                        . 'Call setTimeout(seconds) on the connection to change '
178
                        . 'the timeout.'
179 16
                    );
180
                }
181
                throw new Exception('Unable to write to socket. Disconnected.');
182
            }
183
            $bytesWritten += $result;
184
            $this->tellPos -= $bytesWritten;
185
        }
186
187
        return $bytesWritten;
188
    }
189
190
    /**
191
     * Returns whether or not the stream is readable.
192
     *
193
     * @return bool
194
     */
195 16
    public function isReadable()
196
    {
197 16
        return $this->stream ? true : false;
198 16
    }
199
200 16
    /**
201 16
     * @inheritdoc
202 16
     */
203
    public function read($length)
204
    {
205 16
        $this->tellPos = 0;
206
        $s = '';
207
208
        if ($length === -1) {
209
            while (true) {
210 16
                $char = $this->getContent(1);
211 16
212 16
                // skip initial null-terminated byte
213
                if ($s === '' && $char === \chr(0)) {
214
                    continue;
215 16
                }
216 16
217
                // reach a null-terminated byte, stop the stream.
218
                if ($char === '' || $char === \chr(0)) {
219 16
                    $this->nullTerminated = true;
220
                    break;
221
                }
222 16
223 16
                $s .= $char;
224 16
                $this->tellPos += $length - \strlen($s);
225
            }
226
227 16
            return $s;
228
        }
229
230
        while (\strlen($s) < $length) {
231
            $s .= $this->getContent($length - \strlen($s));
232
            $this->tellPos += $length - \strlen($s);
233
        }
234
235 16
        return $s;
236
    }
237 16
238 16
    /**
239
     * @param $length
240
     * @return string
241
     * @throws Exception
242
     */
243
    private function getContent($length): string
244
    {
245
        $string = stream_get_contents($this->stream, $length);
246
        if ($string === false || $this->eof()) {
247
            if ($this->getMetadata('timed_out')) {
248 16
                throw new Exception(
249
                    'Timed out while reading from socket. Disconnected. '
250
                    . 'Call setTimeout(seconds) on the connection to change '
251
                    . 'the timeout.'
252
                );
253
            }
254 16
        }
255
256 16
        return $string;
257 16
    }
258 16
259
    /**
260
     * @inheritdoc
261 16
     */
262
    public function getContents()
263 16
    {
264
        $result = '';
265
        while (!$this->eof() && !$this->nullTerminated) {
266
            $result .= $this->read(-1);
267
        }
268
269
        $this->nullTerminated = false;
270
271
        return $result;
272
    }
273
274
    /**
275
     * @inheritdoc
276
     */
277
    public function getMetadata($key = null)
278
    {
279
        $meta = stream_get_meta_data($this->stream);
280
281
        return $meta[$key] ?? $meta;
282
    }
283
}
284