cerbero90 /
lazy-json-pages
| 1 | <?php |
||||
| 2 | |||||
| 3 | declare(strict_types=1); |
||||
| 4 | |||||
| 5 | namespace Cerbero\LazyJsonPages\Concerns; |
||||
| 6 | |||||
| 7 | use Generator; |
||||
| 8 | use GuzzleHttp\Pool; |
||||
| 9 | use Psr\Http\Message\RequestInterface; |
||||
| 10 | use Psr\Http\Message\ResponseInterface; |
||||
| 11 | use Throwable; |
||||
| 12 | |||||
| 13 | /** |
||||
| 14 | * The trait to send asynchronous HTTP requests. |
||||
| 15 | * |
||||
| 16 | * @property-read \Cerbero\LazyJsonPages\Sources\AnySource $source |
||||
| 17 | * @property-read \Cerbero\LazyJsonPages\Services\Book $book |
||||
| 18 | */ |
||||
| 19 | trait SendsAsyncRequests |
||||
| 20 | { |
||||
| 21 | use RespectsRateLimits; |
||||
| 22 | use RetriesHttpRequests; |
||||
| 23 | |||||
| 24 | /** |
||||
| 25 | * Fetch pages by sending asynchronous HTTP requests. |
||||
| 26 | * |
||||
| 27 | * @return Generator<int, ResponseInterface> |
||||
| 28 | */ |
||||
| 29 | 39 | protected function fetchPagesAsynchronously(int $totalPages): Generator |
|||
| 30 | { |
||||
| 31 | 39 | $request = clone $this->source->request(); |
|||
| 32 | 39 | $fromPage = $this->config->firstPage + 1; |
|||
| 33 | 39 | $toPage = $this->config->firstPage == 0 ? $totalPages - 1 : $totalPages; |
|||
| 34 | |||||
| 35 | 39 | yield from $this->retry(function () use ($request, &$fromPage, $toPage) { |
|||
| 36 | 39 | foreach ($this->chunkRequestsBetweenPages($request, $fromPage, $toPage) as $requests) { |
|||
| 37 | 39 | yield from $this->pool($requests); |
|||
|
0 ignored issues
–
show
Bug
introduced
by
Loading history...
|
|||||
| 38 | } |
||||
| 39 | 39 | }); |
|||
| 40 | } |
||||
| 41 | |||||
| 42 | /** |
||||
| 43 | * Retrieve requests for the given pages in chunks. |
||||
| 44 | * |
||||
| 45 | * @return Generator<int, Generator<int, RequestInterface>> |
||||
| 46 | */ |
||||
| 47 | 39 | protected function chunkRequestsBetweenPages(RequestInterface $request, int &$fromPage, int $toPage): Generator |
|||
| 48 | { |
||||
| 49 | 39 | while ($fromPage <= $toPage) { |
|||
| 50 | 39 | yield $this->yieldRequestsBetweenPages($request, $fromPage, $toPage); |
|||
| 51 | |||||
| 52 | 37 | $this->respectRateLimits(); |
|||
| 53 | } |
||||
| 54 | } |
||||
| 55 | |||||
| 56 | /** |
||||
| 57 | * Yield the requests between the given pages. |
||||
| 58 | * |
||||
| 59 | * @return Generator<int, RequestInterface> |
||||
| 60 | */ |
||||
| 61 | 39 | protected function yieldRequestsBetweenPages(RequestInterface $request, int &$fromPage, int $toPage): Generator |
|||
| 62 | { |
||||
| 63 | 39 | $chunkSize = min($this->config->async, $this->config->rateLimits?->threshold() ?? INF); |
|||
| 64 | |||||
| 65 | 39 | for ($i = 0; $i < $chunkSize && $fromPage <= $toPage; $i++) { |
|||
| 66 | 39 | $page = $this->book->pullFailedPage() ?? $fromPage++; |
|||
| 67 | |||||
| 68 | 39 | yield $page => $request->withUri($this->uriForPage($request->getUri(), (string) $page)); |
|||
|
0 ignored issues
–
show
It seems like
uriForPage() must be provided by classes using this trait. How about adding it as abstract method to this trait?
(
Ignorable by Annotation
)
If this is a false-positive, you can also ignore this issue in your code via the
Loading history...
|
|||||
| 69 | } |
||||
| 70 | } |
||||
| 71 | |||||
| 72 | /** |
||||
| 73 | * Send a pool of asynchronous requests. |
||||
| 74 | * |
||||
| 75 | * @param Generator<int, RequestInterface> $requests |
||||
| 76 | * @return Generator<int, ResponseInterface> |
||||
| 77 | * @throws Throwable |
||||
| 78 | */ |
||||
| 79 | 39 | protected function pool(Generator $requests): Generator |
|||
| 80 | { |
||||
| 81 | 39 | $exception = null; |
|||
| 82 | |||||
| 83 | 39 | $config = [ |
|||
| 84 | 39 | 'concurrency' => $this->config->async, |
|||
| 85 | 39 | 'fulfilled' => fn(ResponseInterface $response, int $page) => $this->book->addPage($page, $response), |
|||
| 86 | 39 | 'rejected' => function (Throwable $e, int $page) use (&$exception) { |
|||
| 87 | 1 | $this->book->addFailedPage($page); |
|||
| 88 | 1 | $exception = $e; |
|||
| 89 | 39 | }, |
|||
| 90 | 39 | ]; |
|||
| 91 | |||||
| 92 | 39 | (new Pool($this->client, $requests, $config))->promise()->wait(); |
|||
| 93 | |||||
| 94 | 38 | if (isset($exception)) { |
|||
| 95 | 1 | throw $exception; |
|||
| 96 | } |
||||
| 97 | |||||
| 98 | 37 | yield from $this->book->pullPages(); |
|||
| 99 | } |
||||
| 100 | } |
||||
| 101 |