etlt.Transformer.Transformer.pre_park_row()   A
last analyzed

Complexity

Conditions 1

Size

Total Lines 8
Code Lines 2

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 0
CRAP Score 2

Importance

Changes 0
Metric Value
cc 1
eloc 2
nop 3
dl 0
loc 8
ccs 0
cts 1
cp 0
crap 2
rs 10
c 0
b 0
f 0
1
import abc
2
import copy
3
import re
4
import time
5
import traceback
6
from pprint import pformat
7
from typing import Any, Dict, List, Optional, Tuple
8
9
from etlt.cleaner.WhitespaceCleaner import WhitespaceCleaner
10
from etlt.reader.Reader import Reader
11
from etlt.writer.SqlLoaderWriter import SqlLoaderWriter
12
13
14
class Transformer(metaclass=abc.ABCMeta):
15
    """
16
    Abstract parent class for transforming source data in (partial) dimensional data.
17
    """
18
19
    # ------------------------------------------------------------------------------------------------------------------
20
    def __init__(self,
21
                 source_reader: Reader,
22
                 transformed_writer: SqlLoaderWriter,
23
                 parked_writer: SqlLoaderWriter,
24
                 ignored_writer: SqlLoaderWriter):
25
        """
26
        Object constructor.
27
28
        :param source_reader: Object for reading source rows.
29
        :param transformed_writer: Object for writing successfully transformed rows.
30
        :param parked_writer: Object for writing parked rows.
31
        :param ignored_writer: Object for writing ignored rows.
32
        """
33
34
        self._count_total: int = 0
35
        """
36
        The number of rows processed.
37
        """
38
39
        self._count_transform: int = 0
40
        """
41
        The number of rows successfully transformed.
42
        """
43
44
        self._count_park: int = 0
45
        """
46
        The number of rows parked.
47
        """
48
49
        self._count_error: int = 0
50
        """
51
        The number of rows processed with errors.
52
        """
53
54
        self._count_ignore: int = 0
55
        """
56
        The number of rows ignored.
57
        """
58
59
        self._time0: float = 0.0
60
        """
61
        Start time of the whole process and start of transforming rows.
62
        """
63
64
        self._time1: float = 0.0
65
        """
66
        End time of transforming rows and start time of the loading transformed rows.
67
        """
68
69
        self._time2: float = 0.0
70
        """
71
        End time of the loading transformed rows and start time of loading parked rows.
72
        """
73
74
        self._time3: float = 0.0
75
        """
76
        End time of the loading parked rows and end time of the whole process.
77
        """
78
79
        self._source_reader: Reader = source_reader
80
        """
81
        Object for reading source rows.
82
        """
83
84
        self._transformed_writer: SqlLoaderWriter = transformed_writer
85
        """
86
        Object for writing successfully transformed rows.r
87
        """
88
89
        self._parked_writer: SqlLoaderWriter = parked_writer
90
        """
91
        Object for writing parked rows.
92
        """
93
94
        self._ignored_writer: SqlLoaderWriter = ignored_writer
95
        """
96
        Object for writing ignored rows.
97
        """
98
99
        self.__mandatory_fields: List[str] = []
100
        """
101
        The mandatory fields (columns) in the output row.
102
        """
103
104
        self._steps: List[callable] = []
105
        """
106
        All _step<n> methods where n is an integer in this class sorted by n.
107
        """
108
109
        self.__init_fields()
110
111
    # ------------------------------------------------------------------------------------------------------------------
112
    @staticmethod
113
    def _log(message: str) -> None:
114
        """
115
        Logs a message.
116
117
        :param message: The log message.
118
        """
119
        #  @todo Replace with log package.
120
        print(time.strftime('%Y-%m-%d %H:%M:%S', time.gmtime()) + ' ' + str(message), flush=True)
121
122
    # ------------------------------------------------------------------------------------------------------------------
123
    def _handle_exception(self, row: Dict[str, Any], exception: Exception) -> None:
124
        """
125
        Logs an exception occurred during transformation of a row.
126
127
        :param row: The source row.
128
        :param exception: The exception.
129
        """
130
        self._log('Error during processing of line {0:d}.'.format(self._source_reader.row_number))
131
        self._log(pformat(row))
132
        self._log(str(exception))
133
        self._log(traceback.format_exc())
134
135
    # ------------------------------------------------------------------------------------------------------------------
136
    def _find_all_step_methods(self) -> None:
137
        """
138
        Finds all _step<n> methods where n is an integer in this class.
139
        """
