Completed
Push — master ( e17409...9d6c1e )
by Sergey
04:06
created

StreamIO::read()   A

Complexity

Conditions 2
Paths 2

Size

Total Lines 12
Code Lines 6

Duplication

Lines 12
Ratio 100 %

Code Coverage

Tests 6
CRAP Score 2

Importance

Changes 0
Metric Value
dl 12
loc 12
ccs 6
cts 6
cp 1
rs 9.4285
c 0
b 0
f 0
cc 2
eloc 6
nc 2
nop 2
crap 2
1
<?php
2
3
namespace ButterAMQP\IO;
4
5
use ButterAMQP\Exception\IOClosedException;
6
use ButterAMQP\Exception\IOException;
7
use ButterAMQP\IOInterface;
8
use ButterAMQP\Debug\ReadableBinaryData;
9
use Psr\Log\LoggerAwareInterface;
10
use Psr\Log\LoggerAwareTrait;
11
use Psr\Log\NullLogger;
12
13
class StreamIO implements IOInterface, LoggerAwareInterface
14
{
15
    use LoggerAwareTrait;
16
17
    /**
18
     * @var resource|null
19
     */
20
    private $stream;
21
22
    /**
23
     * @var string
24
     */
25
    private $buffer;
26
27
    /**
28
     * @var int
29
     */
30
    private $readAheadSize;
31
32
    /**
33
     * Initialize default logger.
34
     */
35 25
    public function __construct()
36
    {
37 25
        $this->logger = new NullLogger();
38 25
    }
39
40
    /**
41
     * {@inheritdoc}
42
     */
43 25
    public function open($protocol, $host, $port, array $parameters = [])
44
    {
45 25
        if ($this->stream && $this->isOpen()) {
46
            return $this;
47
        }
48
49 25
        $context = $this->createStreamContext($parameters);
50
51 25
        $this->stream = @stream_socket_client(
52 25
            sprintf('%s://%s:%d', $protocol, $host, $port),
53 25
            $errno,
54 25
            $errstr,
55 25
            isset($parameters['connection_timeout']) ? $parameters['connection_timeout'] : 30,
56 25
            STREAM_CLIENT_CONNECT,
57
            $context
58 25
        );
59
60 25
        if (!$this->stream) {
61
            throw new IOException(sprintf(
62
                'Unable to connect to "%s:%d" using stream socket: %s',
63
                $host,
64
                $port,
65
                $errstr
66
            ));
67
        }
68
69 25
        $this->buffer = '';
70
71 25
        if (isset($parameters['timeout'])) {
72 7
            list($sec, $usec) = explode('|', number_format($parameters['timeout'], 6, '|', ''));
73 7
            stream_set_timeout($this->stream, $sec, $usec);
74 7
        }
75
76 25
        if (isset($parameters['read_ahead'])) {
77
            $this->readAheadSize = $parameters['read_ahead'];
78
        }
79
80 25
        return $this;
81
    }
82
83
    /**
84
     * @param array $parameters
85
     *
86
     * @return resource
87
     */
88 25
    private function createStreamContext(array $parameters)
89
    {
90 25
        $context = stream_context_create();
91
92 25
        if (isset($parameters['certfile'])) {
93 1
            stream_context_set_option($context, 'ssl', 'local_cert', $parameters['certfile']);
94 1
        }
95
96 25
        if (isset($parameters['keyfile'])) {
97
            stream_context_set_option($context, 'ssl', 'local_pk', $parameters['keyfile']);
98
        }
99
100 25
        if (isset($parameters['cacertfile'])) {
101
            stream_context_set_option($context, 'ssl', 'cafile', $parameters['cacertfile']);
102
        }
103
104 25
        if (isset($parameters['passphrase'])) {
105
            stream_context_set_option($context, 'ssl', 'passphrase', $parameters['passphrase']);
106
        }
107
108 25
        if (isset($parameters['verify'])) {
109 1
            stream_context_set_option($context, 'ssl', 'verify_peer', (bool) $parameters['verify']);
110 1
        }
111
112 25
        if (isset($parameters['allow_self_signed'])) {
113 1
            stream_context_set_option($context, 'ssl', 'allow_self_signed', (bool) $parameters['allow_self_signed']);
114 1
        }
115
116 25
        return $context;
117
    }
118
119
    /**
120
     * @return $this
121
     */
122 12
    public function close()
123
    {
124 12
        if (!$this->stream) {
125
            return $this;
126
        }
127
128 12
        fclose($this->stream);
129
130 12
        $this->stream = null;
131
132 12
        return $this;
133
    }
134
135
    /**
136
     * @param string   $data
137
     * @param int|null $length
138
     *
139
     * @return $this
140
     *
141
     * @throws IOException
142
     */
143 22
    public function write($data, $length = null)
144
    {
145 22
        if ($this->stream === null) {
146
            throw new IOClosedException('Connection is not open');
147
        }
148
149 22
        if ($length === null) {
150 22
            $length = strlen($data);
151 22
        }
152
153 22
        $this->logger->debug(new ReadableBinaryData('Sending', $data));
154
155 22
        while ($length > 0) {
156 22
            if ($this->isOpen()) {
157 1
                throw new IOClosedException('Connection is closed');
158
            }
159
160 22
            $written = @fwrite($this->stream, $data, $length);
161 22
            if ($written === false) {
162
                throw new IOException('An error occur while writing to socket');
163
            }
164
165 22
            $length -= $written;
166 22
            $data = $length ? substr($data, $written, $length) : '';
167 22
        }
168
169 22
        return $this;
170
    }
171
172
    /**
173
     * {@inheritdoc}
174
     */
175 23
    public function peek($length, $blocking = true)
176
    {
177 23
        $received = strlen($this->buffer);
178
179 23
        if ($received >= $length) {
180 2
            return $this->buffer;
181
        }
182
183 23
        $this->buffer .= $this->recv($length - $received, $blocking);
184
185 23
        if (strlen($this->buffer) >= $length) {
186 22
            return $this->buffer;
187
        }
188
189 4
        return null;
190
    }
191
192
    /**
193
     * {@inheritdoc}
194
     */
195 22 View Code Duplication
    public function read($length, $blocking = true)
1 ignored issue
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
196
    {
197 22
        if (!$this->peek($length, $blocking)) {
0 ignored issues
show
Bug Best Practice introduced by
The expression $this->peek($length, $blocking) of type string|null is loosely compared to false; this is ambiguous if the string can be empty. You might want to explicitly use === null instead.

In PHP, under loose comparison (like ==, or !=, or switch conditions), values of different types might be equal.

For string values, the empty string '' is a special case, in particular the following results might be unexpected:

''   == false // true
''   == null  // true
'ab' == false // false
'ab' == null  // false

// It is often better to use strict comparison
'' === false // false
'' === null  // false
Loading history...
198 3
            return null;
199
        }
200
201 21
        $data = substr($this->buffer, 0, $length);
202
203 21
        $this->buffer = substr($this->buffer, $length, strlen($this->buffer) - $length);
204
205 21
        return $data;
206
    }
207
208
    /**
209
     * @param int  $length
210
     * @param bool $blocking
211
     *
212
     * @return string
213
     *
214
     * @throws IOException
215
     */
216 23
    private function recv($length, $blocking)
217
    {
218 23
        if ($this->stream === null) {
219
            throw new IOClosedException('Connection is not open');
220
        }
221
222 23
        if ($this->isOpen()) {
223 1
            throw new IOClosedException('Connection is closed');
224
        }
225
226 23
        if ($this->readAheadSize) {
227
            $meta = stream_get_meta_data($this->stream);
228
229
            if ($length < $meta['unread_bytes']) {
230
                $length = min($this->readAheadSize, $meta['unread_bytes']);
231
            }
232
        }
233
234 23
        stream_set_blocking($this->stream, $blocking);
235
236 23
        if (($received = fread($this->stream, $length)) === false) {
237
            throw new IOException('An error occur while reading from the socket');
238
        }
239
240 23
        return $received;
241
    }
242
243
    /**
244
     * @return bool
245
     */
246 25
    public function isOpen()
247
    {
248 25
        return is_resource($this->stream) && feof($this->stream);
249
    }
250
}
251