Passed
Branch master (17b603)
by P.R.
01:31
created

etlt.Transformer.Transformer.__init__()   A

Complexity

Conditions 1

Size

Total Lines 111
Code Lines 16

Duplication

Lines 111
Ratio 100 %

Code Coverage

Tests 0
CRAP Score 2

Importance

Changes 0
Metric Value
cc 1
eloc 16
nop 5
dl 111
loc 111
ccs 0
cts 17
cp 0
crap 2
rs 9.6
c 0
b 0
f 0

How to fix   Long Method   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

1
"""
2
ETLT
3
4
Copyright 2016 Set Based IT Consultancy
5
6
Licence MIT
7
"""
8
import abc
9
import copy
10
import re
11
import time
12
import traceback
13
14
from etlt.cleaner.WhitespaceCleaner import WhitespaceCleaner
15
16
17 View Code Duplication
class Transformer(metaclass=abc.ABCMeta):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
18
    """
19
    Abstract parent class for transforming source data in (partial) dimensional data.
20
    """
21
22
    # ------------------------------------------------------------------------------------------------------------------
23
    def __init__(self, source_reader, transformed_writer, parked_writer, ignored_writer):
24
        """
25
        Object constructor.
26
27
        :param etlt.reader.Reader.Reader source_reader: Object for reading source rows.
28
        :param etlt.writer.SqlLoaderWriter.SqlLoaderWriter transformed_writer: Object for writing successfully
29
                                                                               transformed rows.
30
        :param etlt.writer.SqlLoaderWriter.SqlLoaderWriter parked_writer: Object for writing parked rows.
31
        :param etlt.writer.SqlLoaderWriter.SqlLoaderWriter ignored_writer: Object for writing ignored rows.
32
        """
33
34
        self._count_total = 0
35
        """
36
        The number of rows processed.
37
38
        :type: int
39
        """
40
41
        self._count_transform = 0
42
        """
43
        The number of rows successfully transformed.
44
45
        :type: int
46
        """
47
48
        self._count_park = 0
49
        """
50
        The number of rows parked.
51
52
        :type: int
53
        """
54
55
        self._count_error = 0
56
        """
57
        The number of rows processed with errors.
58
59
        :type: int
60
        """
61
62
        self._count_ignore = 0
63
        """
64
        The number of rows ignored.
65
66
        :type: int
67
        """
68
69
        self._time0 = 0.0
70
        """
71
        Start time of the whole process and start of transforming rows.
72
73
        :type: float
74
        """
75
76
        self._time1 = 0.0
77
        """
78
        End time of transforming rows and start time of the loading transformed rows.
79
80
        :type: float
81
        """
82
83
        self._time2 = 0.0
84
        """
85
        End time of the loading transformed rows and start time of loading parked rows.
86
87
        :type: float
88
        """
89
90
        self._time3 = 0.0
91
        """
92
        End time of the loading parked rows and end time of the whole process.
93
94
        :type: float
95
        """
96
97
        self._source_reader = source_reader
98
        """
99
        Object for reading source rows.
100
101
        :type: etlt.Reader.Reader
102
        """
103
104
        self._transformed_writer = transformed_writer
105
        """
106
        Object for writing successfully transformed rows.
107
108
        :type: etlt.writer.SqlLoaderWriter.SqlLoaderWriter
109
        """
110
111
        self._parked_writer = parked_writer
112
        """
113
        Object for writing parked rows.
114
115
        :type: etlt.writer.SqlLoaderWriter.SqlLoaderWriter
116
        """
117
118
        self._ignored_writer = ignored_writer
119
        """
120
        Object for writing ignored rows.
121
122
        :type: etlt.writer.SqlLoaderWriter.SqlLoaderWriter
123
        """
124
125
        self._mandatory_fields = []
126
        """
127
        The mandatory fields (or columns) in the output row.
128
129
         :type: list[str]
130
        """
131
132
        self._steps = []
133
        """
134
        All _step<n> methods where n is an integer in this class sorted by n.
135
136
        :type: list[callable]
137
        """
138
139
    # ------------------------------------------------------------------------------------------------------------------
140
    @staticmethod
141
    def _log(message):
142
        """
143
        Logs a message.
144
145
        :param str message: The log message.
146
147
        :rtype: None
148
        """
