Passed
Push — master ( 67a020...fe0e28 )
by butschster
04:26 queued 17s
created

QueueBootloader::initHandler()   A

Complexity

Conditions 4
Paths 4

Size

Total Lines 22
Code Lines 9

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 8
CRAP Score 4.128

Importance

Changes 1
Bugs 0 Features 0
Metric Value
eloc 9
c 1
b 0
f 0
dl 0
loc 22
ccs 8
cts 10
cp 0.8
rs 9.9666
cc 4
nc 4
nop 6
crap 4.128
1
<?php
2
3
declare(strict_types=1);
4
5
namespace Spiral\Queue\Bootloader;
6
7
use Psr\Container\ContainerInterface;
8
use Psr\EventDispatcher\EventDispatcherInterface;
9
use Spiral\Boot\{AbstractKernel, EnvironmentInterface};
10
use Spiral\Boot\Bootloader\Bootloader;
11
use Spiral\Config\ConfiguratorInterface;
12
use Spiral\Config\Patch\Append;
13
use Spiral\Core\{BinderInterface, FactoryInterface, InterceptableCore};
14
use Spiral\Core\Container\Autowire;
15
use Spiral\Core\CoreInterceptorInterface;
16
use Spiral\Queue\{QueueConnectionProviderInterface,
17
    QueueInterface,
18
    QueueManager,
19
    QueueRegistry,
20
    SerializerRegistryInterface
21
};
22
use Spiral\Queue\Config\QueueConfig;
23
use Spiral\Queue\ContainerRegistry;
24
use Spiral\Queue\Core\QueueInjector;
25
use Spiral\Queue\Driver\{NullDriver, SyncDriver};
26
use Spiral\Queue\Failed\{FailedJobHandlerInterface, LogFailedJobHandler};
27
use Spiral\Queue\HandlerRegistryInterface;
28
use Spiral\Queue\Interceptor\Consume\{Core as ConsumeCore, ErrorHandlerInterceptor, Handler};
29
use Spiral\Telemetry\Bootloader\TelemetryBootloader;
30
use Spiral\Telemetry\TracerFactoryInterface;
31
32
final class QueueBootloader extends Bootloader
33
{
34
    protected const DEPENDENCIES = [
35
        TelemetryBootloader::class,
36
    ];
37
38
    protected const SINGLETONS = [
39
        HandlerRegistryInterface::class => QueueRegistry::class,
40
        SerializerRegistryInterface::class => QueueRegistry::class,
41
        FailedJobHandlerInterface::class => LogFailedJobHandler::class,
42
        QueueConnectionProviderInterface::class => QueueManager::class,
43
        QueueManager::class => [self::class, 'initQueueManager'],
44
        QueueRegistry::class => [self::class, 'initRegistry'],
45
        Handler::class => [self::class, 'initHandler'],
46
    ];
47
48 294
    public function __construct(
49
        private readonly ConfiguratorInterface $config,
50
    ) {
51 294
    }
52
53 293
    public function init(
54
        ContainerInterface $container,
55
        BinderInterface $binder,
56
        EnvironmentInterface $env,
57
        AbstractKernel $kernel,
58
    ): void {
59 293
        $this->initQueueConfig($env);
60
61 293
        $this->registerDriverAlias(SyncDriver::class, 'sync');
62 293
        $binder->bindInjector(QueueInterface::class, QueueInjector::class);
63
64 293
        $kernel->booted(static function () use ($container): void {
65 293
            $registry = $container->get(QueueRegistry::class);
66 293
            $config = $container->get(QueueConfig::class);
67
68 293
            foreach ($config->getRegistryHandlers() as $jobType => $handler) {
69
                $registry->setHandler($jobType, $handler);
70
            }
71
72 293
            foreach ($config->getRegistrySerializers() as $jobType => $serializer) {
73
                $registry->setSerializer($jobType, $serializer);
74
            }
75 293
        });
76
    }
77
78
    /**
79
     * @param class-string<CoreInterceptorInterface>|CoreInterceptorInterface|Autowire $interceptor
0 ignored issues
show
Documentation Bug introduced by
The doc comment class-string<CoreInterce...eptorInterface|Autowire at position 0 could not be parsed: Unknown type name 'class-string' at position 0 in class-string<CoreInterceptorInterface>|CoreInterceptorInterface|Autowire.
Loading history...
80
     */
81 1
    public function addConsumeInterceptor(string|CoreInterceptorInterface|Autowire $interceptor): void
82
    {
83 1
        $this->config->modify(
84 1
            QueueConfig::CONFIG,
85 1
            new Append('interceptors.consume', null, $interceptor),
86 1
        );
87
    }
88
89
    /**
90
     * @param class-string<CoreInterceptorInterface>|CoreInterceptorInterface|Autowire $interceptor
0 ignored issues
show
Documentation Bug introduced by
The doc comment class-string<CoreInterce...eptorInterface|Autowire at position 0 could not be parsed: Unknown type name 'class-string' at position 0 in class-string<CoreInterceptorInterface>|CoreInterceptorInterface|Autowire.
Loading history...
91
     */
92 1
    public function addPushInterceptor(string|CoreInterceptorInterface|Autowire $interceptor): void
93
    {
94 1
        $this->config->modify(
95 1
            QueueConfig::CONFIG,
96 1
            new Append('interceptors.push', null, $interceptor),
97 1
        );
98
    }
99
100 293
    public function registerDriverAlias(string $driverClass, string $alias): void
101
    {
102 293
        $this->config->modify(
103 293
            QueueConfig::CONFIG,
104 293
            new Append('driverAliases', $alias, $driverClass),
105 293
        );
106
    }
107
108 3
    protected function initQueueManager(FactoryInterface $factory): QueueManager
109
    {
110 3
        return $factory->make(QueueManager::class);
111
    }
112
113 293
    protected function initRegistry(
114
        ContainerInterface $container,
115
        FactoryInterface $factory,
116
        ContainerRegistry $registry,
117
    ) {
118 293
        return new QueueRegistry($container, $factory, $registry);
119
    }
120
121 2
    protected function initHandler(
122
        ConsumeCore $core,
123
        QueueConfig $config,
124
        ContainerInterface $container,
125
        FactoryInterface $factory,
126
        TracerFactoryInterface $tracerFactory,
127
        ?EventDispatcherInterface $dispatcher = null,
128
    ): Handler {
129 2
        $core = new InterceptableCore($core, $dispatcher);
130
131 2
        foreach ($config->getConsumeInterceptors() as $interceptor) {
132 2
            if (\is_string($interceptor)) {
133 2
                $interceptor = $container->get($interceptor);
134
            } elseif ($interceptor instanceof Autowire) {
135
                $interceptor = $interceptor->resolve($factory);
136
            }
137
138 2
            \assert($interceptor instanceof CoreInterceptorInterface);
139 2
            $core->addInterceptor($interceptor);
140
        }
141
142 2
        return new Handler($core, $tracerFactory);
143
    }
144
145 293
    private function initQueueConfig(EnvironmentInterface $env): void
146
    {
147 293
        $this->config->setDefaults(
148 293
            QueueConfig::CONFIG,
149 293
            [
150 293
                'default' => $env->get('QUEUE_CONNECTION', 'sync'),
151 293
                'connections' => [
152 293
                    'sync' => [
153 293
                        'driver' => 'sync',
154 293
                    ],
155 293
                ],
156 293
                'registry' => [
157 293
                    'handlers' => [],
158 293
                    'serializers' => [],
159 293
                ],
160 293
                'driverAliases' => [
161 293
                    'sync' => SyncDriver::class,
162 293
                    'null' => NullDriver::class,
163 293
                ],
164 293
                'interceptors' => [
165 293
                    'consume' => [
166 293
                        ErrorHandlerInterceptor::class,
167 293
                    ],
168 293
                    'push' => [],
169 293
                ],
170 293
            ],
171 293
        );
172
    }
173
}
174