acquisition   F
last analyzed

Complexity

Total Complexity 78

Size/Duplication

Total Lines 418
Duplicated Lines 38.28 %

Importance

Changes 0
Metric Value
wmc 78
eloc 285
dl 160
loc 418
rs 2.16
c 0
b 0
f 0

1 Function

Rating   Name   Duplication   Size   Complexity  
F process() 160 396 78

How to fix   Duplicated Code    Complexity   

Duplicated Code

Duplicate code is one of the most pungent code smells. A rule that is often used is to re-structure code once it is duplicated in three or more places.

Common duplication problems, and corresponding solutions are:

Complexity

 Tip:   Before tackling complexity, make sure that you eliminate any duplication first. This often can reduce the size of classes significantly.

Complex classes like acquisition often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
import json
2
import mysql.connector
3
import time
4
import math
5
from datetime import datetime
6
import telnetlib
7
from modbus_tk import modbus_tcp
8
import config
9
from decimal import Decimal
10
from byte_swap import byte_swap_32_bit, byte_swap_64_bit
11
12
13
########################################################################################################################
14
# Acquisition Procedures
15
# Step 1: telnet hosts
16
# Step 2: Get point list
17
# Step 3: Read point values from Modbus slaves
18
# Step 4: Bulk insert point values and update latest values in historical database
19
########################################################################################################################
20
21
22
def process(logger, data_source_id, host, port):
23
24
    while True:
25
        # the outermost while loop
26
27
        ################################################################################################################
28
        # Step 1: telnet hosts
29
        ################################################################################################################
30
        try:
31
            telnetlib.Telnet(host, port, 10)
32
            print("Succeeded to telnet %s:%s in acquisition process ", host, port)
33
        except Exception as e:
34
            logger.error("Failed to telnet %s:%s in acquisition process: %s  ", host, port, str(e))
35
            time.sleep(300)
36
            continue
37
38
        ################################################################################################################
39
        # Step 2: Get point list
40
        ################################################################################################################
41
        cnx_system_db = None
42
        cursor_system_db = None
43
        try:
44
            cnx_system_db = mysql.connector.connect(**config.myems_system_db)
45
            cursor_system_db = cnx_system_db.cursor()
46
        except Exception as e:
47
            logger.error("Error in step 2.1 of acquisition process " + str(e))
48
            if cursor_system_db:
49
                cursor_system_db.close()
50
            if cnx_system_db:
51
                cnx_system_db.close()
52
            # sleep and then continue the outermost loop to reload points
53
            time.sleep(60)
54
            continue
55
56
        try:
57
            query = (" SELECT id, name, object_type, is_trend, ratio, address "
58
                     " FROM tbl_points "
59
                     " WHERE data_source_id = %s "
60
                     " ORDER BY id ")
61
            cursor_system_db.execute(query, (data_source_id, ))
62
            rows_point = cursor_system_db.fetchall()
63
        except Exception as e:
64
            logger.error("Error in step 2.2 of acquisition process: " + str(e))
65
            # sleep several minutes and continue the outer loop to reload points
66
            time.sleep(60)
67
            continue
68
        finally:
69
            if cursor_system_db:
70
                cursor_system_db.close()
71
            if cnx_system_db:
72
                cnx_system_db.close()
73
74
        if rows_point is None or len(rows_point) == 0:
0 ignored issues
show
introduced by
The variable rows_point does not seem to be defined for all execution paths.
Loading history...
75
            # there is no points for this data source
76
            logger.error("Point Not Found in Data Source (ID = %s), acquisition process terminated ", data_source_id)
77
            # sleep 60 seconds and go back to the begin of outermost while loop to reload points
78
            time.sleep(60)
79
            continue
80
81
        # There are points for this data source
82
        point_list = list()
83
        for row_point in rows_point:
84
            point_list.append({"id": row_point[0],
85
                               "name": row_point[1],
86
                               "object_type": row_point[2],
87
                               "is_trend": row_point[3],
88
                               "ratio": row_point[4],
89
                               "address": row_point[5]})
90
91
        ################################################################################################################
92
        # Step 3: Read point values from Modbus slaves
93
        ################################################################################################################
94
        # connect to historical database
95
        cnx_historical_db = None
96
        cursor_historical_db = None
97
        try:
98
            cnx_historical_db = mysql.connector.connect(**config.myems_historical_db)
99
            cursor_historical_db = cnx_historical_db.cursor()
100
        except Exception as e:
101
            logger.error("Error in step 3.1 of acquisition process " + str(e))
