Passed
Pull Request — master (#182)
by Dmitriy
02:37
created

Worker::getHandler()   A

Complexity

Conditions 2
Paths 2

Size

Total Lines 7
Code Lines 3

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 4
CRAP Score 2

Importance

Changes 0
Metric Value
cc 2
eloc 3
nc 2
nop 1
dl 0
loc 7
ccs 4
cts 4
cp 1
crap 2
rs 10
c 0
b 0
f 0

1 Method

Rating   Name   Duplication   Size   Complexity  
A Worker::createConsumeHandler() 0 3 1
1
<?php
2
3
declare(strict_types=1);
4
5
namespace Yiisoft\Queue\Worker;
6
7
use Closure;
8
use Psr\Container\ContainerInterface;
9
use Psr\Log\LoggerInterface;
10
use RuntimeException;
11
use Throwable;
12
use Yiisoft\Injector\Injector;
13
use Yiisoft\Queue\Exception\JobFailureException;
14
use Yiisoft\Queue\Message\MessageHandlerInterface;
15
use Yiisoft\Queue\Message\MessageInterface;
16
use Yiisoft\Queue\Middleware\Consume\ConsumeFinalHandler;
17
use Yiisoft\Queue\Middleware\Consume\ConsumeMiddlewareDispatcher;
18
use Yiisoft\Queue\Middleware\Consume\ConsumeRequest;
19
use Yiisoft\Queue\Middleware\Consume\MessageHandlerConsumeInterface;
20
use Yiisoft\Queue\Middleware\FailureHandling\FailureFinalHandler;
21
use Yiisoft\Queue\Middleware\FailureHandling\FailureHandlingRequest;
22
use Yiisoft\Queue\Middleware\FailureHandling\FailureMiddlewareDispatcher;
23
use Yiisoft\Queue\Middleware\FailureHandling\MessageFailureHandlerInterface;
24
use Yiisoft\Queue\QueueInterface;
25
use Yiisoft\Queue\Message\IdEnvelope;
26
27
final class Worker implements WorkerInterface
28
{
29 21
    public function __construct(
30
        private LoggerInterface $logger,
31
        private Injector $injector,
32
        private ContainerInterface $container,
33
        private ConsumeMiddlewareDispatcher $consumeMiddlewareDispatcher,
34
        private FailureMiddlewareDispatcher $failureMiddlewareDispatcher,
35
    ) {
36 21
    }
37
38
    /**
39
     * @throws Throwable
40
     */
41 12
    public function process(MessageInterface $message, QueueInterface $queue): MessageInterface
42
    {
43 12
        $this->logger->info('Processing message #{message}.', ['message' => $message->getMetadata()[IdEnvelope::MESSAGE_ID_KEY] ?? 'null']);
44
45 12
        $handlerClass = $message->getHandler();
46
47 12
        if (!is_subclass_of($handlerClass, MessageHandlerInterface::class, true)) {
48
            throw new RuntimeException(sprintf(
49
                'Message handler "%s" for "%s" must implement "%s".',
50
                $handlerClass,
51
                $message::class,
52
                MessageHandlerInterface::class,
53
            ));
54
        }
55 12
        $handler = $this->container->get($handlerClass);
56 12
        if ($handler === null) {
57
            throw new RuntimeException(sprintf('Queue handler with name "%s" does not exist', $handlerClass));
58
        }
59
60 12
        $request = new ConsumeRequest($message, $queue);
61 12
        $closure = fn (MessageInterface $message): mixed => $this->injector->invoke([$handler, 'handle'], [$message]);
62
        try {
63 12
            return $this->consumeMiddlewareDispatcher->dispatch($request, $this->createConsumeHandler($closure))->getMessage();
64 1
        } catch (Throwable $exception) {
65 1
            $request = new FailureHandlingRequest($request->getMessage(), $exception, $request->getQueue());
66
67
            try {
68 1
                $result = $this->failureMiddlewareDispatcher->dispatch($request, $this->createFailureHandler());
69
                $this->logger->info($exception->getMessage());
70
71
                return $result->getMessage();
72 1
            } catch (Throwable $exception) {
73 1
                $exception = new JobFailureException($message, $exception);
74 1
                $this->logger->error($exception->getMessage());
75 1
                throw $exception;
76
            }
77
        }
78
    }
79
80 12
    private function createConsumeHandler(Closure $handler): MessageHandlerConsumeInterface
81
    {
82 12
        return new ConsumeFinalHandler($handler);
83
    }
84
85 1
    private function createFailureHandler(): MessageFailureHandlerInterface
86
    {
87 1
        return new FailureFinalHandler();
88
    }
89
}
90