etlt.Transformer   A
last analyzed

Complexity

Total Complexity 34

Size/Duplication

Total Lines 363
Duplicated Lines 96.42 %

Test Coverage

Coverage 0%

Importance

Changes 0
Metric Value
eloc 133
dl 350
loc 363
ccs 0
cts 124
cp 0
rs 9.68
c 0
b 0
f 0
wmc 34

17 Methods

Rating   Name   Duplication   Size   Complexity  
A Transformer._step00() 12 12 2
A Transformer._load_ignored_rows() 8 8 1
A Transformer.pre_ignore_row() 10 10 1
A Transformer._transform_row_wrapper() 39 39 4
A Transformer._find_all_step_methods() 9 9 2
A Transformer._load_parked_rows() 8 8 1
A Transformer._transform_rows() 8 8 2
A Transformer.transform_source_rows() 34 34 5
A Transformer.__init__() 86 86 1
A Transformer._handle_exception() 11 11 1
A Transformer._log_statistics() 16 16 1
A Transformer.pre_park_row() 10 10 1
A Transformer._load_transformed_rows() 8 8 1
A Transformer._log() 11 11 1
A Transformer._transform_row() 17 17 4
A Transformer.pre_transform_source_rows() 7 7 1
A Transformer._step99() 16 16 5

How to fix   Duplicated Code   

Duplicated Code

Duplicate code is one of the most pungent code smells. A rule that is often used is to re-structure code once it is duplicated in three or more places.

Common duplication problems, and corresponding solutions are:

1
import abc
2
import copy
3
import re
4
import time
5
import traceback
6
from typing import Any, Dict, List
7
8
from etlt.cleaner.WhitespaceCleaner import WhitespaceCleaner
9
from etlt.reader.Reader import Reader
10
from etlt.writer.SqlLoaderWriter import SqlLoaderWriter
11
12
13 View Code Duplication
class Transformer(metaclass=abc.ABCMeta):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
14
    """
15
    Abstract parent class for transforming source data in (partial) dimensional data.
16
    """
17
18
    # ------------------------------------------------------------------------------------------------------------------
19
    def __init__(self,
20
                 source_reader: Reader,
21
                 transformed_writer: SqlLoaderWriter,
22
                 parked_writer: SqlLoaderWriter,
23
                 ignored_writer: SqlLoaderWriter):
24
        """
25
        Object constructor.
26
27
        :param Reader source_reader: Object for reading source rows.
28
        :param SqlLoaderWriter transformed_writer: Object for writing successfully transformed rows.
29
        :param SqlLoaderWriter parked_writer: Object for writing parked rows.
30
        :param SqlLoaderWriter ignored_writer: Object for writing ignored rows.
31
        """
32
33
        self._count_total: int = 0
34
        """
35
        The number of rows processed.
36
        """
37
38
        self._count_transform: int = 0
39
        """
40
        The number of rows successfully transformed.
41
        """
42
43
        self._count_park: int = 0
44
        """
45
        The number of rows parked.
46
        """
47
48
        self._count_error: int = 0
49
        """
50
        The number of rows processed with errors.
51
        """
52
53
        self._count_ignore: int = 0
54
        """
55
        The number of rows ignored.
56
        """
57
58
        self._time0: float = 0.0
59
        """
60
        Start time of the whole process and start of transforming rows.
61
        """
62
63
        self._time1: float = 0.0
64
        """
65
        End time of transforming rows and start time of the loading transformed rows.
66
        """
67
68
        self._time2: float = 0.0
69
        """
70
        End time of the loading transformed rows and start time of loading parked rows.
71
        """
72
73
        self._time3: float = 0.0
74
        """
75
        End time of the loading parked rows and end time of the whole process.
76
        """
77
78
        self._source_reader: Reader = source_reader
79
        """
80
        Object for reading source rows.
81
        """
82
83
        self._transformed_writer: SqlLoaderWriter = transformed_writer
84
        """
85
        Object for writing successfully transformed rows.r
86
        """
87
88
        self._parked_writer: SqlLoaderWriter = parked_writer
89
        """
90
        Object for writing parked rows.
91
        """
92
93
        self._ignored_writer: SqlLoaderWriter = ignored_writer
94
        """
95
        Object for writing ignored rows.
96
        """
97
98
        self._mandatory_fields: List[str] = []
99
        """
100
        The mandatory fields (or columns) in the output row.
101
        """
102
103
        self._steps: List[callable] = []
