Passed
Push — master ( a1ca1b...704457 )
by Tobias
19:10 queued 17:29
created

MultiCurl::proceed()   C

Complexity

Conditions 12
Paths 53

Size

Total Lines 65
Code Lines 34

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 30
CRAP Score 12.1081

Importance

Changes 2
Bugs 0 Features 0
Metric Value
cc 12
eloc 34
c 2
b 0
f 0
nc 53
nop 0
dl 0
loc 65
ccs 30
cts 33
cp 0.9091
crap 12.1081
rs 6.9666

How to fix   Long Method    Complexity   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

1
<?php
2
3
declare(strict_types=1);
4
5
namespace Buzz\Client;
6
7
use Buzz\Configuration\ParameterBag;
8
use Buzz\Exception\ClientException;
9
use Buzz\Message\ResponseBuilder;
10
use Psr\Http\Message\RequestInterface;
11
use Psr\Http\Message\ResponseInterface;
12
use Symfony\Component\OptionsResolver\OptionsResolver;
13
14
class MultiCurl extends AbstractCurl implements BatchClientInterface, BuzzClientInterface
15
{
16
    private $queue = [];
17
    private $curlm;
18
19
    /**
20
     * Raw responses that the server has pushed to us.
21
     *
22
     * @var array
23
     */
24
    private $pushedResponses = [];
25
26
    /**
27
     * Curl handlers with unprocessed pushed responses.
28
     *
29
     * @var array
30
     */
31
    private $pushResponseHandles = [];
32
33
    /**
34
     * Callbacks that decides if a pushed request should be accepted or not.
35
     *
36
     * @var array
37
     */
38
    private $pushFunctions = [];
39
40
    /**
41
     * @var bool
42
     */
43
    private $serverPushSupported = true;
44
45
    /**
46
     * To work around bugs in PHP and GC.
47
     *
48
     * @var array
49
     */
50
    private $pushCb = [];
51
52
    /**
53
     * {@inheritdoc}
54
     */
55 50
    public function __construct($responseFactory, array $options = [])
56
    {
57 50
        parent::__construct($responseFactory, $options);
58
59 49
        if (\PHP_VERSION_ID < 70215 || \PHP_VERSION_ID === 70300 || \PHP_VERSION_ID === 70301 || !(CURL_VERSION_HTTP2 & curl_version()['features'])) {
60
            // Dont use HTTP/2 push when it's unsupported or buggy, see https://bugs.php.net/76675
61
            $this->serverPushSupported = false;
62
        }
63 49
    }
64
65
    /**
66
     * Populates the supplied response with the response for the supplied request.
67
     *
68
     * If a "callback" option is supplied, its value will be called when the
69
     * request completes. It is ONLY in the callback you will see the response
70
     * or an exception.
71
     *
72
     * This is a non-blocking function call.
73
     *
74
     * The callable should have the following signature:
75
     *
76
     *     $callback = function($request, $response, $exception) {
77
     *         if (!$exception) {
78
     *             // success
79
     *         } else {
80
     *             // error ($error is one of the CURLE_* constants)
81
     *         }
82
     *     };
83
     */
84 13
    public function sendAsyncRequest(RequestInterface $request, array $options = []): void
85
    {
86 13
        $options = $this->validateOptions($options);
87
88 13
        $this->addToQueue($request, $options);
89 13
    }
90
91
    /**
92
     * This is a blocking function call.
93
     */
94 59
    public function sendRequest(RequestInterface $request, array $options = []): ResponseInterface
95
    {
96 59
        $options = $this->validateOptions($options);
97 58
        $originalCallback = $options->get('callback');
98 58
        $responseToReturn = null;
99
        $options = $options->add(['callback' => function (RequestInterface $request, ResponseInterface $response = null, ClientException $e = null) use (&$responseToReturn, $originalCallback) {
100 58
            $responseToReturn = $response;
101 58
            $originalCallback($request, $response, $e);
102
103 58
            if (null !== $e) {
104 5
                throw $e;
105
            }
106 58
        }]);
107
108 58
        $this->addToQueue($request, $options);
109 58
        $this->flush();
110
111 53
        return $responseToReturn;
112
    }
113
114 64
    protected function configureOptions(OptionsResolver $resolver): void
115
    {
116 50
        parent::configureOptions($resolver);
117
118
        $resolver->setDefault('callback', function (RequestInterface $request, ResponseInterface $response = null, ClientException $e = null) {
119 64
        });
120 50
        $resolver->setAllowedTypes('callback', 'callable');
121
122
        $resolver->setDefault('push_function_callback', function ($parent, $pushed, $headers) {
0 ignored issues
show
Unused Code introduced by
The parameter $pushed is not used and could be removed. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-unused  annotation

122
        $resolver->setDefault('push_function_callback', function ($parent, /** @scrutinizer ignore-unused */ $pushed, $headers) {

This check looks for parameters that have been defined for a function or method, but which are not used in the method body.

Loading history...
Unused Code introduced by
The parameter $headers is not used and could be removed. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-unused  annotation

122
        $resolver->setDefault('push_function_callback', function ($parent, $pushed, /** @scrutinizer ignore-unused */ $headers) {

This check looks for parameters that have been defined for a function or method, but which are not used in the method body.

Loading history...
Unused Code introduced by
The parameter $parent is not used and could be removed. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-unused  annotation

122
        $resolver->setDefault('push_function_callback', function (/** @scrutinizer ignore-unused */ $parent, $pushed, $headers) {

This check looks for parameters that have been defined for a function or method, but which are not used in the method body.

Loading history...
123
            return CURL_PUSH_OK;
124 50
        });
125 50
        $resolver->setAllowedTypes('push_function_callback', ['callable', 'null']);
126
127 50
        $resolver->setDefault('use_pushed_response', true);
128 50
        $resolver->setAllowedTypes('use_pushed_response', 'boolean');
129 50
    }
130
131
    public function count(): int
132
    {
133
        return \count($this->queue);
134
    }
135
136
    /**
137
     * Wait for all requests to finish.
138
     *
139
     * This is a blocking function call.
140
     *
141
     * This will not throw any exceptions. All exceptions are handled in the callback.
142
     */
143 70
    public function flush(): void
144
    {
145 70
        while (!empty($this->queue)) {
146 70
            $this->proceed();
147
        }
148 65
    }
149
150
    /**
151
     * See if any connection is ready to be processed.
152
     *
153
     * This is a non-blocking function call.
154
     *
155
     * @throws ClientException if we fail to initialized cUrl
156
     */
157 71
    public function proceed(): void
158
    {
159 71
        if (empty($this->queue)) {
160
            return;
161
        }
162
163 71
        if (!$this->curlm) {
164 71
            $this->initMultiCurlHandle();
165
        }
166
167 71
        $this->initQueue();
168 71
        $exception = null;
169
        do {
170
            // Start processing each handler in the stack
171 71
            $mrc = curl_multi_exec($this->curlm, $stillRunning);
172 71
        } while (CURLM_CALL_MULTI_PERFORM === $mrc);
173
174 71
        while ($info = curl_multi_info_read($this->curlm)) {
175
            // handle any completed requests
176 71
            if (CURLMSG_DONE !== $info['msg']) {
177
                continue;
178
            }
179
180 71
            $handled = false;
181
182
            /** @var RequestInterface $request */
183
            /** @var ParameterBag $options */
184
            /** @var ResponseBuilder $responseBuilder */
185 71
            foreach ($this->queue as $i => list($request, $options, $curl, $responseBuilder)) {
186
                // Try to find the correct handle from the queue.
187 71
                if ($curl !== $info['handle']) {
188 1
                    continue;
189
                }
190
191 71
                $handled = true;
192 71
                $response = null;
193
                try {
194 71
                    $this->parseError($request, $info['result'], $curl);
195 64
                    $response = $responseBuilder->getResponse();
196 64
                    if ($options->get('expose_curl_info', false)) {
197 64
                        $response = $response->withHeader('__curl_info', (string) json_encode(curl_getinfo($curl)));
198
                    }
199 7
                } catch (\Throwable $e) {
200 7
                    if (null === $exception) {
201 7
                        $exception = $e;
202
                    }
203
                }
204
205
                // remove from queue
206 71
                curl_multi_remove_handle($this->curlm, $curl);
207 71
                $this->releaseHandle($curl);
208 71
                unset($this->queue[$i]);
209
210
                // callback
211 71
                \call_user_func($options->get('callback'), $request, $response, $exception);
212 66
                $exception = null;
213
            }
214
215 66
            if (!$handled) {
216
                // It must be a pushed response.
217
                $this->handlePushedResponse($info['handle']);
218
            }
219
        }
220
221 70
        $this->cleanup();
222 70
    }
223
224
    private function addPushHandle($headers, $handle)
225
    {
226
        foreach ($headers as $header) {
227
            if (0 === strpos($header, ':path:')) {
228
                $path = substr($header, 6);
229
                $url = (string) curl_getinfo($handle)['url'];
230
                $url = str_replace((string) parse_url($url, PHP_URL_PATH), $path, $url);
231
                $this->pushResponseHandles[$url] = $handle;
232
                break;
233
            }
234
        }
235
    }
236
237
    private function handlePushedResponse($handle)
238
    {
239
        $found = false;
240
        foreach ($this->pushResponseHandles as $url => $h) {
241
            // Weak comparison
242
            if ($handle == $h) {
243
                $found = $url;
244
            }
245
        }
246
247
        if (!$found) {
248
            $found = curl_getinfo($handle)['url'];
249
        }
250
251
        $content = curl_multi_getcontent($handle);
252
        // Check if we got some headers, if not, we do not bother to store it.
253
        if (0 !== $headerSize = curl_getinfo($handle, CURLINFO_HEADER_SIZE)) {
254
            $this->pushedResponses[$found] = ['content' => $content, 'headerSize' => $headerSize];
255
            unset($this->pushResponseHandles[$found]);
256
        }
257
    }
258
259 71
    private function hasPushResponse($url)
260
    {
261 71
        return isset($this->pushedResponses[$url]);
262
    }
263
264
    private function getPushedResponse($url)
265
    {
266
        $response = $this->pushedResponses[$url];
267
        unset($this->pushedResponses[$url]);
268
269
        return $response;
270
    }
271
272 71
    private function addToQueue(RequestInterface $request, ParameterBag $options): array
273
    {
274 71
        if (null !== $callback = $options->get('push_function_callback')) {
275 71
            $this->pushFunctions[] = $callback;
276
        }
277
278 71
        return $this->queue[] = [$request, $options];
279
    }
280
281
    /**
282
     * Create a multi curl handle and add some properties to it.
283
     */
284 71
    private function initMultiCurlHandle(): void
285
    {
286 71
        $this->curlm = curl_multi_init();
287 71
        if (false === $this->curlm) {
288
            throw new ClientException('Unable to create a new cURL multi handle');
289
        }
290
291 71
        if ($this->serverPushSupported) {
292 71
            $userCallbacks = $this->pushFunctions;
293
294 71
            curl_multi_setopt($this->curlm, CURLMOPT_PIPELINING, CURLPIPE_MULTIPLEX);
295
            // We need to use $this->pushCb[] because of a bug in PHP
296 71
            curl_multi_setopt(
297 71
                $this->curlm,
298 71
                CURLMOPT_PUSHFUNCTION,
299
                $this->pushCb[] = function ($parent, $pushed, $headers) use ($userCallbacks) {
300
                    // If any callback say no, then do not accept.
301
                    foreach ($userCallbacks as $callback) {
302
                        if (CURL_PUSH_DENY === $callback($parent, $pushed, $headers)) {
303
                            return CURL_PUSH_DENY;
304
                        }
305
                    }
306
307
                    curl_setopt($pushed, CURLOPT_RETURNTRANSFER, true);
308
                    curl_setopt($pushed, CURLOPT_HEADER, true);
309
                    $this->addPushHandle($headers, $pushed);
310
311
                    return CURL_PUSH_OK;
312 71
                }
313
            );
314
        }
315 71
    }
316
317
    /**
318
     * Loop over the queue and make sure every item (request) is initialized (ie, got a handle).
319
     */
320 71
    private function initQueue(): void
321
    {
322 71
        foreach ($this->queue as $i => $queueItem) {
323 71
            if (2 !== \count($queueItem)) {
324
                // We have already prepared this curl
325 69
                continue;
326
            }
327
            // prepare curl handle
328
            /** @var RequestInterface $request */
329
            /** @var ParameterBag $options */
330 71
            list($request, $options) = $queueItem;
331
332
            // Check if we have the response in cache already.
333 71
            if ($this->serverPushSupported
334 71
                && $options->get('use_pushed_response')
335 71
                && $this->hasPushResponse($request->getUri()->__toString())
336
            ) {
337
                $data = $this->getPushedResponse($request->getUri()->__toString());
338
                $response = (new ResponseBuilder($this->responseFactory))->getResponseFromRawInput(
339
                    $data['content'],
340
                    $data['headerSize']
341
                );
342
                \call_user_func($options->get('callback'), $request, $response, null);
343
                unset($this->queue[$i]);
344
345
                continue;
346
            }
347
348 71
            $curl = $this->createHandle();
349 71
            $responseBuilder = $this->prepare($curl, $request, $options);
350 71
            $this->queue[$i][] = $curl;
351 71
            $this->queue[$i][] = $responseBuilder;
352 71
            curl_multi_add_handle($this->curlm, $curl);
353
        }
354 71
    }
355
356
    /**
357
     * If we got no requests in the queue, do a clean up to save some memory.
358
     */
359 70
    private function cleanup(): void
360
    {
361 70
        if (empty($this->queue)) {
362 66
            curl_multi_close($this->curlm);
363 66
            $this->curlm = null;
364 66
            $this->pushFunctions = [];
365 66
            $this->pushCb = [];
366
        }
367 70
    }
368
}
369