Completed
Push — master ( 035469...dc4ce9 )
by Sergey
04:34
created

StreamIO::setReadAheadSize()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 4
Code Lines 2

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 3
CRAP Score 1

Importance

Changes 0
Metric Value
dl 0
loc 4
ccs 3
cts 3
cp 1
rs 10
c 0
b 0
f 0
cc 1
eloc 2
nc 1
nop 1
crap 1
1
<?php
2
3
namespace ButterAMQP\IO;
4
5
use ButterAMQP\Exception\IOClosedException;
6
use ButterAMQP\Exception\IOException;
7
use ButterAMQP\IOInterface;
8
9
class StreamIO implements IOInterface
10
{
11
    /**
12
     * @var resource|null
13
     */
14
    private $stream;
15
16
    /**
17
     * @var string
18
     */
19
    private $buffer;
20
21
    /**
22
     * @var int
23
     */
24
    private $readAheadSize;
25
26
    /**
27
     * {@inheritdoc}
28
     */
29 25
    public function open($protocol, $host, $port, array $parameters = [])
30
    {
31 25
        if ($this->isOpen()) {
32
            return $this;
33
        }
34
35 25
        $parameters = array_merge(['connection_timeout' => 30, 'read_ahead' => 13000, 'timeout' => 1], $parameters);
36
37 25
        $this->stream = @stream_socket_client(
38 25
            sprintf('%s://%s:%d', $protocol, $host, $port),
39 25
            $errno,
40 25
            $errstr,
41 25
            $parameters['connection_timeout'],
42 25
            STREAM_CLIENT_CONNECT,
43 25
            $this->createStreamContext($parameters)
44 25
        );
45
46 25
        if (!$this->stream) {
47
            throw new IOException(sprintf('An error occur while connecting to "%s:%d": %s', $host, $port, $errstr));
48
        }
49
50 25
        $this->buffer = '';
51
52 25
        $this->setReadingTimeout($parameters['timeout']);
53 25
        $this->setReadAheadSize($parameters['read_ahead']);
54
55 25
        return $this;
56
    }
57
58
    /**
59
     * @param float $timeout
60
     */
61 25
    private function setReadingTimeout($timeout)
62
    {
63 25
        list($sec, $usec) = explode('|', number_format($timeout, 6, '|', ''));
64 25
        stream_set_timeout($this->stream, $sec, $usec);
65 25
    }
66
67
    /**
68
     * @param int $size
69
     */
70 25
    private function setReadAheadSize($size)
71
    {
72 25
        $this->readAheadSize = $size;
73 25
    }
74
75
    /**
76
     * @param array $parameters
77
     *
78
     * @return resource
79
     */
80 25
    private function createStreamContext(array $parameters)
81
    {
82
        static $options = [
83
            'certfile' => 'local_cert',
84
            'keyfile' => 'local_pk',
85
            'cacertfile' => 'cafile',
86
            'passphrase' => 'passphrase',
87
            'verify' => 'verify_peer',
88
            'allow_self_signed' => 'allow_self_signed',
89 25
        ];
90
91 25
        $context = stream_context_create();
92
93 25
        foreach ($parameters as $name => $value) {
94 25
            if (!isset($options[$name])) {
95 25
                continue;
96
            }
97
98 1
            stream_context_set_option($context, 'ssl', $options[$name], $value);
99 25
        }
100
101 25
        return $context;
102
    }
103
104
    /**
105
     * @return $this
106
     */
107 23
    public function close()
108
    {
109 23
        if (!$this->stream) {
110
            return $this;
111
        }
112
113 23
        fclose($this->stream);
114
115 23
        $this->stream = null;
116
117 23
        return $this;
118
    }
119
120
    /**
121
     * @param string   $data
122
     * @param int|null $length
123
     *
124
     * @return $this
125
     *
126
     * @throws IOException
127
     */
128 22
    public function write($data, $length = null)
129
    {
130 22
        if ($length === null) {
131 22
            $length = strlen($data);
132 22
        }
133
134 22
        while ($length > 0) {
135 22
            if (!$this->isOpen()) {
136 1
                throw new IOClosedException('Connection is closed');
137
            }
138
139 22
            $written = @fwrite($this->stream, $data, $length);
140 22
            if ($written === false) {
141
                throw new IOException('An error occur while writing to socket');
142
            }
143
144 22
            $length -= $written;
145 22
            $data = $length ? substr($data, $written, $length) : '';
146 22
        }
147
148 22
        return $this;
149
    }
150
151
    /**
152
     * {@inheritdoc}
153
     */
154 23
    public function peek($length, $blocking = true)
155
    {
156 23
        $received = strlen($this->buffer);
157
158 23
        if ($received >= $length) {
159 12
            return $this->buffer;
160
        }
161
162 23
        $this->buffer .= $this->recv($length - $received, $blocking);
163
164 23
        if (strlen($this->buffer) >= $length) {
165 22
            return $this->buffer;
166
        }
167
168 4
        return null;
169
    }
170
171
    /**
172
     * {@inheritdoc}
173
     */
174 22 View Code Duplication
    public function read($length, $blocking = true)
175
    {
176 22
        if ($this->peek($length, $blocking) === null) {
177 3
            return null;
178
        }
179
180 21
        $data = substr($this->buffer, 0, $length);
181
182 21
        $this->buffer = substr($this->buffer, $length, strlen($this->buffer) - $length);
183
184 21
        return $data;
185
    }
186
187
    /**
188
     * @param int  $length
189
     * @param bool $blocking
190
     *
191
     * @return string
192
     *
193
     * @throws IOException
194
     */
195 23
    private function recv($length, $blocking)
196
    {
197 23
        if (!$this->isOpen()) {
198 1
            throw new IOClosedException('Connection is closed');
199
        }
200
201 23
        if ($this->readAheadSize) {
202 23
            $meta = stream_get_meta_data($this->stream);
203
204 23
            if ($length < $meta['unread_bytes']) {
205 11
                $length = min($this->readAheadSize, $meta['unread_bytes']);
206 11
            }
207 23
        }
208
209 23
        stream_set_blocking($this->stream, $blocking);
210
211 23
        if (($received = fread($this->stream, $length)) === false) {
212
            throw new IOException('An error occur while reading from the socket');
213
        }
214
215 23
        return $received;
216
    }
217
218
    /**
219
     * @return bool
220
     */
221 25
    public function isOpen()
222
    {
223 25
        return is_resource($this->stream) && !feof($this->stream);
224
    }
225
}
226