Issues (10)

Integration/Laravel/TableSyncServiceProvider.php (1 issue)

Severity
1
<?php
2
3
declare(strict_types=1);
4
5
namespace Umbrellio\TableSync\Integration\Laravel;
6
7
use Illuminate\Log\LogManager;
8
use Illuminate\Support\Facades\Config as ConfigRepository;
9
use Illuminate\Support\ServiceProvider;
10
use PhpAmqpLib\Wire\AMQPTable;
11
use Psr\Log\LoggerInterface;
12
use Umbrellio\TableSync\Integration\Laravel\Console\PidManager;
13
use Umbrellio\TableSync\Integration\Laravel\Publishers\QueuePublisher;
14
use Umbrellio\TableSync\Integration\Laravel\Receive\MessageData\MessageDataRetriever;
15
use Umbrellio\TableSync\Integration\Laravel\Receive\Upserter\ByTargetKeysResolver;
16
use Umbrellio\TableSync\Integration\Laravel\Receive\Upserter\ConflictConditionResolverContract;
17
use Umbrellio\TableSync\Integration\Laravel\ReceivedMessageHandlers\QueueReceivedMessageHandler;
18
use Umbrellio\TableSync\Publisher;
19
use Umbrellio\TableSync\Rabbit\ChannelContainer;
20
use Umbrellio\TableSync\Rabbit\Config;
21
use Umbrellio\TableSync\Rabbit\ConnectionContainer;
22
use Umbrellio\TableSync\Rabbit\Consumer as RabbitConsumer;
23
use Umbrellio\TableSync\Rabbit\Publisher as RabbitPublisher;
24
25
class TableSyncServiceProvider extends ServiceProvider
26
{
27
    public $bindings = [
28
        ConflictConditionResolverContract::class => ByTargetKeysResolver::class,
29
    ];
30
    protected $defer = true;
31
32
    public function boot()
33
    {
34
        $config = __DIR__ . '/config/table_sync.php';
35
36
        $this->publishes([
37
            $config => base_path('config/table_sync.php'),
38
        ], 'config');
39
    }
40
41
    public function configureChannel()
42
    {
43
        $this->app->bind(ChannelContainer::class, function ($app) {
44
            $channel = new ChannelContainer($app->make(ConnectionContainer::class));
45
            $channel->setChannelOption(ConfigRepository::get('table_sync.channel'));
46
            return $channel;
47
        });
48
    }
49
50
    public function register()
51
    {
52
        $publishConfig = ConfigRepository::get('table_sync.publish');
53
54
        if ($publishConfig !== null) {
55
            $this->configurePublish($publishConfig);
56
        }
57
58
        $receiveConfig = ConfigRepository::get('table_sync.receive');
59
60
        if ($receiveConfig !== null) {
61
            $this->configureReceive($receiveConfig);
62
        }
63
64
        $logsConfig = ConfigRepository::get('table_sync.log');
65
66
        if ($logsConfig !== null) {
67
            $this->configureLogs($logsConfig);
68
        }
69
70
        $this->configureConnection();
71
        $this->configureChannel();
72
    }
73
74
    public function provides()
75
    {
76
        return [Publisher::class];
77
    }
78
79
    protected function registerCommands()
80
    {
81
        if ($this->app->runningInConsole()) {
82
            $this->commands([
83
                Console\Commands\WorkCommand::class,
84
                Console\Commands\TerminateCommand::class,
85
                Console\Commands\RestartCommand::class,
86
            ]);
87
        }
88
    }
89
90
    private function configurePublish(array $config): void
91
    {
92
        $this->app->singleton(Publisher::class, function ($app) use ($config) {
93
            if (isset($config['custom_publisher'])) {
94
                return $app->make($config['custom_publisher']);
95
            }
96
97
            return $app->make(QueuePublisher::class, [
98
                'publisher' => $app->make(RabbitPublisher::class),
99
            ]);
100
        });
101
102
        $this->app->bind(Config\PublishMessage::class, function () use ($config) {
103
            [
104
                'appId' => $appId,
105
                'headers' => $headers,
106
            ] = $config['message'];
107
108
            return new Config\PublishMessage($appId, new AMQPTable($headers));
109
        });
110
111
        $this->app->bind(Config\Publisher::class, function () use ($config) {
112
            [
113
                'exchangeName' => $exchange,
114
                'confirmSelect' => $confirm,
115
            ] = $config['publisher'];
116
117
            return new Config\Publisher($exchange, $confirm);
118
        });
119
    }
120
121
    private function configureReceive(array $config): void
122
    {
123
        $this->app->bind(MessageDataRetriever::class, function () use ($config) {
124
            $config = $config['message_configs'] ?? [];
125
            return new MessageDataRetriever($config);
126
        });
127
128
        $this->app->bind(Config\Consumer::class, function ($app) use ($config) {
129
            $queue = $config['queue'] ?? '';
130
            $handler = $config['custom_received_message_handler'] ?? QueueReceivedMessageHandler::class;
131
            $microsecondsToSleep = $config['microseconds_to_sleep'];
132
133
            return (new Config\Consumer($app->make($handler), $queue))->setMicrosecondsToSleep($microsecondsToSleep);
134
        });
135
136
        $pidPath = $config['pid_path'] ?? storage_path('table_sync.pid');
137
138
        $this->app->bind(PidManager::class, function () use ($pidPath) {
139
            return new PidManager($pidPath);
140
        });
141
142
        $this->registerCommands();
143
    }
144
145
    private function configureLogs(array $config): void
146
    {
147
        if (($logManager = $this->app->make('log')) instanceof LogManager) {
0 ignored issues
show
$logManager = $this->app->make('log') is always a sub-type of Illuminate\Log\LogManager.
Loading history...
148
            $this->app->singleton('table_sync.logger', function () use ($logManager, $config) {
149
                return $logManager->channel($config['channel']);
150
            });
151
        }
152
153
        $this->app
154
            ->when([RabbitPublisher::class, RabbitConsumer::class])
155
            ->needs(LoggerInterface::class)
156
            ->give('table_sync.logger')
157
        ;
158
    }
159
160
    private function configureConnection(): void
161
    {
162
        $this->app->singleton(ConnectionContainer::class, function () {
163
            [
164
                'host' => $host,
165
                'port' => $port,
166
                'user' => $user,
167
                'pass' => $pass,
168
                'vhost' => $vhost,
169
                'sslOptions' => $sslOptions,
170
                'options' => $options,
171
            ] = ConfigRepository::get('table_sync.connection');
172
173
            return new ConnectionContainer($host, $port, $user, $pass, $vhost, $sslOptions, $options);
174
        });
175
    }
176
}
177