Completed
Push — master ( 2b1385...7c6a84 )
by Thomas
07:21
created

StreamHandler   C

Complexity

Total Complexity 64

Size/Duplication

Total Lines 399
Duplicated Lines 0 %

Coupling/Cohesion

Components 1
Dependencies 8

Importance

Changes 1
Bugs 0 Features 0
Metric Value
wmc 64
c 1
b 0
f 0
lcom 1
cbo 8
dl 0
loc 399
rs 5.8364

How to fix   Complexity   

Complex Class

Complex classes like StreamHandler often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes. You can also have a look at the cohesion graph to spot any un-connected, or weakly-connected components.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

While breaking up the class, it is a good idea to analyze how other classes use StreamHandler, and based on these observations, apply Extract Interface, too.

1
<?php
2
namespace GuzzleHttp\Ring\Client;
3
4
use GuzzleHttp\Ring\Core;
5
use GuzzleHttp\Ring\Exception\ConnectException;
6
use GuzzleHttp\Ring\Exception\RingException;
7
use GuzzleHttp\Ring\Future\CompletedFutureArray;
8
use GuzzleHttp\Stream\InflateStream;
9
use GuzzleHttp\Stream\StreamInterface;
10
use GuzzleHttp\Stream\Stream;
11
use GuzzleHttp\Stream\Utils;
12
13
/**
14
 * RingPHP client handler that uses PHP's HTTP stream wrapper.
15
 */
