Completed
Push — master ( a425b0...5acc2f )
by Sergey
03:50
created

StreamIO::tuneConnection()   A

Complexity

Conditions 2
Paths 2

Size

Total Lines 10
Code Lines 6

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 7
CRAP Score 2

Importance

Changes 0
Metric Value
dl 0
loc 10
ccs 7
cts 7
cp 1
rs 9.4285
c 0
b 0
f 0
cc 2
eloc 6
nc 2
nop 1
crap 2
1
<?php
2
3
namespace ButterAMQP\IO;
4
5
use ButterAMQP\Exception\IOClosedException;
6
use ButterAMQP\Exception\IOException;
7
use ButterAMQP\IOInterface;
8
9
class StreamIO implements IOInterface
10
{
11
    const DEFAULT_CONNECTION_TIMEOUT = 30;
12
    const DEFAULT_READING_TIMEOUT = 1;
13
14
    /**
15
     * @var resource|null
16
     */
17
    private $stream;
18
19
    /**
20
     * @var string
21
     */
22
    private $buffer;
23
24
    /**
25
     * {@inheritdoc}
26
     */
27 25
    public function open($protocol, $host, $port, array $parameters = [])
28
    {
29 25
        if ($this->isOpen()) {
30
            return $this;
31
        }
32
33 25
        $this->buffer = '';
34
35 25
        $this->stream = $this->openConnection(
36 25
            $protocol,
37 25
            $host,
38 25
            $port,
39
            $parameters
40 25
        );
41
42 25
        $this->tuneConnection($parameters);
43
44 25
        return $this;
45
    }
46
47
    /**
48
     * @param string $protocol
49
     * @param string $host
50
     * @param int    $port
51
     * @param array  $parameters
52
     *
53
     * @return resource
54
     *
55
     * @throws IOException
56
     */
57 25
    private function openConnection($protocol, $host, $port, array $parameters = [])
58
    {
59 25
        $timeout = isset($parameters['connection_timeout']) ?
60 25
            $parameters['connection_timeout'] : self::DEFAULT_CONNECTION_TIMEOUT;
61
62 25
        $stream = @stream_socket_client(
63 25
            sprintf('%s://%s:%d', $protocol, $host, $port),
64 25
            $errno,
65 25
            $errstr,
66 25
            $timeout,
67 25
            STREAM_CLIENT_CONNECT,
68 25
            $this->createStreamContext($parameters)
69 25
        );
70
71 25
        if (!$stream) {
72
            throw new IOException(sprintf('An error occur while connecting to "%s:%d": %s', $host, $port, $errstr));
73
        }
74
75 25
        return $stream;
76
    }
77
78
    /**
79
     * @param array $parameters
80
     *
81
     * @return resource
82
     */
83 25
    private function createStreamContext(array $parameters)
84
    {
85
        static $options = [
86
            'certfile' => 'local_cert',
87
            'keyfile' => 'local_pk',
88
            'cacertfile' => 'cafile',
89
            'passphrase' => 'passphrase',
90
            'verify' => 'verify_peer',
91
            'allow_self_signed' => 'allow_self_signed',
92 25
        ];
93
94 25
        $context = stream_context_create();
95
96 25
        foreach ($parameters as $name => $value) {
97 7
            if (!isset($options[$name])) {
98 7
                continue;
99
            }
100
101 1
            stream_context_set_option($context, 'ssl', $options[$name], $value);
102 25
        }
103
104 25
        return $context;
105
    }
106
107
    /**
108
     * @param array $parameters
109
     */
110 25
    private function tuneConnection(array $parameters)
111
    {
112 25
        $timeout = isset($parameters['timeout']) ? $parameters['timeout'] : self::DEFAULT_READING_TIMEOUT;
113
114 25
        list($sec, $usec) = explode('|', number_format($timeout, 6, '|', ''));
115
116 25
        stream_set_timeout($this->stream, $sec, $usec);
117 25
        stream_set_read_buffer($this->stream, 0);
118 25
        stream_set_write_buffer($this->stream, 0);
119 25
    }
120
121
    /**
122
     * @return $this
123
     */
124 23
    public function close()
125
    {
126 23
        if (!$this->stream) {
127
            return $this;
128
        }
129
130 23
        fclose($this->stream);
131
132 23
        $this->stream = null;
133
134 23
        return $this;
135
    }
136
137
    /**
138
     * @param string   $data
139
     * @param int|null $length
140
     *
141
     * @return $this
142
     *
143
     * @throws IOException
144
     */
145 22
    public function write($data, $length = null)
146
    {
147 22
        if ($length === null) {
148 22
            $length = strlen($data);
149 22
        }
150
151 22
        while ($length > 0) {
152 22
            if (!$this->isOpen()) {
153 1
                throw new IOClosedException('Connection is closed');
154
            }
155
156 22
            $written = @fwrite($this->stream, $data, $length);
157 22
            if ($written === false) {
158
                throw new IOException('An error occur while writing to socket');
159
            }
160
161 22
            $length -= $written;
162 22
            $data = $length ? substr($data, $written, $length) : '';
163 22
        }
164
165 22
        return $this;
166
    }
167
168
    /**
169
     * {@inheritdoc}
170
     */
171 23
    public function peek($length, $blocking = true)
172
    {
173 23
        $received = strlen($this->buffer);
174
175 23
        if ($received >= $length) {
176 1
            return $this->buffer;
177
        }
178
179 23
        $this->recv($length - $received, $blocking);
180
181 22
        if (strlen($this->buffer) >= $length) {
182 22
            return $this->buffer;
183
        }
184
185 2
        return null;
186
    }
187
188
    /**
189
     * {@inheritdoc}
190
     */
191 22 View Code Duplication
    public function read($length, $blocking = true)
192
    {
193 22
        if ($this->peek($length, $blocking) === null) {
194 1
            return null;
195
        }
196
197 21
        $data = substr($this->buffer, 0, $length);
198
199 21
        $this->buffer = substr($this->buffer, $length, strlen($this->buffer) - $length);
200
201 21
        return $data;
202
    }
203
204
    /**
205
     * @param int  $length
206
     * @param bool $blocking
207
     *
208
     * @return string
209
     *
210
     * @throws IOException
211
     */
212 23
    private function recv($length, $blocking)
213
    {
214 23
        @stream_set_blocking($this->stream, $blocking);
0 ignored issues
show
Security Best Practice introduced by
It seems like you do not handle an error condition here. This can introduce security issues, and is generally not recommended.

If you suppress an error, we recommend checking for the error condition explicitly:

// For example instead of
@mkdir($dir);

// Better use
if (@mkdir($dir) === false) {
    throw new \RuntimeException('The directory '.$dir.' could not be created.');
}
Loading history...
215
216 23
        $pending = $length;
217
218
        do {
219 23
            if (!$this->isOpen()) {
220 1
                throw new IOClosedException('Connection is closed');
221
            }
222
223 23
            if (($received = fread($this->stream, $pending)) === false) {
224
                throw new IOException('An error occur while reading from the socket');
225
            }
226
227 23
            $pending -= strlen($received);
228 23
            $this->buffer .= $received;
229
230 23
            $meta = stream_get_meta_data($this->stream);
231
232 23
            if ($meta['timed_out']) {
233 2
                break;
234
            }
235 23
        } while ($pending > 0);
236 22
    }
237
238
    /**
239
     * @return bool
240
     */
241 25
    public function isOpen()
242
    {
243 25
        return is_resource($this->stream) && !feof($this->stream);
244
    }
245
}
246