102
            if cursor_historical_db:
103
                cursor_historical_db.close()
104
            if cnx_historical_db:
105
                cnx_historical_db.close()
106
            # sleep 60 seconds and go back to the begin of outermost while loop to reload points
107
            time.sleep(60)
108
            continue
109
110
        # connect to the Modbus data source
111
        master = modbus_tcp.TcpMaster(host=host, port=port, timeout_in_sec=5.0)
112
        master.set_timeout(5.0)
113
        print("Ready to connect to %s:%s ", host, port)
114
115
        # inner loop to read all point values within a configurable period
116
        while True:
117
            is_modbus_tcp_timed_out = False
118
            energy_value_list = list()
119
            analog_value_list = list()
120
            digital_value_list = list()
121
122
            # foreach point loop
123
            for point in point_list:
124
                try:
125
                    address = json.loads(point['address'])
126
                except Exception as e:
127
                    logger.error("Error in step 3.2 of acquisition process: \n"
128
                                 "Invalid point address in JSON " + str(e))
129
                    continue
130
131
                if 'slave_id' not in address.keys() \
132
                    or 'function_code' not in address.keys() \
133
                    or 'offset' not in address.keys() \
134
                    or 'number_of_registers' not in address.keys() \
135
                    or 'format' not in address.keys() \
136
                    or 'byte_swap' not in address.keys() \
137
                    or address['slave_id'] < 1 \
138
                    or address['function_code'] not in (1, 2, 3, 4) \
139
                    or address['offset'] < 0 \
140
                    or address['number_of_registers'] < 0 \
141
                    or len(address['format']) < 1 \
142
                        or not isinstance(address['byte_swap'], bool):
143
144
                    logger.error('Data Source(ID=%s), Point(ID=%s) Invalid address data.',
145
                                 data_source_id, point['id'])
146
                    # invalid point is found, and go on the foreach point loop to process next point
147
                    continue
148
149
                # read register value for valid point
150
                try:
151
                    result = master.execute(slave=address['slave_id'],
152
                                            function_code=address['function_code'],
153
                                            starting_address=address['offset'],
154
                                            quantity_of_x=address['number_of_registers'],
155
                                            data_format=address['format'])
156
                except Exception as e:
157
                    logger.error(str(e) +
158
                                 " host:" + host + " port:" + str(port) +
159
                                 " slave_id:" + str(address['slave_id']) +
160
                                 " function_code:" + str(address['function_code']) +
161
                                 " starting_address:" + str(address['offset']) +
162
                                 " quantity_of_x:" + str(address['number_of_registers']) +
163
                                 " data_format:" + str(address['format']) +
164
                                 " byte_swap:" + str(address['byte_swap']))
165
166
                    if 'timed out' in str(e):
167
                        is_modbus_tcp_timed_out = True
168
                        # timeout error, break the foreach point loop
169
                        break
170
                    else:
171
                        # exception occurred when read register value, go on the foreach point loop
172
                        continue
173
174
                if result is None or not isinstance(result, tuple) or len(result) == 0:
175
                    # invalid result, and go on the foreach point loop to process next point
176
                    logger.error("Error in step 3.3 of acquisition process: \n"
177
                                 " invalid result: None "
178
                                 " for point_id: " + str(point['id']))
179
                    continue
180
181
                if not isinstance(result[0], float) and not isinstance(result[0], int) or math.isnan(result[0]):
182
                    # invalid result, and go on the foreach point loop to process next point
183
                    logger.error(" Error in step 3.4 of acquisition process:\n"
184
                                 " invalid result: not float and not int or not a number "
185
                                 " for point_id: " + str(point['id']))
186
                    continue
187
188
                if address['byte_swap']:
189
                    if address['number_of_registers'] == 2:
190
                        value = byte_swap_32_bit(result[0])
191
                    elif address['number_of_registers'] == 4:
192
                        value = byte_swap_64_bit(result[0])
193
                    else:
194
                        value = result[0]
195
                else:
196
                    value = result[0]
197
198
                if point['object_type'] == 'ANALOG_VALUE':
199
                    analog_value_list.append({'data_source_id': data_source_id,
200
                                              'point_id': point['id'],
201
                                              'is_trend': point['is_trend'],
202
                                              'value': Decimal(value) * point['ratio']})
203
                elif point['object_type'] == 'ENERGY_VALUE':
204
                    energy_value_list.append({'data_source_id': data_source_id,
205
                                              'point_id': point['id'],
206
                                              'is_trend': point['is_trend'],
207
                                              'value': Decimal(value) * point['ratio']})