16
class StreamHandler
17
{
18
    private $options;
19
    private $lastHeaders;
20
21
    public function __construct(array $options = [])
22
    {
23
        $this->options = $options;
24
    }
25
26
    public function __invoke(array $request)
27
    {
28
        $url = Core::url($request);
29
        Core::doSleep($request);
30
31
        try {
32
            // Does not support the expect header.
33
            $request = Core::removeHeader($request, 'Expect');
34
            $stream = $this->createStream($url, $request);
35
            return $this->createResponse($request, $url, $stream);
36
        } catch (RingException $e) {
37
            return $this->createErrorResponse($url, $e);
38
        }
39
    }
40
41
    private function createResponse(array $request, $url, $stream)
42
    {
43
        $hdrs = $this->lastHeaders;
44
        $this->lastHeaders = null;
45
        $parts = explode(' ', array_shift($hdrs), 3);
46
        $response = [
47
            'version'        => substr($parts[0], 5),
48
            'status'         => $parts[1],
49
            'reason'         => isset($parts[2]) ? $parts[2] : null,
50
            'headers'        => Core::headersFromLines($hdrs),
51
            'effective_url'  => $url,
52
        ];
53
54
        $stream = $this->checkDecode($request, $response, $stream);
55
56
        // If not streaming, then drain the response into a stream.
57
        if (empty($request['client']['stream'])) {
58
            $dest = isset($request['client']['save_to'])
59
                ? $request['client']['save_to']
60
                : fopen('php://temp', 'r+');
61
            $stream = $this->drain($stream, $dest);
62
        }
63
64
        $response['body'] = $stream;
65
66
        return new CompletedFutureArray($response);
67
    }
68
69
    private function checkDecode(array $request, array $response, $stream)
70
    {
71
        // Automatically decode responses when instructed.
72
        if (!empty($request['client']['decode_content'])) {
73
            switch (Core::firstHeader($response, 'Content-Encoding', true)) {
74
                case 'gzip':
75
                case 'deflate':
76
                    $stream = new InflateStream(Stream::factory($stream));
77
                    break;
78
            }
79
        }
80
81
        return $stream;
82
    }
83
84
    /**
85
     * Drains the stream into the "save_to" client option.
86
     *
87
     * @param resource                        $stream
88
     * @param string|resource|StreamInterface $dest
89
     *
90
     * @return Stream
91
     * @throws \RuntimeException when the save_to option is invalid.
92
     */
93
    private function drain($stream, $dest)
94
    {
95
        if (is_resource($stream)) {
96
            if (!is_resource($dest)) {
97
                $stream = Stream::factory($stream);
98
            } else {
99
                stream_copy_to_stream($stream, $dest);
100
                fclose($stream);
101
                rewind($dest);
102
                return $dest;
103
            }
104
        }
105
106
        // Stream the response into the destination stream
107
        $dest = is_string($dest)
108
            ? new Stream(Utils::open($dest, 'r+'))
109
            : Stream::factory($dest);
110
111
        Utils::copyToStream($stream, $dest);
112
        $dest->seek(0);
113
        $stream->close();
114
115
        return $dest;
116
    }
117
118
    /**
119
     * Creates an error response for the given stream.
120
     *
121
     * @param string        $url
122
     * @param RingException $e
123
     *
124
     * @return array
125
     */
126
    private function createErrorResponse($url, RingException $e)
127
    {
128
        // Determine if the error was a networking error.
129
        $message = $e->getMessage();
130
131
        // This list can probably get more comprehensive.
132
        if (strpos($message, 'getaddrinfo') // DNS lookup failed
133
            || strpos($message, 'Connection refused')
134
        ) {
135
            $e = new ConnectException($e->getMessage(), 0, $e);
136
        }
137
138
        return new CompletedFutureArray([
139
            'status'        => null,
140
            'body'          => null,
141
            'headers'       => [],
142
            'effective_url' => $url,
143
            'error'         => $e
144
        ]);
145
    }
146
147
    /**
148
     * Create a resource and check to ensure it was created successfully
149
     *
150
     * @param callable $callback Callable that returns stream resource
151
     *
152
     * @return resource
153
     * @throws \RuntimeException on error
154
     */
155
    private function createResource(callable $callback)
156
    {
157
        $errors = null;
158
        set_error_handler(function ($_, $msg, $file, $line) use (&$errors) {
159
            $errors[] = [
160
                'message' => $msg,
161
                'file'    => $file,
162
                'line'    => $line
163
            ];
164
            return true;
165
        });
166
167
        $resource = $callback();
168
        restore_error_handler();
169
170
        if (!$resource) {
171
            $message = 'Error creating resource: ';
172
            foreach ($errors as $err) {
173
                foreach ($err as $key => $value) {
174
                    $message .= "[$key] $value" . PHP_EOL;
175
                }
176
            }
177
            throw new RingException(trim($message));
178
        }
179
180
        return $resource;
181
    }
182
183
    private function createStream($url, array $request)
184
    {
185
        static $methods;
186
        if (!$methods) {
187
            $methods = array_flip(get_class_methods(__CLASS__));
188
        }
189
190
        // HTTP/1.1 streams using the PHP stream wrapper require a
191
        // Connection: close header
192
        if ((!isset($request['version']) || $request['version'] == '1.1')
193
            && !Core::hasHeader($request, 'Connection')
194
        ) {
195
            $request['headers']['Connection'] = ['close'];
196
        }
197
198
        // Ensure SSL is verified by default
199
        if (!isset($request['client']['verify'])) {
200
            $request['client']['verify'] = true;
201
        }
202
203
        $params = [];
204
        $options = $this->getDefaultOptions($request);
205
206
        if (isset($request['client'])) {
207
            foreach ($request['client'] as $key => $value) {
208
                $method = "add_{$key}";
209
                if (isset($methods[$method])) {
210
                    $this->{$method}($request, $options, $value, $params);
211
                }
212
            }
213
        }
214
215
        return $this->createStreamResource(
216
            $url,
217
            $request,
218
            $options,
219
            $this->createContext($request, $options, $params)
220
        );
221
    }
222
223
    private function getDefaultOptions(array $request)
224
    {
225
        $headers = "";
226
        foreach ($request['headers'] as $name => $value) {
227
            foreach ((array) $value as $val) {
228
                $headers .= "$name: $val\r\n";
229
            }
230
        }
231
232
        $context = [
233
            'http' => [
234
                'method'           => $request['http_method'],
235
                'header'           => $headers,
236
                'protocol_version' => isset($request['version']) ? $request['version'] : 1.1,
237
                'ignore_errors'    => true,
238
                'follow_location'  => 0,
239
            ],
240
        ];
241
242
        $body = Core::body($request);
243
        if (isset($body)) {
244
            $context['http']['content'] = $body;
245
            // Prevent the HTTP handler from adding a Content-Type header.
246
            if (!Core::hasHeader($request, 'Content-Type')) {
247
                $context['http']['header'] .= "Content-Type:\r\n";
248
            }
249
        }
250
251
        $context['http']['header'] = rtrim($context['http']['header']);
252
253
        return $context;
254
    }
255
256
    private function add_proxy(array $request, &$options, $value, &$params)
257
    {
258
        if (!is_array($value)) {
259
            $options['http']['proxy'] = $value;
260
        } else {
261
            $scheme = isset($request['scheme']) ? $request['scheme'] : 'http';
262
            if (isset($value[$scheme])) {
263
                $options['http']['proxy'] = $value[$scheme];
264
            }
265
        }
266
    }
267
268
    private function add_timeout(array $request, &$options, $value, &$params)
269
    {
270
        $options['http']['timeout'] = $value;
271
    }
272
273
    private function add_verify(array $request, &$options, $value, &$params)
274
    {
275
        if ($value === true) {
276
            // PHP 5.6 or greater will find the system cert by default. When
277
            // < 5.6, use the Guzzle bundled cacert.
278
            if (PHP_VERSION_ID < 50600) {
279
                $options['ssl']['cafile'] = ClientUtils::getDefaultCaBundle();
280
            }
281
        } elseif (is_string($value)) {
282
            $options['ssl']['cafile'] = $value;
283
            if (!file_exists($value)) {
284
                throw new RingException("SSL CA bundle not found: $value");
285
            }
286
        } elseif ($value === false) {
287
            $options['ssl']['verify_peer'] = false;
288
            $options['ssl']['allow_self_signed'] = true;
289
            return;
290
        } else {
291
            throw new RingException('Invalid verify request option');
292
        }
293
294
        $options['ssl']['verify_peer'] = true;
295
        $options['ssl']['allow_self_signed'] = false;
296
    }
297
298
    private function add_cert(array $request, &$options, $value, &$params)
299
    {
300
        if (is_array($value)) {
301
            $options['ssl']['passphrase'] = $value[1];
302
            $value = $value[0];
303
        }
304
305
        if (!file_exists($value)) {
306
            throw new RingException("SSL certificate not found: {$value}");
307
        }
308
309
        $options['ssl']['local_cert'] = $value;
310
    }
311
312
    private function add_progress(array $request, &$options, $value, &$params)
313
    {
314
        $fn = function ($code, $_1, $_2, $_3, $transferred, $total) use ($value) {
315
            if ($code == STREAM_NOTIFY_PROGRESS) {
316
                $value($total, $transferred, null, null);
317
            }
318
        };
319
320
        // Wrap the existing function if needed.
321
        $params['notification'] = isset($params['notification'])
322
            ? Core::callArray([$params['notification'], $fn])
323
            : $fn;
324
    }
325
326
    private function add_debug(array $request, &$options, $value, &$params)
327
    {
328
        if ($value === false) {
329
            return;
330
        }
331
332
        static $map = [
333
            STREAM_NOTIFY_CONNECT       => 'CONNECT',
334
            STREAM_NOTIFY_AUTH_REQUIRED => 'AUTH_REQUIRED',
335
            STREAM_NOTIFY_AUTH_RESULT   => 'AUTH_RESULT',
336
            STREAM_NOTIFY_MIME_TYPE_IS  => 'MIME_TYPE_IS',
337
            STREAM_NOTIFY_FILE_SIZE_IS  => 'FILE_SIZE_IS',
338
            STREAM_NOTIFY_REDIRECTED    => 'REDIRECTED',
339
            STREAM_NOTIFY_PROGRESS      => 'PROGRESS',
340
            STREAM_NOTIFY_FAILURE       => 'FAILURE',
341
            STREAM_NOTIFY_COMPLETED     => 'COMPLETED',
342
            STREAM_NOTIFY_RESOLVE       => 'RESOLVE',
343
        ];
344
345
        static $args = ['severity', 'message', 'message_code',
346
            'bytes_transferred', 'bytes_max'];
347
348
        $value = Core::getDebugResource($value);
349
        $ident = $request['http_method'] . ' ' . Core::url($request);
350
        $fn = function () use ($ident, $value, $map, $args) {
351
            $passed = func_get_args();
352
            $code = array_shift($passed);
353
            fprintf($value, '<%s> [%s] ', $ident, $map[$code]);
354
            foreach (array_filter($passed) as $i => $v) {
355
                fwrite($value, $args[$i] . ': "' . $v . '" ');
356
            }
357
            fwrite($value, "\n");
358
        };
359
360
        // Wrap the existing function if needed.
361
        $params['notification'] = isset($params['notification'])
362
            ? Core::callArray([$params['notification'], $fn])
363
            : $fn;
364
    }
365
366
    private function applyCustomOptions(array $request, array &$options)
367
    {
368
        if (!isset($request['client']['stream_context'])) {
369
            return;
370
        }
371
372
        if (!is_array($request['client']['stream_context'])) {
373
            throw new RingException('stream_context must be an array');
374
        }
375
376
        $options = array_replace_recursive(
377
            $options,
378
            $request['client']['stream_context']
379
        );
380
    }
381
382
    private function createContext(array $request, array $options, array $params)
383
    {
384
        $this->applyCustomOptions($request, $options);
385
        return $this->createResource(
386
            function () use ($request, $options, $params) {
387
                return stream_context_create($options, $params);
388
            },
389
            $request,
390
            $options
391
        );
392
    }
393
394
    private function createStreamResource(
395
        $url,
396
        array $request,
397
        array $options,
398
        $context
399
    ) {
400
        return $this->createResource(
401
            function () use ($url, $context) {
402
                if (false === strpos($url, 'http')) {
403
                    trigger_error("URL is invalid: {$url}", E_USER_WARNING);
404
                    return null;
405
                }
406
                $resource = fopen($url, 'r', null, $context);
407
                $this->lastHeaders = $http_response_header;
408
                return $resource;
409
            },
410
            $request,
411
            $options
412
        );
413
    }
414
}
415