Completed
Push — master ( dc4ce9...cc0810 )
by Sergey
05:48
created

StreamIO   A

Complexity

Total Complexity 28

Size/Duplication

Total Lines 214
Duplicated Lines 5.61 %

Coupling/Cohesion

Components 1
Dependencies 2

Test Coverage

Coverage 93.67%

Importance

Changes 0
Metric Value
wmc 28
lcom 1
cbo 2
dl 12
loc 214
ccs 74
cts 79
cp 0.9367
rs 10
c 0
b 0
f 0

10 Methods

Rating   Name   Duplication   Size   Complexity  
B open() 0 25 3
A setReadingTimeout() 0 5 1
A setReadAheadSize() 0 4 1
A createStreamContext() 0 23 3
A close() 0 12 2
B write() 0 22 6
A peek() 0 16 3
A read() 12 12 2
B recv() 0 22 5
A isOpen() 0 4 2

How to fix   Duplicated Code   

Duplicated Code

Duplicate code is one of the most pungent code smells. A rule that is often used is to re-structure code once it is duplicated in three or more places.

Common duplication problems, and corresponding solutions are:

1
<?php
2
3
namespace ButterAMQP\IO;
4
5
use ButterAMQP\Exception\IOClosedException;
6
use ButterAMQP\Exception\IOException;
7
use ButterAMQP\IOInterface;
8
9
class StreamIO implements IOInterface
10
{
11
    /**
12
     * @var resource|null
13
     */
14
    private $stream;
15
16
    /**
17
     * @var string
18
     */
19
    private $buffer;
20
21
    /**
22
     * @var int
23
     */
24
    private $readAheadSize;
25
26
    /**
27
     * {@inheritdoc}
28
     */
29 25
    public function open($protocol, $host, $port, array $parameters = [])
30
    {
31 25
        if ($this->isOpen()) {
32
            return $this;
33
        }
34
35 25
        $parameters = array_merge(['connection_timeout' => 30, 'read_ahead' => 13000, 'timeout' => 1], $parameters);
36
37 25
        $remote = sprintf('%s://%s:%d', $protocol, $host, $port);
38 25
        $timeout = $parameters['connection_timeout'];
39 25
        $context = $this->createStreamContext($parameters);
40
41 25
        $this->stream = @stream_socket_client($remote, $errno, $errstr, $timeout, STREAM_CLIENT_CONNECT, $context);
42
43 25
        if (!$this->stream) {
44
            throw new IOException(sprintf('An error occur while connecting to "%s:%d": %s', $host, $port, $errstr));
45
        }
46
47 25
        $this->buffer = '';
48
49 25
        $this->setReadingTimeout($parameters['timeout']);
50 25
        $this->setReadAheadSize($parameters['read_ahead']);
51
52 25
        return $this;
53
    }
54
55
    /**
56
     * @param float $timeout
57
     */
58 25
    private function setReadingTimeout($timeout)
59
    {
60 25
        list($sec, $usec) = explode('|', number_format($timeout, 6, '|', ''));
61 25
        stream_set_timeout($this->stream, $sec, $usec);
62 25
    }
63
64
    /**
65
     * @param int $size
66
     */
67 25
    private function setReadAheadSize($size)
68
    {
69 25
        $this->readAheadSize = $size;
70 25
    }
71
72
    /**
73
     * @param array $parameters
74
     *
75
     * @return resource
76
     */
77 25
    private function createStreamContext(array $parameters)
78
    {
79
        static $options = [
80
            'certfile' => 'local_cert',
81
            'keyfile' => 'local_pk',
82
            'cacertfile' => 'cafile',
83
            'passphrase' => 'passphrase',
84
            'verify' => 'verify_peer',
85
            'allow_self_signed' => 'allow_self_signed',
86 25
        ];
87
88 25
        $context = stream_context_create();
89
90 25
        foreach ($parameters as $name => $value) {
91 25
            if (!isset($options[$name])) {
92 25
                continue;
93
            }
94
95 1
            stream_context_set_option($context, 'ssl', $options[$name], $value);
96 25
        }
97
98 25
        return $context;
99
    }
100
101
    /**
102
     * @return $this
103
     */
104 23
    public function close()
105
    {
106 23
        if (!$this->stream) {
107
            return $this;
108
        }
109
110 23
        fclose($this->stream);
111
112 23
        $this->stream = null;
113
114 23
        return $this;
115
    }
116
117
    /**
118
     * @param string   $data
119
     * @param int|null $length
120
     *
121
     * @return $this
122
     *
123
     * @throws IOException
124
     */
125 22
    public function write($data, $length = null)
126
    {
127 22
        if ($length === null) {
128 22
            $length = strlen($data);
129 22
        }
130
131 22
        while ($length > 0) {
132 22
            if (!$this->isOpen()) {
133 1
                throw new IOClosedException('Connection is closed');
134
            }
135
136 22
            $written = @fwrite($this->stream, $data, $length);
137 22
            if ($written === false) {
138
                throw new IOException('An error occur while writing to socket');
139
            }
140
141 22
            $length -= $written;
142 22
            $data = $length ? substr($data, $written, $length) : '';
143 22
        }
144
145 22
        return $this;
146
    }
147
148
    /**
149
     * {@inheritdoc}
150
     */
151 23
    public function peek($length, $blocking = true)
152
    {
153 23
        $received = strlen($this->buffer);
154
155 23
        if ($received >= $length) {
156 12
            return $this->buffer;
157
        }
158
159 23
        $this->buffer .= $this->recv($length - $received, $blocking);
160
161 23
        if (strlen($this->buffer) >= $length) {
162 22
            return $this->buffer;
163
        }
164
165 4
        return null;
166
    }
167
168
    /**
169
     * {@inheritdoc}
170
     */
171 22 View Code Duplication
    public function read($length, $blocking = true)
172
    {
173 22
        if ($this->peek($length, $blocking) === null) {
174 3
            return null;
175
        }
176
177 21
        $data = substr($this->buffer, 0, $length);
178
179 21
        $this->buffer = substr($this->buffer, $length, strlen($this->buffer) - $length);
180
181 21
        return $data;
182
    }
183
184
    /**
185
     * @param int  $length
186
     * @param bool $blocking
187
     *
188
     * @return string
189
     *
190
     * @throws IOException
191
     */
192 23
    private function recv($length, $blocking)
193
    {
194 23
        if (!$this->isOpen()) {
195 1
            throw new IOClosedException('Connection is closed');
196
        }
197
198 23
        if ($this->readAheadSize) {
199 23
            $meta = stream_get_meta_data($this->stream);
200
201 23
            if ($length < $meta['unread_bytes']) {
202 11
                $length = min($this->readAheadSize, $meta['unread_bytes']);
203 11
            }
204 23
        }
205
206 23
        stream_set_blocking($this->stream, $blocking);
207
208 23
        if (($received = fread($this->stream, $length)) === false) {
209
            throw new IOException('An error occur while reading from the socket');
210
        }
211
212 23
        return $received;
213
    }
214
215
    /**
216
     * @return bool
217
     */
218 25
    public function isOpen()
219
    {
220 25
        return is_resource($this->stream) && !feof($this->stream);
221
    }
222
}
223