Completed
Push — master ( 0c702e...f32b73 )
by P.R.
01:15
created

Transformer._step00()   A

Complexity

Conditions 2

Size

Total Lines 12

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 2
c 0
b 0
f 0
dl 0
loc 12
rs 9.4285
1
"""
2
ETLT
3
4
Copyright 2016 Set Based IT Consultancy
5
6
Licence MIT
7
"""
8
import abc
9
import copy
10
import re
11
import time
12
import traceback
13
14
from etlt.cleaner.WhitespaceCleaner import WhitespaceCleaner
0 ignored issues
show
Bug introduced by
The name cleaner does not seem to exist in module etlt.
Loading history...
Configuration introduced by
The import etlt.cleaner.WhitespaceCleaner could not be resolved.

This can be caused by one of the following:

1. Missing Dependencies

This error could indicate a configuration issue of Pylint. Make sure that your libraries are available by adding the necessary commands.

# .scrutinizer.yml
before_commands:
    - sudo pip install abc # Python2
    - sudo pip3 install abc # Python3
Tip: We are currently not using virtualenv to run pylint, when installing your modules make sure to use the command for the correct version.

2. Missing __init__.py files

This error could also result from missing __init__.py files in your module folders. Make sure that you place one file in each sub-folder.

Loading history...
15
16
17
class Transformer:
1 ignored issue
show
Unused Code introduced by
This abstract class does not seem to be used anywhere.
Loading history...
18
    """
19
    Abstract parent class for transforming source data in (partial) dimensional data.
20
    """
21
22
    # ------------------------------------------------------------------------------------------------------------------
23
    def __init__(self, source_reader, transformed_writer, parked_writer, ignored_writer):
24
        """
25
        Object constructor.
26
27
        :param etlt.Reader.Reader source_reader: Object for reading source rows.
28
        :param etlt.Writer.Writer transformed_writer: Object for writing successfully transformed rows.
29
        :param etlt.Writer.Writer parked_writer: Object for writing parked rows.
30
        :param etlt.Writer.Writer ignored_writer: Object for writing ignored rows.
31
        """
32
33
        self._count_total = 0
34
        """
35
        The number of rows processed.
36
37
        :type: int
38
        """
39
40
        self._count_transform = 0
41
        """
42
        The number of rows successfully transformed.
43
44
        :type: int
45
        """
46
47
        self._count_park = 0
48
        """
49
        The number of rows parked.
50
51
        :type: int
52
        """
53
54
        self._count_error = 0
55
        """
56
        The number of rows processed with errors.
57
58
        :type: int
59
        """
60
61
        self._count_ignore = 0
62
        """
63
        The number of rows ignored.
64
65
        :type: int
66
        """
67
68
        self._time0 = 0.0
69
        """
70
        Start time of the whole process and start of transforming rows.
71
72
        :type: float
73
        """
74
75
        self._time1 = 0.0
76
        """
77
        End time of transforming rows and start time of the loading transformed rows.
78
79
        :type: float
80
        """
81
82
        self._time2 = 0.0
83
        """
84
        End time of the loading transformed rows and start time of loading parked rows.
85
86
        :type: float
87
        """
88
89
        self._time3 = 0.0
90
        """
91
        End time of the loading parked rows and end time of the whole process.
92
93
        :type: float
94
        """
95
96
        self._source_reader = source_reader
97
        """
98
        Object for reading source rows.
99
100
        :type: etlt.Reader.Reader
101
        """
102
103
        self._transformed_writer = transformed_writer
104
        """
105
        Object for writing successfully transformed rows.
106
107
        :type: etlt.Writer.Writer
108
        """
109
110
        self._parked_writer = parked_writer
111
        """
112
        Object for writing parked rows.
113
114
        :type: etlt.Writer.Writer
115
        """
116
117
        self._ignored_write = ignored_writer
118
        """
119
        Object for writing ignored rows.
120
121
        :type: etlt.Writer.Writer
122
        """
123
124
        self._mandatory_fields = []
125
        """
126
        The mandatory fields (or columns) in the output row.
127
128
         :type list[str]:
129
        """
130
131
        self._steps = []
132
        """
133
        All _step<n> methods where n is an integer in this class sorted by n.
134
135
        :type list[str]:
136
        """
137
138
    # ------------------------------------------------------------------------------------------------------------------
139
    def _log(self, message):
0 ignored issues
show
Coding Style introduced by
This method could be written as a function/class method.

If a method does not access any attributes of the class, it could also be implemented as a function or static method. This can help improve readability. For example