140
        steps = (
141
                [method for method in dir(self) if
142
                 callable(getattr(self, method)) and re.match(r'_step\d+\d+.*', method)])
143
        steps = sorted(steps)
144
        for step in steps:
145
            self._steps.append(getattr(self, step))
146
147
    # ------------------------------------------------------------------------------------------------------------------
148
    def _transform_rows(self) -> None:
149
        """
150
        Transforms all source rows.
151
        """
152
        self._find_all_step_methods()
153
154
        for row in self._source_reader.next():
155
            self._transform_row_wrapper(row)
156
157
    # ------------------------------------------------------------------------------------------------------------------
158
    def pre_park_row(self, park_info: str, in_row: Dict[str, Any]) -> None:
159
        """
160
        This method will be called just be for sending an input row to be parked to the parked writer.
161
162
        :param park_info: The park info.
163
        :param in_row: The original input row.
164
        """
165
        pass
166
167
    # ------------------------------------------------------------------------------------------------------------------
168
    def pre_ignore_row(self, ignore_info: str, in_row: Dict[str, Any]) -> None:
169
        """
170
        This method will be called just be for sending an input row to be ignored to the ignore writer.
171
172
        :param ignore_info: The ignore info.
173
        :param in_row: The original input row.
174
        """
175
        pass
176
177
    # ------------------------------------------------------------------------------------------------------------------
178
    def _transform_row_wrapper(self, row: Dict[str, Any]) -> None:
179
        """
180
        Transforms a single source row.
181
182
        :param row: The source row.
183
        """
184
        self._count_total += 1
185
186
        try:
187
            # Transform the naturals keys in line to technical keys.
188
            in_row = copy.copy(row)
189
            out_row = {}
190
            park_info, ignore_info = self._transform_row(in_row, out_row)
191
192
        except Exception as e:
193
            # Log the exception.
194
            self._handle_exception(row, e)
195
            self._count_error += 1
196
            park_info = 'Exception'
197
            # Keep our IDE happy.
198
            ignore_info = None
199
            out_row = {}
200
201
        if park_info:
202
            # Park the row.
203
            self.pre_park_row(park_info, row)
204
            self._parked_writer.writerow(row)
205
            self._count_park += 1
206
        elif ignore_info:
207
            # Ignore the row.
208
            self.pre_ignore_row(ignore_info, row)
209
            self._ignored_writer.writerow(row)
210
            self._count_ignore += 1
211
        else:
212
            # Write the technical keys and measures to the output file.
213
            self._transformed_writer.writerow(out_row)
214
            self._count_transform += 1
215
216
    # ------------------------------------------------------------------------------------------------------------------
217
    def _transform_row(self, in_row: Dict[str, Any], out_row: Dict[str, Any]) -> Tuple[Optional[str], Optional[str]]:
218
        """
219
        Transforms an input row to an output row (i.e. (partial) dimensional data).
220
221
        :param in_row: The input row.
222
        :param out_row: The output row.
223
        """
224
        tmp_row = {}
225
226
        for step in self._steps:
227
            park_info, ignore_info = step(in_row, tmp_row, out_row)
228
            if park_info or ignore_info:
229
                return park_info, ignore_info
230
231
        return None, None
232
233
    # ------------------------------------------------------------------------------------------------------------------
234
    def _step00(self, in_row: Dict[str, Any], tmp_row: Dict[str, Any], out_row: Dict[str, Any]) -> Tuple[
235
        Optional[str], Optional[str]]:
236
        """
237
        Prunes whitespace for all fields in the input row.
238
239
        :param in_row: The input row.
240
        :param tmp_row: Not used.
241
        :param out_row: Not used.
242
        """
243
        for key, value in in_row.items():
244
            in_row[key] = WhitespaceCleaner.clean(value)
245
246
        return None, None
247
248
    # ------------------------------------------------------------------------------------------------------------------
249
    def _step99(self, in_row: Dict[str, Any], tmp_row: Dict[str, Any], out_row: Dict[str, Any]) -> Tuple[
250
        Optional[str], Optional[str]]:
251
        """
252
        Validates all mandatory fields are in the output row and are filled.
253
254
        :param in_row: The input row.
255
        :param tmp_row: Not used.
256
        :param out_row: The output row.
257
        """
258
        park_info = ''
259
        for field in self.__mandatory_fields:
260
            if field not in out_row or not out_row[field]:
261
                if park_info:
262
                    park_info += ' '
263
                park_info += field
264
265
        return park_info, None
266
267
    # ------------------------------------------------------------------------------------------------------------------
