Test Failed
Push — master ( c39bf4...65271b )
by butschster
16:04 queued 08:03
created

QueueBootloader::initQueue()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 3
Code Lines 1

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 2
CRAP Score 1

Importance

Changes 0
Metric Value
eloc 1
dl 0
loc 3
ccs 2
cts 2
cp 1
rs 10
c 0
b 0
f 0
cc 1
nc 1
nop 1
crap 1
1
<?php
2
3
declare(strict_types=1);
4
5
namespace Spiral\Queue\Bootloader;
6
7
use Psr\Container\ContainerInterface;
8
use Psr\EventDispatcher\EventDispatcherInterface;
9
use Spiral\Boot\AbstractKernel;
10
use Spiral\Boot\Bootloader\Bootloader;
11
use Spiral\Boot\EnvironmentInterface;
12
use Spiral\Config\ConfiguratorInterface;
13
use Spiral\Config\Patch\Append;
14
use Spiral\Core\BinderInterface;
15
use Spiral\Core\Container\Autowire;
16
use Spiral\Core\CoreInterceptorInterface;
17
use Spiral\Core\FactoryInterface;
18
use Spiral\Core\InterceptableCore;
19
use Spiral\Queue\Config\QueueConfig;
20
use Spiral\Queue\ContainerRegistry;
21
use Spiral\Queue\Core\QueueInjector;
22
use Spiral\Queue\Driver\NullDriver;
23
use Spiral\Queue\Driver\SyncDriver;
24
use Spiral\Queue\Failed\FailedJobHandlerInterface;
25
use Spiral\Queue\Failed\LogFailedJobHandler;
26
use Spiral\Queue\HandlerRegistryInterface;
27
use Spiral\Queue\Interceptor\Consume\ErrorHandlerInterceptor;
28
use Spiral\Queue\Interceptor\Consume\Handler;
29
use Spiral\Queue\Interceptor\Consume\Core as ConsumeCore;
30
use Spiral\Queue\QueueConnectionProviderInterface;
31
use Spiral\Queue\QueueInterface;
32
use Spiral\Queue\QueueManager;
33
use Spiral\Queue\QueueRegistry;
34
use Spiral\Queue\SerializerRegistryInterface;
35
use Spiral\Telemetry\Bootloader\TelemetryBootloader;
36
use Spiral\Telemetry\TracerFactoryInterface;
37
38
final class QueueBootloader extends Bootloader
39
{
40
    protected const DEPENDENCIES = [
41
        TelemetryBootloader::class,
42
    ];
43
44
    protected const SINGLETONS = [
45
        HandlerRegistryInterface::class => QueueRegistry::class,
46
        SerializerRegistryInterface::class => QueueRegistry::class,
47
        FailedJobHandlerInterface::class => LogFailedJobHandler::class,
48
        QueueConnectionProviderInterface::class => QueueManager::class,
49
        QueueManager::class => [self::class, 'initQueueManager'],
50
        QueueRegistry::class => [self::class, 'initRegistry'],
51
        Handler::class => [self::class, 'initHandler'],
52
    ];
53
54
    public function __construct(
55 283
        private readonly ConfiguratorInterface $config
56
    ) {
57
    }
58 283
59
    public function init(
60 283
        ContainerInterface $container,
61
        BinderInterface $binder,
62
        EnvironmentInterface $env,
63
        AbstractKernel $kernel
64
    ): void {
65
        $this->initQueueConfig($env);
66 283
67
        $this->registerDriverAlias(SyncDriver::class, 'sync');
68 283
        $binder->bindInjector(QueueInterface::class, QueueInjector::class);
69 283
70
        $kernel->booted(static function () use ($container): void {
71 283
            $registry = $container->get(QueueRegistry::class);
72 283
            $config = $container->get(QueueConfig::class);
73 283
74
            foreach ($config->getRegistryHandlers() as $jobType => $handler) {
75 283
                $registry->setHandler($jobType, $handler);
76
            }
77
78
            foreach ($config->getRegistrySerializers() as $jobType => $serializer) {
79 283
                $registry->setSerializer($jobType, $serializer);
80
            }
81
        });
82 283
    }
83
84
    /**
85
     * @param class-string<CoreInterceptorInterface>|CoreInterceptorInterface|Autowire $interceptor
0 ignored issues
show
Documentation Bug introduced by
The doc comment class-string<CoreInterce...eptorInterface|Autowire at position 0 could not be parsed: Unknown type name 'class-string' at position 0 in class-string<CoreInterceptorInterface>|CoreInterceptorInterface|Autowire.
Loading history...
86
     */
87
    public function addConsumeInterceptor(string|CoreInterceptorInterface|Autowire $interceptor): void
88 1
    {
89
        $this->config->modify(
90 1
            QueueConfig::CONFIG,
91 1
            new Append('interceptors.consume', null, $interceptor)
92 1
        );
93 1
    }
94
95
    /**
96
     * @param class-string<CoreInterceptorInterface>|CoreInterceptorInterface|Autowire $interceptor
0 ignored issues
show
Documentation Bug introduced by
The doc comment class-string<CoreInterce...eptorInterface|Autowire at position 0 could not be parsed: Unknown type name 'class-string' at position 0 in class-string<CoreInterceptorInterface>|CoreInterceptorInterface|Autowire.
Loading history...
97
     */
98
    public function addPushInterceptor(string|CoreInterceptorInterface|Autowire $interceptor): void
99 1
    {
100
        $this->config->modify(
101 1
            QueueConfig::CONFIG,
102 1
            new Append('interceptors.push', null, $interceptor)
103 1
        );
104 1
    }
105
106
    public function registerDriverAlias(string $driverClass, string $alias): void
107 283
    {
108
        $this->config->modify(
109 283
            QueueConfig::CONFIG,
110 283
            new Append('driverAliases', $alias, $driverClass)
111 283
        );
112 283
    }
113
114
    protected function initQueueManager(FactoryInterface $factory): QueueManager
115 3
    {
116
        return $factory->make(QueueManager::class);
117 3
    }
118
119
    protected function initRegistry(
120 283
        ContainerInterface $container,
121
        FactoryInterface $factory,
122
        ContainerRegistry $registry
123
    ) {
124
        return new QueueRegistry($container, $factory, $registry);
125 283
    }
126
127
    protected function initHandler(
128 2
        ConsumeCore $core,
129
        QueueConfig $config,
130
        ContainerInterface $container,
131
        FactoryInterface $factory,
132
        TracerFactoryInterface $tracerFactory,
133
        ?EventDispatcherInterface $dispatcher = null
134
    ): Handler {
135
        $core = new InterceptableCore($core, $dispatcher);
136 2
137
        foreach ($config->getConsumeInterceptors() as $interceptor) {
138 2
            if (\is_string($interceptor)) {
139 2
                $interceptor = $container->get($interceptor);
140 2
            } elseif ($interceptor instanceof Autowire) {
141
                $interceptor = $interceptor->resolve($factory);
142
            }
143
144
            \assert($interceptor instanceof CoreInterceptorInterface);
145 2
            $core->addInterceptor($interceptor);
146 2
        }
147
148
        return new Handler($core, $tracerFactory);
149 2
    }
150
151
    private function initQueueConfig(EnvironmentInterface $env): void
152 1
    {
153
        $this->config->setDefaults(
154 1
            QueueConfig::CONFIG,
155
            [
156
                'default' => $env->get('QUEUE_CONNECTION', 'sync'),
157 283
                'connections' => [
158
                    'sync' => [
159 283
                        'driver' => 'sync',
160 283
                    ],
161 283
                ],
162 283
                'registry' => [
163 283
                    'handlers' => [],
164 283
                    'serializers' => [],
165 283
                ],
166 283
                'driverAliases' => [
167 283
                    'sync' => SyncDriver::class,
168 283
                    'null' => NullDriver::class,
169 283
                ],
170 283
                'interceptors' => [
171 283
                    'consume' => [
172 283
                        ErrorHandlerInterceptor::class,
173 283
                    ],
174 283
                    'push' => [],
175 283
                ],
176 283
            ]
177 283
        );
178 283
    }
179
}
180