Completed
Push — master ( 170de6...a76a3a )
by P.R.
01:55
created

Transformer._find_all_step_methods()   B

Complexity

Conditions 5

Size

Total Lines 9

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 0
CRAP Score 30

Importance

Changes 1
Bugs 0 Features 0
Metric Value
cc 5
c 1
b 0
f 0
dl 0
loc 9
ccs 0
cts 5
cp 0
crap 30
rs 8.5454
1
"""
2
ETLT
3
4
Copyright 2016 Set Based IT Consultancy
5
6
Licence MIT
7
"""
8
import abc
9
import copy
10
import re
11
import time
12
import traceback
13
14
from etlt.cleaner.WhitespaceCleaner import WhitespaceCleaner
15
16
17
class Transformer(metaclass=abc.ABCMeta):
18
    """
19
    Abstract parent class for transforming source data in (partial) dimensional data.
20
    """
21
22
    # ------------------------------------------------------------------------------------------------------------------
23
    def __init__(self, source_reader, transformed_writer, parked_writer, ignored_writer):
24
        """
25
        Object constructor.
26
27
        :param etlt.reader.Reader.Reader source_reader: Object for reading source rows.
28
        :param etlt.writer.SqlLoaderWriter.SqlLoaderWriter transformed_writer: Object for writing successfully
29
                                                                               transformed rows.
30
        :param etlt.writer.SqlLoaderWriter.SqlLoaderWriter parked_writer: Object for writing parked rows.
31
        :param etlt.writer.SqlLoaderWriter.SqlLoaderWriter ignored_writer: Object for writing ignored rows.
32
        """
33
34
        self._count_total = 0
35
        """
36
        The number of rows processed.
37
38
        :type: int
39
        """
40
41
        self._count_transform = 0
42
        """
43
        The number of rows successfully transformed.
44
45
        :type: int
46
        """
47
48
        self._count_park = 0
49
        """
50
        The number of rows parked.
51
52
        :type: int
53
        """
54
55
        self._count_error = 0
56
        """
57
        The number of rows processed with errors.
58
59
        :type: int
60
        """
61
62
        self._count_ignore = 0
63
        """
64
        The number of rows ignored.
65
66
        :type: int
67
        """
68
69
        self._time0 = 0.0
70
        """
71
        Start time of the whole process and start of transforming rows.
72
73
        :type: float
74
        """
75
76
        self._time1 = 0.0
77
        """
78
        End time of transforming rows and start time of the loading transformed rows.
79
80
        :type: float
81
        """
82
83
        self._time2 = 0.0
84
        """
85
        End time of the loading transformed rows and start time of loading parked rows.
86
87
        :type: float
88
        """
89
90
        self._time3 = 0.0
91
        """
92
        End time of the loading parked rows and end time of the whole process.
93
94
        :type: float
95
        """
96
97
        self._source_reader = source_reader
98
        """
99
        Object for reading source rows.
100
101
        :type: etlt.Reader.Reader
102
        """
103
104
        self._transformed_writer = transformed_writer
105
        """
106
        Object for writing successfully transformed rows.
107
108
        :type: etlt.writer.SqlLoaderWriter.SqlLoaderWriter
109
        """
110
111
        self._parked_writer = parked_writer
112
        """
113
        Object for writing parked rows.
114
115
        :type: etlt.writer.SqlLoaderWriter.SqlLoaderWriter
116
        """
117
118
        self._ignored_writer = ignored_writer
119
        """
120
        Object for writing ignored rows.
121
122
        :type: etlt.writer.SqlLoaderWriter.SqlLoaderWriter
123
        """
124
125
        self._mandatory_fields = []
126
        """
127
        The mandatory fields (or columns) in the output row.
128
129
         :type: list[str]
130
        """
131
132
        self._steps = []
133
        """
134
        All _step<n> methods where n is an integer in this class sorted by n.
135
136
        :type: list[callable]
137
        """
138
139
    # ------------------------------------------------------------------------------------------------------------------
140
    @staticmethod
141
    def _log(message):
142
        """
143
        Logs a message.
144
145
        :param str message: The log message.
146
147
        :rtype: None
148
        """
149
        #  @todo Replace with log package.
