Issues (46)

src/Concerns/SendsAsyncRequests.php (2 issues)

1
<?php
2
3
declare(strict_types=1);
4
5
namespace Cerbero\LazyJsonPages\Concerns;
6
7
use Generator;
8
use GuzzleHttp\Pool;
9
use Psr\Http\Message\RequestInterface;
10
use Psr\Http\Message\ResponseInterface;
11
use Throwable;
12
13
/**
14
 * The trait to send asynchronous HTTP requests.
15
 *
16
 * @property-read \Cerbero\LazyJsonPages\Sources\AnySource $source
17
 * @property-read \Cerbero\LazyJsonPages\Services\Book $book
18
 */
19
trait SendsAsyncRequests
20
{
21
    use RespectsRateLimits;
22
    use RetriesHttpRequests;
23
24
    /**
25
     * Fetch pages by sending asynchronous HTTP requests.
26
     *
27
     * @return Generator<int, ResponseInterface>
28
     */
29 39
    protected function fetchPagesAsynchronously(int $totalPages): Generator
30
    {
31 39
        $request = clone $this->source->request();
32 39
        $fromPage = $this->config->firstPage + 1;
33 39
        $toPage = $this->config->firstPage == 0 ? $totalPages - 1 : $totalPages;
34
35 39
        yield from $this->retry(function () use ($request, &$fromPage, $toPage) {
36 39
            foreach ($this->chunkRequestsBetweenPages($request, $fromPage, $toPage) as $requests) {
37 39
                yield from $this->pool($requests);
0 ignored issues
show
$requests of type Psr\Http\Message\RequestInterface is incompatible with the type Generator expected by parameter $requests of Cerbero\LazyJsonPages\Co...dsAsyncRequests::pool(). ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-type  annotation

37
                yield from $this->pool(/** @scrutinizer ignore-type */ $requests);
Loading history...
38
            }
39 39
        });
40
    }
41
42
    /**
43
     * Retrieve requests for the given pages in chunks.
44
     *
45
     * @return Generator<int, Generator<int, RequestInterface>>
46
     */
47 39
    protected function chunkRequestsBetweenPages(RequestInterface $request, int &$fromPage, int $toPage): Generator
48
    {
49 39
        while ($fromPage <= $toPage) {
50 39
            yield $this->yieldRequestsBetweenPages($request, $fromPage, $toPage);
51
52 37
            $this->respectRateLimits();
53
        }
54
    }
55
56
    /**
57
     * Yield the requests between the given pages.
58
     *
59
     * @return Generator<int, RequestInterface>
60
     */
61 39
    protected function yieldRequestsBetweenPages(RequestInterface $request, int &$fromPage, int $toPage): Generator
62
    {
63 39
        $chunkSize = min($this->config->async, $this->config->rateLimits?->threshold() ?? INF);
64
65 39
        for ($i = 0; $i < $chunkSize && $fromPage <= $toPage; $i++) {
66 39
            $page = $this->book->pullFailedPage() ?? $fromPage++;
67
68 39
            yield $page => $request->withUri($this->uriForPage($request->getUri(), (string) $page));
0 ignored issues
show
It seems like uriForPage() must be provided by classes using this trait. How about adding it as abstract method to this trait? ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-call  annotation

68
            yield $page => $request->withUri($this->/** @scrutinizer ignore-call */ uriForPage($request->getUri(), (string) $page));
Loading history...
69
        }
70
    }
71
72
    /**
73
     * Send a pool of asynchronous requests.
74
     *
75
     * @param Generator<int, RequestInterface> $requests
76
     * @return Generator<int, ResponseInterface>
77
     * @throws Throwable
78
     */
79 39
    protected function pool(Generator $requests): Generator
80
    {
81 39
        $exception = null;
82
83 39
        $config = [
84 39
            'concurrency' => $this->config->async,
85 39
            'fulfilled' => fn(ResponseInterface $response, int $page) => $this->book->addPage($page, $response),
86 39
            'rejected' => function (Throwable $e, int $page) use (&$exception) {
87 1
                $this->book->addFailedPage($page);
88 1
                $exception = $e;
89 39
            },
90 39
        ];
91
92 39
        (new Pool($this->client, $requests, $config))->promise()->wait();
93
94 38
        if (isset($exception)) {
95 1
            throw $exception;
96
        }
97
98 37
        yield from $this->book->pullPages();
99
    }
100
}
101