Completed
Push — master ( b2e4f3...d16f6a )
by Sergey
08:59
created

StreamIO::read()   B

Complexity

Conditions 5
Paths 4

Size

Total Lines 22
Code Lines 12

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 10
CRAP Score 5.1158

Importance

Changes 0
Metric Value
dl 0
loc 22
ccs 10
cts 12
cp 0.8333
rs 8.6737
c 0
b 0
f 0
cc 5
eloc 12
nc 4
nop 2
crap 5.1158
1
<?php
2
3
namespace ButterAMQP\IO;
4
5
use ButterAMQP\Exception\IOClosedException;
6
use ButterAMQP\Exception\IOException;
7
use ButterAMQP\IOInterface;
8
9
class StreamIO implements IOInterface
10
{
11
    const DEFAULT_CONNECTION_TIMEOUT = 30;
12
    const DEFAULT_READING_TIMEOUT = 1;
13
14
    /**
15
     * @var resource|null
16
     */
17
    private $stream;
18
19
    /**
20
     * @var int
21
     */
22
    private $timeoutSec;
23
24
    /**
25
     * @var int
26
     */
27
    private $timeoutUsec;
28
29
    /**
30
     * {@inheritdoc}
31
     */
32 24
    public function open($protocol, $host, $port, array $parameters = [])
33
    {
34 24
        if ($this->isOpen()) {
35
            return $this;
36
        }
37
38 24
        $this->stream = $this->openConnection(
39 24
            $protocol,
40 24
            $host,
41 24
            $port,
42
            $parameters
43 24
        );
44
45 24
        $this->tuneConnection($parameters);
46
47 24
        return $this;
48
    }
49
50
    /**
51
     * @param string $protocol
52
     * @param string $host
53
     * @param int    $port
54
     * @param array  $parameters
55
     *
56
     * @return resource
57
     *
58
     * @throws IOException
59
     */
60 24
    private function openConnection($protocol, $host, $port, array $parameters = [])
61
    {
62 24
        $timeout = isset($parameters['connection_timeout']) ?
63 24
            $parameters['connection_timeout'] : self::DEFAULT_CONNECTION_TIMEOUT;
64
65 24
        $stream = @stream_socket_client(
66 24
            sprintf('%s://%s:%d', $protocol, $host, $port),
67 24
            $errno,
68 24
            $errstr,
69 24
            $timeout,
70 24
            STREAM_CLIENT_CONNECT,
71 24
            $this->createStreamContext($parameters)
72 24
        );
73
74 24
        if (!$stream) {
75
            throw new IOException(sprintf('An error occur while connecting to "%s:%d": %s', $host, $port, $errstr));
76
        }
77
78 24
        return $stream;
79
    }
80
81
    /**
82
     * @param array $parameters
83
     *
84
     * @return resource
85
     */
86 24
    private function createStreamContext(array $parameters)
87
    {
88
        static $options = [
89
            'certfile' => 'local_cert',
90
            'keyfile' => 'local_pk',
91
            'cacertfile' => 'cafile',
92
            'passphrase' => 'passphrase',
93
            'verify' => 'verify_peer',
94
            'allow_self_signed' => 'allow_self_signed',
95 24
        ];
96
97 24
        $context = stream_context_create();
98
99 24
        foreach ($parameters as $name => $value) {
100 6
            if (!isset($options[$name])) {
101 6
                continue;
102
            }
103
104 1
            stream_context_set_option($context, 'ssl', $options[$name], $value);
105 24
        }
106
107 24
        return $context;
108
    }
109
110
    /**
111
     * @param array $parameters
112
     */
113 24
    private function tuneConnection(array $parameters)
114
    {
115 24
        $timeout = isset($parameters['timeout']) ? $parameters['timeout'] : self::DEFAULT_READING_TIMEOUT;
116
117 24
        list($sec, $usec) = explode('|', number_format($timeout, 6, '|', ''));
118
119 24
        $this->timeoutSec = $sec;
120 24
        $this->timeoutUsec = $usec;
121
122 24
        stream_set_timeout($this->stream, $this->timeoutSec, $this->timeoutUsec);
123 24
        stream_set_read_buffer($this->stream, 0);
124 24
        stream_set_write_buffer($this->stream, 0);
125 24
    }
126
127
    /**
128
     * @return $this
129
     */
130 22
    public function close()
131
    {
132 22
        if (!$this->stream) {
133
            return $this;
134
        }
135
136 22
        fclose($this->stream);
137
138 22
        $this->stream = null;
139
140 22
        return $this;
141
    }
142
143
    /**
144
     * @param string   $data
145
     * @param int|null $length
146
     *
147
     * @return $this
148
     *
149
     * @throws IOException
150
     */
151 22
    public function write($data, $length = null)
152
    {
153 22
        if ($length === null) {
154 22
            $length = strlen($data);
155 22
        }
156
157 22
        while ($length > 0) {
158 22
            if (!$this->isOpen()) {
159 1
                throw new IOClosedException('Connection is closed');
160
            }
161
162 22
            $written = @fwrite($this->stream, $data, $length);
163 22
            if ($written === false) {
164
                throw new IOException('An error occur while writing to socket');
165
            }
166
167 22
            $length -= $written;
168 22
            $data = $length ? substr($data, $written, $length) : '';
169 22
        }
170
171 22
        return $this;
172
    }
173
174
    /**
175
     * {@inheritdoc}
176
     */
177 22
    public function read($length, $blocking = true)
178
    {
179 22
        stream_set_blocking($this->stream, $blocking);
180
181 22
        if (!$this->isOpen()) {
182 1
            throw new IOClosedException('Socket is closed or was not open');
183
        }
184
185 22
        $r = [$this->stream];
186 22
        $w = null;
187 22
        $e = null;
188
189 22
        if ($blocking && @stream_select($r, $w, $e, $this->timeoutSec, $this->timeoutUsec) === false) {
190
            throw new IOException('An error occur while selecting stream');
191
        }
192
193 22
        if (($received = fread($this->stream, $length)) === false) {
194
            throw new IOException('An error occur while reading from the socket');
195
        }
196
197 22
        return $received;
198
    }
199
200
    /**
201
     * @return bool
202
     */
203 24
    public function isOpen()
204
    {
205 24
        return is_resource($this->stream) && !feof($this->stream);
206
    }
207
}
208