Passed
Pull Request — master (#823)
by butschster
06:29
created

QueueBootloader   A

Complexity

Total Complexity 15

Size/Duplication

Total Lines 138
Duplicated Lines 0 %

Test Coverage

Coverage 90%

Importance

Changes 1
Bugs 0 Features 0
Metric Value
wmc 15
eloc 57
dl 0
loc 138
ccs 36
cts 40
cp 0.9
rs 10
c 1
b 0
f 0

10 Methods

Rating   Name   Duplication   Size   Complexity  
A __construct() 0 3 1
A addPushInterceptor() 0 5 1
A initQueueConfig() 0 24 1
A registerDriverAlias() 0 5 1
A initHandler() 0 21 4
A addConsumeInterceptor() 0 5 1
A initRegistry() 0 6 1
A initQueueManager() 0 3 1
A initQueue() 0 3 1
A init() 0 21 3
1
<?php
2
3
declare(strict_types=1);
4
5
namespace Spiral\Queue\Bootloader;
6
7
use Psr\Container\ContainerInterface;
8
use Psr\EventDispatcher\EventDispatcherInterface;
9
use Spiral\Boot\AbstractKernel;
10
use Spiral\Boot\Bootloader\Bootloader;
11
use Spiral\Boot\EnvironmentInterface;
12
use Spiral\Config\ConfiguratorInterface;
13
use Spiral\Config\Patch\Append;
14
use Spiral\Core\BinderInterface;
15
use Spiral\Core\Container\Autowire;
16
use Spiral\Core\CoreInterceptorInterface;
17
use Spiral\Core\FactoryInterface;
18
use Spiral\Core\InterceptableCore;
19
use Spiral\Queue\Config\QueueConfig;
20
use Spiral\Queue\ContainerRegistry;
21
use Spiral\Queue\Core\QueueInjector;
22
use Spiral\Queue\Driver\NullDriver;
23
use Spiral\Queue\Driver\SyncDriver;
24
use Spiral\Queue\Failed\FailedJobHandlerInterface;
25
use Spiral\Queue\Failed\LogFailedJobHandler;
26
use Spiral\Queue\HandlerRegistryInterface;
27
use Spiral\Queue\Interceptor\Consume\ErrorHandlerInterceptor;
28
use Spiral\Queue\Interceptor\Consume\Handler;
29
use Spiral\Queue\Interceptor\Consume\Core as ConsumeCore;
30
use Spiral\Queue\QueueConnectionProviderInterface;
31
use Spiral\Queue\QueueInterface;
32
use Spiral\Queue\QueueManager;
33
use Spiral\Queue\QueueRegistry;
34
use Spiral\Queue\SerializerRegistryInterface;
35
36
final class QueueBootloader extends Bootloader
37
{
38
    protected const SINGLETONS = [
39
        HandlerRegistryInterface::class => QueueRegistry::class,
40
        SerializerRegistryInterface::class => QueueRegistry::class,
41
        FailedJobHandlerInterface::class => LogFailedJobHandler::class,
42
        QueueConnectionProviderInterface::class => QueueManager::class,
43
        QueueManager::class => [self::class, 'initQueueManager'],
44
        QueueRegistry::class => [self::class, 'initRegistry'],
45
        Handler::class => [self::class, 'initHandler'],
46
        QueueInterface::class => [self::class, 'initQueue'],
47
    ];
48
49 250
    public function __construct(
50
        private readonly ConfiguratorInterface $config
51
    ) {
52
    }
53
54 250
    public function init(
55
        ContainerInterface $container,
56
        BinderInterface $binder,
57
        EnvironmentInterface $env,
58
        AbstractKernel $kernel
59
    ): void {
60 250
        $this->initQueueConfig($env);
61
62 250
        $this->registerDriverAlias(SyncDriver::class, 'sync');
63 250
        $binder->bindInjector(QueueInterface::class, QueueInjector::class);
64
65 250
        $kernel->booted(static function () use ($container): void {
66 250
            $registry = $container->get(QueueRegistry::class);
67 250
            $config = $container->get(QueueConfig::class);
68
69 250
            foreach ($config->getRegistryHandlers() as $jobType => $handler) {
70
                $registry->setHandler($jobType, $handler);
71
            }
72
73 250
            foreach ($config->getRegistrySerializers() as $jobType => $serializer) {
74
                $registry->setSerializer($jobType, $serializer);
75
            }
76
        });
77
    }
78
79
    /**
80
     * @param class-string<CoreInterceptorInterface>|CoreInterceptorInterface|Autowire $interceptor
0 ignored issues
show
Documentation Bug introduced by
The doc comment class-string<CoreInterce...eptorInterface|Autowire at position 0 could not be parsed: Unknown type name 'class-string' at position 0 in class-string<CoreInterceptorInterface>|CoreInterceptorInterface|Autowire.
Loading history...
81
     */
82 1
    public function addConsumeInterceptor(string|CoreInterceptorInterface|Autowire $interceptor): void
83
    {
84 1
        $this->config->modify(
85
            QueueConfig::CONFIG,
86 1
            new Append('interceptors.consume', null, $interceptor)
87
        );
88
    }
89
90
    /**
91
     * @param class-string<CoreInterceptorInterface>|CoreInterceptorInterface|Autowire $interceptor
0 ignored issues
show
Documentation Bug introduced by
The doc comment class-string<CoreInterce...eptorInterface|Autowire at position 0 could not be parsed: Unknown type name 'class-string' at position 0 in class-string<CoreInterceptorInterface>|CoreInterceptorInterface|Autowire.
Loading history...
92
     */
93 1
    public function addPushInterceptor(string|CoreInterceptorInterface|Autowire $interceptor): void
94
    {
95 1
        $this->config->modify(
96
            QueueConfig::CONFIG,
97 1
            new Append('interceptors.push', null, $interceptor)
98
        );
99
    }
100
101 250
    public function registerDriverAlias(string $driverClass, string $alias): void
102
    {
103 250
        $this->config->modify(
104
            QueueConfig::CONFIG,
105 250
            new Append('driverAliases', $alias, $driverClass)
106
        );
107
    }
108
109 3
    protected function initQueueManager(FactoryInterface $factory): QueueManager
110
    {
111 3
        return $factory->make(QueueManager::class);
112
    }
113
114 250
    protected function initRegistry(
115
        ContainerInterface $container,
116
        FactoryInterface $factory,
117
        ContainerRegistry $registry
118
    ) {
119 250
        return new QueueRegistry($container, $factory, $registry);
120
    }
121
122 2
    protected function initHandler(
123
        ConsumeCore $core,
124
        QueueConfig $config,
125
        ContainerInterface $container,
126
        FactoryInterface $factory,
127
        ?EventDispatcherInterface $dispatcher = null
128
    ): Handler {
129 2
        $core = new InterceptableCore($core, $dispatcher);
130
131 2
        foreach ($config->getConsumeInterceptors() as $interceptor) {
132 2
            if (\is_string($interceptor)) {
133 2
                $interceptor = $container->get($interceptor);
134
            } elseif ($interceptor instanceof Autowire) {
135
                $interceptor = $interceptor->resolve($factory);
136
            }
137
138 2
            \assert($interceptor instanceof CoreInterceptorInterface);
139 2
            $core->addInterceptor($interceptor);
140
        }
141
142 2
        return new Handler($core);
143
    }
144
145 1
    protected function initQueue(QueueConnectionProviderInterface $manager): QueueInterface
146
    {
147 1
        return $manager->getConnection();
148
    }
149
150 250
    private function initQueueConfig(EnvironmentInterface $env): void
151
    {
152 250
        $this->config->setDefaults(
153
            QueueConfig::CONFIG,
154
            [
155 250
                'default' => $env->get('QUEUE_CONNECTION', 'sync'),
156
                'connections' => [
157
                    'sync' => [
158
                        'driver' => 'sync',
159
                    ],
160
                ],
161
                'registry' => [
162
                    'handlers' => [],
163
                    'serializers' => [],
164
                ],
165
                'driverAliases' => [
166
                    'sync' => SyncDriver::class,
167
                    'null' => NullDriver::class,
168
                ],
169
                'interceptors' => [
170
                    'consume' => [
171
                        ErrorHandlerInterceptor::class,
172
                    ],
173
                    'push' => [],
174
                ],
175
            ]
176
        );
177
    }
178
}
179