Passed
Pull Request — master (#190)
by Dmitriy
13:22
created

Worker   A

Complexity

Total Complexity 5

Size/Duplication

Total Lines 40
Duplicated Lines 0 %

Test Coverage

Coverage 90.91%

Importance

Changes 0
Metric Value
eloc 20
c 0
b 0
f 0
dl 0
loc 40
ccs 20
cts 22
cp 0.9091
rs 10
wmc 5

2 Methods

Rating   Name   Duplication   Size   Complexity  
A __construct() 0 6 1
A process() 0 27 4
1
<?php
2
3
declare(strict_types=1);
4
5
namespace Yiisoft\Queue\Worker;
6
7
use Psr\EventDispatcher\EventDispatcherInterface;
8
use Psr\Log\LoggerInterface;
9
use Throwable;
10
use Yiisoft\Queue\Exception\JobFailureException;
11
use Yiisoft\Queue\Message\EnvelopeInterface;
12
use Yiisoft\Queue\Message\IdEnvelope;
13
use Yiisoft\Queue\Message\MessageInterface;
14
use Yiisoft\Queue\Middleware\ConsumeFinalHandler;
15
use Yiisoft\Queue\Middleware\FailureFinalHandler;
16
use Yiisoft\Queue\Middleware\MiddlewareDispatcher;
17
use Yiisoft\Queue\Middleware\Request;
18
use Yiisoft\Queue\QueueInterface;
19
20
final class Worker implements WorkerInterface
21
{
22 14
    public function __construct(
23
        private LoggerInterface $logger,
24
        private EventDispatcherInterface $eventDispatcher,
25
        private MiddlewareDispatcher $consumeMiddlewareDispatcher,
26
        private MiddlewareDispatcher $failureMiddlewareDispatcher,
27
    ) {
28 14
    }
29
30
    /**
31
     * @throws Throwable
32
     */
33 10
    public function process(MessageInterface $message, QueueInterface $queue): MessageInterface
34
    {
35 10
        $this->logger->info(
36 10
            'Processing message #{message}.',
37 10
            ['message' => $message->getMetadata()[IdEnvelope::MESSAGE_ID_KEY] ?? 'null']
38 10
        );
39
40 10
        $request = new Request($message, $queue);
41
42 10
        $closure = function (object $message): mixed {
43 10
            $message = $message instanceof EnvelopeInterface ? $message->getMessage() : $message;
44 10
            return $this->eventDispatcher->dispatch($message);
45 10
        };
46
47
        try {
48 10
            $result = $this->consumeMiddlewareDispatcher->dispatch($request, new ConsumeFinalHandler($closure));
49 9
            return $result->getMessage();
50 1
        } catch (Throwable $exception) {
51
            try {
52 1
                $result = $this->failureMiddlewareDispatcher->dispatch($request, new FailureFinalHandler($exception));
53
                $this->logger->info($exception);
54
55
                return $result->getMessage();
56 1
            } catch (Throwable $exception) {
57 1
                $exception = new JobFailureException($message, $exception);
58 1
                $this->logger->error($exception);
59 1
                throw $exception;
60
            }
61
        }
62
    }
63
}
64