Completed
Push — master ( 5ebf4b...5b2e5b )
by P.R.
01:14
created

Transformer.pre_park_row()   A

Complexity

Conditions 1

Size

Total Lines 10

Duplication

Lines 0
Ratio 0 %

Importance

Changes 1
Bugs 0 Features 0
Metric Value
cc 1
c 1
b 0
f 0
dl 0
loc 10
rs 9.4285
1
"""
2
ETLT
3
4
Copyright 2016 Set Based IT Consultancy
5
6
Licence MIT
7
"""
8
import abc
9
import copy
10
import re
11
import time
12
import traceback
13
14
from etlt.cleaner.WhitespaceCleaner import WhitespaceCleaner
15
16
17
class Transformer:
1 ignored issue
show
Unused Code introduced by
This abstract class does not seem to be used anywhere.
Loading history...
18
    """
19
    Abstract parent class for transforming source data in (partial) dimensional data.
20
    """
21
22
    # ------------------------------------------------------------------------------------------------------------------
23
    def __init__(self, source_reader, transformed_writer, parked_writer, ignored_writer):
24
        """
25
        Object constructor.
26
27
        :param etlt.reader.Reader.Reader source_reader: Object for reading source rows.
28
        :param etlt.writer.SqlLoaderWriter.SqlLoaderWriter transformed_writer: Object for writing successfully transformed rows.
0 ignored issues
show
Coding Style introduced by
This line is too long as per the coding-style (128/120).

This check looks for lines that are too long. You can specify the maximum line length.

Loading history...
29
        :param etlt.writer.SqlLoaderWriter.SqlLoaderWriter parked_writer: Object for writing parked rows.
30
        :param etlt.writer.SqlLoaderWriter.SqlLoaderWriter ignored_writer: Object for writing ignored rows.
31
        """
32
33
        self._count_total = 0
34
        """
35
        The number of rows processed.
36
37
        :type: int
38
        """
39
40
        self._count_transform = 0
41
        """
42
        The number of rows successfully transformed.
43
44
        :type: int
45
        """
46
47
        self._count_park = 0
48
        """
49
        The number of rows parked.
50
51
        :type: int
52
        """
53
54
        self._count_error = 0
55
        """
56
        The number of rows processed with errors.
57
58
        :type: int
59
        """
60
61
        self._count_ignore = 0
62
        """
63
        The number of rows ignored.
64
65
        :type: int
66
        """
67
68
        self._time0 = 0.0
69
        """
70
        Start time of the whole process and start of transforming rows.
71
72
        :type: float
73
        """
74
75
        self._time1 = 0.0
76
        """
77
        End time of transforming rows and start time of the loading transformed rows.
78
79
        :type: float
80
        """
81
82
        self._time2 = 0.0
83
        """
84
        End time of the loading transformed rows and start time of loading parked rows.
85
86
        :type: float
87
        """
88
89
        self._time3 = 0.0
90
        """
91
        End time of the loading parked rows and end time of the whole process.
92
93
        :type: float
94
        """
95
96
        self._source_reader = source_reader
97
        """
98
        Object for reading source rows.
99
100
        :type: etlt.Reader.Reader
101
        """
102
103
        self._transformed_writer = transformed_writer
104
        """
105
        Object for writing successfully transformed rows.
106
107
        :type: etlt.writer.SqlLoaderWriter.SqlLoaderWriter
108
        """
109
110
        self._parked_writer = parked_writer
111
        """
112
        Object for writing parked rows.
113
114
        :type: etlt.writer.SqlLoaderWriter.SqlLoaderWriter
115
        """
116
117
        self._ignored_writer = ignored_writer
118
        """
119
        Object for writing ignored rows.
120
121
        :type: etlt.writer.SqlLoaderWriter.SqlLoaderWriter
122
        """
123
124
        self._mandatory_fields = []
125
        """
126
        The mandatory fields (or columns) in the output row.
127
128
         :type list[str]:
129
        """
130
131
        self._steps = []
132
        """
133
        All _step<n> methods where n is an integer in this class sorted by n.
134
135
        :type list[str]:
136
        """
137
138
    # ------------------------------------------------------------------------------------------------------------------
139
    def _log(self, message):
0 ignored issues
show
Coding Style introduced by
This method could be written as a function/class method.

If a method does not access any attributes of the class, it could also be implemented as a function or static method. This can help improve readability. For example

class Foo:
    def some_method(self, x, y):
        return x + y;

could be written as

class Foo:
    @classmethod
    def some_method(cls, x, y):
        return x + y;
Loading history...
140
        """
141
        Logs a message.
142
143
        :param str message: The log message.
144
145
        :rtype None:
146
        """
147
        print(time.strftime('%Y-%m-%d %H:%M:%S', time.localtime()) + ' ' + str(message))
148
149
    # ------------------------------------------------------------------------------------------------------------------
