MultiCurl::proceed()   C
last analyzed

Complexity

Conditions 12
Paths 53

Size

Total Lines 65
Code Lines 34

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 30
CRAP Score 12.1081

Importance

Changes 2
Bugs 0 Features 0
Metric Value
cc 12
eloc 34
c 2
b 0
f 0
nc 53
nop 0
dl 0
loc 65
ccs 30
cts 33
cp 0.9091
crap 12.1081
rs 6.9666

How to fix   Long Method    Complexity   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

1
<?php
2
3
declare(strict_types=1);
4
5
namespace Buzz\Client;
6
7
use Buzz\Configuration\ParameterBag;
8
use Buzz\Exception\ClientException;
9
use Buzz\Message\ResponseBuilder;
10
use Psr\Http\Message\RequestInterface;
11
use Psr\Http\Message\ResponseInterface;
12
use Symfony\Component\OptionsResolver\OptionsResolver;
13
14
class MultiCurl extends AbstractCurl implements BatchClientInterface, BuzzClientInterface
15
{
16
    private $queue = [];
17
    private $curlm;
18
19
    /**
20
     * Raw responses that the server has pushed to us.
21
     *
22
     * @var array
23
     */
24
    private $pushedResponses = [];
25
26
    /**
27
     * Curl handlers with unprocessed pushed responses.
28
     *
29
     * @var array
30
     */
31
    private $pushResponseHandles = [];
32
33
    /**
34
     * Callbacks that decides if a pushed request should be accepted or not.
35
     *
36
     * @var array
37
     */
38
    private $pushFunctions = [];
39
40
    /**
41
     * @var bool
42
     */
43
    private $serverPushSupported = true;
44
45
    /**
46
     * To work around bugs in PHP and GC.
47
     *
48
     * @var array
49
     */
50
    private $pushCb = [];
51
52
    /**
53
     * {@inheritdoc}
54
     */
55 50
    public function __construct($responseFactory, array $options = [])
56
    {
57 50
        parent::__construct($responseFactory, $options);
58
59
        if (
60
            \PHP_VERSION_ID < 70215 ||
61
            \PHP_VERSION_ID === 70300 ||
62
            \PHP_VERSION_ID === 70301 ||
63
            \PHP_VERSION_ID >= 80000 ||
64 49
            !(CURL_VERSION_HTTP2 & curl_version()['features'])
65
        ) {
66
            // Dont use HTTP/2 push when it's unsupported or buggy, see https://bugs.php.net/76675
67
            $this->serverPushSupported = false;
68
        }
69 49
    }
70
71
    /**
72
     * Populates the supplied response with the response for the supplied request.
73
     *
74
     * If a "callback" option is supplied, its value will be called when the
75
     * request completes. It is ONLY in the callback you will see the response
76
     * or an exception.
77
     *
78
     * This is a non-blocking function call.
79
     *
80
     * The callable should have the following signature:
81
     *
82
     *     $callback = function($request, $response, $exception) {
83
     *         if (!$exception) {
84
     *             // success
85
     *         } else {
86
     *             // error ($error is one of the CURLE_* constants)
87
     *         }
88
     *     };
89
     */
90 13
    public function sendAsyncRequest(RequestInterface $request, array $options = []): void
91
    {
92 13
        $options = $this->validateOptions($options);
93
94 13
        $this->addToQueue($request, $options);
95 13
    }
96
97
    /**
98
     * This is a blocking function call.
99
     */
100 59
    public function sendRequest(RequestInterface $request, array $options = []): ResponseInterface
101
    {
102 59
        $options = $this->validateOptions($options);
103 58
        $originalCallback = $options->get('callback');
104 58
        $responseToReturn = null;
105 58
        $options = $options->add(['callback' => function (RequestInterface $request, ResponseInterface $response = null, ClientException $e = null) use (&$responseToReturn, $originalCallback) {
106 58
            $responseToReturn = $response;
107 58
            $originalCallback($request, $response, $e);
108
109 58
            if (null !== $e) {
110 5
                throw $e;
111
            }
112 58
        }]);
113
114 58
        $this->addToQueue($request, $options);
115 58
        $this->flush();
116
117 53
        return $responseToReturn;
118
    }
119
120 64
    protected function configureOptions(OptionsResolver $resolver): void
121
    {
122 50
        parent::configureOptions($resolver);
123
124 50
        $resolver->setDefault('callback', function (RequestInterface $request, ResponseInterface $response = null, ClientException $e = null) {
125 64
        });
126 50
        $resolver->setAllowedTypes('callback', 'callable');
127
128 50
        $resolver->setDefault('push_function_callback', function ($parent, $pushed, $headers) {
0 ignored issues
show
Unused Code introduced by
The parameter $headers is not used and could be removed. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-unused  annotation

128
        $resolver->setDefault('push_function_callback', function ($parent, $pushed, /** @scrutinizer ignore-unused */ $headers) {

This check looks for parameters that have been defined for a function or method, but which are not used in the method body.

Loading history...
Unused Code introduced by
The parameter $pushed is not used and could be removed. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-unused  annotation

128
        $resolver->setDefault('push_function_callback', function ($parent, /** @scrutinizer ignore-unused */ $pushed, $headers) {

This check looks for parameters that have been defined for a function or method, but which are not used in the method body.

Loading history...
Unused Code introduced by
The parameter $parent is not used and could be removed. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-unused  annotation

128
        $resolver->setDefault('push_function_callback', function (/** @scrutinizer ignore-unused */ $parent, $pushed, $headers) {

This check looks for parameters that have been defined for a function or method, but which are not used in the method body.

Loading history...
129
            return CURL_PUSH_OK;
130 50
        });
131 50
        $resolver->setAllowedTypes('push_function_callback', ['callable', 'null']);
132
133 50
        $resolver->setDefault('use_pushed_response', true);
134 50
        $resolver->setAllowedTypes('use_pushed_response', 'boolean');
135 50
    }
136
137
    public function count(): int
138
    {
139
        return \count($this->queue);
140
    }
141
142
    /**
143
     * Wait for all requests to finish.
144
     *
145
     * This is a blocking function call.
146
     *
147
     * This will not throw any exceptions. All exceptions are handled in the callback.
148
     */
149 70
    public function flush(): void
150
    {
151 70
        while (!empty($this->queue)) {
152 70
            $this->proceed();
153
        }
154 65
    }
155
156
    /**
157
     * See if any connection is ready to be processed.
158
     *
159
     * This is a non-blocking function call.
160
     *
161
     * @throws ClientException if we fail to initialized cUrl
162
     */
163 71
    public function proceed(): void
164
    {
165 71
        if (empty($this->queue)) {
166
            return;
167
        }
168
169 71
        if (!$this->curlm) {
170 71
            $this->initMultiCurlHandle();
171
        }
172
173 71
        $this->initQueue();
174 71
        $exception = null;
175
        do {
176
            // Start processing each handler in the stack
177 71
            $mrc = curl_multi_exec($this->curlm, $stillRunning);
178 71
        } while (CURLM_CALL_MULTI_PERFORM === $mrc);
179
180 71
        while ($info = curl_multi_info_read($this->curlm)) {
181
            // handle any completed requests
182 71
            if (CURLMSG_DONE !== $info['msg']) {
183
                continue;
184
            }
185
186 71
            $handled = false;
187
188
            /** @var RequestInterface $request */
189
            /** @var ParameterBag $options */
190
            /** @var ResponseBuilder $responseBuilder */
191 71
            foreach ($this->queue as $i => list($request, $options, $curl, $responseBuilder)) {
192
                // Try to find the correct handle from the queue.
193 71
                if ($curl !== $info['handle']) {
194 1
                    continue;
195
                }
196
197 71
                $handled = true;
198 71
                $response = null;
199
                try {
200 71
                    $this->parseError($request, $info['result'], $curl);
201 64
                    $response = $responseBuilder->getResponse();
202 64
                    if ($options->get('expose_curl_info', false)) {
203 64
                        $response = $response->withHeader('__curl_info', (string) json_encode(curl_getinfo($curl)));
204
                    }
205 7
                } catch (\Throwable $e) {
206 7
                    if (null === $exception) {
207 7
                        $exception = $e;
208
                    }
209
                }
210
211
                // remove from queue
212 71
                curl_multi_remove_handle($this->curlm, $curl);
213 71
                $this->releaseHandle($curl);
214 71
                unset($this->queue[$i]);
215
216
                // callback
217 71
                \call_user_func($options->get('callback'), $request, $response, $exception);
218 66
                $exception = null;
219
            }
220
221 66
            if (!$handled) {
222
                // It must be a pushed response.
223
                $this->handlePushedResponse($info['handle']);
224
            }
225
        }
226
227 70
        $this->cleanup();
228 70
    }
229
230
    private function addPushHandle($headers, $handle)
231
    {
232
        foreach ($headers as $header) {
233
            if (0 === strpos($header, ':path:')) {
234
                $path = substr($header, 6);
235
                $url = (string) curl_getinfo($handle)['url'];
236
                $url = str_replace((string) parse_url($url, PHP_URL_PATH), $path, $url);
237
                $this->pushResponseHandles[$url] = $handle;
238
                break;
239
            }
240
        }
241
    }
242
243
    private function handlePushedResponse($handle)
244
    {
245
        $found = false;
246
        foreach ($this->pushResponseHandles as $url => $h) {
247
            // Weak comparison
248
            if ($handle == $h) {
249
                $found = $url;
250
            }
251
        }
252
253
        if (!$found) {
254
            $found = curl_getinfo($handle)['url'];
255
        }
256
257
        $content = curl_multi_getcontent($handle);
258
        // Check if we got some headers, if not, we do not bother to store it.
259
        if (0 !== $headerSize = curl_getinfo($handle, CURLINFO_HEADER_SIZE)) {
260
            $this->pushedResponses[$found] = ['content' => $content, 'headerSize' => $headerSize];
261
            unset($this->pushResponseHandles[$found]);
262
        }
263
    }
264
265 71
    private function hasPushResponse($url)
266
    {
267 71
        return isset($this->pushedResponses[$url]);
268
    }
269
270
    private function getPushedResponse($url)
271
    {
272
        $response = $this->pushedResponses[$url];
273
        unset($this->pushedResponses[$url]);
274
275
        return $response;
276
    }
277
278 71
    private function addToQueue(RequestInterface $request, ParameterBag $options): array
279
    {
280 71
        if (null !== $callback = $options->get('push_function_callback')) {
281 71
            $this->pushFunctions[] = $callback;
282
        }
283
284 71
        return $this->queue[] = [$request, $options];
285
    }
286
287
    /**
288
     * Create a multi curl handle and add some properties to it.
289
     */
290 71
    private function initMultiCurlHandle(): void
291
    {
292 71
        $this->curlm = curl_multi_init();
293 71
        if (false === $this->curlm) {
294
            throw new ClientException('Unable to create a new cURL multi handle');
295
        }
296
297 71
        if ($this->serverPushSupported) {
298 71
            $userCallbacks = $this->pushFunctions;
299
300 71
            curl_multi_setopt($this->curlm, CURLMOPT_PIPELINING, CURLPIPE_MULTIPLEX);
301
            // We need to use $this->pushCb[] because of a bug in PHP
302 71
            curl_multi_setopt(
303 71
                $this->curlm,
304 71
                CURLMOPT_PUSHFUNCTION,
305 71
                $this->pushCb[] = function ($parent, $pushed, $headers) use ($userCallbacks) {
306
                    // If any callback say no, then do not accept.
307
                    foreach ($userCallbacks as $callback) {
308
                        if (CURL_PUSH_DENY === $callback($parent, $pushed, $headers)) {
309
                            return CURL_PUSH_DENY;
310
                        }
311
                    }
312
313
                    curl_setopt($pushed, CURLOPT_RETURNTRANSFER, true);
314
                    curl_setopt($pushed, CURLOPT_HEADER, true);
315
                    $this->addPushHandle($headers, $pushed);
316
317
                    return CURL_PUSH_OK;
318 71
                }
319
            );
320
        }
321 71
    }
322
323
    /**
324
     * Loop over the queue and make sure every item (request) is initialized (ie, got a handle).
325
     */
326 71
    private function initQueue(): void
327
    {
328 71
        foreach ($this->queue as $i => $queueItem) {
329 71
            if (2 !== \count($queueItem)) {
330
                // We have already prepared this curl
331 69
                continue;
332
            }
333
            // prepare curl handle
334
            /** @var RequestInterface $request */
335
            /** @var ParameterBag $options */
336 71
            list($request, $options) = $queueItem;
337
338
            // Check if we have the response in cache already.
339 71
            if ($this->serverPushSupported
340 71
                && $options->get('use_pushed_response')
341 71
                && $this->hasPushResponse($request->getUri()->__toString())
342
            ) {
343
                $data = $this->getPushedResponse($request->getUri()->__toString());
344
                $response = (new ResponseBuilder($this->responseFactory))->getResponseFromRawInput(
345
                    $data['content'],
346
                    $data['headerSize']
347
                );
348
                \call_user_func($options->get('callback'), $request, $response, null);
349
                unset($this->queue[$i]);
350
351
                continue;
352
            }
353
354 71
            $curl = $this->createHandle();
355 71
            $responseBuilder = $this->prepare($curl, $request, $options);
356 71
            $this->queue[$i][] = $curl;
357 71
            $this->queue[$i][] = $responseBuilder;
358 71
            curl_multi_add_handle($this->curlm, $curl);
359
        }
360 71
    }
361
362
    /**
363
     * If we got no requests in the queue, do a clean up to save some memory.
364
     */
365 70
    private function cleanup(): void
366
    {
367 70
        if (empty($this->queue)) {
368 66
            curl_multi_close($this->curlm);
369 66
            $this->curlm = null;
370 66
            $this->pushFunctions = [];
371 66
            $this->pushCb = [];
372
        }
373 70
    }
374
}
375