208
                elif point['object_type'] == 'DIGITAL_VALUE':
209
                    digital_value_list.append({'data_source_id': data_source_id,
210
                                               'point_id': point['id'],
211
                                               'is_trend': point['is_trend'],
212
                                               'value': int(value) * int(point['ratio'])})
213
214
            # end of foreach point loop
215
216
            if is_modbus_tcp_timed_out:
217
                # Modbus TCP connection timeout error
218
219
                # destroy the Modbus master
220
                del master
221
222
                # close the connection to database
223
                if cursor_historical_db:
224
                    cursor_historical_db.close()
225
                if cnx_historical_db:
226
                    cnx_historical_db.close()
227
228
                # break the inner while loop to reconnect the Modbus device
229
                time.sleep(60)
230
                break
231
232
            ############################################################################################################
233
            # Step 4: Bulk insert point values and update latest values in historical database
234
            ############################################################################################################
235
            # check the connection to the Historical Database
236
            if not cnx_historical_db.is_connected():
237
                try:
238
                    cnx_historical_db = mysql.connector.connect(**config.myems_historical_db)
239
                    cursor_historical_db = cnx_historical_db.cursor()
240
                except Exception as e:
241
                    logger.error("Error in step 4.1 of acquisition process: " + str(e))
242
                    if cursor_historical_db:
243
                        cursor_historical_db.close()
244
                    if cnx_historical_db:
245
                        cnx_historical_db.close()
246
                    # sleep some seconds
247
                    time.sleep(60)
248
                    continue
249
250
            current_datetime_utc = datetime.utcnow()
251
            # bulk insert values into historical database within a period
252
            # update latest values in the meanwhile
253 View Code Duplication
            if len(analog_value_list) > 0:
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
254
                add_values = (" INSERT INTO tbl_analog_value (point_id, utc_date_time, actual_value) "
255
                              " VALUES  ")
256
                trend_value_count = 0
257
258
                for point_value in analog_value_list:
259
                    if point_value['is_trend']:
260
                        add_values += " (" + str(point_value['point_id']) + ","
261
                        add_values += "'" + current_datetime_utc.isoformat() + "',"
262
                        add_values += str(point_value['value']) + "), "
263
                        trend_value_count += 1
264
265
                if trend_value_count > 0:
266
                    try:
267
                        # trim ", " at the end of string and then execute
268
                        cursor_historical_db.execute(add_values[:-2])
269
                        cnx_historical_db.commit()
270
                    except Exception as e:
271
                        logger.error("Error in step 4.2.1 of acquisition process " + str(e))
272
                        # ignore this exception
273
                        pass
274
275
                # update tbl_analog_value_latest
276
                delete_values = " DELETE FROM tbl_analog_value_latest WHERE point_id IN ( "
277
                latest_values = (" INSERT INTO tbl_analog_value_latest (point_id, utc_date_time, actual_value) "
278
                                 " VALUES  ")
279
                latest_value_count = 0
280
281
                for point_value in analog_value_list:
282
                    delete_values += str(point_value['point_id']) + ","
283
                    latest_values += " (" + str(point_value['point_id']) + ","
284
                    latest_values += "'" + current_datetime_utc.isoformat() + "',"
285
                    latest_values += str(point_value['value']) + "), "
286
                    latest_value_count += 1
287
288
                if latest_value_count > 0:
289
                    try:
290
                        # replace "," at the end of string with ")"
291
                        cursor_historical_db.execute(delete_values[:-1] + ")")
292
                        cnx_historical_db.commit()
293
                    except Exception as e:
294
                        logger.error("Error in step 4.2.2 of acquisition process " + str(e))
295
                        # ignore this exception
296
                        pass
297
298
                    try:
299
                        # trim ", " at the end of string and then execute
300
                        cursor_historical_db.execute(latest_values[:-2])
301
                        cnx_historical_db.commit()
302
                    except Exception as e:
303
                        logger.error("Error in step 4.2.3 of acquisition process " + str(e))
304
                        # ignore this exception
305
                        pass
306
307 View Code Duplication
            if len(energy_value_list) > 0:
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
308
                add_values = (" INSERT INTO tbl_energy_value (point_id, utc_date_time, actual_value) "
309
                              " VALUES  ")
310
                trend_value_count = 0
311
312
                for point_value in energy_value_list:
313
                    if point_value['is_trend']:
314
                        add_values += " (" + str(point_value['point_id']) + ","
