Passed
Push — master ( b80564...a1cca1 )
by Tobias
03:12
created

MultiCurl::proceed()   C

Complexity

Conditions 12
Paths 53

Size

Total Lines 65
Code Lines 34

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 30
CRAP Score 12.1081

Importance

Changes 0
Metric Value
cc 12
eloc 34
nc 53
nop 0
dl 0
loc 65
ccs 30
cts 33
cp 0.9091
crap 12.1081
rs 6.9666
c 0
b 0
f 0

How to fix   Long Method    Complexity   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

1
<?php
2
3
declare(strict_types=1);
4
5
namespace Buzz\Client;
6
7
use Buzz\Configuration\ParameterBag;
8
use Buzz\Exception\ClientException;
9
use Buzz\Message\ResponseBuilder;
10
use Psr\Http\Message\RequestInterface;
11
use Psr\Http\Message\ResponseInterface;
12
use Symfony\Component\OptionsResolver\OptionsResolver;
13
14
class MultiCurl extends AbstractCurl implements BatchClientInterface, BuzzClientInterface
15
{
16
    private $queue = [];
17
    private $curlm;
18
19
    /**
20
     * Raw responses that the server has pushed to us.
21
     *
22
     * @var array
23
     */
24
    private $pushedResponses = [];
25
26
    /**
27
     * Curl handlers with unprocessed pushed responses.
28
     *
29
     * @var array
30
     */
31
    private $pushResponseHandles = [];
32
33
    /**
34
     * Callbacks that decides if a pushed request should be accepted or not.
35
     *
36
     * @var array
37
     */
38
    private $pushFunctions = [];
39
40
    /**
41
     * @var bool
42
     */
43
    private $serverPushSupported = true;
44
45
    /**
46
     * To work around bugs in PHP and GC.
47
     *
48
     * @var array
49
     */
50
    private $pushCb = [];
51
52
    /**
53
     * {@inheritdoc}
54
     */
55 50
    public function __construct($responseFactory, array $options = [])
56
    {
57 50
        parent::__construct($responseFactory, $options);
58
59 49
        if (\PHP_VERSION_ID < 70215 || \PHP_VERSION_ID === 70300 || \PHP_VERSION_ID === 70301 || !(CURL_VERSION_HTTP2 & curl_version()['features'])) {
60
            // Dont use HTTP/2 push when it's unsupported or buggy, see https://bugs.php.net/76675
61 49
            $this->serverPushSupported = false;
62
        }
63 49
    }
64
65
    /**
66
     * Populates the supplied response with the response for the supplied request.
67
     *
68
     * If a "callback" option is supplied, its value will be called when the
69
     * request completes. It is ONLY in the callback you will see the response
70
     * or an exception.
71
     *
72
     * This is a non-blocking function call.
73
     *
74
     * The callable should have the following signature:
75
     *
76
     *     $callback = function($request, $response, $exception) {
77
     *         if (!$exception) {
78
     *             // success
79
     *         } else {
80
     *             // error ($error is one of the CURLE_* constants)
81
     *         }
82
     *     };
83
     */
84 13
    public function sendAsyncRequest(RequestInterface $request, array $options = []): void
85
    {
86 13
        $options = $this->validateOptions($options);
87
88 13
        $this->addToQueue($request, $options);
89 13
    }
90
91
    /**
92
     * This is a blocking function call.
93
     */
94 59
    public function sendRequest(RequestInterface $request, array $options = []): ResponseInterface
95
    {
96 59
        $options = $this->validateOptions($options);
97 58
        $originalCallback = $options->get('callback');
98 58
        $responseToReturn = null;
99 58
        $options = $options->add(['callback' => function (RequestInterface $request, ResponseInterface $response = null, ClientException $e = null) use (&$responseToReturn, $originalCallback) {
100 58
            $responseToReturn = $response;
101 58
            $originalCallback($request, $response, $e);
102
103 58
            if (null !== $e) {
104 5
                throw $e;
105
            }
106 58
        }]);
107
108 58
        $this->addToQueue($request, $options);
109 58
        $this->flush();
110
111 53
        return $responseToReturn;
112
    }
113
114 64
    protected function configureOptions(OptionsResolver $resolver): void
115
    {
116 50
        parent::configureOptions($resolver);
117
118 64
        $resolver->setDefault('callback', function (RequestInterface $request, ResponseInterface $response = null, ClientException $e = null) {
119 64
        });
120 50
        $resolver->setAllowedTypes('callback', 'callable');
121
122 50
        $resolver->setDefault('push_function_callback', function ($parent, $pushed, $headers) {
0 ignored issues
show
Unused Code introduced by
The parameter $pushed is not used and could be removed. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-unused  annotation

122
        $resolver->setDefault('push_function_callback', function ($parent, /** @scrutinizer ignore-unused */ $pushed, $headers) {

This check looks for parameters that have been defined for a function or method, but which are not used in the method body.

Loading history...
Unused Code introduced by
The parameter $headers is not used and could be removed. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-unused  annotation

122
        $resolver->setDefault('push_function_callback', function ($parent, $pushed, /** @scrutinizer ignore-unused */ $headers) {

This check looks for parameters that have been defined for a function or method, but which are not used in the method body.

Loading history...
Unused Code introduced by
The parameter $parent is not used and could be removed. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-unused  annotation

122
        $resolver->setDefault('push_function_callback', function (/** @scrutinizer ignore-unused */ $parent, $pushed, $headers) {

This check looks for parameters that have been defined for a function or method, but which are not used in the method body.

Loading history...
123
            return CURL_PUSH_OK;
124 50
        });
125 50
        $resolver->setAllowedTypes('push_function_callback', ['callable', 'null']);
126
127 50
        $resolver->setDefault('use_pushed_response', true);
128 50
        $resolver->setAllowedTypes('use_pushed_response', 'boolean');
129
130
        // There is a bug in PHP that disables the server push feature if you are using timeouts.
131
        // See https://bugs.php.net/bug.php?id=77497
132 50
        $resolver->setDefault('timeout', null);
133 50
        $resolver->setAllowedTypes('timeout', ['integer', 'float', 'null']);
134 50
    }
135
136
    public function count(): int
137
    {
138
        return \count($this->queue);
139
    }
140
141
    /**
142
     * Wait for all requests to finish.
143
     *
144
     * This is a blocking function call.
145
     *
146
     * This will not throw any exceptions. All exceptions are handled in the callback.
147
     */
148 70
    public function flush(): void
149
    {
150 70
        while (!empty($this->queue)) {
151 70
            $this->proceed();
152
        }
153 65
    }
154
155
    /**
156
     * See if any connection is ready to be processed.
157
     *
158
     * This is a non-blocking function call.
159
     *
160
     * @throws ClientException if we fail to initialized cUrl
161
     */
162 71
    public function proceed(): void
163
    {
164 71
        if (empty($this->queue)) {
165
            return;
166
        }
167
168 71
        if (!$this->curlm) {
169 71
            $this->initMultiCurlHandle();
170
        }
171
172 71
        $this->initQueue();
173 71
        $exception = null;
174
        do {
175
            // Start processing each handler in the stack
176 71
            $mrc = curl_multi_exec($this->curlm, $stillRunning);
177 71
        } while (CURLM_CALL_MULTI_PERFORM === $mrc);
178
179 71
        while ($info = curl_multi_info_read($this->curlm)) {
180
            // handle any completed requests
181 71
            if (CURLMSG_DONE !== $info['msg']) {
182
                continue;
183
            }
184
185 71
            $handled = false;
186
187
            /** @var RequestInterface $request */
188
            /** @var ParameterBag $options */
189
            /** @var ResponseBuilder $responseBuilder */
190 71
            foreach ($this->queue as $i => list($request, $options, $curl, $responseBuilder)) {
191
                // Try to find the correct handle from the queue.
192 71
                if ($curl !== $info['handle']) {
193 1
                    continue;
194
                }
195
196 71
                $handled = true;
197 71
                $response = null;
198
                try {
199 71
                    $this->parseError($request, $info['result'], $curl);
200 64
                    $response = $responseBuilder->getResponse();
201 64
                    if ($options->get('expose_curl_info', false)) {
202 64
                        $response = $response->withHeader('__curl_info', json_encode(curl_getinfo($curl)));
203
                    }
204 7
                } catch (\Throwable $e) {
205 7
                    if (null === $exception) {
206 7
                        $exception = $e;
207
                    }
208
                }
209
210
                // remove from queue
211 71
                curl_multi_remove_handle($this->curlm, $curl);
212 71
                $this->releaseHandle($curl);
213 71
                unset($this->queue[$i]);
214
215
                // callback
216 71
                \call_user_func($options->get('callback'), $request, $response, $exception);
217 66
                $exception = null;
218
            }
219
220 66
            if (!$handled) {
221
                // It must be a pushed response.
222
                $this->handlePushedResponse($info['handle']);
223
            }
224
        }
225
226 70
        $this->cleanup();
227 70
    }
228
229
    private function addPushHandle($headers, $handle)
230
    {
231
        foreach ($headers as $header) {
232
            if (0 === strpos($header, ':path:')) {
233
                $path = substr($header, 6);
234
                $url = (string) curl_getinfo($handle)['url'];
235
                $url = str_replace((string) parse_url($url, PHP_URL_PATH), $path, $url);
236
                $this->pushResponseHandles[$url] = $handle;
237
                break;
238
            }
239
        }
240
    }
241
242
    private function handlePushedResponse($handle)
243
    {
244
        $found = false;
245
        foreach ($this->pushResponseHandles as $url => $h) {
246
            // Weak comparison
247
            if ($handle == $h) {
248
                $found = $url;
249
            }
250
        }
251
252
        if (!$found) {
253
            $found = curl_getinfo($handle)['url'];
254
        }
255
256
        $content = curl_multi_getcontent($handle);
257
        // Check if we got some headers, if not, we do not bother to store it.
258
        if (0 !== $headerSize = curl_getinfo($handle, CURLINFO_HEADER_SIZE)) {
259
            $this->pushedResponses[$found] = ['content' => $content, 'headerSize' => $headerSize];
260
            unset($this->pushResponseHandles[$found]);
261
        }
262
    }
263
264
    private function hasPushResponse($url)
265
    {
266
        return isset($this->pushedResponses[$url]);
267
    }
268
269
    private function getPushedResponse($url)
270
    {
271
        $response = $this->pushedResponses[$url];
272
        unset($this->pushedResponses[$url]);
273
274
        return $response;
275
    }
276
277 71
    private function addToQueue(RequestInterface $request, ParameterBag $options): array
278
    {
279 71
        if (null !== $callback = $options->get('push_function_callback')) {
280 71
            $this->pushFunctions[] = $callback;
281
        }
282
283 71
        return $this->queue[] = [$request, $options];
284
    }
285
286
    /**
287
     * Create a multi curl handle and add some properties to it.
288
     */
289 71
    private function initMultiCurlHandle(): void
290
    {
291 71
        $this->curlm = curl_multi_init();
292 71
        if (false === $this->curlm) {
293
            throw new ClientException('Unable to create a new cURL multi handle');
294
        }
295
296 71
        if ($this->serverPushSupported) {
297
            $userCallbacks = $this->pushFunctions;
298
299
            curl_multi_setopt($this->curlm, CURLMOPT_PIPELINING, CURLPIPE_MULTIPLEX);
300
            // We need to use $this->pushCb[] because of a bug in PHP
301
            curl_multi_setopt(
302
                $this->curlm,
303
                CURLMOPT_PUSHFUNCTION,
304
                $this->pushCb[] = function ($parent, $pushed, $headers) use ($userCallbacks) {
305
                    // If any callback say no, then do not accept.
306
                    foreach ($userCallbacks as $callback) {
307
                        if (CURL_PUSH_DENY === $callback($parent, $pushed, $headers)) {
308
                            return CURL_PUSH_DENY;
309
                        }
310
                    }
311
312
                    curl_setopt($pushed, CURLOPT_RETURNTRANSFER, true);
313
                    curl_setopt($pushed, CURLOPT_HEADER, true);
314
                    $this->addPushHandle($headers, $pushed);
315
316
                    return CURL_PUSH_OK;
317
                }
318
            );
319
        }
320 71
    }
321
322
    /**
323
     * Loop over the queue and make sure every item (request) is initialized (ie, got a handle).
324
     */
325 71
    private function initQueue(): void
326
    {
327 71
        foreach ($this->queue as $i => $queueItem) {
328 71
            if (2 !== \count($queueItem)) {
329
                // We have already prepared this curl
330 69
                continue;
331
            }
332
            // prepare curl handle
333
            /** @var RequestInterface $request */
334
            /** @var ParameterBag $options */
335 71
            list($request, $options) = $queueItem;
336
337
            // Check if we have the response in cache already.
338 71
            if ($this->serverPushSupported
339 71
                && $options->get('use_pushed_response')
340 71
                && $this->hasPushResponse($request->getUri()->__toString())
341
            ) {
342
                $data = $this->getPushedResponse($request->getUri()->__toString());
343
                $response = (new ResponseBuilder($this->responseFactory))->getResponseFromRawInput(
344
                    $data['content'],
345
                    $data['headerSize']
346
                );
347
                \call_user_func($options->get('callback'), $request, $response, null);
348
                unset($this->queue[$i]);
349
350
                continue;
351
            }
352
353 71
            $curl = $this->createHandle();
354 71
            $responseBuilder = $this->prepare($curl, $request, $options);
355 71
            $this->queue[$i][] = $curl;
356 71
            $this->queue[$i][] = $responseBuilder;
357 71
            curl_multi_add_handle($this->curlm, $curl);
358
        }
359 71
    }
360
361
    /**
362
     * If we got no requests in the queue, do a clean up to save some memory.
363
     */
364 70
    private function cleanup(): void
365
    {
366 70
        if (empty($this->queue)) {
367 66
            curl_multi_close($this->curlm);
368 66
            $this->curlm = null;
369 66
            $this->pushFunctions = [];
370 66
            $this->pushCb = [];
371
        }
372 70
    }
373
}
374