Passed
Pull Request — master (#864)
by Maxim
07:21 queued 28s
created

QueueBootloader::initQueue()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 3
Code Lines 1

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 2
CRAP Score 1

Importance

Changes 0
Metric Value
eloc 1
dl 0
loc 3
ccs 2
cts 2
cp 1
rs 10
c 0
b 0
f 0
cc 1
nc 1
nop 1
crap 1
1
<?php
2
3
declare(strict_types=1);
4
5
namespace Spiral\Queue\Bootloader;
6
7
use Psr\Container\ContainerInterface;
8
use Psr\EventDispatcher\EventDispatcherInterface;
9
use Spiral\Boot\AbstractKernel;
10
use Spiral\Boot\Bootloader\Bootloader;
11
use Spiral\Boot\EnvironmentInterface;
12
use Spiral\Config\ConfiguratorInterface;
13
use Spiral\Config\Patch\Append;
14
use Spiral\Core\BinderInterface;
15
use Spiral\Core\Container\Autowire;
16
use Spiral\Core\CoreInterceptorInterface;
17
use Spiral\Core\FactoryInterface;
18
use Spiral\Core\InterceptableCore;
19
use Spiral\Queue\Config\QueueConfig;
20
use Spiral\Queue\ContainerRegistry;
21
use Spiral\Queue\Core\QueueInjector;
22
use Spiral\Queue\Driver\NullDriver;
23
use Spiral\Queue\Driver\SyncDriver;
24
use Spiral\Queue\Failed\FailedJobHandlerInterface;
25
use Spiral\Queue\Failed\LogFailedJobHandler;
26
use Spiral\Queue\HandlerRegistryInterface;
27
use Spiral\Queue\Interceptor\Consume\ErrorHandlerInterceptor;
28
use Spiral\Queue\Interceptor\Consume\Handler;
29
use Spiral\Queue\Interceptor\Consume\Core as ConsumeCore;
30
use Spiral\Queue\QueueConnectionProviderInterface;
31
use Spiral\Queue\QueueInterface;
32
use Spiral\Queue\QueueManager;
33
use Spiral\Queue\QueueRegistry;
34
use Spiral\Queue\SerializerRegistryInterface;
35
use Spiral\Telemetry\Bootloader\TelemetryBootloader;
36
use Spiral\Telemetry\TracerFactoryInterface;
37
38
final class QueueBootloader extends Bootloader
39
{
40
    protected const DEPENDENCIES = [
41
        TelemetryBootloader::class,
42
    ];
43
44
    protected const SINGLETONS = [
45
        HandlerRegistryInterface::class => QueueRegistry::class,
46
        SerializerRegistryInterface::class => QueueRegistry::class,
47
        FailedJobHandlerInterface::class => LogFailedJobHandler::class,
48
        QueueConnectionProviderInterface::class => QueueManager::class,
49
        QueueManager::class => [self::class, 'initQueueManager'],
50
        QueueRegistry::class => [self::class, 'initRegistry'],
51
        Handler::class => [self::class, 'initHandler'],
52
    ];
53
54 270
    public function __construct(
55
        private readonly ConfiguratorInterface $config
56
    ) {
57 270
    }
58
59 270
    public function init(
60
        ContainerInterface $container,
61
        BinderInterface $binder,
62
        EnvironmentInterface $env,
63
        AbstractKernel $kernel
64
    ): void {
65 270
        $this->initQueueConfig($env);
66
67 270
        $this->registerDriverAlias(SyncDriver::class, 'sync');
68 270
        $binder->bindInjector(QueueInterface::class, QueueInjector::class);
69
70 270
        $kernel->booted(static function () use ($container): void {
71 270
            $registry = $container->get(QueueRegistry::class);
72 270
            $config = $container->get(QueueConfig::class);
73
74 270
            foreach ($config->getRegistryHandlers() as $jobType => $handler) {
75
                $registry->setHandler($jobType, $handler);
76
            }
77
78 270
            foreach ($config->getRegistrySerializers() as $jobType => $serializer) {
79
                $registry->setSerializer($jobType, $serializer);
80
            }
81 270
        });
82
    }
83
84
    /**
85
     * @param class-string<CoreInterceptorInterface>|CoreInterceptorInterface|Autowire $interceptor
0 ignored issues
show
Documentation Bug introduced by
The doc comment class-string<CoreInterce...eptorInterface|Autowire at position 0 could not be parsed: Unknown type name 'class-string' at position 0 in class-string<CoreInterceptorInterface>|CoreInterceptorInterface|Autowire.
Loading history...
86
     */
87 1
    public function addConsumeInterceptor(string|CoreInterceptorInterface|Autowire $interceptor): void
88
    {
89 1
        $this->config->modify(
90 1
            QueueConfig::CONFIG,
91 1
            new Append('interceptors.consume', null, $interceptor)
92 1
        );
93
    }
94
95
    /**
96
     * @param class-string<CoreInterceptorInterface>|CoreInterceptorInterface|Autowire $interceptor
0 ignored issues
show
Documentation Bug introduced by
The doc comment class-string<CoreInterce...eptorInterface|Autowire at position 0 could not be parsed: Unknown type name 'class-string' at position 0 in class-string<CoreInterceptorInterface>|CoreInterceptorInterface|Autowire.
Loading history...
97
     */
98 1
    public function addPushInterceptor(string|CoreInterceptorInterface|Autowire $interceptor): void
99
    {
100 1
        $this->config->modify(
101 1
            QueueConfig::CONFIG,
102 1
            new Append('interceptors.push', null, $interceptor)
103 1
        );
104
    }
105
106 270
    public function registerDriverAlias(string $driverClass, string $alias): void
107
    {
108 270
        $this->config->modify(
109 270
            QueueConfig::CONFIG,
110 270
            new Append('driverAliases', $alias, $driverClass)
111 270
        );
112
    }
113
114 3
    protected function initQueueManager(FactoryInterface $factory): QueueManager
115
    {
116 3
        return $factory->make(QueueManager::class);
117
    }
118
119 270
    protected function initRegistry(
120
        ContainerInterface $container,
121
        FactoryInterface $factory,
122
        ContainerRegistry $registry
123
    ) {
124 270
        return new QueueRegistry($container, $factory, $registry);
125
    }
126
127 2
    protected function initHandler(
128
        ConsumeCore $core,
129
        QueueConfig $config,
130
        ContainerInterface $container,
131
        FactoryInterface $factory,
132
        TracerFactoryInterface $tracerFactory,
133
        ?EventDispatcherInterface $dispatcher = null
134
    ): Handler {
135 2
        $core = new InterceptableCore($core, $dispatcher);
136
137 2
        foreach ($config->getConsumeInterceptors() as $interceptor) {
138 2
            if (\is_string($interceptor)) {
139 2
                $interceptor = $container->get($interceptor);
140
            } elseif ($interceptor instanceof Autowire) {
141
                $interceptor = $interceptor->resolve($factory);
142
            }
143
144 2
            \assert($interceptor instanceof CoreInterceptorInterface);
145 2
            $core->addInterceptor($interceptor);
146
        }
147
148 2
        return new Handler($core, $tracerFactory);
149
    }
150
151 270
    private function initQueueConfig(EnvironmentInterface $env): void
152
    {
153 270
        $this->config->setDefaults(
154 270
            QueueConfig::CONFIG,
155 270
            [
156 270
                'default' => $env->get('QUEUE_CONNECTION', 'sync'),
157 270
                'connections' => [
158 270
                    'sync' => [
159 270
                        'driver' => 'sync',
160 270
                    ],
161 270
                ],
162 270
                'registry' => [
163 270
                    'handlers' => [],
164 270
                    'serializers' => [],
165 270
                ],
166 270
                'driverAliases' => [
167 270
                    'sync' => SyncDriver::class,
168 270
                    'null' => NullDriver::class,
169 270
                ],
170 270
                'interceptors' => [
171 270
                    'consume' => [
172 270
                        ErrorHandlerInterceptor::class,
173 270
                    ],
174 270
                    'push' => [],
175 270
                ],
176 270
            ]
177 270
        );
178
    }
179
}
180