Completed
Push — master ( 8b1dc7...524000 )
by P.R.
01:44
created

Transformer._handle_exception()   A

Complexity

Conditions 1

Size

Total Lines 11

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 0
CRAP Score 2

Importance

Changes 0
Metric Value
c 0
b 0
f 0
dl 0
loc 11
ccs 0
cts 5
cp 0
rs 9.4285
cc 1
crap 2
1
"""
2
ETLT
3
4
Copyright 2016 Set Based IT Consultancy
5
6
Licence MIT
7
"""
8
import abc
9
import copy
10
import re
11
import time
12
import traceback
13
14
from etlt.cleaner.WhitespaceCleaner import WhitespaceCleaner
15
16
17
class Transformer:
1 ignored issue
show
Unused Code introduced by
This abstract class does not seem to be used anywhere.
Loading history...
18
    """
19
    Abstract parent class for transforming source data in (partial) dimensional data.
20
    """
21
22
    # ------------------------------------------------------------------------------------------------------------------
23
    def __init__(self, source_reader, transformed_writer, parked_writer, ignored_writer):
24
        """
25
        Object constructor.
26
27
        :param etlt.reader.Reader.Reader source_reader: Object for reading source rows.
28
        :param etlt.writer.SqlLoaderWriter.SqlLoaderWriter transformed_writer: Object for writing successfully transformed rows.
0 ignored issues
show
Coding Style introduced by
This line is too long as per the coding-style (128/120).

This check looks for lines that are too long. You can specify the maximum line length.

Loading history...
29
        :param etlt.writer.SqlLoaderWriter.SqlLoaderWriter parked_writer: Object for writing parked rows.
30
        :param etlt.writer.SqlLoaderWriter.SqlLoaderWriter ignored_writer: Object for writing ignored rows.
31
        """
32
33
        self._count_total = 0
34
        """
35
        The number of rows processed.
36
37
        :type: int
38
        """
39
40
        self._count_transform = 0
41
        """
42
        The number of rows successfully transformed.
43
44
        :type: int
45
        """
46
47
        self._count_park = 0
48
        """
49
        The number of rows parked.
50
51
        :type: int
52
        """
53
54
        self._count_error = 0
55
        """
56
        The number of rows processed with errors.
57
58
        :type: int
59
        """
60
61
        self._count_ignore = 0
62
        """
63
        The number of rows ignored.
64
65
        :type: int
66
        """
67
68
        self._time0 = 0.0
69
        """
70
        Start time of the whole process and start of transforming rows.
71
72
        :type: float
73
        """
74
75
        self._time1 = 0.0
76
        """
77
        End time of transforming rows and start time of the loading transformed rows.
78
79
        :type: float
80
        """
81
82
        self._time2 = 0.0
83
        """
84
        End time of the loading transformed rows and start time of loading parked rows.
85
86
        :type: float
87
        """
88
89
        self._time3 = 0.0
90
        """
91
        End time of the loading parked rows and end time of the whole process.
92
93
        :type: float
94
        """
95
96
        self._source_reader = source_reader
97
        """
98
        Object for reading source rows.
99
100
        :type: etlt.Reader.Reader
101
        """
102
103
        self._transformed_writer = transformed_writer
104
        """
105
        Object for writing successfully transformed rows.
106
107
        :type: etlt.writer.SqlLoaderWriter.SqlLoaderWriter
108
        """
109
110
        self._parked_writer = parked_writer
111
        """
112
        Object for writing parked rows.
113
114
        :type: etlt.writer.SqlLoaderWriter.SqlLoaderWriter
115
        """
116
117
        self._ignored_writer = ignored_writer
118
        """
119
        Object for writing ignored rows.
120
121
        :type: etlt.writer.SqlLoaderWriter.SqlLoaderWriter
122
        """
123
124
        self._mandatory_fields = []
125
        """
126
        The mandatory fields (or columns) in the output row.
127
128
         :type list[str]:
129
        """
130
131
        self._steps = []
132
        """
133
        All _step<n> methods where n is an integer in this class sorted by n.
134
135
        :type list[str]:
136
        """
137
138
    # ------------------------------------------------------------------------------------------------------------------
139
    def _log(self, message):
0 ignored issues
show
Coding Style introduced by
This method could be written as a function/class method.

If a method does not access any attributes of the class, it could also be implemented as a function or static method. This can help improve readability. For example

class Foo:
    def some_method(self, x, y):
        return x + y;

could be written as

class Foo:
    @classmethod
    def some_method(cls, x, y):
        return x + y;
Loading history...
140
        """
141
        Logs a message.
142
143
        :param str message: The log message.
144
145
        :rtype None:
146
        """
147
        #  @todo Replace with log package.
