1
|
|
|
<?php |
2
|
|
|
|
3
|
|
|
declare(strict_types=1); |
4
|
|
|
|
5
|
|
|
namespace Spiral\Queue\Interceptor\Consume; |
6
|
|
|
|
7
|
|
|
use Spiral\Attributes\ReaderInterface; |
8
|
|
|
use Spiral\Core\CoreInterceptorInterface; |
9
|
|
|
use Spiral\Core\CoreInterface; |
10
|
|
|
use Spiral\Queue\Attribute\RetryPolicy as Attribute; |
11
|
|
|
use Spiral\Queue\Exception\JobException; |
12
|
|
|
use Spiral\Queue\Exception\RetryableExceptionInterface; |
13
|
|
|
use Spiral\Queue\Exception\RetryException; |
14
|
|
|
use Spiral\Queue\Options; |
15
|
|
|
use Spiral\Queue\RetryPolicy; |
16
|
|
|
|
17
|
|
|
final class RetryPolicyInterceptor implements CoreInterceptorInterface |
18
|
|
|
{ |
19
|
9 |
|
public function __construct( |
20
|
|
|
private readonly ReaderInterface $reader |
21
|
|
|
) { |
22
|
9 |
|
} |
23
|
|
|
|
24
|
7 |
|
public function process(string $controller, string $action, array $parameters, CoreInterface $core): mixed |
25
|
|
|
{ |
26
|
|
|
try { |
27
|
7 |
|
return $core->callAction($controller, $action, $parameters); |
28
|
6 |
|
} catch (\Throwable $e) { |
29
|
6 |
|
$attribute = $this->reader->firstClassMetadata(new \ReflectionClass($controller), Attribute::class); |
30
|
6 |
|
if ($attribute === null) { |
31
|
1 |
|
throw $e; |
32
|
|
|
} |
33
|
|
|
|
34
|
5 |
|
$policy = $this->getRetryPolicy($e, $attribute); |
35
|
|
|
|
36
|
5 |
|
$headers = $parameters['headers'] ?? []; |
37
|
5 |
|
$attempts = (int)($headers['attempts'][0] ?? 0); |
38
|
|
|
|
39
|
5 |
|
if ($policy->isRetryable($e, $attempts) === false) { |
40
|
1 |
|
throw $e; |
41
|
|
|
} |
42
|
|
|
|
43
|
4 |
|
throw new RetryException( |
44
|
4 |
|
reason: $e->getMessage(), |
45
|
4 |
|
options: (new Options()) |
46
|
4 |
|
->withDelay($policy->getDelay($attempts)) |
47
|
4 |
|
->withHeader('attempts', (string)($attempts + 1)) |
48
|
4 |
|
); |
49
|
|
|
} |
50
|
|
|
} |
51
|
|
|
|
52
|
5 |
|
private function getRetryPolicy(\Throwable $exception, Attribute $attribute): RetryPolicy |
53
|
|
|
{ |
54
|
5 |
|
if ($exception instanceof JobException && $exception->getPrevious() !== null) { |
55
|
1 |
|
$exception = $exception->getPrevious(); |
56
|
|
|
} |
57
|
|
|
|
58
|
5 |
|
$policy = $exception instanceof RetryableExceptionInterface ? $exception->getRetryPolicy() : null; |
59
|
|
|
|
60
|
5 |
|
return $policy ?? $attribute->getRetryPolicy(); |
61
|
|
|
} |
62
|
|
|
} |
63
|
|
|
|