Test Failed
Pull Request — master (#187)
by Dmitriy
02:44
created

Worker::getHandler()   A

Complexity

Conditions 2
Paths 2

Size

Total Lines 7
Code Lines 3

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 4
CRAP Score 2

Importance

Changes 0
Metric Value
cc 2
eloc 3
nc 2
nop 1
dl 0
loc 7
ccs 4
cts 4
cp 1
crap 2
rs 10
c 0
b 0
f 0
1
<?php
2
3
declare(strict_types=1);
4
5
namespace Yiisoft\Queue\Worker;
6
7
use Closure;
8
use Psr\Container\ContainerInterface;
9
use Psr\Log\LoggerInterface;
10
use RuntimeException;
11
use Throwable;
12
use Yiisoft\Injector\Injector;
13
use Yiisoft\Queue\Exception\JobFailureException;
14
use Yiisoft\Queue\Message\MessageInterface;
15
use Yiisoft\Queue\Middleware\Consume\ConsumeFinalHandler;
16
use Yiisoft\Queue\Middleware\Consume\ConsumeMiddlewareDispatcher;
17
use Yiisoft\Queue\Middleware\Consume\ConsumeRequest;
18
use Yiisoft\Queue\Middleware\Consume\MessageHandlerConsumeInterface;
19
use Yiisoft\Queue\Middleware\FailureHandling\FailureFinalHandler;
20
use Yiisoft\Queue\Middleware\FailureHandling\FailureHandlingRequest;
21
use Yiisoft\Queue\Middleware\FailureHandling\FailureMiddlewareDispatcher;
22
use Yiisoft\Queue\Middleware\FailureHandling\MessageFailureHandlerInterface;
23
use Yiisoft\Queue\QueueInterface;
24
use Yiisoft\Queue\Message\IdEnvelope;
25
26
final class Worker implements WorkerInterface
27
{
28
    public function __construct(
29
        private LoggerInterface $logger,
30
        private Injector $injector,
31
        private ContainerInterface $container,
32
        private ConsumeMiddlewareDispatcher $consumeMiddlewareDispatcher,
33
        private FailureMiddlewareDispatcher $failureMiddlewareDispatcher,
34 25
    ) {
35
    }
36
37
    /**
38
     * @throws Throwable
39
     */
40
    public function process(MessageInterface $message, QueueInterface $queue): MessageInterface
41
    {
42 25
        $this->logger->info('Processing message #{message}.', ['message' => $message->getMetadata()[IdEnvelope::MESSAGE_ID_KEY] ?? 'null']);
43
44
        $name = $message->getHandlerName();
45
        $handler = $this->container->get($name);
46
        if ($handler === null) {
47 16
            throw new RuntimeException(sprintf('Queue handler with name "%s" does not exist.', $name));
48
        }
49 16
50
        $request = new ConsumeRequest($message, $queue);
51 16
        $closure = fn (MessageInterface $message): mixed => $this->injector->invoke($handler, [$message]);
52 16
        try {
53 16
            return $this->consumeMiddlewareDispatcher->dispatch($request, $this->createConsumeHandler($closure))->getMessage();
54 3
        } catch (Throwable $exception) {
55
            $request = new FailureHandlingRequest($request->getMessage(), $exception, $request->getQueue());
56
57 13
            try {
58 13
                $result = $this->failureMiddlewareDispatcher->dispatch($request, $this->createFailureHandler());
59
                $this->logger->info($exception->getMessage());
60 13
61 1
                return $result->getMessage();
62 1
            } catch (Throwable $exception) {
63
                $exception = new JobFailureException($message, $exception);
64
                $this->logger->error($exception->getMessage());
65 1
                throw $exception;
66
            }
67
        }
68
    }
69 1
70 1
    private function createConsumeHandler(Closure $handler): MessageHandlerConsumeInterface
71 1
    {
72 1
        return new ConsumeFinalHandler($handler);
73
    }
74
75
    private function createFailureHandler(): MessageFailureHandlerInterface
76
    {
77 16
        return new FailureFinalHandler();
78
    }
79
}
80