150
    def _log_exception(self, row, exception):
151
        """
152
        Logs an exception occurred during transformation of a row.
153
154
        :param list|dict|() row: The source row.
155
        :param Exception exception: The exception.
156
        """
157
        self._log('Error during processing of line {0:d}.'.format(self._source_reader.row_number))
158
        self._log(row)
159
        self._log(str(exception))
160
        self._log(traceback.format_exc())
161
162
    # ------------------------------------------------------------------------------------------------------------------
163
    def _find_all_step_methods(self):
164
        """
165
        Finds all _step<n> methods where n is an integer in this class.
166
        """
167
        steps = ([method for method in dir(self) if callable(getattr(self, method)) and
168
                  re.match('_step\d+\d+.*', method)])
0 ignored issues
show
Bug introduced by
A suspicious escape sequence \d was found. Did you maybe forget to add an r prefix?

Escape sequences in Python are generally interpreted according to rules similar to standard C. Only if strings are prefixed with r or R are they interpreted as regular expressions.

The escape sequence that was used indicates that you might have intended to write a regular expression.

Learn more about the available escape sequences. in the Python documentation.

Loading history...
169
        self._steps = sorted(steps)
170
171
    # ------------------------------------------------------------------------------------------------------------------
172
    def _transform_rows(self):
173
        """
174
        Transforms all source rows.
175
        """
176
        self._find_all_step_methods()
177
178
        for row in self._source_reader.next():
179
            self._transform_row_wrapper(row)
180
181
    # ------------------------------------------------------------------------------------------------------------------
182
    def pre_park_row(self, park_info, in_row):
183
        """
184
        This method will be called just be for sending an input row to be parked to the parked writer.
185
186
        :param str park_info: The park info.
187
        :param dict[str,str] in_row: The original input row.
188
189
        :rtype: None
190
        """
191
        pass
192
193
    # ------------------------------------------------------------------------------------------------------------------
194
    def pre_ignore_row(self, ignore_info, in_row):
195
        """
196
        This method will be called just be for sending an input row to be ignored to the ignore writer.
197
198
        :param str ignore_info: The ignore info.
199
        :param dict[str,str] in_row: The original input row.
200
201
        :rtype: None
202
        """
203
        pass
204
205
    # ------------------------------------------------------------------------------------------------------------------
206
    def _transform_row_wrapper(self, row):
207
        """
208
        Transforms a single source row.
209
210
        :param dict[str|str] row: The source row.
211
        """
212
        self._count_total += 1
213
214
        try:
215
            # Transform the naturals keys in line to technical keys.
216
            in_row = copy.copy(row)
217
            out_row = {}
218
            park_info, ignore_info = self._transform_row(in_row, out_row)
219
220
        except Exception as e:
1 ignored issue
show
Best Practice introduced by
Catching very general exceptions such as Exception is usually not recommended.

Generally, you would want to handle very specific errors in the exception handler. This ensure that you do not hide other types of errors which should be fixed.

So, unless you specifically plan to handle any error, consider adding a more specific exception.

Loading history...
221
            # Log the exception.
222
            self._log_exception(row, e)
223
            # Keep track of the number of errors.
224
            self._count_error += 1
225
            # This row must be parked.
226
            park_info = 'Exception'
227
            # Keep our IDE happy.
228
            ignore_info = None
229
            out_row = {}
230
231
        if park_info:
232
            # Park the row.
233
            self.pre_park_row(park_info, row)
234
            self._parked_writer.write(row)
235
            self._count_park += 1
236
        elif ignore_info:
237
            # Ignore the row.
238
            self.pre_ignore_row(ignore_info, row)
239
            self._ignored_writer.write(row)
240
            self._count_ignore += 1
241
        else:
242
            # Write the technical keys and measures to the output file.
243
            self._transformed_writer.write(out_row)
244
            self._count_transform += 1
245
246
    # ------------------------------------------------------------------------------------------------------------------
247
    def _transform_row(self, in_row, out_row):
248
        """
249
        Transforms an input row to an output row (i.e. (partial) dimensional data).
250
251
        :param dict[str,str] in_row: The input row.
252
        :param dict[str,T] out_row: The output row.
253
254
        :rtype (str,str):
255
        """
256
        tmp_row = {}
257
258
        for step in self._steps:
259
            park_info, ignore_info = getattr(self, step)(in_row, tmp_row, out_row)
260
            if park_info or ignore_info:
261
                return park_info, ignore_info
262
263
        return None, None
264
265
    # ------------------------------------------------------------------------------------------------------------------
266
    def _step00(self, in_row, tmp_row, out_row):
3 ignored issues
show
Unused Code introduced by
The argument out_row seems to be unused.
Loading history...
Unused Code introduced by
The argument tmp_row seems to be unused.
Loading history...
Coding Style introduced by
This method could be written as a function/class method.

