StreamIO   A
last analyzed

Complexity

Total Complexity 24

Size/Duplication

Total Lines 195
Duplicated Lines 0 %

Coupling/Cohesion

Components 1
Dependencies 2

Test Coverage

Coverage 91.89%

Importance

Changes 1
Bugs 0 Features 0
Metric Value
wmc 24
lcom 1
cbo 2
dl 0
loc 195
ccs 68
cts 74
cp 0.9189
rs 10
c 1
b 0
f 0

8 Methods

Rating   Name   Duplication   Size   Complexity  
A openConnection() 0 20 3
A createStreamContext() 0 23 3
A tuneConnection() 0 13 2
A isOpen() 0 4 2
A open() 0 17 2
A close() 0 12 2
B write() 0 18 5
B read() 0 22 5
1
<?php
2
3
namespace ButterAMQP\IO;
4
5
use ButterAMQP\Exception\IOClosedException;
6
use ButterAMQP\Exception\IOException;
7
use ButterAMQP\IOInterface;
8
9
class StreamIO implements IOInterface
10
{
11
    const DEFAULT_CONNECTION_TIMEOUT = 30;
12
    const DEFAULT_READING_TIMEOUT = 1;
13
14
    /**
15
     * @var resource|null
16
     */
17
    private $stream;
18
19
    /**
20
     * @var int
21
     */
22
    private $timeoutSec;
23
24
    /**
25
     * @var int
26
     */
27
    private $timeoutUsec;
28
29
    /**
30
     * {@inheritdoc}
31
     */
32 23
    public function open($protocol, $host, $port, array $parameters = [])
33
    {
34 23
        if ($this->isOpen()) {
35
            return $this;
36
        }
37
38 23
        $this->stream = $this->openConnection(
39 23
            $protocol,
40 23
            $host,
41 23
            $port,
42
            $parameters
43 23
        );
44
45 23
        $this->tuneConnection($parameters);
46
47 23
        return $this;
48
    }
49
50
    /**
51
     * @param string $protocol
52
     * @param string $host
53
     * @param int    $port
54
     * @param array  $parameters
55
     *
56
     * @return resource
57
     *
58
     * @throws IOException
59
     */
60 23
    private function openConnection($protocol, $host, $port, array $parameters = [])
61
    {
62 23
        $timeout = isset($parameters['connection_timeout']) ?
63 23
            $parameters['connection_timeout'] : self::DEFAULT_CONNECTION_TIMEOUT;
64
65 23
        $stream = @stream_socket_client(
66 23
            sprintf('%s://%s:%d', $protocol, $host, $port),
67 23
            $errno,
68 23
            $errstr,
69 23
            $timeout,
70 23
            STREAM_CLIENT_CONNECT,
71 23
            $this->createStreamContext($parameters)
72 23
        );
73
74 23
        if (!$stream) {
75
            throw new IOException(sprintf('An error occur while connecting to "%s:%d": %s', $host, $port, $errstr));
76
        }
77
78 23
        return $stream;
79
    }
80
81
    /**
82
     * @param array $parameters
83
     *
84
     * @return resource
85
     */
86 23
    private function createStreamContext(array $parameters)
87
    {
88
        static $options = [
89
            'certfile' => 'local_cert',
90
            'keyfile' => 'local_pk',
91
            'cacertfile' => 'cafile',
92
            'passphrase' => 'passphrase',
93
            'verify' => 'verify_peer',
94
            'allow_self_signed' => 'allow_self_signed',
95 23
        ];
96
97 23
        $context = stream_context_create();
98
99 23
        foreach ($parameters as $name => $value) {
100 5
            if (!isset($options[$name])) {
101 5
                continue;
102
            }
103
104 1
            stream_context_set_option($context, 'ssl', $options[$name], $value);
105 23
        }
106
107 23
        return $context;
108
    }
109
110
    /**
111
     * @param array $parameters
112
     */
113 23
    private function tuneConnection(array $parameters)
114
    {
115 23
        $timeout = isset($parameters['timeout']) ? $parameters['timeout'] : self::DEFAULT_READING_TIMEOUT;
116
117 23
        list($sec, $usec) = explode('|', number_format($timeout, 6, '|', ''));
118
119 23
        $this->timeoutSec = $sec;
120 23
        $this->timeoutUsec = $usec;
121
122 23
        stream_set_timeout($this->stream, $this->timeoutSec, $this->timeoutUsec);
123 23
        stream_set_read_buffer($this->stream, 0);
124 23
        stream_set_write_buffer($this->stream, 0);
125 23
    }
126
127
    /**
128
     * @return $this
129
     */
130 22
    public function close()
131
    {
132 22
        if (!$this->stream) {
133
            return $this;
134
        }
135
136 22
        fclose($this->stream);
137
138 22
        $this->stream = null;
139
140 22
        return $this;
141
    }
142
143
    /**
144
     * @param string   $data
145
     * @param int|null $length
146
     *
147
     * @return $this
148
     *
149
     * @throws IOException
150
     */
151 21
    public function write($data, $length = null)
152
    {
153 21
        if ($length === null) {
154 21
            $length = strlen($data);
155 21
        }
156
157 21
        while ($length > 0) {
158 21
            $written = @fwrite($this->stream, $data, $length);
159 21
            if (!$written) {
160
                throw new IOException('An error occur while writing to socket');
161
            }
162
163 21
            $length -= $written;
164 21
            $data = $length ? substr($data, $written, $length) : '';
165 21
        }
166
167 21
        return $this;
168
    }
169
170
    /**
171
     * {@inheritdoc}
172
     */
173 22
    public function read($length, $blocking = true)
174
    {
175 22
        stream_set_blocking($this->stream, $blocking);
176
177 22
        if (!$this->isOpen()) {
178 1
            throw new IOClosedException('Socket is closed or was not open');
179
        }
180
181 22
        $r = [$this->stream];
182 22
        $w = null;
183 22
        $e = null;
184
185 22
        if ($blocking && @stream_select($r, $w, $e, $this->timeoutSec, $this->timeoutUsec) === false) {
186
            throw new IOException('An error occur while selecting stream');
187
        }
188
189 22
        if (($received = fread($this->stream, $length)) === false) {
190
            throw new IOException('An error occur while reading from the socket');
191
        }
192
193 22
        return $received;
194
    }
195
196
    /**
197
     * @return bool
198
     */
199 23
    public function isOpen()
200
    {
201 23
        return is_resource($this->stream) && !feof($this->stream);
202
    }
203
}
204