EventStreamReplayer::__construct()   A
last analyzed

Complexity

Conditions 1
Paths 1

Size

Total Lines 10
Code Lines 5

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 1
eloc 5
nc 1
nop 3
dl 0
loc 10
rs 10
c 0
b 0
f 0
1
<?php
2
3
/**
4
 * Copyright 2014 SURFnet bv
5
 *
6
 * Licensed under the Apache License, Version 2.0 (the "License");
7
 * you may not use this file except in compliance with the License.
8
 * You may obtain a copy of the License at
9
 *
10
 *     http://www.apache.org/licenses/LICENSE-2.0
11
 *
12
 * Unless required by applicable law or agreed to in writing, software
13
 * distributed under the License is distributed on an "AS IS" BASIS,
14
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15
 * See the License for the specific language governing permissions and
16
 * limitations under the License.
17
 */
0 ignored issues
show
Coding Style introduced by
Missing @link tag in file comment
Loading history...
18
19
namespace Surfnet\StepupMiddleware\MiddlewareBundle\Service;
20
21
use Broadway\Domain\DomainMessage;
22
use Doctrine\DBAL\Exception\InvalidArgumentException;
23
use Exception;
24
use Surfnet\StepupMiddleware\CommandHandlingBundle\EventHandling\BufferedEventBus;
25
use Surfnet\StepupMiddleware\MiddlewareBundle\EventSourcing\DBALEventHydrator;
26
use Symfony\Component\Console\Helper\ProgressBar;
27
use Symfony\Component\Console\Output\OutputInterface;
28
use Throwable;
29
30
class EventStreamReplayer
0 ignored issues
show
Coding Style introduced by
Missing doc comment for class EventStreamReplayer
Loading history...
31
{
32
    /**
33
     * @var string[]
34
     */
35
    private array $middlewareTables = [
36
        'allowed_second_factor',
37
        'audit_log',
38
        'configured_institution',
39
        'email_templates',
40
        'identity',
41
        'identity_self_asserted_token_options',
42
        'institution_authorization',
43
        'institution_configuration_options',
44
        'institution_listing',
45
//        'institution_with_ra_locations',
0 ignored issues
show
Coding Style introduced by
Line indented incorrectly; expected at least 4 spaces, found 0
Loading history...
46
        'ra_listing',
47
        'ra_location',
48
        'ra_second_factor',
49
        'recovery_token',
50
        'second_factor_revocation',
51
        'sraa',
52
        'unverified_second_factor',
53
        'verified_second_factor',
54
        'vetted_second_factor',
55
        'vetting_type_hint',
56
        'whitelist_entry',
57
    ];
58
59
    /**
60
     * @var string[]
61
     */
62
    private array $gatewayTables = [
63
        'institution_configuration',
64
        'saml_entity',
65
        'second_factor',
66
        'whitelist_entry',
67
    ];
68
69
    public function __construct(
70
        private readonly BufferedEventBus $eventBus,
71
        private readonly DBALEventHydrator $eventHydrator,
72
        private readonly DBALConnectionHelper $connectionHelper,
73
    ) {
74
        ProgressBar::setFormatDefinition(
75
            'event_replay',
76
            "<info> %message%</info>\n"
77
            . ' <comment>%current%/%max%</comment> [%bar%] <comment>%percent:3s%%</comment><info>%elapsed:6s%/'
78
            . "%estimated:-6s%</info>\n %memory:6s%",
79
        );
80
    }
81
82
    public function replayEvents(OutputInterface $output, int $increments): void
83
    {
84
        $preparationProgress = new ProgressBar($output, 3);
85
        $preparationProgress->setFormat('event_replay');
86
87
        $preparationProgress->setMessage('Starting Transaction');
88
        $this->connectionHelper->beginTransaction();
89
        $preparationProgress->clear();
90
        $preparationProgress->advance();
91
92
        try {
93
            $preparationProgress->setMessage('Removing data from Read Tables');
94
            $preparationProgress->clear();
95
            $this->wipeReadTables($output);
96
97
            $preparationProgress->setMessage('Done wiping');
98
            $preparationProgress->clear();
99
            $preparationProgress->advance();
100
101
            $preparationProgress->setMessage('Determining amount of events to replay...');
102
            $preparationProgress->clear();
103
            $totalEvents = $this->eventHydrator->getCount();
104
105
            $preparationProgress->advance();
106
107
            if ($totalEvents == 0) {
108
                // Spaces are needed to overwrite the previous message.
109
                $preparationProgress->setMessage('There are no events to replay. Done.     ');
110
                $preparationProgress->clear();
111
                $preparationProgress->finish();
112
                return;
113
            } else {
114
                $defaultMessage = sprintf(
115
                    'Found <comment>%s</comment> Events, replaying in increments of <comment>%d</comment>',
116
                    $totalEvents,
117
                    $increments,
118
                );
119
                $preparationProgress->setMessage($defaultMessage);
120
                $preparationProgress->clear();
121
                $preparationProgress->finish();
122
            }
123
124
            $replayProgress = new ProgressBar($output, $totalEvents);
125
            $replayProgress->setFormat('event_replay');
126
            $replayProgress->setMessage($defaultMessage);
127
128
            for ($count = 0; $count < $totalEvents; $count += $increments) {
129
                $eventStream = $this->eventHydrator->getFromTill($increments, $count);
130
131
                if ($output->getVerbosity() >= OutputInterface::VERBOSITY_DEBUG) {
132
                    $messages = [];
133
                    foreach ($eventStream->getIterator() as $event) {
134
                        /** @var DomainMessage $event */
0 ignored issues
show
Coding Style introduced by
The open comment tag must be the only content on the line
Loading history...
Coding Style introduced by
The close comment tag must be the only content on the line
Loading history...
135
                        $messages[] = sprintf(
136
                            ' > <info>Publishing Event <comment>%s</comment> "<comment>%s</comment>" for UUID <comment>"%s</comment>"</info>',
137
                            $event->getRecordedOn()->toString(),
138
                            $event->getType(),
139
                            $event->getId(),
140
                        );
141
                    }
142
143
                    $output->writeln($messages);
144
                }
145
146
                $this->eventBus->publish($eventStream);
147
                $this->eventBus->flush();
148
149
                unset($eventStream);
150
                $steps = (($count + $increments < $totalEvents) ? $increments : ($totalEvents - $count));
151
                $replayProgress->advance($steps);
152
            }
153
154
            $this->connectionHelper->commit();
155
            $replayProgress->finish();
156
157
            $output->writeln(['', '<info>Done</info>', '']);
158
        } catch (Throwable $e) {
159
            echo $e->getMessage()."\n";
160
161
            $this->connectionHelper->rollBack();
162
            if (isset($replayProgress)) {
163
                $replayProgress->setMessage(sprintf('<error>ERROR OCCURRED: "%s"</error>', $e->getMessage()));
164
                $replayProgress->finish();
165
            }
166
167
            throw $e;
168
        }
169
    }
170
171
    /**
0 ignored issues
show
Coding Style introduced by
Parameter $output should have a doc-comment as per coding-style.
Loading history...
172
     * @throws InvalidArgumentException|\Doctrine\DBAL\Exception
173
     */
0 ignored issues
show
Coding Style introduced by
Missing @return tag in function comment
Loading history...
174
    private function wipeReadTables(OutputInterface $output): void
0 ignored issues
show
Coding Style introduced by
Private method name "EventStreamReplayer::wipeReadTables" must be prefixed with an underscore
Loading history...
175
    {
176
        if ($output->getVerbosity() === OutputInterface::VERBOSITY_DEBUG) {
177
            $output->writeln('<info>Retrieving connections to wipe READ tables</info>');
178
        }
179
180
        $middlewareConnection = $this->connectionHelper->getConnection('middleware');
181
        $gatewayConnection = $this->connectionHelper->getConnection('gateway');
182
183
        $middlewareDatabaseName = $middlewareConnection->getDatabase();
184
        $gatewayDatabaseName = $gatewayConnection->getDatabase();
185
186
        foreach ($this->middlewareTables as $table) {
187
            $rows = $middlewareConnection->delete($table, [1 => 1]);
188
            if ($output->getVerbosity() === OutputInterface::VERBOSITY_DEBUG) {
189
                $output->writeln(
190
                    sprintf(
191
                        '<info>Deleted <comment>%d</comment> rows from table <comment>%s.%s</comment></info>',
192
                        $rows,
193
                        $middlewareDatabaseName,
194
                        $table,
195
                    ),
196
                );
197
            }
198
        }
199
200
        foreach ($this->gatewayTables as $table) {
201
            $rows = $gatewayConnection->delete($table, [1 => 1]);
202
            if ($output->getVerbosity() === OutputInterface::VERBOSITY_DEBUG) {
203
                $output->writeln(
204
                    sprintf(
205
                        '<info>Deleted <comment>%d</comment> rows from table <comment>%s.%s</comment></info>',
206
                        $rows,
207
                        $gatewayDatabaseName,
208
                        $table,
209
                    ),
210
                );
211
            }
212
        }
213
    }
214
}
215