Passed
Push — master ( a1ca1b...704457 )
by Tobias
19:10 queued 17:29
created

MultiCurl::addToQueue()   A

Complexity

Conditions 2
Paths 2

Size

Total Lines 7
Code Lines 3

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 4
CRAP Score 2

Importance

Changes 1
Bugs 0 Features 0
Metric Value
cc 2
eloc 3
c 1
b 0
f 0
nc 2
nop 2
dl 0
loc 7
ccs 4
cts 4
cp 1
crap 2
rs 10
1
<?php
2
3
declare(strict_types=1);
4
5
namespace Buzz\Client;
6
7
use Buzz\Configuration\ParameterBag;
8
use Buzz\Exception\ClientException;
9
use Buzz\Message\ResponseBuilder;
10
use Psr\Http\Message\RequestInterface;
11
use Psr\Http\Message\ResponseInterface;
12
use Symfony\Component\OptionsResolver\OptionsResolver;
13
14
class MultiCurl extends AbstractCurl implements BatchClientInterface, BuzzClientInterface
15
{
16
    private $queue = [];
17
    private $curlm;
18
19
    /**
20
     * Raw responses that the server has pushed to us.
21
     *
22
     * @var array
23
     */
24
    private $pushedResponses = [];
25
26
    /**
27
     * Curl handlers with unprocessed pushed responses.
28
     *
29
     * @var array
30
     */
31
    private $pushResponseHandles = [];
32
33
    /**
34
     * Callbacks that decides if a pushed request should be accepted or not.
35
     *
36
     * @var array
37
     */
38
    private $pushFunctions = [];
39
40
    /**
41
     * @var bool
42
     */
43
    private $serverPushSupported = true;
44
45
    /**
46
     * To work around bugs in PHP and GC.
47
     *
48
     * @var array
49
     */
50
    private $pushCb = [];
51
52
    /**
53
     * {@inheritdoc}
54
     */
55 50
    public function __construct($responseFactory, array $options = [])
56
    {
57 50
        parent::__construct($responseFactory, $options);
58
59 49
        if (\PHP_VERSION_ID < 70215 || \PHP_VERSION_ID === 70300 || \PHP_VERSION_ID === 70301 || !(CURL_VERSION_HTTP2 & curl_version()['features'])) {
60
            // Dont use HTTP/2 push when it's unsupported or buggy, see https://bugs.php.net/76675
61
            $this->serverPushSupported = false;
62
        }
63 49
    }
64
65
    /**
66
     * Populates the supplied response with the response for the supplied request.
67
     *
68
     * If a "callback" option is supplied, its value will be called when the
69
     * request completes. It is ONLY in the callback you will see the response
70
     * or an exception.
71
     *
72
     * This is a non-blocking function call.
73
     *
74
     * The callable should have the following signature:
75
     *
76
     *     $callback = function($request, $response, $exception) {
77
     *         if (!$exception) {
78
     *             // success
79
     *         } else {
80
     *             // error ($error is one of the CURLE_* constants)
81
     *         }
82
     *     };
83
     */
84 13
    public function sendAsyncRequest(RequestInterface $request, array $options = []): void
85
    {
86 13
        $options = $this->validateOptions($options);
87
88 13
        $this->addToQueue($request, $options);
89 13
    }
90
91
    /**
92
     * This is a blocking function call.
93
     */
94 59
    public function sendRequest(RequestInterface $request, array $options = []): ResponseInterface
95
    {
96 59
        $options = $this->validateOptions($options);
97 58
        $originalCallback = $options->get('callback');
98 58
        $responseToReturn = null;
99
        $options = $options->add(['callback' => function (RequestInterface $request, ResponseInterface $response = null, ClientException $e = null) use (&$responseToReturn, $originalCallback) {
100 58
            $responseToReturn = $response;
101 58
            $originalCallback($request, $response, $e);
102
103 58
            if (null !== $e) {
104 5
                throw $e;
105
            }
106 58
        }]);
107
108 58
        $this->addToQueue($request, $options);
109 58
        $this->flush();
110
111 53
        return $responseToReturn;
112
    }
113
114 64
    protected function configureOptions(OptionsResolver $resolver): void
115
    {
116 50
        parent::configureOptions($resolver);
117
118
        $resolver->setDefault('callback', function (RequestInterface $request, ResponseInterface $response = null, ClientException $e = null) {
119 64
        });
120 50
        $resolver->setAllowedTypes('callback', 'callable');
121
122
        $resolver->setDefault('push_function_callback', function ($parent, $pushed, $headers) {
0 ignored issues
show
Unused Code introduced by
The parameter $pushed is not used and could be removed. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-unused  annotation

122
        $resolver->setDefault('push_function_callback', function ($parent, /** @scrutinizer ignore-unused */ $pushed, $headers) {

This check looks for parameters that have been defined for a function or method, but which are not used in the method body.

Loading history...
Unused Code introduced by
The parameter $headers is not used and could be removed. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-unused  annotation

122
        $resolver->setDefault('push_function_callback', function ($parent, $pushed, /** @scrutinizer ignore-unused */ $headers) {

This check looks for parameters that have been defined for a function or method, but which are not used in the method body.

Loading history...
Unused Code introduced by
The parameter $parent is not used and could be removed. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-unused  annotation

122
        $resolver->setDefault('push_function_callback', function (/** @scrutinizer ignore-unused */ $parent, $pushed, $headers) {

This check looks for parameters that have been defined for a function or method, but which are not used in the method body.

Loading history...
123
            return CURL_PUSH_OK;
124 50
        });
125 50
        $resolver->setAllowedTypes('push_function_callback', ['callable', 'null']);
126
127 50
        $resolver->setDefault('use_pushed_response', true);
128 50
        $resolver->setAllowedTypes('use_pushed_response', 'boolean');
129 50
    }
130
131
    public function count(): int
132
    {
133
        return \count($this->queue);
134
    }
135
136
    /**
137
     * Wait for all requests to finish.
138
     *
139
     * This is a blocking function call.
140
     *
141
     * This will not throw any exceptions. All exceptions are handled in the callback.
142
     */
143 70
    public function flush(): void
144
    {
145 70
        while (!empty($this->queue)) {
146 70
            $this->proceed();
147
        }
148 65
    }
149
150
    /**
151
     * See if any connection is ready to be processed.
152
     *
153
     * This is a non-blocking function call.
154
     *
155
     * @throws ClientException if we fail to initialized cUrl
156
     */
157 71
    public function proceed(): void
158
    {
159 71
        if (empty($this->queue)) {
160
            return;
161
        }
162
163 71
        if (!$this->curlm) {
164 71
            $this->initMultiCurlHandle();
165
        }
166
167 71
        $this->initQueue();
168 71
        $exception = null;
169
        do {
170
            // Start processing each handler in the stack
171 71
            $mrc = curl_multi_exec($this->curlm, $stillRunning);
172 71
        } while (CURLM_CALL_MULTI_PERFORM === $mrc);
173
174 71
        while ($info = curl_multi_info_read($this->curlm)) {
175
            // handle any completed requests
176 71
            if (CURLMSG_DONE !== $info['msg']) {
177
                continue;
178
            }
179
180 71
            $handled = false;
181
182
            /** @var RequestInterface $request */
183
            /** @var ParameterBag $options */
184
            /** @var ResponseBuilder $responseBuilder */
185 71
            foreach ($this->queue as $i => list($request, $options, $curl, $responseBuilder)) {
186
                // Try to find the correct handle from the queue.
187 71
                if ($curl !== $info['handle']) {
188 1
                    continue;
189
                }
190
191 71
                $handled = true;
192 71
                $response = null;
193
                try {
194 71
                    $this->parseError($request, $info['result'], $curl);
195 64
                    $response = $responseBuilder->getResponse();
196 64
                    if ($options->get('expose_curl_info', false)) {
197 64
                        $response = $response->withHeader('__curl_info', (string) json_encode(curl_getinfo($curl)));
198
                    }
199 7
                } catch (\Throwable $e) {
200 7
                    if (null === $exception) {
201 7
                        $exception = $e;
202
                    }
203
                }
204
205
                // remove from queue
206 71
                curl_multi_remove_handle($this->curlm, $curl);
207 71
                $this->releaseHandle($curl);
208 71
                unset($this->queue[$i]);
209
210
                // callback
211 71
                \call_user_func($options->get('callback'), $request, $response, $exception);
212 66
                $exception = null;
213
            }
214
215 66
            if (!$handled) {
216
                // It must be a pushed response.
217
                $this->handlePushedResponse($info['handle']);
218
            }
219
        }
220
221 70
        $this->cleanup();
222 70
    }
223
224
    private function addPushHandle($headers, $handle)
225
    {
226
        foreach ($headers as $header) {
227
            if (0 === strpos($header, ':path:')) {
228
                $path = substr($header, 6);
229
                $url = (string) curl_getinfo($handle)['url'];
230
                $url = str_replace((string) parse_url($url, PHP_URL_PATH), $path, $url);
231
                $this->pushResponseHandles[$url] = $handle;
232
                break;
233
            }
234
        }
235
    }
236
237
    private function handlePushedResponse($handle)
238
    {
239
        $found = false;
240
        foreach ($this->pushResponseHandles as $url => $h) {
241
            // Weak comparison
242
            if ($handle == $h) {
243
                $found = $url;
244
            }
245
        }
246
247
        if (!$found) {
248
            $found = curl_getinfo($handle)['url'];
249
        }
250
251
        $content = curl_multi_getcontent($handle);
252
        // Check if we got some headers, if not, we do not bother to store it.
253
        if (0 !== $headerSize = curl_getinfo($handle, CURLINFO_HEADER_SIZE)) {
254
            $this->pushedResponses[$found] = ['content' => $content, 'headerSize' => $headerSize];
255
            unset($this->pushResponseHandles[$found]);
256
        }
257
    }
258
259 71
    private function hasPushResponse($url)
260
    {
261 71
        return isset($this->pushedResponses[$url]);
262
    }
263
264
    private function getPushedResponse($url)
265
    {
266
        $response = $this->pushedResponses[$url];
267
        unset($this->pushedResponses[$url]);
268
269
        return $response;
270
    }
271
272 71
    private function addToQueue(RequestInterface $request, ParameterBag $options): array
273
    {
274 71
        if (null !== $callback = $options->get('push_function_callback')) {
275 71
            $this->pushFunctions[] = $callback;
276
        }
277
278 71
        return $this->queue[] = [$request, $options];
279
    }
280
281
    /**
282
     * Create a multi curl handle and add some properties to it.
283
     */
284 71
    private function initMultiCurlHandle(): void
285
    {
286 71
        $this->curlm = curl_multi_init();
287 71
        if (false === $this->curlm) {
288
            throw new ClientException('Unable to create a new cURL multi handle');
289
        }
290
291 71
        if ($this->serverPushSupported) {
292 71
            $userCallbacks = $this->pushFunctions;
293
294 71
            curl_multi_setopt($this->curlm, CURLMOPT_PIPELINING, CURLPIPE_MULTIPLEX);
295
            // We need to use $this->pushCb[] because of a bug in PHP
296 71
            curl_multi_setopt(
297 71
                $this->curlm,
298 71
                CURLMOPT_PUSHFUNCTION,
299
                $this->pushCb[] = function ($parent, $pushed, $headers) use ($userCallbacks) {
300
                    // If any callback say no, then do not accept.
301
                    foreach ($userCallbacks as $callback) {
302
                        if (CURL_PUSH_DENY === $callback($parent, $pushed, $headers)) {
303
                            return CURL_PUSH_DENY;
304
                        }
305
                    }
306
307
                    curl_setopt($pushed, CURLOPT_RETURNTRANSFER, true);
308
                    curl_setopt($pushed, CURLOPT_HEADER, true);
309
                    $this->addPushHandle($headers, $pushed);
310
311
                    return CURL_PUSH_OK;
312 71
                }
313
            );
314
        }
315 71
    }
316
317
    /**
318
     * Loop over the queue and make sure every item (request) is initialized (ie, got a handle).
319
     */
320 71
    private function initQueue(): void
321
    {
322 71
        foreach ($this->queue as $i => $queueItem) {
323 71
            if (2 !== \count($queueItem)) {
324
                // We have already prepared this curl
325 69
                continue;
326
            }
327
            // prepare curl handle
328
            /** @var RequestInterface $request */
329
            /** @var ParameterBag $options */
330 71
            list($request, $options) = $queueItem;
331
332
            // Check if we have the response in cache already.
333 71
            if ($this->serverPushSupported
334 71
                && $options->get('use_pushed_response')
335 71
                && $this->hasPushResponse($request->getUri()->__toString())
336
            ) {
337
                $data = $this->getPushedResponse($request->getUri()->__toString());
338
                $response = (new ResponseBuilder($this->responseFactory))->getResponseFromRawInput(
339
                    $data['content'],
340
                    $data['headerSize']
341
                );
342
                \call_user_func($options->get('callback'), $request, $response, null);
343
                unset($this->queue[$i]);
344
345
                continue;
346
            }
347
348 71
            $curl = $this->createHandle();
349 71
            $responseBuilder = $this->prepare($curl, $request, $options);
350 71
            $this->queue[$i][] = $curl;
351 71
            $this->queue[$i][] = $responseBuilder;
352 71
            curl_multi_add_handle($this->curlm, $curl);
353
        }
354 71
    }
355
356
    /**
357
     * If we got no requests in the queue, do a clean up to save some memory.
358
     */
359 70
    private function cleanup(): void
360
    {
361 70
        if (empty($this->queue)) {
362 66
            curl_multi_close($this->curlm);
363 66
            $this->curlm = null;
364 66
            $this->pushFunctions = [];
365 66
            $this->pushCb = [];
366
        }
367 70
    }
368
}
369