104
        """
105
        All _step<n> methods where n is an integer in this class sorted by n.
106
        """
107
108
    # ------------------------------------------------------------------------------------------------------------------
109
    @staticmethod
110
    def _log(message):
111
        """
112
        Logs a message.
113
114
        :param str message: The log message.
115
116
        :rtype: None
117
        """
118
        #  @todo Replace with log package.
119
        print(time.strftime('%Y-%m-%d %H:%M:%S', time.localtime()) + ' ' + str(message), flush=True)
120
121
    # ------------------------------------------------------------------------------------------------------------------
122
    def _handle_exception(self, row, exception) -> None:
123
        """
124
        Logs an exception occurred during transformation of a row.
125
126
        :param list|dict|() row: The source row.
127
        :param Exception exception: The exception.
128
        """
129
        self._log('Error during processing of line {0:d}.'.format(self._source_reader.row_number))
130
        self._log(row)
131
        self._log(str(exception))
132
        self._log(traceback.format_exc())
133
134
    # ------------------------------------------------------------------------------------------------------------------
135
    def _find_all_step_methods(self) -> None:
136
        """
137
        Finds all _step<n> methods where n is an integer in this class.
138
        """
139
        steps = ([method for method in dir(self) if callable(getattr(self, method)) and
140
                  re.match(r'_step\d+\d+.*', method)])
141
        steps = sorted(steps)
142
        for step in steps:
143
            self._steps.append(getattr(self, step))
144
145
    # ------------------------------------------------------------------------------------------------------------------
146
    def _transform_rows(self) -> None:
147
        """
148
        Transforms all source rows.
149
        """
150
        self._find_all_step_methods()
151
152
        for row in self._source_reader.next():
153
            self._transform_row_wrapper(row)
154
155
    # ------------------------------------------------------------------------------------------------------------------
156
    def pre_park_row(self, park_info, in_row) -> None:
157
        """
158
        This method will be called just be for sending an input row to be parked to the parked writer.
159
160
        :param str park_info: The park info.
161
        :param dict[str,str] in_row: The original input row.
162
163
        :rtype: None
164
        """
165
        pass
166
167
    # ------------------------------------------------------------------------------------------------------------------
168
    def pre_ignore_row(self, ignore_info, in_row) -> None:
169
        """
170
        This method will be called just be for sending an input row to be ignored to the ignore writer.
171
172
        :param str ignore_info: The ignore info.
173
        :param dict[str,str] in_row: The original input row.
174
175
        :rtype: None
176
        """
177
        pass
178
179
    # ------------------------------------------------------------------------------------------------------------------
180
    def _transform_row_wrapper(self, row: Dict[str, Any]) -> None:
181
        """
182
        Transforms a single source row.
183
184
        :param dict[str,any] row: The source row.
185
        """
186
        self._count_total += 1
187
188
        try:
189
            # Transform the naturals keys in line to technical keys.
190
            in_row = copy.copy(row)
191
            out_row = {}
192
            park_info, ignore_info = self._transform_row(in_row, out_row)
193
194
        except Exception as e:
195
            # Log the exception.
196
            self._handle_exception(row, e)
197
            # Keep track of the number of errors.
198
            self._count_error += 1
199
            # This row must be parked.
200
            park_info = 'Exception'
201
            # Keep our IDE happy.
202
            ignore_info = None
203
            out_row = {}
204
205
        if park_info:
206
            # Park the row.
207
            self.pre_park_row(park_info, row)
208
            self._parked_writer.writerow(row)
209
            self._count_park += 1
210
        elif ignore_info:
211
            # Ignore the row.
212
            self.pre_ignore_row(ignore_info, row)
213
            self._ignored_writer.writerow(row)
214
            self._count_ignore += 1
215
        else:
216
            # Write the technical keys and measures to the output file.
217
            self._transformed_writer.writerow(out_row)
218
            self._count_transform += 1
219
220
    # ------------------------------------------------------------------------------------------------------------------
221
    def _transform_row(self, in_row, out_row):
222
        """
223
        Transforms an input row to an output row (i.e. (partial) dimensional data).
224
225
        :param dict[str,str] in_row: The input row.
226
        :param dict[str,T] out_row: The output row.
227
228
        :rtype: (str,str)
229
        """
230
        tmp_row = {}
231
232
        for step in self._steps:
233
            park_info, ignore_info = step(in_row, tmp_row, out_row)
234
            if park_info or ignore_info:
235
                return park_info, ignore_info
236
237
        return None, None
238
239
    # ------------------------------------------------------------------------------------------------------------------
