1 | <?php |
||||||
2 | |||||||
3 | declare(strict_types=1); |
||||||
4 | |||||||
5 | namespace Buzz\Client; |
||||||
6 | |||||||
7 | use Buzz\Configuration\ParameterBag; |
||||||
8 | use Buzz\Exception\ClientException; |
||||||
9 | use Buzz\Message\ResponseBuilder; |
||||||
10 | use Psr\Http\Message\RequestInterface; |
||||||
11 | use Psr\Http\Message\ResponseInterface; |
||||||
12 | use Symfony\Component\OptionsResolver\OptionsResolver; |
||||||
13 | |||||||
14 | class MultiCurl extends AbstractCurl implements BatchClientInterface, BuzzClientInterface |
||||||
15 | { |
||||||
16 | private $queue = []; |
||||||
17 | private $curlm; |
||||||
18 | |||||||
19 | /** |
||||||
20 | * Raw responses that the server has pushed to us. |
||||||
21 | * |
||||||
22 | * @var array |
||||||
23 | */ |
||||||
24 | private $pushedResponses = []; |
||||||
25 | |||||||
26 | /** |
||||||
27 | * Curl handlers with unprocessed pushed responses. |
||||||
28 | * |
||||||
29 | * @var array |
||||||
30 | */ |
||||||
31 | private $pushResponseHandles = []; |
||||||
32 | |||||||
33 | /** |
||||||
34 | * Callbacks that decides if a pushed request should be accepted or not. |
||||||
35 | * |
||||||
36 | * @var array |
||||||
37 | */ |
||||||
38 | private $pushFunctions = []; |
||||||
39 | |||||||
40 | /** |
||||||
41 | * @var bool |
||||||
42 | */ |
||||||
43 | private $serverPushSupported = true; |
||||||
44 | |||||||
45 | /** |
||||||
46 | * To work around bugs in PHP and GC. |
||||||
47 | * |
||||||
48 | * @var array |
||||||
49 | */ |
||||||
50 | private $pushCb = []; |
||||||
51 | |||||||
52 | /** |
||||||
53 | * {@inheritdoc} |
||||||
54 | */ |
||||||
55 | 50 | public function __construct($responseFactory, array $options = []) |
|||||
56 | { |
||||||
57 | 50 | parent::__construct($responseFactory, $options); |
|||||
58 | |||||||
59 | if ( |
||||||
60 | \PHP_VERSION_ID < 70215 || |
||||||
61 | \PHP_VERSION_ID === 70300 || |
||||||
62 | \PHP_VERSION_ID === 70301 || |
||||||
63 | \PHP_VERSION_ID >= 80000 || |
||||||
64 | 49 | !(CURL_VERSION_HTTP2 & curl_version()['features']) |
|||||
65 | ) { |
||||||
66 | // Dont use HTTP/2 push when it's unsupported or buggy, see https://bugs.php.net/76675 |
||||||
67 | $this->serverPushSupported = false; |
||||||
68 | } |
||||||
69 | 49 | } |
|||||
70 | |||||||
71 | /** |
||||||
72 | * Populates the supplied response with the response for the supplied request. |
||||||
73 | * |
||||||
74 | * If a "callback" option is supplied, its value will be called when the |
||||||
75 | * request completes. It is ONLY in the callback you will see the response |
||||||
76 | * or an exception. |
||||||
77 | * |
||||||
78 | * This is a non-blocking function call. |
||||||
79 | * |
||||||
80 | * The callable should have the following signature: |
||||||
81 | * |
||||||
82 | * $callback = function($request, $response, $exception) { |
||||||
83 | * if (!$exception) { |
||||||
84 | * // success |
||||||
85 | * } else { |
||||||
86 | * // error ($error is one of the CURLE_* constants) |
||||||
87 | * } |
||||||
88 | * }; |
||||||
89 | */ |
||||||
90 | 13 | public function sendAsyncRequest(RequestInterface $request, array $options = []): void |
|||||
91 | { |
||||||
92 | 13 | $options = $this->validateOptions($options); |
|||||
93 | |||||||
94 | 13 | $this->addToQueue($request, $options); |
|||||
95 | 13 | } |
|||||
96 | |||||||
97 | /** |
||||||
98 | * This is a blocking function call. |
||||||
99 | */ |
||||||
100 | 59 | public function sendRequest(RequestInterface $request, array $options = []): ResponseInterface |
|||||
101 | { |
||||||
102 | 59 | $options = $this->validateOptions($options); |
|||||
103 | 58 | $originalCallback = $options->get('callback'); |
|||||
104 | 58 | $responseToReturn = null; |
|||||
105 | 58 | $options = $options->add(['callback' => function (RequestInterface $request, ResponseInterface $response = null, ClientException $e = null) use (&$responseToReturn, $originalCallback) { |
|||||
106 | 58 | $responseToReturn = $response; |
|||||
107 | 58 | $originalCallback($request, $response, $e); |
|||||
108 | |||||||
109 | 58 | if (null !== $e) { |
|||||
110 | 5 | throw $e; |
|||||
111 | } |
||||||
112 | 58 | }]); |
|||||
113 | |||||||
114 | 58 | $this->addToQueue($request, $options); |
|||||
115 | 58 | $this->flush(); |
|||||
116 | |||||||
117 | 53 | return $responseToReturn; |
|||||
118 | } |
||||||
119 | |||||||
120 | 64 | protected function configureOptions(OptionsResolver $resolver): void |
|||||
121 | { |
||||||
122 | 50 | parent::configureOptions($resolver); |
|||||
123 | |||||||
124 | 50 | $resolver->setDefault('callback', function (RequestInterface $request, ResponseInterface $response = null, ClientException $e = null) { |
|||||
125 | 64 | }); |
|||||
126 | 50 | $resolver->setAllowedTypes('callback', 'callable'); |
|||||
127 | |||||||
128 | 50 | $resolver->setDefault('push_function_callback', function ($parent, $pushed, $headers) { |
|||||
0 ignored issues
–
show
The parameter
$pushed is not used and could be removed.
(
Ignorable by Annotation
)
If this is a false-positive, you can also ignore this issue in your code via the
This check looks for parameters that have been defined for a function or method, but which are not used in the method body. ![]() The parameter
$parent is not used and could be removed.
(
Ignorable by Annotation
)
If this is a false-positive, you can also ignore this issue in your code via the
This check looks for parameters that have been defined for a function or method, but which are not used in the method body. ![]() |
|||||||
129 | return CURL_PUSH_OK; |
||||||
130 | 50 | }); |
|||||
131 | 50 | $resolver->setAllowedTypes('push_function_callback', ['callable', 'null']); |
|||||
132 | |||||||
133 | 50 | $resolver->setDefault('use_pushed_response', true); |
|||||
134 | 50 | $resolver->setAllowedTypes('use_pushed_response', 'boolean'); |
|||||
135 | 50 | } |
|||||
136 | |||||||
137 | public function count(): int |
||||||
138 | { |
||||||
139 | return \count($this->queue); |
||||||
140 | } |
||||||
141 | |||||||
142 | /** |
||||||
143 | * Wait for all requests to finish. |
||||||
144 | * |
||||||
145 | * This is a blocking function call. |
||||||
146 | * |
||||||
147 | * This will not throw any exceptions. All exceptions are handled in the callback. |
||||||
148 | */ |
||||||
149 | 70 | public function flush(): void |
|||||
150 | { |
||||||
151 | 70 | while (!empty($this->queue)) { |
|||||
152 | 70 | $this->proceed(); |
|||||
153 | } |
||||||
154 | 65 | } |
|||||
155 | |||||||
156 | /** |
||||||
157 | * See if any connection is ready to be processed. |
||||||
158 | * |
||||||
159 | * This is a non-blocking function call. |
||||||
160 | * |
||||||
161 | * @throws ClientException if we fail to initialized cUrl |
||||||
162 | */ |
||||||
163 | 71 | public function proceed(): void |
|||||
164 | { |
||||||
165 | 71 | if (empty($this->queue)) { |
|||||
166 | return; |
||||||
167 | } |
||||||
168 | |||||||
169 | 71 | if (!$this->curlm) { |
|||||
170 | 71 | $this->initMultiCurlHandle(); |
|||||
171 | } |
||||||
172 | |||||||
173 | 71 | $this->initQueue(); |
|||||
174 | 71 | $exception = null; |
|||||
175 | do { |
||||||
176 | // Start processing each handler in the stack |
||||||
177 | 71 | $mrc = curl_multi_exec($this->curlm, $stillRunning); |
|||||
178 | 71 | } while (CURLM_CALL_MULTI_PERFORM === $mrc); |
|||||
179 | |||||||
180 | 71 | while ($info = curl_multi_info_read($this->curlm)) { |
|||||
181 | // handle any completed requests |
||||||
182 | 71 | if (CURLMSG_DONE !== $info['msg']) { |
|||||
183 | continue; |
||||||
184 | } |
||||||
185 | |||||||
186 | 71 | $handled = false; |
|||||
187 | |||||||
188 | /** @var RequestInterface $request */ |
||||||
189 | /** @var ParameterBag $options */ |
||||||
190 | /** @var ResponseBuilder $responseBuilder */ |
||||||
191 | 71 | foreach ($this->queue as $i => list($request, $options, $curl, $responseBuilder)) { |
|||||
192 | // Try to find the correct handle from the queue. |
||||||
193 | 71 | if ($curl !== $info['handle']) { |
|||||
194 | 1 | continue; |
|||||
195 | } |
||||||
196 | |||||||
197 | 71 | $handled = true; |
|||||
198 | 71 | $response = null; |
|||||
199 | try { |
||||||
200 | 71 | $this->parseError($request, $info['result'], $curl); |
|||||
201 | 64 | $response = $responseBuilder->getResponse(); |
|||||
202 | 64 | if ($options->get('expose_curl_info', false)) { |
|||||
203 | 64 | $response = $response->withHeader('__curl_info', (string) json_encode(curl_getinfo($curl))); |
|||||
204 | } |
||||||
205 | 7 | } catch (\Throwable $e) { |
|||||
206 | 7 | if (null === $exception) { |
|||||
207 | 7 | $exception = $e; |
|||||
208 | } |
||||||
209 | } |
||||||
210 | |||||||
211 | // remove from queue |
||||||
212 | 71 | curl_multi_remove_handle($this->curlm, $curl); |
|||||
213 | 71 | $this->releaseHandle($curl); |
|||||
214 | 71 | unset($this->queue[$i]); |
|||||
215 | |||||||
216 | // callback |
||||||
217 | 71 | \call_user_func($options->get('callback'), $request, $response, $exception); |
|||||
218 | 66 | $exception = null; |
|||||
219 | } |
||||||
220 | |||||||
221 | 66 | if (!$handled) { |
|||||
222 | // It must be a pushed response. |
||||||
223 | $this->handlePushedResponse($info['handle']); |
||||||
224 | } |
||||||
225 | } |
||||||
226 | |||||||
227 | 70 | $this->cleanup(); |
|||||
228 | 70 | } |
|||||
229 | |||||||
230 | private function addPushHandle($headers, $handle) |
||||||
231 | { |
||||||
232 | foreach ($headers as $header) { |
||||||
233 | if (0 === strpos($header, ':path:')) { |
||||||
234 | $path = substr($header, 6); |
||||||
235 | $url = (string) curl_getinfo($handle)['url']; |
||||||
236 | $url = str_replace((string) parse_url($url, PHP_URL_PATH), $path, $url); |
||||||
237 | $this->pushResponseHandles[$url] = $handle; |
||||||
238 | break; |
||||||
239 | } |
||||||
240 | } |
||||||
241 | } |
||||||
242 | |||||||
243 | private function handlePushedResponse($handle) |
||||||
244 | { |
||||||
245 | $found = false; |
||||||
246 | foreach ($this->pushResponseHandles as $url => $h) { |
||||||
247 | // Weak comparison |
||||||
248 | if ($handle == $h) { |
||||||
249 | $found = $url; |
||||||
250 | } |
||||||
251 | } |
||||||
252 | |||||||
253 | if (!$found) { |
||||||
254 | $found = curl_getinfo($handle)['url']; |
||||||
255 | } |
||||||
256 | |||||||
257 | $content = curl_multi_getcontent($handle); |
||||||
258 | // Check if we got some headers, if not, we do not bother to store it. |
||||||
259 | if (0 !== $headerSize = curl_getinfo($handle, CURLINFO_HEADER_SIZE)) { |
||||||
260 | $this->pushedResponses[$found] = ['content' => $content, 'headerSize' => $headerSize]; |
||||||
261 | unset($this->pushResponseHandles[$found]); |
||||||
262 | } |
||||||
263 | } |
||||||
264 | |||||||
265 | 71 | private function hasPushResponse($url) |
|||||
266 | { |
||||||
267 | 71 | return isset($this->pushedResponses[$url]); |
|||||
268 | } |
||||||
269 | |||||||
270 | private function getPushedResponse($url) |
||||||
271 | { |
||||||
272 | $response = $this->pushedResponses[$url]; |
||||||
273 | unset($this->pushedResponses[$url]); |
||||||
274 | |||||||
275 | return $response; |
||||||
276 | } |
||||||
277 | |||||||
278 | 71 | private function addToQueue(RequestInterface $request, ParameterBag $options): array |
|||||
279 | { |
||||||
280 | 71 | if (null !== $callback = $options->get('push_function_callback')) { |
|||||
281 | 71 | $this->pushFunctions[] = $callback; |
|||||
282 | } |
||||||
283 | |||||||
284 | 71 | return $this->queue[] = [$request, $options]; |
|||||
285 | } |
||||||
286 | |||||||
287 | /** |
||||||
288 | * Create a multi curl handle and add some properties to it. |
||||||
289 | */ |
||||||
290 | 71 | private function initMultiCurlHandle(): void |
|||||
291 | { |
||||||
292 | 71 | $this->curlm = curl_multi_init(); |
|||||
293 | 71 | if (false === $this->curlm) { |
|||||
294 | throw new ClientException('Unable to create a new cURL multi handle'); |
||||||
295 | } |
||||||
296 | |||||||
297 | 71 | if ($this->serverPushSupported) { |
|||||
298 | 71 | $userCallbacks = $this->pushFunctions; |
|||||
299 | |||||||
300 | 71 | curl_multi_setopt($this->curlm, CURLMOPT_PIPELINING, CURLPIPE_MULTIPLEX); |
|||||
301 | // We need to use $this->pushCb[] because of a bug in PHP |
||||||
302 | 71 | curl_multi_setopt( |
|||||
303 | 71 | $this->curlm, |
|||||
304 | 71 | CURLMOPT_PUSHFUNCTION, |
|||||
305 | 71 | $this->pushCb[] = function ($parent, $pushed, $headers) use ($userCallbacks) { |
|||||
306 | // If any callback say no, then do not accept. |
||||||
307 | foreach ($userCallbacks as $callback) { |
||||||
308 | if (CURL_PUSH_DENY === $callback($parent, $pushed, $headers)) { |
||||||
309 | return CURL_PUSH_DENY; |
||||||
310 | } |
||||||
311 | } |
||||||
312 | |||||||
313 | curl_setopt($pushed, CURLOPT_RETURNTRANSFER, true); |
||||||
314 | curl_setopt($pushed, CURLOPT_HEADER, true); |
||||||
315 | $this->addPushHandle($headers, $pushed); |
||||||
316 | |||||||
317 | return CURL_PUSH_OK; |
||||||
318 | 71 | } |
|||||
319 | ); |
||||||
320 | } |
||||||
321 | 71 | } |
|||||
322 | |||||||
323 | /** |
||||||
324 | * Loop over the queue and make sure every item (request) is initialized (ie, got a handle). |
||||||
325 | */ |
||||||
326 | 71 | private function initQueue(): void |
|||||
327 | { |
||||||
328 | 71 | foreach ($this->queue as $i => $queueItem) { |
|||||
329 | 71 | if (2 !== \count($queueItem)) { |
|||||
330 | // We have already prepared this curl |
||||||
331 | 69 | continue; |
|||||
332 | } |
||||||
333 | // prepare curl handle |
||||||
334 | /** @var RequestInterface $request */ |
||||||
335 | /** @var ParameterBag $options */ |
||||||
336 | 71 | list($request, $options) = $queueItem; |
|||||
337 | |||||||
338 | // Check if we have the response in cache already. |
||||||
339 | 71 | if ($this->serverPushSupported |
|||||
340 | 71 | && $options->get('use_pushed_response') |
|||||
341 | 71 | && $this->hasPushResponse($request->getUri()->__toString()) |
|||||
342 | ) { |
||||||
343 | $data = $this->getPushedResponse($request->getUri()->__toString()); |
||||||
344 | $response = (new ResponseBuilder($this->responseFactory))->getResponseFromRawInput( |
||||||
345 | $data['content'], |
||||||
346 | $data['headerSize'] |
||||||
347 | ); |
||||||
348 | \call_user_func($options->get('callback'), $request, $response, null); |
||||||
349 | unset($this->queue[$i]); |
||||||
350 | |||||||
351 | continue; |
||||||
352 | } |
||||||
353 | |||||||
354 | 71 | $curl = $this->createHandle(); |
|||||
355 | 71 | $responseBuilder = $this->prepare($curl, $request, $options); |
|||||
356 | 71 | $this->queue[$i][] = $curl; |
|||||
357 | 71 | $this->queue[$i][] = $responseBuilder; |
|||||
358 | 71 | curl_multi_add_handle($this->curlm, $curl); |
|||||
359 | } |
||||||
360 | 71 | } |
|||||
361 | |||||||
362 | /** |
||||||
363 | * If we got no requests in the queue, do a clean up to save some memory. |
||||||
364 | */ |
||||||
365 | 70 | private function cleanup(): void |
|||||
366 | { |
||||||
367 | 70 | if (empty($this->queue)) { |
|||||
368 | 66 | curl_multi_close($this->curlm); |
|||||
369 | 66 | $this->curlm = null; |
|||||
370 | 66 | $this->pushFunctions = []; |
|||||
371 | 66 | $this->pushCb = []; |
|||||
372 | } |
||||||
373 | 70 | } |
|||||
374 | } |
||||||
375 |
This check looks for parameters that have been defined for a function or method, but which are not used in the method body.