Passed
Push — master ( a7780f...563711 )
by Sergey
03:41
created

StreamIO   A

Complexity

Total Complexity 33

Size/Duplication

Total Lines 231
Duplicated Lines 5.19 %

Coupling/Cohesion

Components 1
Dependencies 6

Test Coverage

Coverage 81.52%

Importance

Changes 0
Metric Value
wmc 33
lcom 1
cbo 6
dl 12
loc 231
ccs 75
cts 92
cp 0.8152
rs 9.3999
c 0
b 0
f 0

9 Methods

Rating   Name   Duplication   Size   Complexity  
A __construct() 0 4 1
C open() 0 39 7
A createStreamContext() 0 23 3
A close() 0 12 2
C write() 0 28 7
A peek() 0 16 3
A read() 12 12 2
B recv() 0 26 6
A isOpen() 0 4 2

How to fix   Duplicated Code   

Duplicated Code

Duplicate code is one of the most pungent code smells. A rule that is often used is to re-structure code once it is duplicated in three or more places.

Common duplication problems, and corresponding solutions are:

1
<?php
2
3
namespace ButterAMQP\IO;
4
5
use ButterAMQP\Exception\IOClosedException;
6
use ButterAMQP\Exception\IOException;
7
use ButterAMQP\IOInterface;
8
use ButterAMQP\Debug\ReadableBinaryData;
9
use Psr\Log\LoggerAwareInterface;
10
use Psr\Log\LoggerAwareTrait;
11
use Psr\Log\NullLogger;
12
13
class StreamIO implements IOInterface, LoggerAwareInterface
14
{
15
    use LoggerAwareTrait;
16
17
    /**
18
     * @var resource|null
19
     */
20
    private $stream;
21
22
    /**
23
     * @var string
24
     */
25
    private $buffer;
26
27
    /**
28
     * @var int
29
     */
30
    private $readAheadSize;
31
32
    /**
33
     * Initialize default logger.
34
     */
35 25
    public function __construct()
36
    {
37 25
        $this->logger = new NullLogger();
38 25
    }
39
40
    /**
41
     * {@inheritdoc}
42
     */
43 25
    public function open($protocol, $host, $port, array $parameters = [])
44
    {
45 25
        if ($this->stream && $this->isOpen()) {
46
            return $this;
47
        }
48
49 25
        $context = $this->createStreamContext($parameters);
50
51 25
        $this->stream = @stream_socket_client(
52 25
            sprintf('%s://%s:%d', $protocol, $host, $port),
53 25
            $errno,
54 25
            $errstr,
55 25
            isset($parameters['connection_timeout']) ? $parameters['connection_timeout'] : 30,
56 25
            STREAM_CLIENT_CONNECT,
57
            $context
58 25
        );
59
60 25
        if (!$this->stream) {
61
            throw new IOException(sprintf(
62
                'Unable to connect to "%s:%d" using stream socket: %s',
63
                $host,
64
                $port,
65
                $errstr
66
            ));
67
        }
68
69 25
        $this->buffer = '';
70
71 25
        if (isset($parameters['timeout'])) {
72 7
            list($sec, $usec) = explode('|', number_format($parameters['timeout'], 6, '|', ''));
73 7
            stream_set_timeout($this->stream, $sec, $usec);
74 7
        }
75
76 25
        if (isset($parameters['read_ahead'])) {
77
            $this->readAheadSize = $parameters['read_ahead'];
78
        }
79
80 25
        return $this;
81
    }
82
83
    /**
84
     * @param array $parameters
85
     *
86
     * @return resource
87
     */
88 25
    private function createStreamContext(array $parameters)
89
    {
90
        static $options = [
91
            'certfile' => 'local_cert',
92
            'keyfile' => 'local_pk',
93
            'cacertfile' => 'cafile',
94
            'passphrase' => 'passphrase',
95
            'verify' => 'verify_peer',
96
            'allow_self_signed' => 'allow_self_signed',
97 25
        ];
98
99 25
        $context = stream_context_create();
100
101 25
        foreach ($parameters as $name => $value) {
102 7
            if (!isset($options[$name])) {
103 7
                continue;
104
            }
105
106 1
            stream_context_set_option($context, 'ssl', $options[$name], $value);
107 25
        }
108
109 25
        return $context;
110
    }
111
112
    /**
113
     * @return $this
114
     */
115 12
    public function close()
116
    {
117 12
        if (!$this->stream) {
118 1
            return $this;
119
        }
120
121 12
        fclose($this->stream);
122
123 12
        $this->stream = null;
124
125 12
        return $this;
126
    }
127
128
    /**
129
     * @param string   $data
130
     * @param int|null $length
131
     *
132
     * @return $this
133
     *
134
     * @throws IOException
135
     */
136 22
    public function write($data, $length = null)
137
    {
138 22
        if ($this->stream === null) {
139
            throw new IOClosedException('Connection is not open');
140
        }
141
142 22
        if ($length === null) {
143 22
            $length = strlen($data);
144 22
        }
145
146 22
        $this->logger->debug(new ReadableBinaryData('Sending', $data));
147
148 22
        while ($length > 0) {
149 22
            if ($this->isOpen()) {
150 1
                throw new IOClosedException('Connection is closed');
151
            }
152
153 22
            $written = @fwrite($this->stream, $data, $length);
154 22
            if ($written === false) {
155
                throw new IOException('An error occur while writing to socket');
156
            }
157
158 22
            $length -= $written;
159 22
            $data = $length ? substr($data, $written, $length) : '';
160 22
        }
161
162 22
        return $this;
163
    }
164
165
    /**
166
     * {@inheritdoc}
167
     */
168 23
    public function peek($length, $blocking = true)
169
    {
170 23
        $received = strlen($this->buffer);
171
172 23
        if ($received >= $length) {
173 2
            return $this->buffer;
174
        }
175
176 23
        $this->buffer .= $this->recv($length - $received, $blocking);
177
178 23
        if (strlen($this->buffer) >= $length) {
179 22
            return $this->buffer;
180
        }
181
182 4
        return null;
183
    }
184
185
    /**
186
     * {@inheritdoc}
187
     */
188 22 View Code Duplication
    public function read($length, $blocking = true)
189
    {
190 22
        if ($this->peek($length, $blocking) === null) {
191 3
            return null;
192
        }
193
194 21
        $data = substr($this->buffer, 0, $length);
195
196 21
        $this->buffer = substr($this->buffer, $length, strlen($this->buffer) - $length);
197
198 21
        return $data;
199
    }
200
201
    /**
202
     * @param int  $length
203
     * @param bool $blocking
204
     *
205
     * @return string
206
     *
207
     * @throws IOException
208
     */
209 23
    private function recv($length, $blocking)
210
    {
211 23
        if ($this->stream === null) {
212
            throw new IOClosedException('Connection is not open');
213
        }
214
215 23
        if ($this->isOpen()) {
216 1
            throw new IOClosedException('Connection is closed');
217
        }
218
219 23
        if ($this->readAheadSize) {
220
            $meta = stream_get_meta_data($this->stream);
221
222
            if ($length < $meta['unread_bytes']) {
223
                $length = min($this->readAheadSize, $meta['unread_bytes']);
224
            }
225
        }
226
227 23
        stream_set_blocking($this->stream, $blocking);
228
229 23
        if (($received = fread($this->stream, $length)) === false) {
230
            throw new IOException('An error occur while reading from the socket');
231
        }
232
233 23
        return $received;
234
    }
235
236
    /**
237
     * @return bool
238
     */
239 25
    public function isOpen()
240
    {
241 25
        return is_resource($this->stream) && feof($this->stream);
242
    }
243
}
244