149
        #  @todo Replace with log package.
150
        print(time.strftime('%Y-%m-%d %H:%M:%S', time.localtime()) + ' ' + str(message), flush=True)
151
152
    # ------------------------------------------------------------------------------------------------------------------
153
    def _handle_exception(self, row, exception):
154
        """
155
        Logs an exception occurred during transformation of a row.
156
157
        :param list|dict|() row: The source row.
158
        :param Exception exception: The exception.
159
        """
160
        self._log('Error during processing of line {0:d}.'.format(self._source_reader.row_number))
161
        self._log(row)
162
        self._log(str(exception))
163
        self._log(traceback.format_exc())
164
165
    # ------------------------------------------------------------------------------------------------------------------
166
    def _find_all_step_methods(self):
167
        """
168
        Finds all _step<n> methods where n is an integer in this class.
169
        """
170
        steps = ([method for method in dir(self) if callable(getattr(self, method)) and
171
                  re.match(r'_step\d+\d+.*', method)])
172
        steps = sorted(steps)
173
        for step in steps:
174
            self._steps.append(getattr(self, step))
175
176
    # ------------------------------------------------------------------------------------------------------------------
177
    def _transform_rows(self):
178
        """
179
        Transforms all source rows.
180
        """
181
        self._find_all_step_methods()
182
183
        for row in self._source_reader.next():
184
            self._transform_row_wrapper(row)
185
186
    # ------------------------------------------------------------------------------------------------------------------
187
    def pre_park_row(self, park_info, in_row):
188
        """
189
        This method will be called just be for sending an input row to be parked to the parked writer.
190
191
        :param str park_info: The park info.
192
        :param dict[str,str] in_row: The original input row.
193
194
        :rtype: None
195
        """
196
        pass
197
198
    # ------------------------------------------------------------------------------------------------------------------
199
    def pre_ignore_row(self, ignore_info, in_row):
200
        """
201
        This method will be called just be for sending an input row to be ignored to the ignore writer.
202
203
        :param str ignore_info: The ignore info.
204
        :param dict[str,str] in_row: The original input row.
205
206
        :rtype: None
207
        """
208
        pass
209
210
    # ------------------------------------------------------------------------------------------------------------------
211
    def _transform_row_wrapper(self, row):
212
        """
213
        Transforms a single source row.
214
215
        :param dict[str|str] row: The source row.
216
        """
217
        self._count_total += 1
218
219
        try:
220
            # Transform the naturals keys in line to technical keys.
221
            in_row = copy.copy(row)
222
            out_row = {}
223
            park_info, ignore_info = self._transform_row(in_row, out_row)
224
225
        except Exception as e:
226
            # Log the exception.
227
            self._handle_exception(row, e)
228
            # Keep track of the number of errors.
229
            self._count_error += 1
230
            # This row must be parked.
231
            park_info = 'Exception'
232
            # Keep our IDE happy.
233
            ignore_info = None
234
            out_row = {}
235
236
        if park_info:
237
            # Park the row.
238
            self.pre_park_row(park_info, row)
239
            self._parked_writer.writerow(row)
240
            self._count_park += 1
241
        elif ignore_info:
242
            # Ignore the row.
243
            self.pre_ignore_row(ignore_info, row)
244
            self._ignored_writer.writerow(row)
245
            self._count_ignore += 1
246
        else:
247
            # Write the technical keys and measures to the output file.
248
            self._transformed_writer.writerow(out_row)
249
            self._count_transform += 1
250
251
    # ------------------------------------------------------------------------------------------------------------------
252
    def _transform_row(self, in_row, out_row):
253
        """
254
        Transforms an input row to an output row (i.e. (partial) dimensional data).
255
256
        :param dict[str,str] in_row: The input row.
257
        :param dict[str,T] out_row: The output row.
258
259
        :rtype: (str,str)
260
        """
261
        tmp_row = {}
262
263
        for step in self._steps:
264
            park_info, ignore_info = step(in_row, tmp_row, out_row)
265
            if park_info or ignore_info:
266
                return park_info, ignore_info
267
268
        return None, None
269
270
    # ------------------------------------------------------------------------------------------------------------------
271
    def _step00(self, in_row, tmp_row, out_row):
