EventStreamReplayer::replayEvents()   B
last analyzed

Complexity

Conditions 8
Paths 86

Size

Total Lines 86
Code Lines 59

Duplication

Lines 0
Ratio 0 %

Importance

Changes 2
Bugs 0 Features 0
Metric Value
cc 8
eloc 59
nc 86
nop 2
dl 0
loc 86
rs 7.6501
c 2
b 0
f 0

How to fix   Long Method   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

1
<?php
2
3
/**
4
 * Copyright 2014 SURFnet bv
5
 *
6
 * Licensed under the Apache License, Version 2.0 (the "License");
7
 * you may not use this file except in compliance with the License.
8
 * You may obtain a copy of the License at
9
 *
10
 *     http://www.apache.org/licenses/LICENSE-2.0
11
 *
12
 * Unless required by applicable law or agreed to in writing, software
13
 * distributed under the License is distributed on an "AS IS" BASIS,
14
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15
 * See the License for the specific language governing permissions and
16
 * limitations under the License.
17
 */
18
19
namespace Surfnet\StepupMiddleware\MiddlewareBundle\Service;
20
21
use Broadway\Domain\DomainMessage;
22
use Doctrine\DBAL\Exception\InvalidArgumentException;
23
use Surfnet\StepupMiddleware\CommandHandlingBundle\EventHandling\BufferedEventBus;
24
use Surfnet\StepupMiddleware\MiddlewareBundle\EventSourcing\DBALEventHydrator;
25
use Symfony\Component\Console\Helper\ProgressBar;
26
use Symfony\Component\Console\Output\OutputInterface;
27
use Throwable;
28
29
class EventStreamReplayer
30
{
31
    /**
32
     * @var string[]
33
     */
34
    private array $middlewareTables = [
35
        'allowed_second_factor',
36
        'audit_log',
37
        'configured_institution',
38
        'email_templates',
39
        'identity',
40
        'identity_self_asserted_token_options',
41
        'institution_authorization',
42
        'institution_configuration_options',
43
        'institution_listing',
44
//        'institution_with_ra_locations',
45
        'ra_listing',
46
        'ra_location',
47
        'ra_second_factor',
48
        'recovery_token',
49
        'second_factor_revocation',
50
        'sraa',
51
        'unverified_second_factor',
52
        'verified_second_factor',
53
        'vetted_second_factor',
54
        'vetting_type_hint',
55
        'whitelist_entry',
56
    ];
57
58
    /**
59
     * @var string[]
60
     */
61
    private array $gatewayTables = [
62
        'institution_configuration',
63
        'saml_entity',
64
        'second_factor',
65
        'whitelist_entry',
66
    ];
67
68
    public function __construct(
69
        private readonly BufferedEventBus $eventBus,
70
        private readonly DBALEventHydrator $eventHydrator,
71
        private readonly DBALConnectionHelper $connectionHelper,
72
    ) {
73
        ProgressBar::setFormatDefinition(
74
            'event_replay',
75
            "<info> %message%</info>\n"
76
            . ' <comment>%current%/%max%</comment> [%bar%] <comment>%percent:3s%%</comment><info>%elapsed:6s%/'
77
            . "%estimated:-6s%</info>\n %memory:6s%",
78
        );
79
    }
80
81
    public function replayEvents(OutputInterface $output, int $increments): void
82
    {
83
        $preparationProgress = new ProgressBar($output, 3);
84
        $preparationProgress->setFormat('event_replay');
85
86
        $preparationProgress->setMessage('Starting Transaction');
87
        $this->connectionHelper->beginTransaction();
88
        $preparationProgress->clear();
89
        $preparationProgress->advance();
90
91
        try {
92
            $preparationProgress->setMessage('Removing data from Read Tables');
93
            $preparationProgress->clear();
94
            $this->wipeReadTables($output);
95
96
            $preparationProgress->setMessage('Done wiping');
97
            $preparationProgress->clear();
98
            $preparationProgress->advance();
99
100
            $preparationProgress->setMessage('Determining amount of events to replay...');
101
            $preparationProgress->clear();
102
            $totalEvents = $this->eventHydrator->getCount();
103
104
            $preparationProgress->advance();
105
106
            if ($totalEvents == 0) {
107
                // Spaces are needed to overwrite the previous message.
108
                $preparationProgress->setMessage('There are no events to replay. Done.     ');
109
                $preparationProgress->clear();
110
                $preparationProgress->finish();
111
                return;
112
            } else {
113
                $defaultMessage = sprintf(
114
                    'Found <comment>%s</comment> Events, replaying in increments of <comment>%d</comment>',
115
                    $totalEvents,
116
                    $increments,
117
                );
118
                $preparationProgress->setMessage($defaultMessage);
119
                $preparationProgress->clear();
120
                $preparationProgress->finish();
121
            }
122
123
            $replayProgress = new ProgressBar($output, $totalEvents);
124
            $replayProgress->setFormat('event_replay');
125
            $replayProgress->setMessage($defaultMessage);
126
127
            for ($count = 0; $count < $totalEvents; $count += $increments) {
128
                $eventStream = $this->eventHydrator->getFromTill($increments, $count);
129
130
                if ($output->getVerbosity() >= OutputInterface::VERBOSITY_DEBUG) {
131
                    $messages = [];
132
                    foreach ($eventStream->getIterator() as $event) {
133
                        /** @var DomainMessage $event */
134
                        $messages[] = sprintf(
135
                            ' > <info>Publishing Event <comment>%s</comment> "<comment>%s</comment>" for UUID <comment>"%s</comment>"</info>',
136
                            $event->getRecordedOn()->toString(),
137
                            $event->getType(),
138
                            $event->getId(),
139
                        );
140
                    }
141
142
                    $output->writeln($messages);
143
                }
144
145
                $this->eventBus->publish($eventStream);
146
                $this->eventBus->flush();
147
148
                unset($eventStream);
149
                $steps = (($count + $increments < $totalEvents) ? $increments : ($totalEvents - $count));
150
                $replayProgress->advance($steps);
151
            }
152
153
            $this->connectionHelper->commit();
154
            $replayProgress->finish();
155
156
            $output->writeln(['', '<info>Done</info>', '']);
157
        } catch (Throwable $e) {
158
            echo $e->getMessage()."\n";
0 ignored issues
show
Coding Style introduced by
Expected at least 1 space before "."; 0 found
Loading history...
Coding Style introduced by
Expected at least 1 space after "."; 0 found
Loading history...
159
160
            $this->connectionHelper->rollBack();
161
            if (isset($replayProgress)) {
162
                $replayProgress->setMessage(sprintf('<error>ERROR OCCURRED: "%s"</error>', $e->getMessage()));
163
                $replayProgress->finish();
164
            }
165
166
            throw $e;
167
        }
168
    }
169
170
    /**
171
     * @throws InvalidArgumentException|\Doctrine\DBAL\Exception
172
     */
173
    private function wipeReadTables(OutputInterface $output): void
174
    {
175
        if ($output->getVerbosity() === OutputInterface::VERBOSITY_DEBUG) {
176
            $output->writeln('<info>Retrieving connections to wipe READ tables</info>');
177
        }
178
179
        $middlewareConnection = $this->connectionHelper->getConnection('middleware');
180
        $gatewayConnection = $this->connectionHelper->getConnection('gateway');
181
182
        $middlewareDatabaseName = $middlewareConnection->getDatabase();
183
        $gatewayDatabaseName = $gatewayConnection->getDatabase();
184
185
        foreach ($this->middlewareTables as $table) {
186
            $rows = $middlewareConnection->delete($table, [1 => 1]);
187
            if ($output->getVerbosity() === OutputInterface::VERBOSITY_DEBUG) {
188
                $output->writeln(
189
                    sprintf(
190
                        '<info>Deleted <comment>%d</comment> rows from table <comment>%s.%s</comment></info>',
191
                        $rows,
192
                        $middlewareDatabaseName,
193
                        $table,
194
                    ),
195
                );
196
            }
197
        }
198
199
        foreach ($this->gatewayTables as $table) {
200
            $rows = $gatewayConnection->delete($table, [1 => 1]);
201
            if ($output->getVerbosity() === OutputInterface::VERBOSITY_DEBUG) {
202
                $output->writeln(
203
                    sprintf(
204
                        '<info>Deleted <comment>%d</comment> rows from table <comment>%s.%s</comment></info>',
205
                        $rows,
206
                        $gatewayDatabaseName,
207
                        $table,
208
                    ),
209
                );
210
            }
211
        }
212
    }
213
}
214