315
                        add_values += "'" + current_datetime_utc.isoformat() + "',"
316
                        add_values += str(point_value['value']) + "), "
317
                        trend_value_count += 1
318
319
                if trend_value_count > 0:
320
                    try:
321
                        # trim ", " at the end of string and then execute
322
                        cursor_historical_db.execute(add_values[:-2])
323
                        cnx_historical_db.commit()
324
                    except Exception as e:
325
                        logger.error("Error in step 4.3.1 of acquisition process: " + str(e))
326
                        # ignore this exception
327
                        pass
328
329
                # update tbl_energy_value_latest
330
                delete_values = " DELETE FROM tbl_energy_value_latest WHERE point_id IN ( "
331
                latest_values = (" INSERT INTO tbl_energy_value_latest (point_id, utc_date_time, actual_value) "
332
                                 " VALUES  ")
333
334
                latest_value_count = 0
335
                for point_value in energy_value_list:
336
                    delete_values += str(point_value['point_id']) + ","
337
                    latest_values += " (" + str(point_value['point_id']) + ","
338
                    latest_values += "'" + current_datetime_utc.isoformat() + "',"
339
                    latest_values += str(point_value['value']) + "), "
340
                    latest_value_count += 1
341
342
                if latest_value_count > 0:
343
                    try:
344
                        # replace "," at the end of string with ")"
345
                        cursor_historical_db.execute(delete_values[:-1] + ")")
346
                        cnx_historical_db.commit()
347
348
                    except Exception as e:
349
                        logger.error("Error in step 4.3.2 of acquisition process " + str(e))
350
                        # ignore this exception
351
                        pass
352
353
                    try:
354
                        # trim ", " at the end of string and then execute
355
                        cursor_historical_db.execute(latest_values[:-2])
356
                        cnx_historical_db.commit()
357
358
                    except Exception as e:
359
                        logger.error("Error in step 4.3.3 of acquisition process " + str(e))
360
                        # ignore this exception
361
                        pass
362
363 View Code Duplication
            if len(digital_value_list) > 0:
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
364
                add_values = (" INSERT INTO tbl_digital_value (point_id, utc_date_time, actual_value) "
365
                              " VALUES  ")
366
                trend_value_count = 0
367
368
                for point_value in digital_value_list:
369
                    if point_value['is_trend']:
370
                        add_values += " (" + str(point_value['point_id']) + ","
371
                        add_values += "'" + current_datetime_utc.isoformat() + "',"
372
                        add_values += str(point_value['value']) + "), "
373
                        trend_value_count += 1
374
375
                if trend_value_count > 0:
376
                    try:
377
                        # trim ", " at the end of string and then execute
378
                        cursor_historical_db.execute(add_values[:-2])
379
                        cnx_historical_db.commit()
380
                    except Exception as e:
381
                        logger.error("Error in step 4.4.1 of acquisition process: " + str(e))
382
                        # ignore this exception
383
                        pass
384
385
                # update tbl_digital_value_latest
386
                delete_values = " DELETE FROM tbl_digital_value_latest WHERE point_id IN ( "
387
                latest_values = (" INSERT INTO tbl_digital_value_latest (point_id, utc_date_time, actual_value) "
388
                                 " VALUES  ")
389
                latest_value_count = 0
390
                for point_value in digital_value_list:
391
                    delete_values += str(point_value['point_id']) + ","
392
                    latest_values += " (" + str(point_value['point_id']) + ","
393
                    latest_values += "'" + current_datetime_utc.isoformat() + "',"
394
                    latest_values += str(point_value['value']) + "), "
395
                    latest_value_count += 1
396
397
                if latest_value_count > 0:
398
                    try:
399
                        # replace "," at the end of string with ")"
400
                        cursor_historical_db.execute(delete_values[:-1] + ")")
401
                        cnx_historical_db.commit()
402
                    except Exception as e:
403
                        logger.error("Error in step 4.4.2 of acquisition process " + str(e))
404
                        # ignore this exception
405
                        pass
406
407
                    try:
408
                        # trim ", " at the end of string and then execute
409
                        cursor_historical_db.execute(latest_values[:-2])
410
                        cnx_historical_db.commit()
411
                    except Exception as e:
412
                        logger.error("Error in step 4.4.3 of acquisition process " + str(e))
413
                        # ignore this exception
414
                        pass
415
416
            # sleep some seconds
417
            time.sleep(config.interval_in_seconds)
418
419
        # end of inner while loop
420
421
    # end of outermost while loop
422