Passed
Push — master ( 738663...b5cb7b )
by butschster
25:04 queued 15:39
created

QueueBootloader::boot()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 7
Code Lines 2

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 3
CRAP Score 1

Importance

Changes 0
Metric Value
eloc 2
dl 0
loc 7
ccs 3
cts 3
cp 1
rs 10
c 0
b 0
f 0
cc 1
nc 1
nop 3
crap 1
1
<?php
2
3
declare(strict_types=1);
4
5
namespace Spiral\Queue\Bootloader;
6
7
use Psr\Container\ContainerInterface;
8
use Psr\EventDispatcher\EventDispatcherInterface;
9
use Spiral\Boot\{AbstractKernel, EnvironmentInterface};
10
use Spiral\Boot\Bootloader\Bootloader;
11
use Spiral\Config\ConfiguratorInterface;
12
use Spiral\Config\Patch\Append;
13
use Spiral\Core\{BinderInterface, FactoryInterface, InterceptableCore};
14
use Spiral\Core\Container\Autowire;
15
use Spiral\Core\CoreInterceptorInterface;
16
use Spiral\Queue\{JobHandlerLocatorListener,
17
    QueueConnectionProviderInterface,
18
    QueueInterface,
19
    QueueManager,
20
    QueueRegistry,
21
    SerializerLocatorListener,
22
    SerializerRegistryInterface};
