Completed
Pull Request — master (#138)
by
unknown
02:47
created

DockerHandler::setResponseStream()   B

Complexity

Conditions 5
Paths 8

Size

Total Lines 21
Code Lines 11

Duplication

Lines 0
Ratio 0 %

Importance

Changes 1
Bugs 0 Features 0
Metric Value
c 1
b 0
f 0
dl 0
loc 21
rs 8.7624
cc 5
eloc 11
nc 8
nop 2
1
<?php
2
3
namespace Docker\Http\Handler;
4
5
use Docker\Http\Stream\Filter\OutputEvent;
6
use GuzzleHttp\Event\EmitterInterface;
7
use GuzzleHttp\Ring\Core;
8
use GuzzleHttp\Ring\Exception\ConnectException;
9
use GuzzleHttp\Ring\Exception\RingException;
10
use GuzzleHttp\Ring\Future\CompletedFutureArray;
11
use GuzzleHttp\Stream\Stream;
12
13
class DockerHandler
14
{
15
    const CHUNK_SIZE = 8192;
16
17
    /** @var string */
18
    private $entrypoint;
19
20
    /** @var resource A stream context resource */
21
    private $context;
22
23
    /** @var boolean Whether stream is encrypted with TLS */
24
    private $useTls;
25
26
    /** @var EmitterInterface */
27
    protected $emitter;
28
29
    /**
30
     * @param array $options
31
     */
32
    public function __construct(array $options = [])
33
    {
34
        if (isset($options['entrypoint'])) {
35
            $this->entrypoint = $options['entrypoint'];
36
        }
37
        if (isset($options['useTls'])) {
38
            $this->useTls = $options['useTls'];
39
        }
40
        if (isset($options['emitter'])) {
41
            $this->emitter = $options['emitter'];
42
        }
43
        $this->context = isset($options['context'])
44
            ? $options['context'] : stream_context_create();
45
46
        stream_filter_register('chunk', '\Docker\Http\Stream\Filter\Chunk');
47
        stream_filter_register('event', '\Docker\Http\Stream\Filter\Event');
48
    }
49
50
    /**
51
     * @param array $request
52
     * @return CompletedFutureArray
53
     */
54
    public function __invoke(array $request)
55
    {
56
        $url = Core::url($request);
57
        Core::doSleep($request);
58
59
        try {
60
            // Does not support the expect header.
61
            $request = Core::removeHeader($request, 'Expect');
62
            $stream = $this->createStream($url, $request);
63
64
            return $this->createResponse($request, $stream);
65
        } catch (RingException $e) {
66
            return $this->createErrorResponse($url, $e);
67
        }
68
    }
69
70
    /**
71
     * @param array $request
72
     * @param $value
73
     */
74
    public function addCallback(array &$request, $value)
75
    {
76
        $this->emitter->on('response.output', function (OutputEvent $event) use ($value, $request) {
77
            $value($event->getContent(), $event->getType());
78
        });
79
    }
80
81
    /**
82
     * @param array $request
83
     * @param $value
84
     */
85
    private function addTimeout(array &$request, $value)
86
    {
87
        $request['client']['timeout'] = $value;
88
    }
89
90
    /**
91
     * @param string $url
92
     * @param array $request
93
     * @return resource
94
     */
95
    private function createStream($url, array &$request)
96
    {
97
        static $methods;
98
        if (!$methods) {
99
            $methods = array_flip(get_class_methods(__CLASS__));
100
        }
101
102
        // HTTP/1.1 streams using the PHP stream wrapper require a
103
        // Connection: close header
104
        if ((!isset($request['version']) || $request['version'] == '1.1')
105
            && !Core::hasHeader($request, 'Connection')
106
        ) {
107
            $request['headers']['Connection'] = ['close'];
108
        }
109
110
        if (isset($request['client']['stream']) && $request['client']['stream']) {
111
            $request = Core::setHeader($request, 'Transfer-Encoding', ['chunked']);
112
        }
113
114
        // Ensure SSL is verified by default
115
        if (!isset($request['client']['verify'])) {
116
            $request['client']['verify'] = true;
117
        }
118
119
        $options = [];
120
121
        if (isset($request['client'])) {
122
            foreach ($request['client'] as $key => $value) {
123
                $method = "add" . ucfirst($key);
124
                if (isset($methods[$method])) {
125
                    $this->{$method}($request, $value);
126
                }
127
            }
128
        }
129
130
        return $this->createStreamResource(
131
            $url,
132
            $request,
133
            $options,
134
            $this->context
135
        );
136
    }
137
138
    /**
139
     * @param $url
140
     * @param array $request
141
     * @param array $options
142
     * @param resource $context
143
     * @return resource
144
     */
145
    private function createStreamResource(
146
        $url,
147
        array $request,
148
        array $options,
149
        $context
150
    ) {
151
152
        /** @noinspection PhpMethodParametersCountMismatchInspection */
153
        return $this->createResource(
154
            function () use ($url, $context, $request, $options) {
155
156
                $errorNo = null;
157
                $errorMsg = null;
158
159
                $resource = @stream_socket_client(
160
                    $this->entrypoint,
161
                    $errorNo,
162
                    $errorMsg,
163
                    $this->getDefaultTimeout($request),
164
                    STREAM_CLIENT_CONNECT,
165
                    $context
166
                );
167
168
                return $resource;
169
            }
170
        );
171
    }
172
173
    /**
174
     * Create a resource and check to ensure it was created successfully
175
     *
176
     * @param callable $callback Callable that returns stream resource
177
     *
178
     * @return resource
179
     * @throws \RuntimeException on error
180
     */
181
    private function createResource(callable $callback)
182
    {
183
        $errors = null;
184
        set_error_handler(function ($_, $msg, $file, $line) use (&$errors) {
185
            $errors[] = [
186
                'message' => $msg,
187
                'file' => $file,
188
                'line' => $line
189
            ];
190
191
            return true;
192
        });
193
194
        $resource = $callback();
195
        restore_error_handler();
196
197
        if (!$resource) {
198
            $message = 'Error creating resource: ';
199
            if (!empty($errors)) {
200
                foreach ($errors as $err) {
201
                    foreach ($err as $key => $value) {
202
                        $message .= "[$key] $value" . PHP_EOL;
203
                    }
204
                }
205
            }
206
            throw new RingException(trim($message));
207
        }
208
209
        return $resource;
210
    }
211
212
    /**
213
     * Creates an error response for the given stream.
214
     *
215
     * @param string $url
216
     * @param RingException $e
217
     * @return CompletedFutureArray
218
     */
219
    private function createErrorResponse($url, RingException $e)
220
    {
221
        // Determine if the error was a networking error.
222
        $message = $e->getMessage();
223
224
        // This list can probably get more comprehensive.
225
        if (strpos($message, 'getaddrinfo') // DNS lookup failed
226
            || strpos($message, 'Connection refused')
227
        ) {
228
            $e = new ConnectException($e->getMessage(), 0, $e);
229
        }
230
231
        return new CompletedFutureArray([
232
            'status' => null,
233
            'body' => null,
234
            'headers' => [],
235
            'effective_url' => $url,
236
            'error' => $e
237
        ]);
238
    }
239
240
    /**
241
     * @param array $request
242
     * @param resource $socket
243
     * @return CompletedFutureArray
244
     */
245
    private function createResponse(array $request, $socket)
246
    {
247
        // Check if tls is needed
248
        if ($this->useTls) {
249
            if (!@stream_socket_enable_crypto($socket, true, STREAM_CRYPTO_METHOD_TLS_CLIENT)) {
250
                throw new RingException(sprintf('Cannot enable tls: %s', error_get_last()['message']));
251
            }
252
        }
253
254
        // Write headers
255
        $isWrite = $this->fwrite($socket, $this->getRequestHeaderAsString($request));
256
257
        // Write body if set
258
        if ($request['body'] !== null && $isWrite !== false) {
259
            $stream = Stream::factory($request['body']);
260
            $filter = null;
261
262
            if (Core::header($request, 'Transfer-Encoding') == 'chunked') {
263
                $filter = stream_filter_prepend($socket, 'chunk', STREAM_FILTER_WRITE);
264
            }
265
266
            while (!$stream->eof() && $isWrite) {
267
                $isWrite = $this->fwrite($socket, $stream->read(self::CHUNK_SIZE));
268
            }
269
270
            if (Core::header($request, 'Transfer-Encoding') == 'chunked') {
271
                stream_filter_remove($filter);
272
273
                if (false !== $isWrite) {
274
                    $isWrite = $this->fwrite($socket, "0\r\n\r\n");
275
                }
276
            }
277
        }
278
279
        stream_set_timeout($socket, $this->getDefaultTimeout($request));
280
281
        // Response should be available, extract headers
282
        do {
283
            $response = $this->getResponseWithHeaders($socket, $request);
284
        } while ($response !== null && $response['status'] == 100);
285
286
        // Check timeout
287
        $metadata = stream_get_meta_data($socket);
288
289
        if ($metadata['timed_out']) {
290
            throw new RingException('Timed out while reading socket');
291
        }
292
293
        if (false === $isWrite) {
294
            // When an error happen and no response it is most probably due to TLS configuration
295
            if ($response === null) {
296
                throw new RingException(
297
                    'Error while sending request (Broken Pipe' .
298
                    '), check your TLS configuration and logs in docker daemon for more information ',
299
                    $request
300
                );
301
            }
302
303
            throw new RingException('Error while sending request (Broken Pipe)', $request, $response);
304
        }
305
306
        if (null == $response) {
307
            throw new RingException('No response could be parsed: check server log', $request);
308
        }
309
310
        $this->setResponseStream($response, $socket);
311
312
        if (isset($request['client']['wait']) && $request['client']['wait']) {
313
            Core::body($response);
314
        }
315
316
        return new CompletedFutureArray($response);
317
    }
318
319
    /**
320
     * @param array $response
321
     * @param $socket
322
     */
323
    private function setResponseStream(array &$response, $socket)
324
    {
325
        if (Core::header($response, 'Transfer-Encoding') == "chunked") {
326
            stream_filter_append($socket, 'dechunk');
327
        }
328
329
        if (isset($response['client']['callback']) && $response['client']['callback']) {
330
            stream_filter_append($socket, 'event', STREAM_FILTER_READ, [
331
                'emitter' => $this->emitter,
332
                'content_type' => Core::header($response, 'Content-Type')
333
            ]);
334
        }
335
336
        $stream = new Stream($socket);
337
338
        if ($response['status'] / 100 > 4) {
339
            $response['reason'] = (string) $stream;
340
        }
341
342
        $response['body'] = $stream;
343
    }
344
345
    /**
346
     * Replace fwrite behavior as api is broken in PHP
347
     *
348
     * @see https://secure.phabricator.com/rPHU69490c53c9c2ef2002bc2dd4cecfe9a4b080b497
349
     *
350
     * @param resource $stream The stream resource
351
     * @param string|bool $bytes Bytes written in the stream
352
     *
353
     * @return bool|int false if pipe is broken, number of bytes written otherwise
354
     */
355
    private function fwrite($stream, $bytes)
356
    {
357
        if (!strlen($bytes)) {
358
            return 0;
359
        }
360
361
        $result = @fwrite($stream, $bytes);
362
        if ($result !== 0) {
363
            // In cases where some bytes are witten (`$result > 0`) or
364
            // an error occurs (`$result === false`), the behavior of fwrite() is
365
            // correct. We can return the value as-is.
366
            return $result;
367
        }
368
369
        // If we make it here, we performed a 0-length write. Try to distinguish
370
        // between EAGAIN and EPIPE. To do this, we're going to `stream_select()`
371
        // the stream, write to it again if PHP claims that it's writable, and
372
        // consider the pipe broken if the write fails.
373
374
        $read = [];
375
        $write = [$stream];
376
        $except = [];
377
378
        @stream_select($read, $write, $except, 0);
0 ignored issues
show
Security Best Practice introduced by
It seems like you do not handle an error condition here. This can introduce security issues, and is generally not recommended.

If you suppress an error, we recommend checking for the error condition explicitly:

// For example instead of
@mkdir($dir);

// Better use
if (@mkdir($dir) === false) {
    throw new \RuntimeException('The directory '.$dir.' could not be created.');
}
Loading history...
379
380
        if (!$write) {
0 ignored issues
show
Bug Best Practice introduced by
The expression $write of type array is implicitly converted to a boolean; are you sure this is intended? If so, consider using empty($expr) instead to make it clear that you intend to check for an array without elements.

This check marks implicit conversions of arrays to boolean values in a comparison. While in PHP an empty array is considered to be equal (but not identical) to false, this is not always apparent.

Consider making the comparison explicit by using empty(..) or ! empty(...) instead.

Loading history...
381
            // The stream isn't writable, so we conclude that it probably really is
382
            // blocked and the underlying error was EAGAIN. Return 0 to indicate that
383
            // no data could be written yet.
384
            return 0;
385
        }
386
387
        // If we make it here, PHP **just** claimed that this stream is writable, so
388
        // perform a write. If the write also fails, conclude that these failures are
389
        // EPIPE or some other permanent failure.
390
        $result = @fwrite($stream, $bytes);
391
        if ($result !== 0) {
392
            // The write worked or failed explicitly. This value is fine to return.
393
            return $result;
394
        }
395
396
        // We performed a 0-length write, were told that the stream was writable, and
397
        // then immediately performed another 0-length write. Conclude that the pipe
398
        // is broken and return `false`.
399
        return false;
400
    }
401
402
    /**
403
     * @param array $request
404
     * @return string
405
     */
406
    private function getRequestHeaderAsString(array $request)
407
    {
408
        $message = vsprintf('%s %s HTTP/%s', [
409
                strtoupper($request['http_method']),
410
                $request['url'],
411
                $request['version']
412
            ]) . "\r\n";
413
414
        foreach ($request['headers'] as $name => $values) {
415
            $message .= $name . ': ' . implode(', ', $values) . "\r\n";
416
        }
417
418
        $message .= "\r\n";
419
420
        return $message;
421
    }
422
423
    /**
424
     * @param array $request
425
     * @return null|string
426
     */
427
    private function getDefaultTimeout(array $request)
428
    {
429
        $timeout = null;
430
431
        if (isset($request['timeout'])) {
432
            $timeout = $request['timeout'];
433
        }
434
435
        if ($timeout === null) {
436
            if (isset($request['client']['timeout'])) {
437
                $timeout = $request['client']['timeout'];
438
            }
439
        }
440
441
        if (null == $timeout) {
442
            $timeout = ini_get('default_socket_timeout');
443
        }
444
445
        return $timeout;
446
    }
447
448
    private function getResponseWithHeaders($stream, array $request)
449
    {
450
        $headers = [];
451
452
        while (($line = fgets($stream)) !== false) {
453
            if (rtrim($line) === '') {
454
                break;
455
            }
456
457
            $headers[] = trim($line);
458
        }
459
460
        $parts = explode(' ', array_shift($headers), 3);
461
462
        if (count($parts) <= 1) {
463
            return null;
464
        }
465
466
        $options = ['protocol_version' => substr($parts[0], -3)];
467
        if (isset($parts[2])) {
468
            $options['reason_phrase'] = $parts[2];
469
        }
470
471
        // Set the size on the stream if it was returned in the response
472
        $responseHeaders = [];
473
        foreach ($headers as $header) {
474
            $headerParts = explode(':', $header, 2);
475
            $responseHeaders[trim($headerParts[0])] = isset($headerParts[1])
476
                ? trim($headerParts[1])
477
                : '';
478
        }
479
480
        $response = [
481
            'version' => substr($parts[0], 5),
482
            'status' => $parts[1],
483
            'reason' => isset($parts[2]) ? $parts[2] : null,
484
            'headers' => Core::headersFromLines($headers),
485
            'client' => $request['client']
486
        ];
487
488
        return $response;
489
    }
490
}
491