1
|
|
|
<?php |
2
|
|
|
|
3
|
|
|
namespace ButterAMQP\IO; |
4
|
|
|
|
5
|
|
|
use ButterAMQP\Exception\IOClosedException; |
6
|
|
|
use ButterAMQP\Exception\IOException; |
7
|
|
|
use ButterAMQP\IOInterface; |
8
|
|
|
use ButterAMQP\Debug\ReadableBinaryData; |
9
|
|
|
use Psr\Log\LoggerAwareInterface; |
10
|
|
|
use Psr\Log\LoggerAwareTrait; |
11
|
|
|
use Psr\Log\NullLogger; |
12
|
|
|
|
13
|
|
|
class StreamIO implements IOInterface, LoggerAwareInterface |
14
|
|
|
{ |
15
|
|
|
use LoggerAwareTrait; |
16
|
|
|
|
17
|
|
|
/** |
18
|
|
|
* @var resource|null |
19
|
|
|
*/ |
20
|
|
|
private $stream; |
21
|
|
|
|
22
|
|
|
/** |
23
|
|
|
* @var string |
24
|
|
|
*/ |
25
|
|
|
private $buffer; |
26
|
|
|
|
27
|
|
|
/** |
28
|
|
|
* @var int |
29
|
|
|
*/ |
30
|
|
|
private $readAheadSize; |
31
|
|
|
|
32
|
|
|
/** |
33
|
|
|
* Initialize default logger. |
34
|
|
|
*/ |
35
|
25 |
|
public function __construct() |
36
|
|
|
{ |
37
|
25 |
|
$this->logger = new NullLogger(); |
38
|
25 |
|
} |
39
|
|
|
|
40
|
|
|
/** |
41
|
|
|
* {@inheritdoc} |
42
|
|
|
*/ |
43
|
25 |
|
public function open($protocol, $host, $port, array $parameters = []) |
44
|
|
|
{ |
45
|
25 |
|
if ($this->stream && $this->isOpen()) { |
46
|
|
|
return $this; |
47
|
|
|
} |
48
|
|
|
|
49
|
25 |
|
$context = $this->createStreamContext($parameters); |
50
|
|
|
|
51
|
25 |
|
$this->stream = @stream_socket_client( |
52
|
25 |
|
sprintf('%s://%s:%d', $protocol, $host, $port), |
53
|
25 |
|
$errno, |
54
|
25 |
|
$errstr, |
55
|
25 |
|
isset($parameters['connection_timeout']) ? $parameters['connection_timeout'] : 30, |
56
|
25 |
|
STREAM_CLIENT_CONNECT, |
57
|
|
|
$context |
58
|
25 |
|
); |
59
|
|
|
|
60
|
25 |
|
if (!$this->stream) { |
61
|
|
|
throw new IOException(sprintf( |
62
|
|
|
'Unable to connect to "%s:%d" using stream socket: %s', |
63
|
|
|
$host, |
64
|
|
|
$port, |
65
|
|
|
$errstr |
66
|
|
|
)); |
67
|
|
|
} |
68
|
|
|
|
69
|
25 |
|
$this->buffer = ''; |
70
|
|
|
|
71
|
25 |
|
if (isset($parameters['timeout'])) { |
72
|
7 |
|
list($sec, $usec) = explode('|', number_format($parameters['timeout'], 6, '|', '')); |
73
|
7 |
|
stream_set_timeout($this->stream, $sec, $usec); |
74
|
7 |
|
} |
75
|
|
|
|
76
|
25 |
|
if (isset($parameters['read_ahead'])) { |
77
|
|
|
$this->readAheadSize = $parameters['read_ahead']; |
78
|
|
|
} |
79
|
|
|
|
80
|
25 |
|
return $this; |
81
|
|
|
} |
82
|
|
|
|
83
|
|
|
/** |
84
|
|
|
* @param array $parameters |
85
|
|
|
* |
86
|
|
|
* @return resource |
87
|
|
|
*/ |
88
|
25 |
|
private function createStreamContext(array $parameters) |
89
|
|
|
{ |
90
|
25 |
|
$context = stream_context_create(); |
91
|
|
|
|
92
|
25 |
|
if (isset($parameters['certfile'])) { |
93
|
1 |
|
stream_context_set_option($context, 'ssl', 'local_cert', $parameters['certfile']); |
94
|
1 |
|
} |
95
|
|
|
|
96
|
25 |
|
if (isset($parameters['keyfile'])) { |
97
|
|
|
stream_context_set_option($context, 'ssl', 'local_pk', $parameters['keyfile']); |
98
|
|
|
} |
99
|
|
|
|
100
|
25 |
|
if (isset($parameters['cacertfile'])) { |
101
|
|
|
stream_context_set_option($context, 'ssl', 'cafile', $parameters['cacertfile']); |
102
|
|
|
} |
103
|
|
|
|
104
|
25 |
|
if (isset($parameters['passphrase'])) { |
105
|
|
|
stream_context_set_option($context, 'ssl', 'passphrase', $parameters['passphrase']); |
106
|
|
|
} |
107
|
|
|
|
108
|
25 |
|
if (isset($parameters['verify'])) { |
109
|
1 |
|
stream_context_set_option($context, 'ssl', 'verify_peer', (bool) $parameters['verify']); |
110
|
1 |
|
} |
111
|
|
|
|
112
|
25 |
|
if (isset($parameters['allow_self_signed'])) { |
113
|
1 |
|
stream_context_set_option($context, 'ssl', 'allow_self_signed', (bool) $parameters['allow_self_signed']); |
114
|
1 |
|
} |
115
|
|
|
|
116
|
25 |
|
return $context; |
117
|
|
|
} |
118
|
|
|
|
119
|
|
|
/** |
120
|
|
|
* @return $this |
121
|
|
|
*/ |
122
|
12 |
|
public function close() |
123
|
|
|
{ |
124
|
12 |
|
if (!$this->stream) { |
125
|
|
|
return $this; |
126
|
|
|
} |
127
|
|
|
|
128
|
12 |
|
fclose($this->stream); |
129
|
|
|
|
130
|
12 |
|
$this->stream = null; |
131
|
|
|
|
132
|
12 |
|
return $this; |
133
|
|
|
} |
134
|
|
|
|
135
|
|
|
/** |
136
|
|
|
* @param string $data |
137
|
|
|
* @param int|null $length |
138
|
|
|
* |
139
|
|
|
* @return $this |
140
|
|
|
* |
141
|
|
|
* @throws IOException |
142
|
|
|
*/ |
143
|
22 |
|
public function write($data, $length = null) |
144
|
|
|
{ |
145
|
22 |
|
if ($this->stream === null) { |
146
|
|
|
throw new IOClosedException('Connection is not open'); |
147
|
|
|
} |
148
|
|
|
|
149
|
22 |
|
if ($length === null) { |
150
|
22 |
|
$length = strlen($data); |
151
|
22 |
|
} |
152
|
|
|
|
153
|
22 |
|
$this->logger->debug(new ReadableBinaryData('Sending', $data)); |
154
|
|
|
|
155
|
22 |
|
while ($length > 0) { |
156
|
22 |
|
if ($this->isOpen()) { |
157
|
1 |
|
throw new IOClosedException('Connection is closed'); |
158
|
|
|
} |
159
|
|
|
|
160
|
22 |
|
$written = @fwrite($this->stream, $data, $length); |
161
|
22 |
|
if ($written === false) { |
162
|
|
|
throw new IOException('An error occur while writing to socket'); |
163
|
|
|
} |
164
|
|
|
|
165
|
22 |
|
$length -= $written; |
166
|
22 |
|
$data = $length ? substr($data, $written, $length) : ''; |
167
|
22 |
|
} |
168
|
|
|
|
169
|
22 |
|
return $this; |
170
|
|
|
} |
171
|
|
|
|
172
|
|
|
/** |
173
|
|
|
* {@inheritdoc} |
174
|
|
|
*/ |
175
|
23 |
|
public function peek($length, $blocking = true) |
176
|
|
|
{ |
177
|
23 |
|
$received = strlen($this->buffer); |
178
|
|
|
|
179
|
23 |
|
if ($received >= $length) { |
180
|
2 |
|
return $this->buffer; |
181
|
|
|
} |
182
|
|
|
|
183
|
23 |
|
$this->buffer .= $this->recv($length - $received, $blocking); |
184
|
|
|
|
185
|
23 |
|
if (strlen($this->buffer) >= $length) { |
186
|
22 |
|
return $this->buffer; |
187
|
|
|
} |
188
|
|
|
|
189
|
4 |
|
return null; |
190
|
|
|
} |
191
|
|
|
|
192
|
|
|
/** |
193
|
|
|
* {@inheritdoc} |
194
|
|
|
*/ |
195
|
22 |
View Code Duplication |
public function read($length, $blocking = true) |
|
|
|
|
196
|
|
|
{ |
197
|
22 |
|
if (!$this->peek($length, $blocking)) { |
|
|
|
|
198
|
3 |
|
return null; |
199
|
|
|
} |
200
|
|
|
|
201
|
21 |
|
$data = substr($this->buffer, 0, $length); |
202
|
|
|
|
203
|
21 |
|
$this->buffer = substr($this->buffer, $length, strlen($this->buffer) - $length); |
204
|
|
|
|
205
|
21 |
|
return $data; |
206
|
|
|
} |
207
|
|
|
|
208
|
|
|
/** |
209
|
|
|
* @param int $length |
210
|
|
|
* @param bool $blocking |
211
|
|
|
* |
212
|
|
|
* @return string |
213
|
|
|
* |
214
|
|
|
* @throws IOException |
215
|
|
|
*/ |
216
|
23 |
|
private function recv($length, $blocking) |
217
|
|
|
{ |
218
|
23 |
|
if ($this->stream === null) { |
219
|
|
|
throw new IOClosedException('Connection is not open'); |
220
|
|
|
} |
221
|
|
|
|
222
|
23 |
|
if ($this->isOpen()) { |
223
|
1 |
|
throw new IOClosedException('Connection is closed'); |
224
|
|
|
} |
225
|
|
|
|
226
|
23 |
|
if ($this->readAheadSize) { |
227
|
|
|
$meta = stream_get_meta_data($this->stream); |
228
|
|
|
|
229
|
|
|
if ($length < $meta['unread_bytes']) { |
230
|
|
|
$length = min($this->readAheadSize, $meta['unread_bytes']); |
231
|
|
|
} |
232
|
|
|
} |
233
|
|
|
|
234
|
23 |
|
stream_set_blocking($this->stream, $blocking); |
235
|
|
|
|
236
|
23 |
|
if (($received = fread($this->stream, $length)) === false) { |
237
|
|
|
throw new IOException('An error occur while reading from the socket'); |
238
|
|
|
} |
239
|
|
|
|
240
|
23 |
|
return $received; |
241
|
|
|
} |
242
|
|
|
|
243
|
|
|
/** |
244
|
|
|
* @return bool |
245
|
|
|
*/ |
246
|
25 |
|
public function isOpen() |
247
|
|
|
{ |
248
|
25 |
|
return is_resource($this->stream) && feof($this->stream); |
249
|
|
|
} |
250
|
|
|
} |
251
|
|
|
|
Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.
You can also find more detailed suggestions in the “Code” section of your repository.