Passed
Pull Request — master (#218)
by Viktor
03:01
created

Worker::createFailureHandler()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 3
Code Lines 1

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 2
CRAP Score 1

Importance

Changes 0
Metric Value
cc 1
eloc 1
nc 1
nop 0
dl 0
loc 3
ccs 2
cts 2
cp 1
crap 1
rs 10
c 0
b 0
f 0
1
<?php
2
3
declare(strict_types=1);
4
5
namespace Yiisoft\Queue\Worker;
6
7
use Closure;
8
use Psr\Container\ContainerExceptionInterface;
9
use Psr\Container\ContainerInterface;
10
use Psr\Container\NotFoundExceptionInterface;
11
use Psr\Log\LoggerInterface;
12
use ReflectionException;
13
use ReflectionMethod;
14
use RuntimeException;
15
use Throwable;
16
use Yiisoft\Injector\Injector;
17
use Yiisoft\Queue\Exception\JobFailureException;
18
use Yiisoft\Queue\Message\MessageHandlerInterface;
19
use Yiisoft\Queue\Message\MessageInterface;
20
use Yiisoft\Queue\Middleware\Consume\ConsumeFinalHandler;
21
use Yiisoft\Queue\Middleware\Consume\ConsumeMiddlewareDispatcher;
22
use Yiisoft\Queue\Middleware\Consume\ConsumeRequest;
23
use Yiisoft\Queue\Middleware\Consume\MessageHandlerConsumeInterface;
24
use Yiisoft\Queue\Middleware\FailureHandling\FailureFinalHandler;
25
use Yiisoft\Queue\Middleware\FailureHandling\FailureHandlingRequest;
26
use Yiisoft\Queue\Middleware\FailureHandling\FailureMiddlewareDispatcher;
27
use Yiisoft\Queue\Middleware\FailureHandling\MessageFailureHandlerInterface;
28
use Yiisoft\Queue\QueueInterface;
29
use Yiisoft\Queue\Message\IdEnvelope;
30
31
final class Worker implements WorkerInterface
32
{
33
    private array $handlersCached = [];
34
35 26
    public function __construct(
36
        private array $handlers,
37
        private LoggerInterface $logger,
38
        private Injector $injector,
39
        private ContainerInterface $container,
40
        private ConsumeMiddlewareDispatcher $consumeMiddlewareDispatcher,
41
        private FailureMiddlewareDispatcher $failureMiddlewareDispatcher,
42
    ) {
43 26
    }
44
45
    /**
46
     * @throws Throwable
47
     */
48 17
    public function process(MessageInterface $message, QueueInterface $queue): MessageInterface
49
    {
50 17
        $this->logger->info('Processing message #{message}.', ['message' => $message->getMetadata()[IdEnvelope::MESSAGE_ID_KEY] ?? 'null']);
51
52 17
        $name = $message->getHandlerName();
53 17
        $handler = $this->getHandler($name);
54 17
        if ($handler === null) {
55 3
            throw new RuntimeException(sprintf('Queue handler with name "%s" does not exist', $name));
56
        }
57
58 14
        $request = new ConsumeRequest($message, $queue);
59 14
        $closure = fn (MessageInterface $message): mixed => $this->injector->invoke($handler, [$message]);
60
        try {
61 14
            return $this->consumeMiddlewareDispatcher->dispatch($request, $this->createConsumeHandler($closure))->getMessage();
62 1
        } catch (Throwable $exception) {
63 1
            $request = new FailureHandlingRequest($request->getMessage(), $exception, $request->getQueue());
64
65
            try {
66 1
                $result = $this->failureMiddlewareDispatcher->dispatch($request, $this->createFailureHandler());
67
                $this->logger->info($exception->getMessage());
68
69
                return $result->getMessage();
70 1
            } catch (Throwable $exception) {
71 1
                $exception = new JobFailureException($message, $exception);
72 1
                $this->logger->error($exception->getMessage());
73 1
                throw $exception;
74
            }
75
        }
76
    }
77
78 17
    private function getHandler(string $name): ?callable
79
    {
80 17
        if (!array_key_exists($name, $this->handlersCached)) {
81 17
            $definition = $this->handlers[$name] ?? null;
82 17
            if ($definition === null && $this->container->has($name)) {
83 1
                $handler = $this->container->get($name);
84 1
                if ($handler instanceof MessageHandlerInterface) {
85 1
                    $this->handlersCached[$name] = [$handler, 'handle'];
86
87 1
                    return $this->handlersCached[$name];
88
                }
89
90
                return null;
91
            }
92
93 16
            $this->handlersCached[$name] = $this->prepare($this->handlers[$name] ?? null);
94
        }
95
96 17
        return $this->handlersCached[$name];
97
    }
98
99
    /**
100
     * Checks if the handler is a DI container alias
101
     *
102
     * @param array|callable|object|string|null $definition
103
     *
104
     * @throws ContainerExceptionInterface
105
     * @throws NotFoundExceptionInterface
106
     */
107 16
    private function prepare(callable|object|array|string|null $definition): callable|null
108
    {
109 16
        if (is_string($definition) && $this->container->has($definition)) {
0 ignored issues
show
introduced by
The condition is_string($definition) is always false.
Loading history...
110 1
            return $this->container->get($definition);
111
        }
112
113
        if (
114 15
            is_array($definition)
115 15
            && array_keys($definition) === [0, 1]
116 15
            && is_string($definition[0])
117 15
            && is_string($definition[1])
118
        ) {
119 6
            [$className, $methodName] = $definition;
120
121 6
            if (!class_exists($className) && $this->container->has($className)) {
122 1
                return [
123 1
                    $this->container->get($className),
124 1
                    $methodName,
125 1
                ];
126
            }
127
128 5
            if (!class_exists($className)) {
129 1
                $this->logger->error("$className doesn't exist.");
130
131 1
                return null;
132
            }
133
134
            try {
135 4
                $reflection = new ReflectionMethod($className, $methodName);
136 1
            } catch (ReflectionException $e) {
137 1
                $this->logger->error($e->getMessage());
138
139 1
                return null;
140
            }
141 3
            if ($reflection->isStatic()) {
142
                return [$className, $methodName];
143
            }
144 3
            if ($this->container->has($className)) {
145 2
                return [
146 2
                    $this->container->get($className),
147 2
                    $methodName,
148 2
                ];
149
            }
150
151 1
            return null;
152
        }
153
154 9
        return $definition;
155
    }
156
157 14
    private function createConsumeHandler(Closure $handler): MessageHandlerConsumeInterface
158
    {
159 14
        return new ConsumeFinalHandler($handler);
160
    }
161
162 1
    private function createFailureHandler(): MessageFailureHandlerInterface
163
    {
164 1
        return new FailureFinalHandler();
165
    }
166
}
167