class Foo:
    def some_method(self, x, y):
        return x + y;

could be written as

class Foo:
    @classmethod
    def some_method(cls, x, y):
        return x + y;
Loading history...
140
        """
141
        Logs a message.
142
143
        :param str message: The log message.
144
145
        :rtype None:
146
        """
147
        print(time.strftime('%Y-%m-%d %H:%M:%S', time.localtime()) + ' ' + str(message))
148
149
    # ------------------------------------------------------------------------------------------------------------------
150
    def _log_exception(self, row, exception):
151
        """
152
        Logs an exception occurred during transformation of a row.
153
154
        :param list|dict|() row: The source row.
155
        :param Exception exception: The exception.
156
        """
157
        self._log('Error during processing of line {0:d}.'.format(self._source_reader.row_number))
158
        self._log(row)
159
        self._log(str(exception))
160
        self._log(traceback.format_exc())
161
162
    # ------------------------------------------------------------------------------------------------------------------
163
    def _find_all_step_methods(self):
164
        """
165
        Finds all _step<n> methods where n is an integer in this class.
166
        """
167
        steps = ([method for method in dir(self) if callable(getattr(self, method)) and
168
                  re.match('_step\d+\d+.*', method)])
0 ignored issues
show
Bug introduced by
A suspicious escape sequence \d was found. Did you maybe forget to add an r prefix?

Escape sequences in Python are generally interpreted according to rules similar to standard C. Only if strings are prefixed with r or R are they interpreted as regular expressions.

The escape sequence that was used indicates that you might have intended to write a regular expression.

Learn more about the available escape sequences. in the Python documentation.

Loading history...
169
        self._steps = sorted(steps)
170
171
    # ------------------------------------------------------------------------------------------------------------------
172
    def _transform_rows(self):
173
        """
174
        Transforms all source rows.
175
        """
176
        self._find_all_step_methods()
177
178
        for row in self._source_reader.next():
179
            self._transform_row_wrapper(row)
180
181
    # ------------------------------------------------------------------------------------------------------------------
182
    def _transform_row_wrapper(self, row):
183
        """
184
        Transforms a single source row.
185
186
        :param dict[str|str] row: The source row.
187
        """
188
        self._count_total += 1
189
190
        try:
191
            # Transform the naturals keys in line to technical keys.
192
            in_row = copy.copy(row)
193
            out_row = {}
194
            park_info, ignore_info = self._transform_row(in_row, out_row)
195
196
        except Exception as e:
1 ignored issue
show
Best Practice introduced by
Catching very general exceptions such as Exception is usually not recommended.

Generally, you would want to handle very specific errors in the exception handler. This ensure that you do not hide other types of errors which should be fixed.

So, unless you specifically plan to handle any error, consider adding a more specific exception.

Loading history...
197
            # Log the exception.
198
            self._log_exception(row, e)
199
            # Keep track of the number of errors.
200
            self._count_error += 1
201
            # This row must be parked.
202
            park_info = 'Exception'
203
            # Keep our IDE happy.
204
            ignore_info = None
205
            out_row = {}
206
207
        if park_info:
208
            # Park the row.
209
            # self._parked_writer.put_row(park)
210
            self._count_park += 1
211
        elif ignore_info:
212
            # Ignore the row.
213
            # self._ignore_writer.put_row(park)
214
            self._count_ignore += 1
215
        else:
216
            # Write the technical keys and measures to the output file.
217
            self._transformed_writer.write(out_row)
218
            self._count_transform += 1
219
220
    # ------------------------------------------------------------------------------------------------------------------
221
    def _transform_row(self, in_row, out_row):
222
        """
223
        Transforms an input row to an output row (i.e. (partial) dimensional data).
224
225
        :param dict[str,str] in_row: The input row.
226
        :param dict[str,T] out_row: The output row.
227
228
        :rtype (str,str):
229
        """
230
        tmp_row = {}
231
232
        for step in self._steps:
233
            park_info, ignore_info = getattr(self, step)(in_row, tmp_row, out_row)
234
            if park_info or ignore_info:
235
                return park_info, ignore_info
236
237
        return None, None
238
239
    # ------------------------------------------------------------------------------------------------------------------
240
    def _step00(self, in_row, tmp_row, out_row):
3 ignored issues
show
Unused Code introduced by
The argument tmp_row seems to be unused.
Loading history...
Unused Code introduced by
The argument out_row seems to be unused.
Loading history...
Coding Style introduced by
This method could be written as a function/class method.

If a method does not access any attributes of the class, it could also be implemented as a function or static method. This can help improve readability. For example

