Issues (8)

lib/Client/MultiCurl.php (3 issues)

Severity
1
<?php
2
3
declare(strict_types=1);
4
5
namespace Buzz\Client;
6
7
use Buzz\Configuration\ParameterBag;
8
use Buzz\Exception\ClientException;
9
use Buzz\Message\ResponseBuilder;
10
use Psr\Http\Message\RequestInterface;
11
use Psr\Http\Message\ResponseInterface;
12
use Symfony\Component\OptionsResolver\OptionsResolver;
13
14
class MultiCurl extends AbstractCurl implements BatchClientInterface, BuzzClientInterface
15
{
16
    private $queue = [];
17
    private $curlm;
18
19
    /**
20
     * Raw responses that the server has pushed to us.
21
     *
22
     * @var array
23
     */
24
    private $pushedResponses = [];
25
26
    /**
27
     * Curl handlers with unprocessed pushed responses.
28
     *
29
     * @var array
30
     */
31
    private $pushResponseHandles = [];
32
33
    /**
34
     * Callbacks that decides if a pushed request should be accepted or not.
35
     *
36
     * @var array
37
     */
38
    private $pushFunctions = [];
39
40
    /**
41
     * @var bool
42
     */
43
    private $serverPushSupported = true;
44
45
    /**
46
     * To work around bugs in PHP and GC.
47
     *
48
     * @var array
49
     */
50
    private $pushCb = [];
51
52
    /**
53
     * {@inheritdoc}
54
     */
55 50
    public function __construct($responseFactory, array $options = [])
56
    {
57 50
        parent::__construct($responseFactory, $options);
58
59
        if (
60
            \PHP_VERSION_ID < 70215 ||
61
            \PHP_VERSION_ID === 70300 ||
62
            \PHP_VERSION_ID === 70301 ||
63
            \PHP_VERSION_ID >= 80000 ||
64 49
            !(CURL_VERSION_HTTP2 & curl_version()['features'])
65
        ) {
66
            // Dont use HTTP/2 push when it's unsupported or buggy, see https://bugs.php.net/76675
67
            $this->serverPushSupported = false;
68
        }
69 49
    }
70
71
    /**
72
     * Populates the supplied response with the response for the supplied request.
73
     *
74
     * If a "callback" option is supplied, its value will be called when the
75
     * request completes. It is ONLY in the callback you will see the response
76
     * or an exception.
77
     *
78
     * This is a non-blocking function call.
79
     *
80
     * The callable should have the following signature:
81
     *
82
     *     $callback = function($request, $response, $exception) {
83
     *         if (!$exception) {
84
     *             // success
85
     *         } else {
86
     *             // error ($error is one of the CURLE_* constants)
87
     *         }
88
     *     };
89
     */
90 13
    public function sendAsyncRequest(RequestInterface $request, array $options = []): void
91
    {
92 13
        $options = $this->validateOptions($options);
93
94 13
        $this->addToQueue($request, $options);
95 13
    }
96
97
    /**
98
     * This is a blocking function call.
99
     */
100 59
    public function sendRequest(RequestInterface $request, array $options = []): ResponseInterface
101
    {
102 59
        $options = $this->validateOptions($options);
103 58
        $originalCallback = $options->get('callback');
104 58
        $responseToReturn = null;
105 58
        $options = $options->add(['callback' => function (RequestInterface $request, ResponseInterface $response = null, ClientException $e = null) use (&$responseToReturn, $originalCallback) {
106 58
            $responseToReturn = $response;
107 58
            $originalCallback($request, $response, $e);
108
109 58
            if (null !== $e) {
110 5
                throw $e;
111
            }
112 58
        }]);
113
114 58
        $this->addToQueue($request, $options);
115 58
        $this->flush();
116
117 53
        return $responseToReturn;
118
    }
119
120 64
    protected function configureOptions(OptionsResolver $resolver): void
121
    {
122 50
        parent::configureOptions($resolver);
123
124 50
        $resolver->setDefault('callback', function (RequestInterface $request, ResponseInterface $response = null, ClientException $e = null) {
125 64
        });
126 50
        $resolver->setAllowedTypes('callback', 'callable');
127
128 50
        $resolver->setDefault('push_function_callback', function ($parent, $pushed, $headers) {
0 ignored issues
show
The parameter $headers is not used and could be removed. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-unused  annotation

128
        $resolver->setDefault('push_function_callback', function ($parent, $pushed, /** @scrutinizer ignore-unused */ $headers) {

This check looks for parameters that have been defined for a function or method, but which are not used in the method body.

Loading history...
The parameter $pushed is not used and could be removed. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-unused  annotation

128
        $resolver->setDefault('push_function_callback', function ($parent, /** @scrutinizer ignore-unused */ $pushed, $headers) {

This check looks for parameters that have been defined for a function or method, but which are not used in the method body.

Loading history...
The parameter $parent is not used and could be removed. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-unused  annotation

128
        $resolver->setDefault('push_function_callback', function (/** @scrutinizer ignore-unused */ $parent, $pushed, $headers) {

This check looks for parameters that have been defined for a function or method, but which are not used in the method body.

Loading history...
129
            return CURL_PUSH_OK;
130 50
        });
131 50
        $resolver->setAllowedTypes('push_function_callback', ['callable', 'null']);
132
133 50
        $resolver->setDefault('use_pushed_response', true);
134 50
        $resolver->setAllowedTypes('use_pushed_response', 'boolean');
135 50
    }
136
137
    public function count(): int
138
    {
139
        return \count($this->queue);
140
    }
141
142
    /**
143
     * Wait for all requests to finish.
144
     *
145
     * This is a blocking function call.
146
     *
147
     * This will not throw any exceptions. All exceptions are handled in the callback.
148
     */
149 70
    public function flush(): void
150
    {
151 70
        while (!empty($this->queue)) {
152 70
            $this->proceed();
153
        }
154 65
    }
155
156
    /**
157
     * See if any connection is ready to be processed.
158
     *
159
     * This is a non-blocking function call.
160
     *
161
     * @throws ClientException if we fail to initialized cUrl
162
     */
163 71
    public function proceed(): void
164
    {
165 71
        if (empty($this->queue)) {
166
            return;
167
        }
168
169 71
        if (!$this->curlm) {
170 71
            $this->initMultiCurlHandle();
171
        }
172
173 71
        $this->initQueue();
174 71
        $exception = null;
175
        do {
176
            // Start processing each handler in the stack
177 71
            $mrc = curl_multi_exec($this->curlm, $stillRunning);
178 71
        } while (CURLM_CALL_MULTI_PERFORM === $mrc);
179
180 71
        while ($info = curl_multi_info_read($this->curlm)) {
181
            // handle any completed requests
182 71
            if (CURLMSG_DONE !== $info['msg']) {
183
                continue;
184
            }
185
186 71
            $handled = false;
187
188
            /** @var RequestInterface $request */
189
            /** @var ParameterBag $options */
190
            /** @var ResponseBuilder $responseBuilder */
191 71
            foreach ($this->queue as $i => list($request, $options, $curl, $responseBuilder)) {
192
                // Try to find the correct handle from the queue.
193 71
                if ($curl !== $info['handle']) {
194 1
                    continue;
195
                }
196
197 71
                $handled = true;
198 71
                $response = null;
199
                try {
200 71
                    $this->parseError($request, $info['result'], $curl);
201 64
                    $response = $responseBuilder->getResponse();
202 64
                    if ($options->get('expose_curl_info', false)) {
203 64
                        $response = $response->withHeader('__curl_info', (string) json_encode(curl_getinfo($curl)));
204
                    }
205 7
                } catch (\Throwable $e) {
206 7
                    if (null === $exception) {
207 7
                        $exception = $e;
208
                    }
209
                }
210
211
                // remove from queue
212 71
                curl_multi_remove_handle($this->curlm, $curl);
213 71
                $this->releaseHandle($curl);
214 71
                unset($this->queue[$i]);
215
216
                // callback
217 71
                \call_user_func($options->get('callback'), $request, $response, $exception);
218 66
                $exception = null;
219
            }
220
221 66
            if (!$handled) {
222
                // It must be a pushed response.
223
                $this->handlePushedResponse($info['handle']);
224
            }
225
        }
226
227 70
        $this->cleanup();
228 70
    }
229
230
    private function addPushHandle($headers, $handle)
231
    {
232
        foreach ($headers as $header) {
233
            if (0 === strpos($header, ':path:')) {
234
                $path = substr($header, 6);
235
                $url = (string) curl_getinfo($handle)['url'];
236
                $url = str_replace((string) parse_url($url, PHP_URL_PATH), $path, $url);
237
                $this->pushResponseHandles[$url] = $handle;
238
                break;
239
            }
240
        }
241
    }
242
243
    private function handlePushedResponse($handle)
244
    {
245
        $found = false;
246
        foreach ($this->pushResponseHandles as $url => $h) {
247
            // Weak comparison
248
            if ($handle == $h) {
249
                $found = $url;
250
            }
251
        }
252
253
        if (!$found) {
254
            $found = curl_getinfo($handle)['url'];
255
        }
256
257
        $content = curl_multi_getcontent($handle);
258
        // Check if we got some headers, if not, we do not bother to store it.
259
        if (0 !== $headerSize = curl_getinfo($handle, CURLINFO_HEADER_SIZE)) {
260
            $this->pushedResponses[$found] = ['content' => $content, 'headerSize' => $headerSize];
261
            unset($this->pushResponseHandles[$found]);
262
        }
263
    }
264
265 71
    private function hasPushResponse($url)
266
    {
267 71
        return isset($this->pushedResponses[$url]);
268
    }
269
270
    private function getPushedResponse($url)
271
    {
272
        $response = $this->pushedResponses[$url];
273
        unset($this->pushedResponses[$url]);
274
275
        return $response;
276
    }
277
278 71
    private function addToQueue(RequestInterface $request, ParameterBag $options): array
279
    {
280 71
        if (null !== $callback = $options->get('push_function_callback')) {
281 71
            $this->pushFunctions[] = $callback;
282
        }
283
284 71
        return $this->queue[] = [$request, $options];
285
    }
286
287
    /**
288
     * Create a multi curl handle and add some properties to it.
289
     */
290 71
    private function initMultiCurlHandle(): void
291
    {
292 71
        $this->curlm = curl_multi_init();
293 71
        if (false === $this->curlm) {
294
            throw new ClientException('Unable to create a new cURL multi handle');
295
        }
296
297 71
        if ($this->serverPushSupported) {
298 71
            $userCallbacks = $this->pushFunctions;
299
300 71
            curl_multi_setopt($this->curlm, CURLMOPT_PIPELINING, CURLPIPE_MULTIPLEX);
301
            // We need to use $this->pushCb[] because of a bug in PHP
302 71
            curl_multi_setopt(
303 71
                $this->curlm,
304 71
                CURLMOPT_PUSHFUNCTION,
305 71
                $this->pushCb[] = function ($parent, $pushed, $headers) use ($userCallbacks) {
306
                    // If any callback say no, then do not accept.
307
                    foreach ($userCallbacks as $callback) {
308
                        if (CURL_PUSH_DENY === $callback($parent, $pushed, $headers)) {
309
                            return CURL_PUSH_DENY;
310
                        }
311
                    }
312
313
                    curl_setopt($pushed, CURLOPT_RETURNTRANSFER, true);
314
                    curl_setopt($pushed, CURLOPT_HEADER, true);
315
                    $this->addPushHandle($headers, $pushed);
316
317
                    return CURL_PUSH_OK;
318 71
                }
319
            );
320
        }
321 71
    }
322
323
    /**
324
     * Loop over the queue and make sure every item (request) is initialized (ie, got a handle).
325
     */
326 71
    private function initQueue(): void
327
    {
328 71
        foreach ($this->queue as $i => $queueItem) {
329 71
            if (2 !== \count($queueItem)) {
330
                // We have already prepared this curl
331 69
                continue;
332
            }
333
            // prepare curl handle
334
            /** @var RequestInterface $request */
335
            /** @var ParameterBag $options */
336 71
            list($request, $options) = $queueItem;
337
338
            // Check if we have the response in cache already.
339 71
            if ($this->serverPushSupported
340 71
                && $options->get('use_pushed_response')
341 71
                && $this->hasPushResponse($request->getUri()->__toString())
342
            ) {
343
                $data = $this->getPushedResponse($request->getUri()->__toString());
344
                $response = (new ResponseBuilder($this->responseFactory))->getResponseFromRawInput(
345
                    $data['content'],
346
                    $data['headerSize']
347
                );
348
                \call_user_func($options->get('callback'), $request, $response, null);
349
                unset($this->queue[$i]);
350
351
                continue;
352
            }
353
354 71
            $curl = $this->createHandle();
355 71
            $responseBuilder = $this->prepare($curl, $request, $options);
356 71
            $this->queue[$i][] = $curl;
357 71
            $this->queue[$i][] = $responseBuilder;
358 71
            curl_multi_add_handle($this->curlm, $curl);
359
        }
360 71
    }
361
362
    /**
363
     * If we got no requests in the queue, do a clean up to save some memory.
364
     */
365 70
    private function cleanup(): void
366
    {
367 70
        if (empty($this->queue)) {
368 66
            curl_multi_close($this->curlm);
369 66
            $this->curlm = null;
370 66
            $this->pushFunctions = [];
371 66
            $this->pushCb = [];
372
        }
373 70
    }
374
}
375