23
use Spiral\Queue\Config\QueueConfig;
24
use Spiral\Queue\ContainerRegistry;
25
use Spiral\Queue\Core\QueueInjector;
26
use Spiral\Queue\Driver\{NullDriver, SyncDriver};
27
use Spiral\Queue\Failed\{FailedJobHandlerInterface, LogFailedJobHandler};
28
use Spiral\Queue\HandlerRegistryInterface;
29
use Spiral\Queue\Interceptor\Consume\{Core as ConsumeCore, ErrorHandlerInterceptor, Handler, RetryPolicyInterceptor};
30
use Spiral\Telemetry\Bootloader\TelemetryBootloader;
31
use Spiral\Telemetry\TracerFactoryInterface;
32
use Spiral\Tokenizer\Bootloader\TokenizerListenerBootloader;
33
use Spiral\Tokenizer\TokenizerListenerRegistryInterface;
34
35
final class QueueBootloader extends Bootloader
36
{
37
    protected const DEPENDENCIES = [
38
        TokenizerListenerBootloader::class,
39
        TelemetryBootloader::class,
40
    ];
41
42
    protected const SINGLETONS = [
43
        HandlerRegistryInterface::class => QueueRegistry::class,
44
        SerializerRegistryInterface::class => QueueRegistry::class,
45
        FailedJobHandlerInterface::class => LogFailedJobHandler::class,
46
        QueueConnectionProviderInterface::class => QueueManager::class,
47
        QueueManager::class => [self::class, 'initQueueManager'],
48
        QueueRegistry::class => [self::class, 'initRegistry'],
49
        Handler::class => [self::class, 'initHandler'],
50
    ];
51
52 306
    public function __construct(
53
        private readonly ConfiguratorInterface $config,
54
    ) {
55 306
    }
56
57 306
    public function init(
58
        ContainerInterface $container,
59
        BinderInterface $binder,
60
        EnvironmentInterface $env,
61
        AbstractKernel $kernel,
62
    ): void {
63 306
        $this->initQueueConfig($env);
64
65 306
        $this->registerDriverAlias(SyncDriver::class, 'sync');
66 306
        $binder->bindInjector(QueueInterface::class, QueueInjector::class);
67
68 306
        $kernel->booted(static function () use ($container): void {
69 306
            $registry = $container->get(QueueRegistry::class);
70 306
            $config = $container->get(QueueConfig::class);
71
72 306
            foreach ($config->getRegistryHandlers() as $jobType => $handler) {
73
                $registry->setHandler($jobType, $handler);
74
            }
75
76 306
            foreach ($config->getRegistrySerializers() as $jobType => $serializer) {
77
                $registry->setSerializer($jobType, $serializer);
78
            }
79 306
        });
80
    }
81
82 306
    public function boot(
83
        TokenizerListenerRegistryInterface $listenerRegistry,
84
        JobHandlerLocatorListener $jobHandlerLocator,
85
        SerializerLocatorListener $serializerLocator
86
    ): void {
87 306
        $listenerRegistry->addListener($jobHandlerLocator);
88 306
        $listenerRegistry->addListener($serializerLocator);
89
    }
90
91
    /**
92
     * @param class-string<CoreInterceptorInterface>|CoreInterceptorInterface|Autowire $interceptor
0 ignored issues
show
Documentation Bug introduced by
The doc comment class-string<CoreInterce...eptorInterface|Autowire at position 0 could not be parsed: Unknown type name 'class-string' at position 0 in class-string<CoreInterceptorInterface>|CoreInterceptorInterface|Autowire.
Loading history...
93
     */
94 1
    public function addConsumeInterceptor(string|CoreInterceptorInterface|Autowire $interceptor): void
95
    {
96 1
        $this->config->modify(
97 1
            QueueConfig::CONFIG,
98 1
            new Append('interceptors.consume', null, $interceptor),
99 1
        );
100
    }
101
102
    /**
103
     * @param class-string<CoreInterceptorInterface>|CoreInterceptorInterface|Autowire $interceptor
0 ignored issues
show
Documentation Bug introduced by
The doc comment class-string<CoreInterce...eptorInterface|Autowire at position 0 could not be parsed: Unknown type name 'class-string' at position 0 in class-string<CoreInterceptorInterface>|CoreInterceptorInterface|Autowire.
Loading history...
104
     */
105 1
    public function addPushInterceptor(string|CoreInterceptorInterface|Autowire $interceptor): void
106
    {
107 1
        $this->config->modify(
108 1
            QueueConfig::CONFIG,
109 1
            new Append('interceptors.push', null, $interceptor),
110 1
        );
111
    }
112
113 306
    public function registerDriverAlias(string $driverClass, string $alias): void
114
    {
115 306
        $this->config->modify(
116 306
            QueueConfig::CONFIG,
117 306
            new Append('driverAliases', $alias, $driverClass),
118 306
        );
119
    }
120
121 3
    protected function initQueueManager(FactoryInterface $factory): QueueManager
122
    {
123 3
        return $factory->make(QueueManager::class);
124
    }
125
126 306
    protected function initRegistry(
127
        ContainerInterface $container,
128
        FactoryInterface $factory,
129
        ContainerRegistry $registry,
130
    ) {
131 306
        return new QueueRegistry($container, $factory, $registry);
132
    }
133
134 2
    protected function initHandler(
135
        ConsumeCore $core,
136
        QueueConfig $config,
137
        ContainerInterface $container,
138
        FactoryInterface $factory,
139
        TracerFactoryInterface $tracerFactory,
140
        ?EventDispatcherInterface $dispatcher = null,
141
    ): Handler {
142 2
        $core = new InterceptableCore($core, $dispatcher);
143
144 2
        foreach ($config->getConsumeInterceptors() as $interceptor) {
145 2
            if (\is_string($interceptor)) {
146 2
                $interceptor = $container->get($interceptor);
147
            } elseif ($interceptor instanceof Autowire) {
148
                $interceptor = $interceptor->resolve($factory);
149
            }
150
151 2
            \assert($interceptor instanceof CoreInterceptorInterface);
152 2
            $core->addInterceptor($interceptor);
153
        }
154
155 2
        return new Handler($core, $tracerFactory);
156
    }
157
158 306
    private function initQueueConfig(EnvironmentInterface $env): void
159
    {
160 306
        $this->config->setDefaults(
161 306
            QueueConfig::CONFIG,
162 306
            [
163 306
                'default' => $env->get('QUEUE_CONNECTION', 'sync'),
164 306
                'connections' => [
165 306
                    'sync' => [
166 306
                        'driver' => 'sync',
167 306
                    ],
168 306
                ],
169 306
                'registry' => [
170 306
                    'handlers' => [],
171 306
                    'serializers' => [],
172 306
                ],
173 306
                'driverAliases' => [
174 306
                    'sync' => SyncDriver::class,
175 306
                    'null' => NullDriver::class,
176 306
                ],
177 306
                'interceptors' => [
178 306
                    'consume' => [
179 306
                        ErrorHandlerInterceptor::class,
180 306
                        RetryPolicyInterceptor::class,
181 306
                    ],
182 306
                    'push' => [],
183 306
                ],
184 306
            ],
185 306
        );
186
    }
187
}
188