class Foo:
    def some_method(self, x, y):
        return x + y;

could be written as

class Foo:
    @classmethod
    def some_method(cls, x, y):
        return x + y;
Loading history...
241
        """
242
        Prunes whitespace for all fields in the input row.
243
244
        :param dict in_row: The input row.
245
        :param dict tmp_row: Not used.
246
        :param dict out_row: Not used.
247
        """
248
        for key, value in in_row.items():
0 ignored issues
show
Unused Code introduced by
The variable value seems to be unused.
Loading history...
249
            in_row[key] = WhitespaceCleaner.clean(in_row[key])
250
251
        return None, None
252
253
    # ------------------------------------------------------------------------------------------------------------------
254
    def _step99(self, in_row, tmp_row, out_row):
2 ignored issues
show
Unused Code introduced by
The argument in_row seems to be unused.
Loading history...
Unused Code introduced by
The argument tmp_row seems to be unused.
Loading history...
255
        """
256
        Validates all mandatory fields are in the output row and are filled.
257
258
        :param dict in_row: The input row.
259
        :param dict tmp_row: Not used.
260
        :param dict out_row: The output row.
261
        """
262
        park_info = ''
263
        for field in self._mandatory_fields:
264
            if field not in out_row or not out_row[field]:
265
                if park_info:
266
                    park_info += ' '
267
                park_info += field
268
269
        return park_info, None
270
271
    # ------------------------------------------------------------------------------------------------------------------
272
    @abc.abstractmethod
273
    def _load_ignored_rows(self):
274
        """
275
        Loads the ignored rows into the database.
276
277
        :rtype: None
278
        """
279
        raise NotImplementedError()
280
281
    # ------------------------------------------------------------------------------------------------------------------
282
    @abc.abstractmethod
283
    def _load_parked_rows(self):
284
        """
285
        Loads the parked rows into the database.
286
287
        :rtype: None
288
        """
289
        raise NotImplementedError()
290
291
    # ------------------------------------------------------------------------------------------------------------------
292
    @abc.abstractmethod
293
    def _load_transformed_rows(self):
294
        """
295
        Loads the successfully transformed rows into the database.
296
297
        :rtype: None
298
        """
299
        raise NotImplementedError()
300
301
    # ------------------------------------------------------------------------------------------------------------------
302
    def _log_statistics(self):
303
        """
304
        Log statistics about the number of rows and number of rows per second.
305
        """
306
        rows_per_second_trans = self._count_total / (self._time1 - self._time0)
307
        rows_per_second_load = self._count_transform / (self._time2 - self._time1)
308
        rows_per_second_overall = self._count_total / (self._time3 - self._time0)
309
310
        self._log('Number of rows processed            : {0:d}'.format(self._count_total))
311
        self._log('Number of rows transformed          : {0:d}'.format(self._count_transform))
312
        self._log('Number of rows ignored              : {0:d}'.format(self._count_ignore))
313
        self._log('Number of rows parked               : {0:d}'.format(self._count_park))
314
        self._log('Number of errors                    : {0:d}'.format(self._count_error))
315
        self._log('Number of rows per second processed : {0:d}'.format(int(rows_per_second_trans)))
316
        self._log('Number of rows per second loaded    : {0:d}'.format(int(rows_per_second_load)))
317
        self._log('Number of rows per second overall   : {0:d}'.format(int(rows_per_second_overall)))
318
319
    # ------------------------------------------------------------------------------------------------------------------
320
    def transform_source_rows(self):
321
        """
322
        Transforms the rows for the source system into (partial) dimensional data.
323
        """
324
        # Start timer for overall progress.
325
        self._time0 = time.perf_counter()
326
327
        # Transform all source rows.
328
        with self._source_reader:
329
            with self._transformed_writer:
330
                with self._parked_writer:
331
                    with self._ignored_write:
332
                        self._transform_rows()
333
334
        # Time end of transformation.
335
        self._time1 = time.perf_counter()
336
337
        # Load transformed rows into the fact table.
338
        self._load_transformed_rows()
339
340
        # Time end of loading transformed rows.
341
        self._time2 = time.perf_counter()
342
343
        # Load parked and ignored rows into the parked and ignored rows.
344
        self._load_ignored_rows()
345
        self._load_parked_rows()
346
347
        # Time end of loading parked and ignored rows.
348
        self._time3 = time.perf_counter()
349
350
        # Show statistics about number of rows and performance.
351
        self._log_statistics()
352
353
# ----------------------------------------------------------------------------------------------------------------------
354