150
        print(time.strftime('%Y-%m-%d %H:%M:%S', time.localtime()) + ' ' + str(message), flush=True)
151
152
    # ------------------------------------------------------------------------------------------------------------------
153
    def _handle_exception(self, row, exception):
154
        """
155
        Logs an exception occurred during transformation of a row.
156
157
        :param list|dict|() row: The source row.
158
        :param Exception exception: The exception.
159
        """
160
        self._log('Error during processing of line {0:d}.'.format(self._source_reader.row_number))
161
        self._log(row)
162
        self._log(str(exception))
163
        self._log(traceback.format_exc())
164
165
    # ------------------------------------------------------------------------------------------------------------------
166
    def _find_all_step_methods(self):
167
        """
168
        Finds all _step<n> methods where n is an integer in this class.
169
        """
170
        steps = ([method for method in dir(self) if callable(getattr(self, method)) and
171
                  re.match(r'_step\d+\d+.*', method)])
172
        steps = sorted(steps)
173
        for step in steps:
174
            self._steps.append(getattr(self, step))
175
176
    # ------------------------------------------------------------------------------------------------------------------
177
    def _transform_rows(self):
178
        """
179
        Transforms all source rows.
180
        """
181
        self._find_all_step_methods()
182
183
        for row in self._source_reader.next():
184
            self._transform_row_wrapper(row)
185
186
    # ------------------------------------------------------------------------------------------------------------------
187
    def pre_park_row(self, park_info, in_row):
188
        """
189
        This method will be called just be for sending an input row to be parked to the parked writer.
190
191
        :param str park_info: The park info.
192
        :param dict[str,str] in_row: The original input row.
193
194
        :rtype: None
195
        """
196
        pass
197
198
    # ------------------------------------------------------------------------------------------------------------------
199
    def pre_ignore_row(self, ignore_info, in_row):
200
        """
201
        This method will be called just be for sending an input row to be ignored to the ignore writer.
202
203
        :param str ignore_info: The ignore info.
204
        :param dict[str,str] in_row: The original input row.
205
206
        :rtype: None
207
        """
208
        pass
209
210
    # ------------------------------------------------------------------------------------------------------------------
211
    def _transform_row_wrapper(self, row):
212
        """
213
        Transforms a single source row.
214
215
        :param dict[str|str] row: The source row.
216
        """
217
        self._count_total += 1
218
219
        try:
220
            # Transform the naturals keys in line to technical keys.
221
            in_row = copy.copy(row)
222
            out_row = {}
223
            park_info, ignore_info = self._transform_row(in_row, out_row)
224
225
        except Exception as e:
226
            # Log the exception.
227
            self._handle_exception(row, e)
228
            # Keep track of the number of errors.
229
            self._count_error += 1
230
            # This row must be parked.
231
            park_info = 'Exception'
232
            # Keep our IDE happy.
233
            ignore_info = None
234
            out_row = {}
235
236
        if park_info:
237
            # Park the row.
238
            self.pre_park_row(park_info, row)
239
            self._parked_writer.writerow(row)
240
            self._count_park += 1
241
        elif ignore_info:
242
            # Ignore the row.
243
            self.pre_ignore_row(ignore_info, row)
244
            self._ignored_writer.writerow(row)
245
            self._count_ignore += 1
246
        else:
247
            # Write the technical keys and measures to the output file.
248
            self._transformed_writer.writerow(out_row)
249
            self._count_transform += 1
250
251
    # ------------------------------------------------------------------------------------------------------------------
252
    def _transform_row(self, in_row, out_row):
253
        """
254
        Transforms an input row to an output row (i.e. (partial) dimensional data).
255
256
        :param dict[str,str] in_row: The input row.
257
        :param dict[str,T] out_row: The output row.
258
259
        :rtype: (str,str)
260
        """
261
        tmp_row = {}
262
263
        for step in self._steps:
264
            park_info, ignore_info = step(in_row, tmp_row, out_row)
265
            if park_info or ignore_info:
266
                return park_info, ignore_info
267
268
        return None, None
269
270
    # ------------------------------------------------------------------------------------------------------------------
271
    def _step00(self, in_row, tmp_row, out_row):
272
        """
273
        Prunes whitespace for all fields in the input row.
274
275
        :param dict in_row: The input row.
276
        :param dict tmp_row: Not used.
277
        :param dict out_row: Not used.
278
        """