If a method does not access any attributes of the class, it could also be implemented as a function or static method. This can help improve readability. For example

class Foo:
    def some_method(self, x, y):
        return x + y;

could be written as

class Foo:
    @classmethod
    def some_method(cls, x, y):
        return x + y;
Loading history...
267
        """
268
        Prunes whitespace for all fields in the input row.
269
270
        :param dict in_row: The input row.
271
        :param dict tmp_row: Not used.
272
        :param dict out_row: Not used.
273
        """
274
        for key, value in in_row.items():
0 ignored issues
show
Unused Code introduced by
The variable value seems to be unused.
Loading history...
275
            in_row[key] = WhitespaceCleaner.clean(in_row[key])
276
277
        return None, None
278
279
    # ------------------------------------------------------------------------------------------------------------------
280
    def _step99(self, in_row, tmp_row, out_row):
2 ignored issues
show
Unused Code introduced by
The argument tmp_row seems to be unused.
Loading history...
Unused Code introduced by
The argument in_row seems to be unused.
Loading history...
281
        """
282
        Validates all mandatory fields are in the output row and are filled.
283
284
        :param dict in_row: The input row.
285
        :param dict tmp_row: Not used.
286
        :param dict out_row: The output row.
287
        """
288
        park_info = ''
289
        for field in self._mandatory_fields:
290
            if field not in out_row or not out_row[field]:
291
                if park_info:
292
                    park_info += ' '
293
                park_info += field
294
295
        return park_info, None
296
297
    # ------------------------------------------------------------------------------------------------------------------
298
    @abc.abstractmethod
299
    def _load_ignored_rows(self):
300
        """
301
        Loads the ignored rows into the database.
302
303
        :rtype: None
304
        """
305
        raise NotImplementedError()
306
307
    # ------------------------------------------------------------------------------------------------------------------
308
    @abc.abstractmethod
309
    def _load_parked_rows(self):
310
        """
311
        Loads the parked rows into the database.
312
313
        :rtype: None
314
        """
315
        raise NotImplementedError()
316
317
    # ------------------------------------------------------------------------------------------------------------------
318
    @abc.abstractmethod
319
    def _load_transformed_rows(self):
320
        """
321
        Loads the successfully transformed rows into the database.
322
323
        :rtype: None
324
        """
325
        raise NotImplementedError()
326
327
    # ------------------------------------------------------------------------------------------------------------------
328
    def _log_statistics(self):
329
        """
330
        Log statistics about the number of rows and number of rows per second.
331
        """
332
        rows_per_second_trans = self._count_total / (self._time1 - self._time0)
333
        rows_per_second_load = self._count_transform / (self._time2 - self._time1)
334
        rows_per_second_overall = self._count_total / (self._time3 - self._time0)
335
336
        self._log('Number of rows processed            : {0:d}'.format(self._count_total))
337
        self._log('Number of rows transformed          : {0:d}'.format(self._count_transform))
338
        self._log('Number of rows ignored              : {0:d}'.format(self._count_ignore))
339
        self._log('Number of rows parked               : {0:d}'.format(self._count_park))
340
        self._log('Number of errors                    : {0:d}'.format(self._count_error))
341
        self._log('Number of rows per second processed : {0:d}'.format(int(rows_per_second_trans)))
342
        self._log('Number of rows per second loaded    : {0:d}'.format(int(rows_per_second_load)))
343
        self._log('Number of rows per second overall   : {0:d}'.format(int(rows_per_second_overall)))
344
345
    # ------------------------------------------------------------------------------------------------------------------
346
    def pre_transform_source_rows(self):
347
        """
348
        This method will be called just before transforming the source rows.
349
350
        :rtype: None
351
        """
352
        pass
353
354
    # ------------------------------------------------------------------------------------------------------------------
355
    def transform_source_rows(self):
356
        """
357
        Transforms the rows for the source system into (partial) dimensional data.
358
        """
359
        # Start timer for overall progress.
360
        self._time0 = time.perf_counter()
361
362
        self.pre_transform_source_rows()
363
364
        # Transform all source rows.
365
        with self._source_reader:
366
            with self._transformed_writer:
367
                with self._parked_writer:
368
                    with self._ignored_writer:
369
                        self._transform_rows()
370
371
        # Time end of transformation.
372
        self._time1 = time.perf_counter()
373
374
        # Load transformed rows into the fact table.
375
        self._load_transformed_rows()
376
377
        # Time end of loading transformed rows.
378
        self._time2 = time.perf_counter()
379
380
        # Load parked and ignored rows into the parked and ignored rows.
381
        self._load_ignored_rows()
382
        self._load_parked_rows()
383
384
        # Time end of loading parked and ignored rows.
385
        self._time3 = time.perf_counter()
386
387
        # Show statistics about number of rows and performance.
388
        self._log_statistics()
389
390
# ----------------------------------------------------------------------------------------------------------------------
391