Worker   A
last analyzed

Complexity

Total Complexity 22

Size/Duplication

Total Lines 122
Duplicated Lines 0 %

Test Coverage

Coverage 94.74%

Importance

Changes 0
Metric Value
eloc 54
dl 0
loc 122
ccs 54
cts 57
cp 0.9474
rs 10
c 0
b 0
f 0
wmc 22

6 Methods

Rating   Name   Duplication   Size   Complexity  
A __construct() 0 8 1
A getHandler() 0 7 2
A createFailureHandler() 0 3 1
A process() 0 26 4
A createConsumeHandler() 0 3 1
C prepare() 0 48 13
1
<?php
2
3
declare(strict_types=1);
4
5
namespace Yiisoft\Queue\Worker;
6
7
use Closure;
8
use Psr\Container\ContainerExceptionInterface;
9
use Psr\Container\ContainerInterface;
10
use Psr\Container\NotFoundExceptionInterface;
11
use Psr\Log\LoggerInterface;
12
use ReflectionException;
13
use ReflectionMethod;
14
use RuntimeException;
15
use Throwable;
16
use Yiisoft\Injector\Injector;
17
use Yiisoft\Queue\Exception\JobFailureException;
18
use Yiisoft\Queue\Message\MessageInterface;
19
use Yiisoft\Queue\Middleware\Consume\ConsumeFinalHandler;
20
use Yiisoft\Queue\Middleware\Consume\ConsumeMiddlewareDispatcher;
21
use Yiisoft\Queue\Middleware\Consume\ConsumeRequest;
22
use Yiisoft\Queue\Middleware\Consume\MessageHandlerConsumeInterface;
23
use Yiisoft\Queue\Middleware\FailureHandling\FailureFinalHandler;
24
use Yiisoft\Queue\Middleware\FailureHandling\FailureHandlingRequest;
25
use Yiisoft\Queue\Middleware\FailureHandling\FailureMiddlewareDispatcher;
26
use Yiisoft\Queue\Middleware\FailureHandling\MessageFailureHandlerInterface;
27
use Yiisoft\Queue\QueueInterface;
28
use Yiisoft\Queue\Message\IdEnvelope;
29
30
final class Worker implements WorkerInterface
31
{
32
    private array $handlersCached = [];
33
34 25
    public function __construct(
35
        private array $handlers,
36
        private LoggerInterface $logger,
37
        private Injector $injector,
38
        private ContainerInterface $container,
39
        private ConsumeMiddlewareDispatcher $consumeMiddlewareDispatcher,
40
        private FailureMiddlewareDispatcher $failureMiddlewareDispatcher,
41
    ) {
42 25
    }
43
44
    /**
45
     * @throws Throwable
46
     */
47 16
    public function process(MessageInterface $message, QueueInterface $queue): MessageInterface
48
    {
49 16
        $this->logger->info('Processing message #{message}.', ['message' => $message->getMetadata()[IdEnvelope::MESSAGE_ID_KEY] ?? 'null']);
50
51 16
        $name = $message->getHandlerName();
52 16
        $handler = $this->getHandler($name);
53 16
        if ($handler === null) {
54 3
            throw new RuntimeException(sprintf('Queue handler with name "%s" does not exist', $name));
55
        }
56
57 13
        $request = new ConsumeRequest($message, $queue);
58 13
        $closure = fn (MessageInterface $message): mixed => $this->injector->invoke($handler, [$message]);
59
        try {
60 13
            return $this->consumeMiddlewareDispatcher->dispatch($request, $this->createConsumeHandler($closure))->getMessage();
61 1
        } catch (Throwable $exception) {
62 1
            $request = new FailureHandlingRequest($request->getMessage(), $exception, $request->getQueue());
63
64
            try {
65 1
                $result = $this->failureMiddlewareDispatcher->dispatch($request, $this->createFailureHandler());
66
                $this->logger->info($exception->getMessage());
67
68
                return $result->getMessage();
69 1
            } catch (Throwable $exception) {
70 1
                $exception = new JobFailureException($message, $exception);
71 1
                $this->logger->error($exception->getMessage());
72 1
                throw $exception;
73
            }
74
        }
75
    }
76
77 16
    private function getHandler(string $name): ?callable
78
    {
79 16
        if (!array_key_exists($name, $this->handlersCached)) {
80 16
            $this->handlersCached[$name] = $this->prepare($this->handlers[$name] ?? null);
81
        }
82
83 16
        return $this->handlersCached[$name];
84
    }
85
86
    /**
87
     * Checks if the handler is a DI container alias
88
     *
89
     * @param array|callable|object|string|null $definition
90
     *
91
     * @throws ContainerExceptionInterface
92
     * @throws NotFoundExceptionInterface
93
     */
94 16
    private function prepare(callable|object|array|string|null $definition): callable|null
95
    {
96 16
        if (is_string($definition) && $this->container->has($definition)) {
0 ignored issues
show
introduced by
The condition is_string($definition) is always false.
Loading history...
97 1
            return $this->container->get($definition);
98
        }
99
100
        if (
101 15
            is_array($definition)
102 15
            && array_keys($definition) === [0, 1]
103 15
            && is_string($definition[0])
104 15
            && is_string($definition[1])
105
        ) {
106 6
            [$className, $methodName] = $definition;
107
108 6
            if (!class_exists($className) && $this->container->has($className)) {
109 1
                return [
110 1
                    $this->container->get($className),
111 1
                    $methodName,
112 1
                ];
113
            }
114
115 5
            if (!class_exists($className)) {
116 1
                $this->logger->error("$className doesn't exist.");
117
118 1
                return null;
119
            }
120
121
            try {
122 4
                $reflection = new ReflectionMethod($className, $methodName);
123 1
            } catch (ReflectionException $e) {
124 1
                $this->logger->error($e->getMessage());
125
126 1
                return null;
127
            }
128 3
            if ($reflection->isStatic()) {
129
                return [$className, $methodName];
130
            }
131 3
            if ($this->container->has($className)) {
132 2
                return [
133 2
                    $this->container->get($className),
134 2
                    $methodName,
135 2
                ];
136
            }
137
138 1
            return null;
139
        }
140
141 9
        return $definition;
142
    }
143
144 13
    private function createConsumeHandler(Closure $handler): MessageHandlerConsumeInterface
145
    {
146 13
        return new ConsumeFinalHandler($handler);
147
    }
148
149 1
    private function createFailureHandler(): MessageFailureHandlerInterface
150
    {
151 1
        return new FailureFinalHandler();
152
    }
153
}
154