Passed
Pull Request — master (#190)
by Dmitriy
04:22 queued 01:36
created

Worker::process()   A

Complexity

Conditions 4
Paths 9

Size

Total Lines 30
Code Lines 20

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 19
CRAP Score 4.0138

Importance

Changes 0
Metric Value
cc 4
eloc 20
c 0
b 0
f 0
nc 9
nop 2
dl 0
loc 30
ccs 19
cts 21
cp 0.9048
crap 4.0138
rs 9.6
1
<?php
2
3
declare(strict_types=1);
4
5
namespace Yiisoft\Queue\Worker;
6
7
use Psr\Container\ContainerInterface;
8
use Psr\EventDispatcher\EventDispatcherInterface;
9
use Psr\Log\LoggerInterface;
10
use Throwable;
11
use Yiisoft\Queue\Exception\JobFailureException;
12
use Yiisoft\Queue\Message\EnvelopeInterface;
13
use Yiisoft\Queue\Message\IdEnvelope;
14
use Yiisoft\Queue\Message\MessageInterface;
15
use Yiisoft\Queue\Middleware\ConsumeFinalHandler;
16
use Yiisoft\Queue\Middleware\FailureFinalHandler;
17
use Yiisoft\Queue\Middleware\MiddlewareDispatcher;
18
use Yiisoft\Queue\Middleware\Request;
19
use Yiisoft\Queue\QueueInterface;
20
21
final class Worker implements WorkerInterface
22
{
23 14
    public function __construct(
24
        private LoggerInterface $logger,
25
        private EventDispatcherInterface $eventDispatcher,
26
        private ContainerInterface $container,
27
        private MiddlewareDispatcher $consumeMiddlewareDispatcher,
28
        private MiddlewareDispatcher $failureMiddlewareDispatcher,
29
    ) {
30 14
    }
31
32
    /**
33
     * @throws Throwable
34
     */
35 10
    public function process(MessageInterface $message, QueueInterface $queue): MessageInterface
36
    {
37 10
        $this->logger->info(
38 10
            'Processing message #{message}.',
39 10
            ['message' => $message->getMetadata()[IdEnvelope::MESSAGE_ID_KEY] ?? 'null']
40 10
        );
41
42 10
        $request = new Request($message, $queue);
43
44 10
        $closure = function (object $message): mixed {
45 10
            if ($message instanceof EnvelopeInterface) {
46 5
                $message = $message->getMessage();
47
            }
48
49 10
            return $this->eventDispatcher->dispatch($message);
50 10
        };
51
52
        try {
53 10
            $result = $this->consumeMiddlewareDispatcher->dispatch($request, new ConsumeFinalHandler($closure));
54 9
            return $result->getMessage();
55 1
        } catch (Throwable $exception) {
56
            try {
57 1
                $result = $this->failureMiddlewareDispatcher->dispatch($request, new FailureFinalHandler($exception));
58
                $this->logger->info($exception);
59
60
                return $result->getMessage();
61 1
            } catch (Throwable $exception) {
62 1
                $exception = new JobFailureException($message, $exception);
63 1
                $this->logger->error($exception);
64 1
                throw $exception;
65
            }
66
        }
67
    }
68
}
69