Completed
Push — master ( 36b944...d294be )
by P.R.
01:12
created

Transformer._load_transformed_rows()   A

Complexity

Conditions 1

Size

Total Lines 8

Duplication

Lines 0
Ratio 0 %

Importance

Changes 1
Bugs 0 Features 0
Metric Value
cc 1
c 1
b 0
f 0
dl 0
loc 8
rs 9.4285
1
"""
2
ETLT
3
4
Copyright 2016 Set Based IT Consultancy
5
6
Licence MIT
7
"""
8
import abc
9
import time
10
import traceback
11
12
13
class Transformer:
1 ignored issue
show
Unused Code introduced by
This abstract class does not seem to be used anywhere.
Loading history...
14
    """
15
    Abstract parent class for transforming source data in (partial) dimensional data.
16
    """
17
18
    # ------------------------------------------------------------------------------------------------------------------
19
    def __init__(self, source_reader, transformed_writer, parked_writer, ignored_writer):
20
        """
21
        Object constructor.
22
23
        :param Reader source_reader: Object for reading source rows.
24
        :param Writer transformed_writer: Object for writing successfully transformed rows.
25
        :param Writer parked_writer: Object for writing parked rows.
26
        :param Writer ignored_writer: Object for writing ignored rows.
27
        """
28
29
        self._count_total = 0
30
        """
31
        The number of rows processed.
32
33
        :type: int
34
        """
35
36
        self._count_transform = 0
37
        """
38
        The number of rows successfully transformed.
39
40
        :type: int
41
        """
42
43
        self._count_park = 0
44
        """
45
        The number of rows parked.
46
47
        :type: int
48
        """
49
50
        self._count_error = 0
51
        """
52
        The number of rows processed with errors.
53
54
        :type: int
55
        """
56
57
        self._count_ignore = 0
58
        """
59
        The number of rows ignored.
60
61
        :type: int
62
        """
63
64
        self._time0 = 0.0
65
        """
66
        Start time of the whole process and start of transforming rows.
67
68
        :type: float
69
        """
70
71
        self._time1 = 0.0
72
        """
73
        End time of transforming rows and start time of the loading transformed rows.
74
75
        :type: float
76
        """
77
78
        self._time2 = 0.0
79
        """
80
        End time of the loading transformed rows and start time of loading parked rows.
81
82
        :type: float
83
        """
84
85
        self._time3 = 0.0
86
        """
87
        End time of the loading parked rows and end time of the whole process.
88
89
        :type: float
90
        """
91
92
        self._source_reader = source_reader
93
        """
94
        Object for reading source rows.
95
96
        :type: etlt.Reader.Reader
97
        """
98
99
        self._transformed_writer = transformed_writer
100
        """
101
        Object for writing successfully transformed rows.
102
103
        :type: etlt.Writer.Writer
104
        """
105
106
        self._parked_writer = parked_writer
107
        """
108
        Object for writing parked rows.
109
110
        :type: etlt.Writer.Writer
111
        """
112
113
        self._ignored_write = ignored_writer
114
        """
115
        Object for writing ignored rows.
116
117
        :type: etlt.Writer.Writer
118
        """
119
120
        # The dimension keys in the output row.
121
        self.keys = ()
122
123
    # ------------------------------------------------------------------------------------------------------------------
124
    def _log(self, message):
0 ignored issues
show
Coding Style introduced by
This method could be written as a function/class method.

If a method does not access any attributes of the class, it could also be implemented as a function or static method. This can help improve readability. For example

class Foo:
    def some_method(self, x, y):
        return x + y;

could be written as

class Foo:
    @classmethod
    def some_method(cls, x, y):
        return x + y;
Loading history...
125
        """
126
        Logs a message.
127
128
        :param str message: The log message.
129
130
        :rtype None:
131
        """
132
        print(time.strftime('%Y-%m-%d %H:%M:%S', time.localtime()) + ' ' + str(message))
133
134
    # ------------------------------------------------------------------------------------------------------------------
135
    def _test_dimension_keys(self, output):
136
        """
137
        Tests whether all dimension keys are found. Returns a tuple with a bool indicating the row must be parked and
138
        the names of the missing dimension keys.
139
140
        :param list output: The transformed row.
141
142
        :rtype: (bool, str)
143
        """
144
        park = False
145
        park_info = ''
146
        i = 0
147
        n = len(self.keys)
148
        while i < n:
149
            if output[i] is None:
150
                park = True
151
                park_info += ' ' if park_info != '' else ''
152
                park_info += self.keys[i]
153
            i += 1
154
155
        return park, park_info
156
157
    # ------------------------------------------------------------------------------------------------------------------
158
    def _log_exception(self, row, exception):
159
        """
160
        Logs an exception occurred during transformation of a row.
161
162
        :param list|dict|() row: The source row.
163
        :param Exception exception: The exception.
164
        """
165
        self._log('Error during processing of line {0:d}.'.format(self._source_reader.row_number))
166
        self._log(row)
167
        self._log(str(exception))
168
        self._log(traceback.format_exc())
169
170
    # ------------------------------------------------------------------------------------------------------------------
171
    def _transform_row_wrapper(self, row):