268
    @abc.abstractmethod
269
    def _load_ignored_rows(self) -> None:
270
        """
271
        Loads the ignored rows into the database.
272
        """
273
        raise NotImplementedError()
274
275
    # ------------------------------------------------------------------------------------------------------------------
276
    @abc.abstractmethod
277
    def _load_parked_rows(self) -> None:
278
        """
279
        Loads the parked rows into the database.
280
        """
281
        raise NotImplementedError()
282
283
    # ------------------------------------------------------------------------------------------------------------------
284
    @abc.abstractmethod
285
    def _load_transformed_rows(self) -> None:
286
        """
287
        Loads the successfully transformed rows into the database.
288
        """
289
        raise NotImplementedError()
290
291
    # ------------------------------------------------------------------------------------------------------------------
292
    def _log_statistics(self) -> None:
293
        """
294
        Log statistics about the number of rows and number of rows per second.
295
        """
296
        rows_per_second_trans = self._count_total / (self._time1 - self._time0)
297
        rows_per_second_load = self._count_transform / (self._time2 - self._time1)
298
        rows_per_second_overall = self._count_total / (self._time3 - self._time0)
299
300
        self._log('Number of rows processed            : {0:d}'.format(self._count_total))
301
        self._log('Number of rows transformed          : {0:d}'.format(self._count_transform))
302
        self._log('Number of rows ignored              : {0:d}'.format(self._count_ignore))
303
        self._log('Number of rows parked               : {0:d}'.format(self._count_park))
304
        self._log('Number of errors                    : {0:d}'.format(self._count_error))
305
        self._log('Number of rows per second processed : {0:d}'.format(int(rows_per_second_trans)))
306
        self._log('Number of rows per second loaded    : {0:d}'.format(int(rows_per_second_load)))
307
        self._log('Number of rows per second overall   : {0:d}'.format(int(rows_per_second_overall)))
308
309
    # ------------------------------------------------------------------------------------------------------------------
310
    def pre_transform_source_rows(self) -> None:
311
        """
312
        This method will be called just before transforming the source rows.
313
        """
314
        pass
315
316
    # ------------------------------------------------------------------------------------------------------------------
317
    def transform_source_rows(self) -> None:
318
        """
319
        Transforms the rows for the source system into (partial) dimensional data.
320
        """
321
        # Start timer for overall progress.
322
        self._time0 = time.perf_counter()
323
324
        self.pre_transform_source_rows()
325
326
        # Transform all source rows.
327
        with self._source_reader:
328
            with self._transformed_writer:
329
                with self._parked_writer:
330
                    with self._ignored_writer:
331
                        self._transform_rows()
332
333
        # Time end of transformation.
334
        self._time1 = time.perf_counter()
335
336
        # Load transformed rows into the fact table.
337
        self._load_transformed_rows()
338
339
        # Time end of loading transformed rows.
340
        self._time2 = time.perf_counter()
341
342
        # Load parked and ignored rows into the parked and ignored rows.
343
        self._load_ignored_rows()
344
        self._load_parked_rows()
345
346
        # Time end of loading parked and ignored rows.
347
        self._time3 = time.perf_counter()
348
349
        # Show statistics about number of rows and performance.
350
        self._log_statistics()
351
352
    # ------------------------------------------------------------------------------------------------------------------
353
    @abc.abstractmethod
354
    def _get_input_fields(self) -> List[str]:
355
        """
356
        Returns the expected names of the columns in the input.
357
        """
358
        raise NotImplementedError()
359
360
    # ------------------------------------------------------------------------------------------------------------------
361
    @abc.abstractmethod
362
    def _get_mandatory_fields(self) -> List[str]:
363
        """
364
        Returns the mandatory fields (columns) in the output row.
365
        """
366
        raise NotImplementedError()
367
368
    # ------------------------------------------------------------------------------------------------------------------
369
    @abc.abstractmethod
370
    def _get_output_fields(self) -> List[str]:
371
        """
372
        Return the fields (columns) that must be written to the output.
373
        """
374
        raise NotImplementedError()
375
376
    # ------------------------------------------------------------------------------------------------------------------
377
    def __init_fields(self) -> None:
378
        """
379
        Initializes the fields of source, output, and mandatory.
380
        """
381
        self._source_reader.fields = self._get_input_fields()
382
        self.__mandatory_fields = self._get_mandatory_fields()
383
        self._transformed_writer.fields = self._get_output_fields()
384
385
    # ----------------------------------------------------------------------------------------------------------------------
386