Passed
Pull Request — master (#189)
by Sergei
02:47
created

Worker   A

Complexity

Total Complexity 20

Size/Duplication

Total Lines 112
Duplicated Lines 0 %

Test Coverage

Coverage 94.34%

Importance

Changes 0
Metric Value
wmc 20
eloc 52
dl 0
loc 112
ccs 50
cts 53
cp 0.9434
rs 10
c 0
b 0
f 0

4 Methods

Rating   Name   Duplication   Size   Complexity  
A __construct() 0 8 1
A getHandler() 0 7 2
A process() 0 26 4
C prepare() 0 48 13
1
<?php
2
3
declare(strict_types=1);
4
5
namespace Yiisoft\Queue\Worker;
6
7
use Psr\Container\ContainerExceptionInterface;
8
use Psr\Container\ContainerInterface;
9
use Psr\Container\NotFoundExceptionInterface;
10
use Psr\Log\LoggerInterface;
11
use ReflectionException;
12
use ReflectionMethod;
13
use RuntimeException;
14
use Throwable;
15
use Yiisoft\Injector\Injector;
16
use Yiisoft\Queue\Exception\JobFailureException;
17
use Yiisoft\Queue\Message\MessageInterface;
18
use Yiisoft\Queue\Middleware\ConsumeFinalHandler;
19
use Yiisoft\Queue\Middleware\FailureHandling\FailureFinalHandler;
20
use Yiisoft\Queue\Middleware\FailureHandling\FailureHandlingRequest;
21
use Yiisoft\Queue\Middleware\FailureHandling\FailureMiddlewareDispatcher;
22
use Yiisoft\Queue\Middleware\MiddlewareDispatcher;
23
use Yiisoft\Queue\Middleware\Request;
24
use Yiisoft\Queue\QueueInterface;
25
use Yiisoft\Queue\Message\IdEnvelope;
26
27
final class Worker implements WorkerInterface
28
{
29
    private array $handlersCached = [];
30
31 25
    public function __construct(
32
        private array $handlers,
33
        private LoggerInterface $logger,
34
        private Injector $injector,
35
        private ContainerInterface $container,
36
        private MiddlewareDispatcher $consumeMiddlewareDispatcher,
37
        private FailureMiddlewareDispatcher $failureMiddlewareDispatcher,
38
    ) {
39 25
    }
40
41
    /**
42
     * @throws Throwable
43
     */
44 16
    public function process(MessageInterface $message, QueueInterface $queue): MessageInterface
45
    {
46 16
        $this->logger->info('Processing message #{message}.', ['message' => $message->getMetadata()[IdEnvelope::MESSAGE_ID_KEY] ?? 'null']);
47
48 16
        $name = $message->getHandlerName();
49 16
        $handler = $this->getHandler($name);
50 16
        if ($handler === null) {
51 3
            throw new RuntimeException("Queue handler with name $name doesn't exist");
52
        }
53
54 13
        $request = new Request($message, $queue->getAdapter());
55 13
        $closure = fn (MessageInterface $message): mixed => $this->injector->invoke($handler, [$message]);
56
        try {
57 13
            return $this->consumeMiddlewareDispatcher->dispatch($request, new ConsumeFinalHandler($closure))->getMessage();
58 1
        } catch (Throwable $exception) {
59 1
            $request = new FailureHandlingRequest($request->getMessage(), $exception, $queue);
60
61
            try {
62 1
                $result = $this->failureMiddlewareDispatcher->dispatch($request, new FailureFinalHandler());
63
                $this->logger->info($exception->getMessage());
64
65
                return $result->getMessage();
66 1
            } catch (Throwable $exception) {
67 1
                $exception = new JobFailureException($message, $exception);
68 1
                $this->logger->error($exception->getMessage());
69 1
                throw $exception;
70
            }
71
        }
72
    }
73
74 16
    private function getHandler(string $name): ?callable
75
    {
76 16
        if (!array_key_exists($name, $this->handlersCached)) {
77 16
            $this->handlersCached[$name] = $this->prepare($this->handlers[$name] ?? null);
78
        }
79
80 16
        return $this->handlersCached[$name];
81
    }
82
83
    /**
84
     * Checks if the handler is a DI container alias
85
     *
86
     * @param array|callable|object|string|null $definition
87
     *
88
     * @throws ContainerExceptionInterface
89
     * @throws NotFoundExceptionInterface
90
     */
91 16
    private function prepare(callable|object|array|string|null $definition): callable|null
92
    {
93 16
        if (is_string($definition) && $this->container->has($definition)) {
0 ignored issues
show
introduced by
The condition is_string($definition) is always false.
Loading history...
94 1
            return $this->container->get($definition);
95
        }
96
97
        if (
98 15
            is_array($definition)
99 15
            && array_keys($definition) === [0, 1]
100 15
            && is_string($definition[0])
101 15
            && is_string($definition[1])
102
        ) {
103 6
            [$className, $methodName] = $definition;
104
105 6
            if (!class_exists($className) && $this->container->has($className)) {
106 1
                return [
107 1
                    $this->container->get($className),
108 1
                    $methodName,
109 1
                ];
110
            }
111
112 5
            if (!class_exists($className)) {
113 1
                $this->logger->error("$className doesn't exist.");
114
115 1
                return null;
116
            }
117
118
            try {
119 4
                $reflection = new ReflectionMethod($className, $methodName);
120 1
            } catch (ReflectionException $e) {
121 1
                $this->logger->error($e->getMessage());
122
123 1
                return null;
124
            }
125 3
            if ($reflection->isStatic()) {
126
                return [$className, $methodName];
127
            }
128 3
            if ($this->container->has($className)) {
129 2
                return [
130 2
                    $this->container->get($className),
131 2
                    $methodName,
132 2
                ];
133
            }
134
135 1
            return null;
136
        }
137
138 9
        return $definition;
139
    }
140
}
141