279
        for key, value in in_row.items():
280
            in_row[key] = WhitespaceCleaner.clean(value)
281
282
        return None, None
283
284
    # ------------------------------------------------------------------------------------------------------------------
285
    def _step99(self, in_row, tmp_row, out_row):
286
        """
287
        Validates all mandatory fields are in the output row and are filled.
288
289
        :param dict in_row: The input row.
290
        :param dict tmp_row: Not used.
291
        :param dict out_row: The output row.
292
        """
293
        park_info = ''
294
        for field in self._mandatory_fields:
295
            if field not in out_row or not out_row[field]:
296
                if park_info:
297
                    park_info += ' '
298
                park_info += field
299
300
        return park_info, None
301
302
    # ------------------------------------------------------------------------------------------------------------------
303
    @abc.abstractmethod
304
    def _load_ignored_rows(self):
305
        """
306
        Loads the ignored rows into the database.
307
308
        :rtype: None
309
        """
310
        raise NotImplementedError()
311
312
    # ------------------------------------------------------------------------------------------------------------------
313
    @abc.abstractmethod
314
    def _load_parked_rows(self):
315
        """
316
        Loads the parked rows into the database.
317
318
        :rtype: None
319
        """
320
        raise NotImplementedError()
321
322
    # ------------------------------------------------------------------------------------------------------------------
323
    @abc.abstractmethod
324
    def _load_transformed_rows(self):
325
        """
326
        Loads the successfully transformed rows into the database.
327
328
        :rtype: None
329
        """
330
        raise NotImplementedError()
331
332
    # ------------------------------------------------------------------------------------------------------------------
333
    def _log_statistics(self):
334
        """
335
        Log statistics about the number of rows and number of rows per second.
336
        """
337
        rows_per_second_trans = self._count_total / (self._time1 - self._time0)
338
        rows_per_second_load = self._count_transform / (self._time2 - self._time1)
339
        rows_per_second_overall = self._count_total / (self._time3 - self._time0)
340
341
        self._log('Number of rows processed            : {0:d}'.format(self._count_total))
342
        self._log('Number of rows transformed          : {0:d}'.format(self._count_transform))
343
        self._log('Number of rows ignored              : {0:d}'.format(self._count_ignore))
344
        self._log('Number of rows parked               : {0:d}'.format(self._count_park))
345
        self._log('Number of errors                    : {0:d}'.format(self._count_error))
346
        self._log('Number of rows per second processed : {0:d}'.format(int(rows_per_second_trans)))
347
        self._log('Number of rows per second loaded    : {0:d}'.format(int(rows_per_second_load)))
348
        self._log('Number of rows per second overall   : {0:d}'.format(int(rows_per_second_overall)))
349
350
    # ------------------------------------------------------------------------------------------------------------------
351
    def pre_transform_source_rows(self):
352
        """
353
        This method will be called just before transforming the source rows.
354
355
        :rtype: None
356
        """
357
        pass
358
359
    # ------------------------------------------------------------------------------------------------------------------
360
    def transform_source_rows(self):
361
        """
362
        Transforms the rows for the source system into (partial) dimensional data.
363
        """
364
        # Start timer for overall progress.
365
        self._time0 = time.perf_counter()
366
367
        self.pre_transform_source_rows()
368
369
        # Transform all source rows.
370
        with self._source_reader:
371
            with self._transformed_writer:
372
                with self._parked_writer:
373
                    with self._ignored_writer:
374
                        self._transform_rows()
375
376
        # Time end of transformation.
377
        self._time1 = time.perf_counter()
378
379
        # Load transformed rows into the fact table.
380
        self._load_transformed_rows()
381
382
        # Time end of loading transformed rows.
383
        self._time2 = time.perf_counter()
384
385
        # Load parked and ignored rows into the parked and ignored rows.
386
        self._load_ignored_rows()
387
        self._load_parked_rows()
388
389
        # Time end of loading parked and ignored rows.
390
        self._time3 = time.perf_counter()
391
392
        # Show statistics about number of rows and performance.
393
        self._log_statistics()
394
395
# ----------------------------------------------------------------------------------------------------------------------
396