240
    def _step00(self, in_row, tmp_row, out_row):
241
        """
242
        Prunes whitespace for all fields in the input row.
243
244
        :param dict in_row: The input row.
245
        :param dict tmp_row: Not used.
246
        :param dict out_row: Not used.
247
        """
248
        for key, value in in_row.items():
249
            in_row[key] = WhitespaceCleaner.clean(value)
250
251
        return None, None
252
253
    # ------------------------------------------------------------------------------------------------------------------
254
    def _step99(self, in_row, tmp_row, out_row):
255
        """
256
        Validates all mandatory fields are in the output row and are filled.
257
258
        :param dict in_row: The input row.
259
        :param dict tmp_row: Not used.
260
        :param dict out_row: The output row.
261
        """
262
        park_info = ''
263
        for field in self._mandatory_fields:
264
            if field not in out_row or not out_row[field]:
265
                if park_info:
266
                    park_info += ' '
267
                park_info += field
268
269
        return park_info, None
270
271
    # ------------------------------------------------------------------------------------------------------------------
272
    @abc.abstractmethod
273
    def _load_ignored_rows(self) -> None:
274
        """
275
        Loads the ignored rows into the database.
276
277
        :rtype: None
278
        """
279
        raise NotImplementedError()
280
281
    # ------------------------------------------------------------------------------------------------------------------
282
    @abc.abstractmethod
283
    def _load_parked_rows(self) -> None:
284
        """
285
        Loads the parked rows into the database.
286
287
        :rtype: None
288
        """
289
        raise NotImplementedError()
290
291
    # ------------------------------------------------------------------------------------------------------------------
292
    @abc.abstractmethod
293
    def _load_transformed_rows(self) -> None:
294
        """
295
        Loads the successfully transformed rows into the database.
296
297
        :rtype: None
298
        """
299
        raise NotImplementedError()
300
301
    # ------------------------------------------------------------------------------------------------------------------
302
    def _log_statistics(self) -> None:
303
        """
304
        Log statistics about the number of rows and number of rows per second.
305
        """
306
        rows_per_second_trans = self._count_total / (self._time1 - self._time0)
307
        rows_per_second_load = self._count_transform / (self._time2 - self._time1)
308
        rows_per_second_overall = self._count_total / (self._time3 - self._time0)
309
310
        self._log('Number of rows processed            : {0:d}'.format(self._count_total))
311
        self._log('Number of rows transformed          : {0:d}'.format(self._count_transform))
312
        self._log('Number of rows ignored              : {0:d}'.format(self._count_ignore))
313
        self._log('Number of rows parked               : {0:d}'.format(self._count_park))
314
        self._log('Number of errors                    : {0:d}'.format(self._count_error))
315
        self._log('Number of rows per second processed : {0:d}'.format(int(rows_per_second_trans)))
316
        self._log('Number of rows per second loaded    : {0:d}'.format(int(rows_per_second_load)))
317
        self._log('Number of rows per second overall   : {0:d}'.format(int(rows_per_second_overall)))
318
319
    # ------------------------------------------------------------------------------------------------------------------
320
    def pre_transform_source_rows(self) -> None:
321
        """
322
        This method will be called just before transforming the source rows.
323
324
        :rtype: None
325
        """
326
        pass
327
328
    # ------------------------------------------------------------------------------------------------------------------
329
    def transform_source_rows(self) -> None:
330
        """
331
        Transforms the rows for the source system into (partial) dimensional data.
332
        """
333
        # Start timer for overall progress.
334
        self._time0 = time.perf_counter()
335
336
        self.pre_transform_source_rows()
337
338
        # Transform all source rows.
339
        with self._source_reader:
340
            with self._transformed_writer:
341
                with self._parked_writer:
342
                    with self._ignored_writer:
343
                        self._transform_rows()
344
345
        # Time end of transformation.
346
        self._time1 = time.perf_counter()
347
348
        # Load transformed rows into the fact table.
349
        self._load_transformed_rows()
350
351
        # Time end of loading transformed rows.
352
        self._time2 = time.perf_counter()
353
354
        # Load parked and ignored rows into the parked and ignored rows.
355
        self._load_ignored_rows()
356
        self._load_parked_rows()
357
358
        # Time end of loading parked and ignored rows.
359
        self._time3 = time.perf_counter()
360
361
        # Show statistics about number of rows and performance.
362
        self._log_statistics()
363
364
# ----------------------------------------------------------------------------------------------------------------------
365