172
        """
173
        Transforms a single source row.
174
175
        :param list|dict|() row: The source row.
176
        """
177
        self._count_total += 1
178
179
        try:
180
            # Transform the naturals keys in line to technical keys.
181
            output = self._transform_row(row)
182
183
            # Test all dimension keys are found.
184
            park, park_info = self._test_dimension_keys(output)
185
186
        except Exception as e:
1 ignored issue
show
Best Practice introduced by
Catching very general exceptions such as Exception is usually not recommended.

Generally, you would want to handle very specific errors in the exception handler. This ensure that you do not hide other types of errors which should be fixed.

So, unless you specifically plan to handle any error, consider adding a more specific exception.

Loading history...
187
            # Log the exception.
188
            self._log_exception(row, e)
189
            # Keep track of the number of errors.
190
            self._count_error += 1
191
            # This row must be parked.
192
            park = True
193
            park_info = 'Exception'
194
            # Keep our IDE happy.
195
            output = []
196
197
        if park:
198
            # Park the row.
199
            park = [park_info, self._source_reader.get_source_name, self._source_reader.row_number] + row
200
            self._parked_writer.put_row(park)
201
            self._count_park += 1
202
        else:
203
            # Write the technical keys and measures to the output file.
204
            self._transformed_writer.put_row(output)
205
            self._count_transform += 1
206
207
    # ------------------------------------------------------------------------------------------------------------------
208
    def _transform_rows(self):
209
        """
210
        Transforms all source rows.
211
        """
212
        row = self._source_reader.get_row()
213
        while row:
214
            self._transform_row_wrapper(row)
215
            row = self._source_reader.get_row()
216
217
    # ------------------------------------------------------------------------------------------------------------------
218
    @abc.abstractmethod
219
    def _transform_row(self, row):
220
        """
221
        Transforms the natural keys of dimensions to technical keys. Returns the transformed row with technical keys.
222
223
        :param list|dict|() row: The source row.
224
225
        :rtype list:
226
        """
227
        raise NotImplementedError()
228
229
    # ------------------------------------------------------------------------------------------------------------------
230
    @abc.abstractmethod
231
    def _load_ignored_rows(self):
232
        """
233
        Loads the ignored rows into the database.
234
235
        :rtype: None
236
        """
237
        raise NotImplementedError()
238
239
    # ------------------------------------------------------------------------------------------------------------------
240
    @abc.abstractmethod
241
    def _load_parked_rows(self):
242
        """
243
        Loads the parked rows into the database.
244
245
        :rtype: None
246
        """
247
        raise NotImplementedError()
248
249
    # ------------------------------------------------------------------------------------------------------------------
250
    @abc.abstractmethod
251
    def _load_transformed_rows(self):
252
        """
253
        Loads the successfully transformed rows into the database.
254
255
        :rtype: None
256
        """
257
        raise NotImplementedError()
258
259
    # ------------------------------------------------------------------------------------------------------------------
260
    def _log_statistics(self):
261
        """
262
        Log statistics about the number of rows and number of rows per second.
263
        """
264
        rows_per_second_trans = self._count_total / (self._time1 - self._time0)
265
        rows_per_second_load = self._count_transform / (self._time2 - self._time1)
266
        rows_per_second_overall = self._count_total / (self._time3 - self._time0)
267
268
        self._log('Number of rows processed            : {0:d}'.format(self._count_total))
269
        self._log('Number of rows transformed          : {0:d}'.format(self._count_transform))
270
        self._log('Number of rows ignored              : {0:d}'.format(self._count_ignore))
271
        self._log('Number of rows parked               : {0:d}'.format(self._count_park))
272
        self._log('Number of errors                    : {0:d}'.format(self._count_error))
273
        self._log('Number of rows per second processed : {0:d}'.format(int(rows_per_second_trans)))
274
        self._log('Number of rows per second loaded    : {0:d}'.format(int(rows_per_second_load)))
275
        self._log('Number of rows per second overall   : {0:d}'.format(int(rows_per_second_overall)))
276
277
    # ------------------------------------------------------------------------------------------------------------------
278
    def transform_source_rows(self):
279
        """
280
        Transforms the rows for the source system into (partial) dimensional data.
281
        """
282
        # Start timer for overall progress.
283
        self._time0 = time.perf_counter()
284
285
        # Transform all source rows.
286
        with self._source_reader:
287
            with self._transformed_writer:
288
                with self._parked_writer:
289
                    with self._ignored_write:
290
                        self._transform_rows()
291
292
        # Time end of transformation.
293
        self._time1 = time.perf_counter()
294
295
        # Load transformed rows into the fact table.
296
        self._load_transformed_rows()
297
298
        # Time end of loading transformed rows.
299
        self._time2 = time.perf_counter()
300
301
        # Load parked and ignored rows into the parked and ignored rows.
302
        self._load_ignored_rows()
303
        self._load_parked_rows()
304
305
        # Time end of loading parked and ignored rows.
306
        self._time3 = time.perf_counter()
307
308
        # Show statistics about number of rows and performance.
309
        self._log_statistics()
310
311
# ----------------------------------------------------------------------------------------------------------------------
312