Passed
Pull Request — master (#190)
by Alexander
05:09 queued 02:32
created

Worker   A

Complexity

Total Complexity 8

Size/Duplication

Total Lines 57
Duplicated Lines 0 %

Test Coverage

Coverage 55.88%

Importance

Changes 0
Metric Value
eloc 32
dl 0
loc 57
ccs 19
cts 34
cp 0.5588
rs 10
c 0
b 0
f 0
wmc 8

2 Methods

Rating   Name   Duplication   Size   Complexity  
A __construct() 0 7 1
B process() 0 43 7
1
<?php
2
3
declare(strict_types=1);
4
5
namespace Yiisoft\Queue\Worker;
6
7
use Psr\Container\ContainerInterface;
8
use Psr\Log\LoggerInterface;
9
use RuntimeException;
10
use Throwable;
11
use Yiisoft\Injector\Injector;
12
use Yiisoft\Queue\Exception\JobFailureException;
13
use Yiisoft\Queue\Message\HandlerEnvelope;
14
use Yiisoft\Queue\Message\MessageHandlerInterface;
15
use Yiisoft\Queue\Message\MessageInterface;
16
use Yiisoft\Queue\Middleware\ConsumeFinalHandler;
17
use Yiisoft\Queue\Middleware\FailureFinalHandler;
18
use Yiisoft\Queue\Middleware\MiddlewareDispatcher;
19
use Yiisoft\Queue\Middleware\Request;
20
use Yiisoft\Queue\QueueInterface;
21
use Yiisoft\Queue\Message\IdEnvelope;
22
23
final class Worker implements WorkerInterface
24
{
25 19
    public function __construct(
26
        private LoggerInterface $logger,
27
        private Injector $injector,
28
        private ContainerInterface $container,
29
        private MiddlewareDispatcher $consumeMiddlewareDispatcher,
30
        private MiddlewareDispatcher $failureMiddlewareDispatcher,
31
    ) {
32 19
    }
33
34
    /**
35
     * @throws Throwable
36
     */
37 10
    public function process(MessageInterface $message, QueueInterface $queue): MessageInterface
38
    {
39 10
        $this->logger->info('Processing message #{message}.', ['message' => $message->getMetadata()[IdEnvelope::MESSAGE_ID_KEY] ?? 'null']);
40
41 10
        $handlerClass = $message instanceof HandlerEnvelope ? $message->getHandler() : null;
42
43 10
        if (!is_subclass_of($handlerClass, MessageHandlerInterface::class, true)) {
44
            throw new RuntimeException(sprintf(
45
                'Message handler "%s" for "%s" must implement "%s".',
46
                $handlerClass,
47
                $message::class,
48
                MessageHandlerInterface::class,
49
            ));
50
        }
51 10
        $handler = $this->container->get($handlerClass);
0 ignored issues
show
Bug introduced by
It seems like $handlerClass can also be of type null; however, parameter $id of Psr\Container\ContainerInterface::get() does only seem to accept string, maybe add an additional type check? ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-type  annotation

51
        $handler = $this->container->get(/** @scrutinizer ignore-type */ $handlerClass);
Loading history...
52 10
        if ($handler === null) {
53
            throw new RuntimeException(sprintf('Queue handler with name "%s" does not exist', $handlerClass));
54
        }
55
56 10
        if (!$handler instanceof MessageHandlerInterface) {
57
            throw new RuntimeException(sprintf(
58
                'Message handler "%s" for "%s" must implement "%s".',
59
                $handlerClass,
60
                $message::class,
61
                MessageHandlerInterface::class,
62
            ));
63
        }
64
65 10
        $request = new Request($message, $queue);
66 10
        $closure = fn (MessageInterface $message): mixed => $this->injector->invoke([$handler, 'handle'], [$message]);
67
        try {
68 10
            $result = $this->consumeMiddlewareDispatcher->dispatch($request, new ConsumeFinalHandler($closure));
69 9
            return $result->getMessage();
70 1
        } catch (Throwable $exception) {
71
            try {
72 1
                $result = $this->failureMiddlewareDispatcher->dispatch($request, new FailureFinalHandler($exception));
73
                $this->logger->info($exception);
74
75
                return $result->getMessage();
76 1
            } catch (Throwable $exception) {
77 1
                $exception = new JobFailureException($message, $exception);
78 1
                $this->logger->error($exception);
79 1
                throw $exception;
80
            }
81
        }
82
    }
83
}
84