kriswallsmith /
Buzz
| 1 | <?php |
||||||
| 2 | |||||||
| 3 | declare(strict_types=1); |
||||||
| 4 | |||||||
| 5 | namespace Buzz\Client; |
||||||
| 6 | |||||||
| 7 | use Buzz\Configuration\ParameterBag; |
||||||
| 8 | use Buzz\Exception\ClientException; |
||||||
| 9 | use Buzz\Message\ResponseBuilder; |
||||||
| 10 | use Psr\Http\Message\RequestInterface; |
||||||
| 11 | use Psr\Http\Message\ResponseInterface; |
||||||
| 12 | use Symfony\Component\OptionsResolver\OptionsResolver; |
||||||
| 13 | |||||||
| 14 | class MultiCurl extends AbstractCurl implements BatchClientInterface, BuzzClientInterface |
||||||
| 15 | { |
||||||
| 16 | private $queue = []; |
||||||
| 17 | private $curlm; |
||||||
| 18 | |||||||
| 19 | /** |
||||||
| 20 | * Raw responses that the server has pushed to us. |
||||||
| 21 | * |
||||||
| 22 | * @var array |
||||||
| 23 | */ |
||||||
| 24 | private $pushedResponses = []; |
||||||
| 25 | |||||||
| 26 | /** |
||||||
| 27 | * Curl handlers with unprocessed pushed responses. |
||||||
| 28 | * |
||||||
| 29 | * @var array |
||||||
| 30 | */ |
||||||
| 31 | private $pushResponseHandles = []; |
||||||
| 32 | |||||||
| 33 | /** |
||||||
| 34 | * Callbacks that decides if a pushed request should be accepted or not. |
||||||
| 35 | * |
||||||
| 36 | * @var array |
||||||
| 37 | */ |
||||||
| 38 | private $pushFunctions = []; |
||||||
| 39 | |||||||
| 40 | /** |
||||||
| 41 | * @var bool |
||||||
| 42 | */ |
||||||
| 43 | private $serverPushSupported = true; |
||||||
| 44 | |||||||
| 45 | /** |
||||||
| 46 | * To work around bugs in PHP and GC. |
||||||
| 47 | * |
||||||
| 48 | * @var array |
||||||
| 49 | */ |
||||||
| 50 | private $pushCb = []; |
||||||
| 51 | |||||||
| 52 | /** |
||||||
| 53 | * {@inheritdoc} |
||||||
| 54 | */ |
||||||
| 55 | 50 | public function __construct($responseFactory, array $options = []) |
|||||
| 56 | { |
||||||
| 57 | 50 | parent::__construct($responseFactory, $options); |
|||||
| 58 | |||||||
| 59 | if ( |
||||||
| 60 | \PHP_VERSION_ID < 70215 || |
||||||
| 61 | \PHP_VERSION_ID === 70300 || |
||||||
| 62 | \PHP_VERSION_ID === 70301 || |
||||||
| 63 | \PHP_VERSION_ID >= 80000 || |
||||||
| 64 | 49 | !(CURL_VERSION_HTTP2 & curl_version()['features']) |
|||||
| 65 | ) { |
||||||
| 66 | // Dont use HTTP/2 push when it's unsupported or buggy, see https://bugs.php.net/76675 |
||||||
| 67 | $this->serverPushSupported = false; |
||||||
| 68 | } |
||||||
| 69 | 49 | } |
|||||
| 70 | |||||||
| 71 | /** |
||||||
| 72 | * Populates the supplied response with the response for the supplied request. |
||||||
| 73 | * |
||||||
| 74 | * If a "callback" option is supplied, its value will be called when the |
||||||
| 75 | * request completes. It is ONLY in the callback you will see the response |
||||||
| 76 | * or an exception. |
||||||
| 77 | * |
||||||
| 78 | * This is a non-blocking function call. |
||||||
| 79 | * |
||||||
| 80 | * The callable should have the following signature: |
||||||
| 81 | * |
||||||
| 82 | * $callback = function($request, $response, $exception) { |
||||||
| 83 | * if (!$exception) { |
||||||
| 84 | * // success |
||||||
| 85 | * } else { |
||||||
| 86 | * // error ($error is one of the CURLE_* constants) |
||||||
| 87 | * } |
||||||
| 88 | * }; |
||||||
| 89 | */ |
||||||
| 90 | 13 | public function sendAsyncRequest(RequestInterface $request, array $options = []): void |
|||||
| 91 | { |
||||||
| 92 | 13 | $options = $this->validateOptions($options); |
|||||
| 93 | |||||||
| 94 | 13 | $this->addToQueue($request, $options); |
|||||
| 95 | 13 | } |
|||||
| 96 | |||||||
| 97 | /** |
||||||
| 98 | * This is a blocking function call. |
||||||
| 99 | */ |
||||||
| 100 | 59 | public function sendRequest(RequestInterface $request, array $options = []): ResponseInterface |
|||||
| 101 | { |
||||||
| 102 | 59 | $options = $this->validateOptions($options); |
|||||
| 103 | 58 | $originalCallback = $options->get('callback'); |
|||||
| 104 | 58 | $responseToReturn = null; |
|||||
| 105 | 58 | $options = $options->add(['callback' => function (RequestInterface $request, ResponseInterface $response = null, ClientException $e = null) use (&$responseToReturn, $originalCallback) { |
|||||
| 106 | 58 | $responseToReturn = $response; |
|||||
| 107 | 58 | $originalCallback($request, $response, $e); |
|||||
| 108 | |||||||
| 109 | 58 | if (null !== $e) { |
|||||
| 110 | 5 | throw $e; |
|||||
| 111 | } |
||||||
| 112 | 58 | }]); |
|||||
| 113 | |||||||
| 114 | 58 | $this->addToQueue($request, $options); |
|||||
| 115 | 58 | $this->flush(); |
|||||
| 116 | |||||||
| 117 | 53 | return $responseToReturn; |
|||||
| 118 | } |
||||||
| 119 | |||||||
| 120 | 64 | protected function configureOptions(OptionsResolver $resolver): void |
|||||
| 121 | { |
||||||
| 122 | 50 | parent::configureOptions($resolver); |
|||||
| 123 | |||||||
| 124 | 50 | $resolver->setDefault('callback', function (RequestInterface $request, ResponseInterface $response = null, ClientException $e = null) { |
|||||
| 125 | 64 | }); |
|||||
| 126 | 50 | $resolver->setAllowedTypes('callback', 'callable'); |
|||||
| 127 | |||||||
| 128 | 50 | $resolver->setDefault('push_function_callback', function ($parent, $pushed, $headers) { |
|||||
|
0 ignored issues
–
show
The parameter
$pushed is not used and could be removed.
(
Ignorable by Annotation
)
If this is a false-positive, you can also ignore this issue in your code via the
This check looks for parameters that have been defined for a function or method, but which are not used in the method body. Loading history...
The parameter
$parent is not used and could be removed.
(
Ignorable by Annotation
)
If this is a false-positive, you can also ignore this issue in your code via the
This check looks for parameters that have been defined for a function or method, but which are not used in the method body. Loading history...
|
|||||||
| 129 | return CURL_PUSH_OK; |
||||||
| 130 | 50 | }); |
|||||
| 131 | 50 | $resolver->setAllowedTypes('push_function_callback', ['callable', 'null']); |
|||||
| 132 | |||||||
| 133 | 50 | $resolver->setDefault('use_pushed_response', true); |
|||||
| 134 | 50 | $resolver->setAllowedTypes('use_pushed_response', 'boolean'); |
|||||
| 135 | 50 | } |
|||||
| 136 | |||||||
| 137 | public function count(): int |
||||||
| 138 | { |
||||||
| 139 | return \count($this->queue); |
||||||
| 140 | } |
||||||
| 141 | |||||||
| 142 | /** |
||||||
| 143 | * Wait for all requests to finish. |
||||||
| 144 | * |
||||||
| 145 | * This is a blocking function call. |
||||||
| 146 | * |
||||||
| 147 | * This will not throw any exceptions. All exceptions are handled in the callback. |
||||||
| 148 | */ |
||||||
| 149 | 70 | public function flush(): void |
|||||
| 150 | { |
||||||
| 151 | 70 | while (!empty($this->queue)) { |
|||||
| 152 | 70 | $this->proceed(); |
|||||
| 153 | } |
||||||
| 154 | 65 | } |
|||||
| 155 | |||||||
| 156 | /** |
||||||
| 157 | * See if any connection is ready to be processed. |
||||||
| 158 | * |
||||||
| 159 | * This is a non-blocking function call. |
||||||
| 160 | * |
||||||
| 161 | * @throws ClientException if we fail to initialized cUrl |
||||||
| 162 | */ |
||||||
| 163 | 71 | public function proceed(): void |
|||||
| 164 | { |
||||||
| 165 | 71 | if (empty($this->queue)) { |
|||||
| 166 | return; |
||||||
| 167 | } |
||||||
| 168 | |||||||
| 169 | 71 | if (!$this->curlm) { |
|||||
| 170 | 71 | $this->initMultiCurlHandle(); |
|||||
| 171 | } |
||||||
| 172 | |||||||
| 173 | 71 | $this->initQueue(); |
|||||
| 174 | 71 | $exception = null; |
|||||
| 175 | do { |
||||||
| 176 | // Start processing each handler in the stack |
||||||
| 177 | 71 | $mrc = curl_multi_exec($this->curlm, $stillRunning); |
|||||
| 178 | 71 | } while (CURLM_CALL_MULTI_PERFORM === $mrc); |
|||||
| 179 | |||||||
| 180 | 71 | while ($info = curl_multi_info_read($this->curlm)) { |
|||||
| 181 | // handle any completed requests |
||||||
| 182 | 71 | if (CURLMSG_DONE !== $info['msg']) { |
|||||
| 183 | continue; |
||||||
| 184 | } |
||||||
| 185 | |||||||
| 186 | 71 | $handled = false; |
|||||
| 187 | |||||||
| 188 | /** @var RequestInterface $request */ |
||||||
| 189 | /** @var ParameterBag $options */ |
||||||
| 190 | /** @var ResponseBuilder $responseBuilder */ |
||||||
| 191 | 71 | foreach ($this->queue as $i => list($request, $options, $curl, $responseBuilder)) { |
|||||
| 192 | // Try to find the correct handle from the queue. |
||||||
| 193 | 71 | if ($curl !== $info['handle']) { |
|||||
| 194 | 1 | continue; |
|||||
| 195 | } |
||||||
| 196 | |||||||
| 197 | 71 | $handled = true; |
|||||
| 198 | 71 | $response = null; |
|||||
| 199 | try { |
||||||
| 200 | 71 | $this->parseError($request, $info['result'], $curl); |
|||||
| 201 | 64 | $response = $responseBuilder->getResponse(); |
|||||
| 202 | 64 | if ($options->get('expose_curl_info', false)) { |
|||||
| 203 | 64 | $response = $response->withHeader('__curl_info', (string) json_encode(curl_getinfo($curl))); |
|||||
| 204 | } |
||||||
| 205 | 7 | } catch (\Throwable $e) { |
|||||
| 206 | 7 | if (null === $exception) { |
|||||
| 207 | 7 | $exception = $e; |
|||||
| 208 | } |
||||||
| 209 | } |
||||||
| 210 | |||||||
| 211 | // remove from queue |
||||||
| 212 | 71 | curl_multi_remove_handle($this->curlm, $curl); |
|||||
| 213 | 71 | $this->releaseHandle($curl); |
|||||
| 214 | 71 | unset($this->queue[$i]); |
|||||
| 215 | |||||||
| 216 | // callback |
||||||
| 217 | 71 | \call_user_func($options->get('callback'), $request, $response, $exception); |
|||||
| 218 | 66 | $exception = null; |
|||||
| 219 | } |
||||||
| 220 | |||||||
| 221 | 66 | if (!$handled) { |
|||||
| 222 | // It must be a pushed response. |
||||||
| 223 | $this->handlePushedResponse($info['handle']); |
||||||
| 224 | } |
||||||
| 225 | } |
||||||
| 226 | |||||||
| 227 | 70 | $this->cleanup(); |
|||||
| 228 | 70 | } |
|||||
| 229 | |||||||
| 230 | private function addPushHandle($headers, $handle) |
||||||
| 231 | { |
||||||
| 232 | foreach ($headers as $header) { |
||||||
| 233 | if (0 === strpos($header, ':path:')) { |
||||||
| 234 | $path = substr($header, 6); |
||||||
| 235 | $url = (string) curl_getinfo($handle)['url']; |
||||||
| 236 | $url = str_replace((string) parse_url($url, PHP_URL_PATH), $path, $url); |
||||||
| 237 | $this->pushResponseHandles[$url] = $handle; |
||||||
| 238 | break; |
||||||
| 239 | } |
||||||
| 240 | } |
||||||
| 241 | } |
||||||
| 242 | |||||||
| 243 | private function handlePushedResponse($handle) |
||||||
| 244 | { |
||||||
| 245 | $found = false; |
||||||
| 246 | foreach ($this->pushResponseHandles as $url => $h) { |
||||||
| 247 | // Weak comparison |
||||||
| 248 | if ($handle == $h) { |
||||||
| 249 | $found = $url; |
||||||
| 250 | } |
||||||
| 251 | } |
||||||
| 252 | |||||||
| 253 | if (!$found) { |
||||||
| 254 | $found = curl_getinfo($handle)['url']; |
||||||
| 255 | } |
||||||
| 256 | |||||||
| 257 | $content = curl_multi_getcontent($handle); |
||||||
| 258 | // Check if we got some headers, if not, we do not bother to store it. |
||||||
| 259 | if (0 !== $headerSize = curl_getinfo($handle, CURLINFO_HEADER_SIZE)) { |
||||||
| 260 | $this->pushedResponses[$found] = ['content' => $content, 'headerSize' => $headerSize]; |
||||||
| 261 | unset($this->pushResponseHandles[$found]); |
||||||
| 262 | } |
||||||
| 263 | } |
||||||
| 264 | |||||||
| 265 | 71 | private function hasPushResponse($url) |
|||||
| 266 | { |
||||||
| 267 | 71 | return isset($this->pushedResponses[$url]); |
|||||
| 268 | } |
||||||
| 269 | |||||||
| 270 | private function getPushedResponse($url) |
||||||
| 271 | { |
||||||
| 272 | $response = $this->pushedResponses[$url]; |
||||||
| 273 | unset($this->pushedResponses[$url]); |
||||||
| 274 | |||||||
| 275 | return $response; |
||||||
| 276 | } |
||||||
| 277 | |||||||
| 278 | 71 | private function addToQueue(RequestInterface $request, ParameterBag $options): array |
|||||
| 279 | { |
||||||
| 280 | 71 | if (null !== $callback = $options->get('push_function_callback')) { |
|||||
| 281 | 71 | $this->pushFunctions[] = $callback; |
|||||
| 282 | } |
||||||
| 283 | |||||||
| 284 | 71 | return $this->queue[] = [$request, $options]; |
|||||
| 285 | } |
||||||
| 286 | |||||||
| 287 | /** |
||||||
| 288 | * Create a multi curl handle and add some properties to it. |
||||||
| 289 | */ |
||||||
| 290 | 71 | private function initMultiCurlHandle(): void |
|||||
| 291 | { |
||||||
| 292 | 71 | $this->curlm = curl_multi_init(); |
|||||
| 293 | 71 | if (false === $this->curlm) { |
|||||
| 294 | throw new ClientException('Unable to create a new cURL multi handle'); |
||||||
| 295 | } |
||||||
| 296 | |||||||
| 297 | 71 | if ($this->serverPushSupported) { |
|||||
| 298 | 71 | $userCallbacks = $this->pushFunctions; |
|||||
| 299 | |||||||
| 300 | 71 | curl_multi_setopt($this->curlm, CURLMOPT_PIPELINING, CURLPIPE_MULTIPLEX); |
|||||
| 301 | // We need to use $this->pushCb[] because of a bug in PHP |
||||||
| 302 | 71 | curl_multi_setopt( |
|||||
| 303 | 71 | $this->curlm, |
|||||
| 304 | 71 | CURLMOPT_PUSHFUNCTION, |
|||||
| 305 | 71 | $this->pushCb[] = function ($parent, $pushed, $headers) use ($userCallbacks) { |
|||||
| 306 | // If any callback say no, then do not accept. |
||||||
| 307 | foreach ($userCallbacks as $callback) { |
||||||
| 308 | if (CURL_PUSH_DENY === $callback($parent, $pushed, $headers)) { |
||||||
| 309 | return CURL_PUSH_DENY; |
||||||
| 310 | } |
||||||
| 311 | } |
||||||
| 312 | |||||||
| 313 | curl_setopt($pushed, CURLOPT_RETURNTRANSFER, true); |
||||||
| 314 | curl_setopt($pushed, CURLOPT_HEADER, true); |
||||||
| 315 | $this->addPushHandle($headers, $pushed); |
||||||
| 316 | |||||||
| 317 | return CURL_PUSH_OK; |
||||||
| 318 | 71 | } |
|||||
| 319 | ); |
||||||
| 320 | } |
||||||
| 321 | 71 | } |
|||||
| 322 | |||||||
| 323 | /** |
||||||
| 324 | * Loop over the queue and make sure every item (request) is initialized (ie, got a handle). |
||||||
| 325 | */ |
||||||
| 326 | 71 | private function initQueue(): void |
|||||
| 327 | { |
||||||
| 328 | 71 | foreach ($this->queue as $i => $queueItem) { |
|||||
| 329 | 71 | if (2 !== \count($queueItem)) { |
|||||
| 330 | // We have already prepared this curl |
||||||
| 331 | 69 | continue; |
|||||
| 332 | } |
||||||
| 333 | // prepare curl handle |
||||||
| 334 | /** @var RequestInterface $request */ |
||||||
| 335 | /** @var ParameterBag $options */ |
||||||
| 336 | 71 | list($request, $options) = $queueItem; |
|||||
| 337 | |||||||
| 338 | // Check if we have the response in cache already. |
||||||
| 339 | 71 | if ($this->serverPushSupported |
|||||
| 340 | 71 | && $options->get('use_pushed_response') |
|||||
| 341 | 71 | && $this->hasPushResponse($request->getUri()->__toString()) |
|||||
| 342 | ) { |
||||||
| 343 | $data = $this->getPushedResponse($request->getUri()->__toString()); |
||||||
| 344 | $response = (new ResponseBuilder($this->responseFactory))->getResponseFromRawInput( |
||||||
| 345 | $data['content'], |
||||||
| 346 | $data['headerSize'] |
||||||
| 347 | ); |
||||||
| 348 | \call_user_func($options->get('callback'), $request, $response, null); |
||||||
| 349 | unset($this->queue[$i]); |
||||||
| 350 | |||||||
| 351 | continue; |
||||||
| 352 | } |
||||||
| 353 | |||||||
| 354 | 71 | $curl = $this->createHandle(); |
|||||
| 355 | 71 | $responseBuilder = $this->prepare($curl, $request, $options); |
|||||
| 356 | 71 | $this->queue[$i][] = $curl; |
|||||
| 357 | 71 | $this->queue[$i][] = $responseBuilder; |
|||||
| 358 | 71 | curl_multi_add_handle($this->curlm, $curl); |
|||||
| 359 | } |
||||||
| 360 | 71 | } |
|||||
| 361 | |||||||
| 362 | /** |
||||||
| 363 | * If we got no requests in the queue, do a clean up to save some memory. |
||||||
| 364 | */ |
||||||
| 365 | 70 | private function cleanup(): void |
|||||
| 366 | { |
||||||
| 367 | 70 | if (empty($this->queue)) { |
|||||
| 368 | 66 | curl_multi_close($this->curlm); |
|||||
| 369 | 66 | $this->curlm = null; |
|||||
| 370 | 66 | $this->pushFunctions = []; |
|||||
| 371 | 66 | $this->pushCb = []; |
|||||
| 372 | } |
||||||
| 373 | 70 | } |
|||||
| 374 | } |
||||||
| 375 |
This check looks for parameters that have been defined for a function or method, but which are not used in the method body.