272
        """
273
        Prunes whitespace for all fields in the input row.
274
275
        :param dict in_row: The input row.
276
        :param dict tmp_row: Not used.
277
        :param dict out_row: Not used.
278
        """
279
        for key, value in in_row.items():
280
            in_row[key] = WhitespaceCleaner.clean(value)
281
282
        return None, None
283
284
    # ------------------------------------------------------------------------------------------------------------------
285
    def _step99(self, in_row, tmp_row, out_row):
286
        """
287
        Validates all mandatory fields are in the output row and are filled.
288
289
        :param dict in_row: The input row.
290
        :param dict tmp_row: Not used.
291
        :param dict out_row: The output row.
292
        """
293
        park_info = ''
294
        for field in self._mandatory_fields:
295
            if field not in out_row or not out_row[field]:
296
                if park_info:
297
                    park_info += ' '
298
                park_info += field
299
300
        return park_info, None
301
302
    # ------------------------------------------------------------------------------------------------------------------
303
    @abc.abstractmethod
304
    def _load_ignored_rows(self):
305
        """
306
        Loads the ignored rows into the database.
307
308
        :rtype: None
309
        """
310
        raise NotImplementedError()
311
312
    # ------------------------------------------------------------------------------------------------------------------
313
    @abc.abstractmethod
314
    def _load_parked_rows(self):
315
        """
316
        Loads the parked rows into the database.
317
318
        :rtype: None
319
        """
320
        raise NotImplementedError()
321
322
    # ------------------------------------------------------------------------------------------------------------------
323
    @abc.abstractmethod
324
    def _load_transformed_rows(self):
325
        """
326
        Loads the successfully transformed rows into the database.
327
328
        :rtype: None
329
        """
330
        raise NotImplementedError()
331
332
    # ------------------------------------------------------------------------------------------------------------------
333
    def _log_statistics(self):
334
        """
335
        Log statistics about the number of rows and number of rows per second.
336
        """
337
        rows_per_second_trans = self._count_total / (self._time1 - self._time0)
338
        rows_per_second_load = self._count_transform / (self._time2 - self._time1)
339
        rows_per_second_overall = self._count_total / (self._time3 - self._time0)
340
341
        self._log('Number of rows processed            : {0:d}'.format(self._count_total))
342
        self._log('Number of rows transformed          : {0:d}'.format(self._count_transform))
343
        self._log('Number of rows ignored              : {0:d}'.format(self._count_ignore))
344
        self._log('Number of rows parked               : {0:d}'.format(self._count_park))
345
        self._log('Number of errors                    : {0:d}'.format(self._count_error))
346
        self._log('Number of rows per second processed : {0:d}'.format(int(rows_per_second_trans)))
347
        self._log('Number of rows per second loaded    : {0:d}'.format(int(rows_per_second_load)))
348
        self._log('Number of rows per second overall   : {0:d}'.format(int(rows_per_second_overall)))
349
350
    # ------------------------------------------------------------------------------------------------------------------
351
    def pre_transform_source_rows(self):
352
        """
353
        This method will be called just before transforming the source rows.
354
355
        :rtype: None
356
        """
357
        pass
358
359
    # ------------------------------------------------------------------------------------------------------------------
360
    def transform_source_rows(self):
361
        """
362
        Transforms the rows for the source system into (partial) dimensional data.
363
        """
364
        # Start timer for overall progress.
365
        self._time0 = time.perf_counter()
366
367
        self.pre_transform_source_rows()
368
369
        # Transform all source rows.
370
        with self._source_reader:
371
            with self._transformed_writer:
372
                with self._parked_writer:
373
                    with self._ignored_writer:
374
                        self._transform_rows()
375
376
        # Time end of transformation.
377
        self._time1 = time.perf_counter()
378
379
        # Load transformed rows into the fact table.
380
        self._load_transformed_rows()
381
382
        # Time end of loading transformed rows.
383
        self._time2 = time.perf_counter()
384
385
        # Load parked and ignored rows into the parked and ignored rows.
386
        self._load_ignored_rows()
387
        self._load_parked_rows()
388
389
        # Time end of loading parked and ignored rows.
390
        self._time3 = time.perf_counter()
391
392
        # Show statistics about number of rows and performance.
393
        self._log_statistics()
394
395
# ----------------------------------------------------------------------------------------------------------------------
396