Completed
Push — master ( 5467e4...183ea4 )
by Boy
05:06 queued 01:02
created

EventStreamReplayer::replayEvents()   C

Complexity

Conditions 8
Paths 74

Size

Total Lines 79
Code Lines 51

Duplication

Lines 0
Ratio 0 %

Importance

Changes 1
Bugs 0 Features 0
Metric Value
dl 0
loc 79
rs 6.0572
c 1
b 0
f 0
cc 8
eloc 51
nc 74
nop 2

How to fix   Long Method   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

1
<?php
2
3
/**
4
 * Copyright 2014 SURFnet bv
5
 *
6
 * Licensed under the Apache License, Version 2.0 (the "License");
7
 * you may not use this file except in compliance with the License.
8
 * You may obtain a copy of the License at
9
 *
10
 *     http://www.apache.org/licenses/LICENSE-2.0
11
 *
12
 * Unless required by applicable law or agreed to in writing, software
13
 * distributed under the License is distributed on an "AS IS" BASIS,
14
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15
 * See the License for the specific language governing permissions and
16
 * limitations under the License.
17
 */
18
19
namespace Surfnet\StepupMiddleware\MiddlewareBundle\Service;
20
21
use Broadway\Domain\DomainEventStream;
22
use Broadway\Domain\DomainMessage;
23
use Exception;
24
use Surfnet\StepupMiddleware\CommandHandlingBundle\EventHandling\BufferedEventBus;
25
use Surfnet\StepupMiddleware\MiddlewareBundle\EventSourcing\DBALEventHydrator;
26
use Symfony\Component\Console\Helper\ProgressBar;
27
use Symfony\Component\Console\Output\OutputInterface;
28
29
class EventStreamReplayer
30
{
31
    /**
32
     * @var BufferedEventBus
33
     */
34
    private $eventBus;
35
36
    /**
37
     * @var DBALEventHydrator
38
     */
39
    private $eventHydrator;
40
41
    /**
42
     * @var DBALConnectionHelper
43
     */
44
    private $connectionHelper;
45
46
    /**
47
     * @var array
48
     */
49
    private $middlewareTables = [
50
        'unverified_second_factor',
51
        'verified_second_factor',
52
        'vetted_second_factor',
53
        'ra_second_factor',
54
        'identity',
55
        'sraa',
56
        'audit_log',
57
        'ra_listing',
58
        'ra_candidate',
59
        'second_factor_revocation',
60
        'whitelist_entry',
61
    ];
62
63
    /**
64
     * @var array
65
     */
66
    private $gatewayTables = [
67
        'second_factor',
68
        'saml_entity',
69
        'whitelist_entry',
70
    ];
71
72
    public function __construct(
73
        BufferedEventBus $eventBus,
74
        DBALEventHydrator $eventHydrator,
75
        DBALConnectionHelper $connectionHelper
76
    ) {
77
        $this->eventBus         = $eventBus;
78
        $this->eventHydrator    = $eventHydrator;
79
        $this->connectionHelper = $connectionHelper;
80
        ProgressBar::setFormatDefinition(
81
            'event_replay',
82
            "<info> %message%</info>\n"
83
            . ' <comment>%current%/%max%</comment> [%bar%] <comment>%percent:3s%%</comment><info>%elapsed:6s%/'
84
            . "%estimated:-6s%</info>\n %memory:6s%"
85
        );
86
    }
87
88
    public function replayEvents(OutputInterface $output, $increments)
89
    {
90
        $preparationProgress = new ProgressBar($output, 3);
91
        $preparationProgress->setFormat('event_replay');
92
93
        $preparationProgress->setMessage('Starting Transaction');
94
        $this->connectionHelper->beginTransaction();
95
        $preparationProgress->advance();
96
97
        try {
98
            $preparationProgress->setMessage('Removing data from Read Tables');
99
            $this->wipeReadTables($output);
100
            $preparationProgress->advance();
101
102
            $preparationProgress->setMessage('Determining amount of events to replay...');
103
            $totalEvents = $this->eventHydrator->getCount();
104
105
            $preparationProgress->advance();
106
107
            if ($totalEvents == 0) {
108
                // Spaces are needed to overwrite the previous message.
109
                $preparationProgress->setMessage('There are no events to replay. Done.     ');
110
                $preparationProgress->finish();
111
                return;
112
            } else {
113
                $defaultMessage = sprintf(
114
                    'Found <comment>%s</comment> Events, replaying in increments of <comment>%d</comment>',
115
                    $totalEvents,
116
                    $increments
117
                );
118
                $preparationProgress->setMessage($defaultMessage);
119
                $preparationProgress->finish();
120
            }
121
122
            $replayProgress = new ProgressBar($output, $totalEvents);
123
            $replayProgress->setFormat('event_replay');
124
            $replayProgress->setMessage($defaultMessage);
125
126
            for ($count = 0; $count < $totalEvents; $count += $increments) {
127
                /** @var DomainEventStream $eventStream */
128
                $eventStream = $this->eventHydrator->getFromTill($increments, $count);
129
130
                if ($output->getVerbosity() >= OutputInterface::VERBOSITY_DEBUG) {
131
                    $messages = [];
132
                    foreach ($eventStream->getIterator() as $event) {
133
                        /** @var DomainMessage $event */
134
                        $messages[] = sprintf(
135
                            ' > <info>Publishing Event "<comment>%s</comment>" for UUID <comment>"%s</comment>"</info>',
136
                            $event->getType(),
137
                            $event->getId()
138
                        );
139
                    }
140
141
                    $output->writeln($messages);
142
                }
143
144
                $this->eventBus->publish($eventStream);
145
                $this->eventBus->flush();
146
147
                unset($eventStream);
148
                $steps = (($count + $increments < $totalEvents) ? $increments : ($totalEvents - $count));
149
                $replayProgress->advance($steps);
150
            }
151
152
            $this->connectionHelper->commit();
153
            $replayProgress->finish();
154
155
            $output->writeln(['', '<info>Done</info>', '']);
156
        } catch (Exception $e) {
157
            $this->connectionHelper->rollBack();
158
            if (isset($replayProgress)) {
159
                $replayProgress->setMessage(sprintf('<error>ERROR OCCURRED: "%s"</error>', $e->getMessage()));
160
                $replayProgress->finish();
161
            }
162
163
            throw $e;
164
        }
165
166
    }
167
168
    private function wipeReadTables(OutputInterface $output)
169
    {
170
        if ($output->getVerbosity() === OutputInterface::VERBOSITY_DEBUG) {
171
            $output->writeln('<info>Retrieving connections to wipe READ tables</info>');
172
        }
173
174
        $middlewareConnection = $this->connectionHelper->getConnection('middleware');
175
        $gatewayConnection    = $this->connectionHelper->getConnection('gateway');
176
177
        $middlewareDatabaseName = $middlewareConnection->getDatabase();
178
        $gatewayDatabaseName    = $gatewayConnection->getDatabase();
179
180
        foreach ($this->middlewareTables as $table) {
181
            $rows = $middlewareConnection->delete($table, [1 => 1]);
182
            if ($output->getVerbosity() === OutputInterface::VERBOSITY_DEBUG) {
183
                $output->writeln(sprintf(
184
                    '<info>Deleted <comment>%d</comment> rows from table <comment>%s.%s</comment></info>',
185
                    $rows,
186
                    $middlewareDatabaseName,
187
                    $table
188
                ));
189
            }
190
        }
191
192
        foreach ($this->gatewayTables as $table) {
193
            $rows = $gatewayConnection->delete($table, [1 => 1]);
194
            if ($output->getVerbosity() === OutputInterface::VERBOSITY_DEBUG) {
195
                $output->writeln(sprintf(
196
                    '<info>Deleted <comment>%d</comment> rows from table <comment>%s.%s</comment></info>',
197
                    $rows,
198
                    $gatewayDatabaseName,
199
                    $table
200
                ));
201
            }
202
        }
203
    }
204
}
205