Passed
Push — master ( ab28ee...e2645a )
by Tobias
02:45
created

MultiCurl::proceed()   F

Complexity

Conditions 22
Paths 626

Size

Total Lines 124
Code Lines 68

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 48
CRAP Score 34.312

Importance

Changes 0
Metric Value
cc 22
eloc 68
nc 626
nop 0
dl 0
loc 124
ccs 48
cts 68
cp 0.7059
crap 34.312
rs 0.5194
c 0
b 0
f 0

How to fix   Long Method    Complexity   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

1
<?php
2
3
declare(strict_types=1);
4
5
namespace Buzz\Client;
6
7
use Buzz\Configuration\ParameterBag;
8
use Buzz\Exception\ClientException;
9
use Buzz\Message\ResponseBuilder;
10
use Psr\Http\Message\RequestInterface;
11
use Psr\Http\Message\ResponseInterface;
12
use Symfony\Component\OptionsResolver\OptionsResolver;
13
14
class MultiCurl extends AbstractCurl implements BatchClientInterface, BuzzClientInterface
15
{
16
    private $queue = [];
17
    private $curlm;
18
19
    /**
20
     * Raw responses that the server has pushed to us.
21
     *
22
     * @var array
23
     */
24
    private $pushedResponses = [];
25
26
    /**
27
     * Curl handlers with unprocessed pushed responses.
28
     *
29
     * @var array
30
     */
31
    private $pushResponseHandles = [];
32
33
    /**
34
     * Callbacks that decides if a pushed request should be accepted or not.
35
     *
36
     * @var array
37
     */
38
    private $pushFunctions = [];
39
40
    /**
41
     * @var bool
42
     */
43
    private $serverPushSupported = true;
44
45
    /**
46
     * To work around bugs in PHP and GC.
47
     *
48
     * @var array
49
     */
50
    private $pushCb = [];
51
52
    /**
53
     * {@inheritdoc}
54
     */
55 50
    public function __construct($responseFactory, array $options = [])
56
    {
57 50
        parent::__construct($responseFactory, $options);
58
59 49
        if (\PHP_VERSION_ID < 70215 || \PHP_VERSION_ID === 70300 || \PHP_VERSION_ID === 70301 || !(CURL_VERSION_HTTP2 & curl_version()['features'])) {
60
            // Dont use HTTP/2 push when it's unsupported or buggy, see https://bugs.php.net/76675
61 49
            $this->serverPushSupported = false;
62
        }
63 49
    }
64
65
    /**
66
     * Populates the supplied response with the response for the supplied request.
67
     *
68
     * If a "callback" option is supplied, its value will be called when the
69
     * request completes. It is ONLY in the callback you will see the response
70
     * or an exception.
71
     *
72
     * This is a non-blocking function call.
73
     *
74
     * The callable should have the following signature:
75
     *
76
     *     $callback = function($request, $response, $exception) {
77
     *         if (!$exception) {
78
     *             // success
79
     *         } else {
80
     *             // error ($error is one of the CURLE_* constants)
81
     *         }
82
     *     };
83
     */
84 13
    public function sendAsyncRequest(RequestInterface $request, array $options = []): void
85
    {
86 13
        $options = $this->validateOptions($options);
87
88 13
        $this->addToQueue($request, $options);
89 13
    }
90
91
    /**
92
     * This is a blocking function call.
93
     */
94 59
    public function sendRequest(RequestInterface $request, array $options = []): ResponseInterface
95
    {
96 59
        $options = $this->validateOptions($options);
97 58
        $originalCallback = $options->get('callback');
98 58
        $responseToReturn = null;
99 58
        $options = $options->add(['callback' => function (RequestInterface $request, ResponseInterface $response = null, ClientException $e = null) use (&$responseToReturn, $originalCallback) {
100 58
            $responseToReturn = $response;
101 58
            $originalCallback($request, $response, $e);
102
103 58
            if (null !== $e) {
104 5
                throw $e;
105
            }
106 58
        }]);
107
108 58
        $this->addToQueue($request, $options);
109 58
        $this->flush();
110
111 53
        return $responseToReturn;
112
    }
113
114 64
    protected function configureOptions(OptionsResolver $resolver): void
115
    {
116 50
        parent::configureOptions($resolver);
117
118 64
        $resolver->setDefault('callback', function (RequestInterface $request, ResponseInterface $response = null, ClientException $e = null) {
119 64
        });
120 50
        $resolver->setAllowedTypes('callback', 'callable');
121
122 50
        $resolver->setDefault('push_function_callback', function ($parent, $pushed, $headers) {
0 ignored issues
show
Unused Code introduced by
The parameter $pushed is not used and could be removed. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-unused  annotation

122
        $resolver->setDefault('push_function_callback', function ($parent, /** @scrutinizer ignore-unused */ $pushed, $headers) {

This check looks for parameters that have been defined for a function or method, but which are not used in the method body.

Loading history...
Unused Code introduced by
The parameter $headers is not used and could be removed. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-unused  annotation

122
        $resolver->setDefault('push_function_callback', function ($parent, $pushed, /** @scrutinizer ignore-unused */ $headers) {

This check looks for parameters that have been defined for a function or method, but which are not used in the method body.

Loading history...
Unused Code introduced by
The parameter $parent is not used and could be removed. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-unused  annotation

122
        $resolver->setDefault('push_function_callback', function (/** @scrutinizer ignore-unused */ $parent, $pushed, $headers) {

This check looks for parameters that have been defined for a function or method, but which are not used in the method body.

Loading history...
123
            return CURL_PUSH_OK;
124 50
        });
125 50
        $resolver->setAllowedTypes('push_function_callback', ['callable', 'null']);
126
127 50
        $resolver->setDefault('use_pushed_response', true);
128 50
        $resolver->setAllowedTypes('use_pushed_response', 'boolean');
129
130
        // There is a bug in PHP that disables the server push feature if you are using timeouts.
131
        // See https://bugs.php.net/bug.php?id=77497
132 50
        $resolver->setDefault('timeout', null);
133 50
        $resolver->setAllowedTypes('timeout', ['integer', 'float', 'null']);
134 50
    }
135
136
    public function count(): int
137
    {
138
        return \count($this->queue);
139
    }
140
141
    /**
142
     * Wait for all requests to finish.
143
     *
144
     * This is a blocking function call.
145
     *
146
     * This will not throw any exceptions. All exceptions are handled in the callback.
147
     */
148 70
    public function flush(): void
149
    {
150 70
        while (!empty($this->queue)) {
151 70
            $this->proceed();
152
        }
153 65
    }
154
155
    /**
156
     * See if any connection is ready to be processed.
157
     *
158
     * This is a non-blocking function call.
159
     *
160
     * @throws ClientException if we fail to initialized cUrl
161
     */
162 71
    public function proceed(): void
163
    {
164 71
        if (empty($this->queue)) {
165
            return;
166
        }
167
168 71
        if (!$this->curlm) {
169 71
            $this->curlm = curl_multi_init();
170 71
            if (false === $this->curlm) {
171
                throw new ClientException('Unable to create a new cURL multi handle');
172
            }
173
174 71
            if ($this->serverPushSupported) {
175
                $userCallbacks = $this->pushFunctions;
176
177
                curl_multi_setopt($this->curlm, CURLMOPT_PIPELINING, CURLPIPE_MULTIPLEX);
178
                // We need to use $this->pushCb[] because of a bug in PHP
179
                curl_multi_setopt($this->curlm, CURLMOPT_PUSHFUNCTION, $this->pushCb[] = function ($parent, $pushed, $headers) use ($userCallbacks) {
180
                    // If any callback say no, then do not accept.
181
                    foreach ($userCallbacks as $callback) {
182
                        if (CURL_PUSH_DENY === $callback($parent, $pushed, $headers)) {
183
                            return CURL_PUSH_DENY;
184
                        }
185
                    }
186
187
                    curl_setopt($pushed, CURLOPT_RETURNTRANSFER, true);
188
                    curl_setopt($pushed, CURLOPT_HEADER, true);
189
                    $this->addPushHandle($headers, $pushed);
190
191
                    return CURL_PUSH_OK;
192
                });
193
            }
194
        }
195
196 71
        foreach ($this->queue as $i => $queueItem) {
197 71
            if (2 !== \count($queueItem)) {
198
                // We have already prepared this curl
199 69
                continue;
200
            }
201
            // prepare curl handle
202
            /** @var RequestInterface $request */
203
            /** @var ParameterBag $options */
204 71
            list($request, $options) = $queueItem;
205
206
            // Check if we have the response in cache already.
207 71
            if ($this->serverPushSupported
208 71
                && $options->get('use_pushed_response')
209 71
                && $this->hasPushResponse($request->getUri()->__toString())
210
            ) {
211
                $data = $this->getPushedResponse($request->getUri()->__toString());
212
                $response = (new ResponseBuilder($this->responseFactory))->getResponseFromRawInput($data['content'], $data['headerSize']);
213
                \call_user_func($options->get('callback'), $request, $response, null);
214
                unset($this->queue[$i]);
215
216
                continue;
217
            }
218
219 71
            $curl = $this->createHandle();
220 71
            $responseBuilder = $this->prepare($curl, $request, $options);
221 71
            $this->queue[$i][] = $curl;
222 71
            $this->queue[$i][] = $responseBuilder;
223 71
            curl_multi_add_handle($this->curlm, $curl);
224
        }
225
226 71
        $exception = null;
227
        do {
228
            // Start processing each handler in the stack
229 71
            $mrc = curl_multi_exec($this->curlm, $stillRunning);
230 71
        } while (CURLM_CALL_MULTI_PERFORM === $mrc);
231
232 71
        while ($info = curl_multi_info_read($this->curlm)) {
233
            // handle any completed requests
234 71
            if (CURLMSG_DONE !== $info['msg']) {
235
                continue;
236
            }
237
238 71
            $handled = false;
239 71
            foreach (array_keys($this->queue) as $i) {
240
                /** @var RequestInterface $request */
241
                /** @var ParameterBag $options */
242
                /** @var ResponseBuilder $responseBuilder */
243 71
                list($request, $options, $curl, $responseBuilder) = $this->queue[$i];
244
245
                // Try to find the correct handle from the queue.
246 71
                if ($curl !== $info['handle']) {
247 1
                    continue;
248
                }
249
250 71
                $handled = true;
251 71
                $response = null;
252
                try {
253 71
                    $this->parseError($request, $info['result'], $curl);
254 64
                    $response = $responseBuilder->getResponse();
255 64
                    if ($options->get('expose_curl_info', false)) {
256 64
                        $response = $response->withHeader('__curl_info', json_encode(curl_getinfo($curl)));
257
                    }
258 7
                } catch (\Throwable $e) {
259 7
                    if (null === $exception) {
260 7
                        $exception = $e;
261
                    }
262
                }
263
264
                // remove from queue
265 71
                curl_multi_remove_handle($this->curlm, $curl);
266 71
                $this->releaseHandle($curl);
267 71
                unset($this->queue[$i]);
268
269
                // callback
270 71
                \call_user_func($options->get('callback'), $request, $response, $exception);
271 66
                $exception = null;
272
            }
273
274 66
            if (!$handled) {
275
                // It must be a pushed response.
276
                $this->handlePushedResponse($info['handle']);
277
            }
278
        }
279
280
        // cleanup
281 70
        if (empty($this->queue)) {
282 66
            curl_multi_close($this->curlm);
283 66
            $this->curlm = null;
284 66
            $this->pushFunctions = [];
285 66
            $this->pushCb = [];
286
        }
287 70
    }
288
289
    private function addPushHandle($headers, $handle)
290
    {
291
        foreach ($headers as $header) {
292
            if (0 === strpos($header, ':path:')) {
293
                $path = substr($header, 6);
294
                $url = (string) curl_getinfo($handle)['url'];
295
                $url = str_replace(parse_url($url, PHP_URL_PATH) ?? '', $path, $url);
296
                $this->pushResponseHandles[$url] = $handle;
297
                break;
298
            }
299
        }
300
    }
301
302
    private function handlePushedResponse($handle)
303
    {
304
        $found = false;
305
        foreach ($this->pushResponseHandles as $url => $h) {
306
            // Weak comparison
307
            if ($handle == $h) {
308
                $found = $url;
309
            }
310
        }
311
312
        if (!$found) {
313
            $found = curl_getinfo($handle)['url'];
314
        }
315
316
        $content = curl_multi_getcontent($handle);
317
        // Check if we got some headers, if not, we do not bother to store it.
318
        if (0 !== $headerSize = curl_getinfo($handle, CURLINFO_HEADER_SIZE)) {
319
            $this->pushedResponses[$found] = ['content' => $content, 'headerSize' => $headerSize];
320
            unset($this->pushResponseHandles[$found]);
321
        }
322
    }
323
324
    private function hasPushResponse($url)
325
    {
326
        return isset($this->pushedResponses[$url]);
327
    }
328
329
    private function getPushedResponse($url)
330
    {
331
        $response = $this->pushedResponses[$url];
332
        unset($this->pushedResponses[$url]);
333
334
        return $response;
335
    }
336
337 71
    private function addToQueue(RequestInterface $request, ParameterBag $options): array
338
    {
339 71
        if (null !== $callback = $options->get('push_function_callback')) {
340 71
            $this->pushFunctions[] = $callback;
341
        }
342
343 71
        return $this->queue[] = [$request, $options];
344
    }
345
}
346