148
        print(time.strftime('%Y-%m-%d %H:%M:%S', time.localtime()) + ' ' + str(message), flush=True)
149
150
    # ------------------------------------------------------------------------------------------------------------------
151
    def _handle_exception(self, row, exception):
152
        """
153
        Logs an exception occurred during transformation of a row.
154
155
        :param list|dict|() row: The source row.
156
        :param Exception exception: The exception.
157
        """
158
        self._log('Error during processing of line {0:d}.'.format(self._source_reader.row_number))
159
        self._log(row)
160
        self._log(str(exception))
161
        self._log(traceback.format_exc())
162
163
    # ------------------------------------------------------------------------------------------------------------------
164
    def _find_all_step_methods(self):
165
        """
166
        Finds all _step<n> methods where n is an integer in this class.
167
        """
168
        steps = ([method for method in dir(self) if callable(getattr(self, method)) and
169
                  re.match(r'_step\d+\d+.*', method)])
170
        self._steps = sorted(steps)
171
172
    # ------------------------------------------------------------------------------------------------------------------
173
    def _transform_rows(self):
174
        """
175
        Transforms all source rows.
176
        """
177
        self._find_all_step_methods()
178
179
        for row in self._source_reader.next():
180
            self._transform_row_wrapper(row)
181
182
    # ------------------------------------------------------------------------------------------------------------------
183
    def pre_park_row(self, park_info, in_row):
184
        """
185
        This method will be called just be for sending an input row to be parked to the parked writer.
186
187
        :param str park_info: The park info.
188
        :param dict[str,str] in_row: The original input row.
189
190
        :rtype: None
191
        """
192
        pass
193
194
    # ------------------------------------------------------------------------------------------------------------------
195
    def pre_ignore_row(self, ignore_info, in_row):
196
        """
197
        This method will be called just be for sending an input row to be ignored to the ignore writer.
198
199
        :param str ignore_info: The ignore info.
200
        :param dict[str,str] in_row: The original input row.
201
202
        :rtype: None
203
        """
204
        pass
205
206
    # ------------------------------------------------------------------------------------------------------------------
207
    def _transform_row_wrapper(self, row):
208
        """
209
        Transforms a single source row.
210
211
        :param dict[str|str] row: The source row.
212
        """
213
        self._count_total += 1
214
215
        try:
216
            # Transform the naturals keys in line to technical keys.
217
            in_row = copy.copy(row)
218
            out_row = {}
219
            park_info, ignore_info = self._transform_row(in_row, out_row)
220
221
        except Exception as e:
1 ignored issue
show
Best Practice introduced by
Catching very general exceptions such as Exception is usually not recommended.

Generally, you would want to handle very specific errors in the exception handler. This ensure that you do not hide other types of errors which should be fixed.

So, unless you specifically plan to handle any error, consider adding a more specific exception.

Loading history...
222
            # Log the exception.
223
            self._handle_exception(row, e)
224
            # Keep track of the number of errors.
225
            self._count_error += 1
226
            # This row must be parked.
227
            park_info = 'Exception'
228
            # Keep our IDE happy.
229
            ignore_info = None
230
            out_row = {}
231
232
        if park_info:
233
            # Park the row.
234
            self.pre_park_row(park_info, row)
235
            self._parked_writer.writerow(row)
236
            self._count_park += 1
237
        elif ignore_info:
238
            # Ignore the row.
239
            self.pre_ignore_row(ignore_info, row)
240
            self._ignored_writer.writerow(row)
241
            self._count_ignore += 1
242
        else:
243
            # Write the technical keys and measures to the output file.
244
            self._transformed_writer.writerow(out_row)
245
            self._count_transform += 1
246
247
    # ------------------------------------------------------------------------------------------------------------------
248
    def _transform_row(self, in_row, out_row):
249
        """
250
        Transforms an input row to an output row (i.e. (partial) dimensional data).
251
252
        :param dict[str,str] in_row: The input row.
253
        :param dict[str,T] out_row: The output row.
254
255
        :rtype (str,str):
256
        """
257
        tmp_row = {}
258
259
        for step in self._steps:
260
            park_info, ignore_info = getattr(self, step)(in_row, tmp_row, out_row)
261
            if park_info or ignore_info:
262
                return park_info, ignore_info
263
264
        return None, None
265
266
    # ------------------------------------------------------------------------------------------------------------------
267
    def _step00(self, in_row, tmp_row, out_row):
3 ignored issues
show
Unused Code introduced by
The argument out_row seems to be unused.
Loading history...
Unused Code introduced by
The argument tmp_row seems to be unused.
Loading history...
Coding Style introduced by
This method could be written as a function/class method.

If a method does not access any attributes of the class, it could also be implemented as a function or static method. This can help improve readability. For example

class Foo:
    def some_method(self, x, y):
        return x + y;

could be written as

class Foo:
    @classmethod
    def some_method(cls, x, y):
        return x + y;
Loading history...
268
        """
269
        Prunes whitespace for all fields in the input row.
270
271
        :param dict in_row: The input row.
272
        :param dict tmp_row: Not used.
273
        :param dict out_row: Not used.
274
        """
275
        for key, value in in_row.items():
276
            in_row[key] = WhitespaceCleaner.clean(value)
277
278
        return None, None
279
280
    # ------------------------------------------------------------------------------------------------------------------
281
    def _step99(self, in_row, tmp_row, out_row):
2 ignored issues
show
Unused Code introduced by
The argument in_row seems to be unused.
Loading history...
Unused Code introduced by
The argument tmp_row seems to be unused.
Loading history...
282
        """
283
        Validates all mandatory fields are in the output row and are filled.
284
285
        :param dict in_row: The input row.
286
        :param dict tmp_row: Not used.
287
        :param dict out_row: The output row.
288
        """
289
        park_info = ''
290
        for field in self._mandatory_fields:
291
            if field not in out_row or not out_row[field]:
292
                if park_info:
293
                    park_info += ' '
294
                park_info += field
295
296
        return park_info, None
297
298
    # ------------------------------------------------------------------------------------------------------------------
299
    @abc.abstractmethod
300
    def _load_ignored_rows(self):
301
        """
302
        Loads the ignored rows into the database.
303
304
        :rtype: None
305
        """
306
        raise NotImplementedError()
307
308
    # ------------------------------------------------------------------------------------------------------------------
309
    @abc.abstractmethod
310
    def _load_parked_rows(self):
311
        """
312
        Loads the parked rows into the database.
313
314
        :rtype: None
315
        """
316
        raise NotImplementedError()
317
318
    # ------------------------------------------------------------------------------------------------------------------
319
    @abc.abstractmethod
320
    def _load_transformed_rows(self):
321
        """
322
        Loads the successfully transformed rows into the database.
323
324
        :rtype: None
325
        """
326
        raise NotImplementedError()
327
328
    # ------------------------------------------------------------------------------------------------------------------
329
    def _log_statistics(self):
330
        """
331
        Log statistics about the number of rows and number of rows per second.
332
        """
333
        rows_per_second_trans = self._count_total / (self._time1 - self._time0)
334
        rows_per_second_load = self._count_transform / (self._time2 - self._time1)
335
        rows_per_second_overall = self._count_total / (self._time3 - self._time0)
336
337
        self._log('Number of rows processed            : {0:d}'.format(self._count_total))
338
        self._log('Number of rows transformed          : {0:d}'.format(self._count_transform))
339
        self._log('Number of rows ignored              : {0:d}'.format(self._count_ignore))
340
        self._log('Number of rows parked               : {0:d}'.format(self._count_park))
341
        self._log('Number of errors                    : {0:d}'.format(self._count_error))
342
        self._log('Number of rows per second processed : {0:d}'.format(int(rows_per_second_trans)))
343
        self._log('Number of rows per second loaded    : {0:d}'.format(int(rows_per_second_load)))
344
        self._log('Number of rows per second overall   : {0:d}'.format(int(rows_per_second_overall)))
345
346
    # ------------------------------------------------------------------------------------------------------------------
347
    def pre_transform_source_rows(self):
348
        """
349
        This method will be called just before transforming the source rows.
350
351
        :rtype: None
352
        """
353
        pass
354
355
    # ------------------------------------------------------------------------------------------------------------------
356
    def transform_source_rows(self):
357
        """
358
        Transforms the rows for the source system into (partial) dimensional data.
359
        """
360
        # Start timer for overall progress.
361
        self._time0 = time.perf_counter()
362
363
        self.pre_transform_source_rows()
364
365
        # Transform all source rows.
366
        with self._source_reader:
367
            with self._transformed_writer:
368
                with self._parked_writer:
369
                    with self._ignored_writer:
370
                        self._transform_rows()
371
372
        # Time end of transformation.
373
        self._time1 = time.perf_counter()
374
375
        # Load transformed rows into the fact table.
376
        self._load_transformed_rows()
377
378
        # Time end of loading transformed rows.
379
        self._time2 = time.perf_counter()
380
381
        # Load parked and ignored rows into the parked and ignored rows.
382
        self._load_ignored_rows()
383
        self._load_parked_rows()
384
385
        # Time end of loading parked and ignored rows.
386
        self._time3 = time.perf_counter()
387
388
        # Show statistics about number of rows and performance.
389
        self._log_statistics()
390
391
# ----------------------------------------------------------------------------------------------------------------------
392