Passed
Push — master ( 34ec8c...79490f )
by Guangyu
19:00 queued 11s
created

acquisition   F

Complexity

Total Complexity 81

Size/Duplication

Total Lines 443
Duplicated Lines 36.12 %

Importance

Changes 0
Metric Value
wmc 81
eloc 301
dl 160
loc 443
rs 2
c 0
b 0
f 0

1 Function

Rating   Name   Duplication   Size   Complexity  
F process() 160 421 81

How to fix   Duplicated Code    Complexity   

Duplicated Code

Duplicate code is one of the most pungent code smells. A rule that is often used is to re-structure code once it is duplicated in three or more places.

Common duplication problems, and corresponding solutions are:

Complexity

 Tip:   Before tackling complexity, make sure that you eliminate any duplication first. This often can reduce the size of classes significantly.

Complex classes like acquisition often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
import json
2
import mysql.connector
3
import time
4
import math
5
from datetime import datetime
6
import telnetlib
7
from modbus_tk import modbus_tcp
8
import config
9
from decimal import Decimal
10
from byte_swap import byte_swap_32_bit, byte_swap_64_bit
11
12
13
########################################################################################################################
14
# Acquisition Procedures
15
# Step 1: telnet the host
16
# Step 2: Get point list
17
# Step 3: Read point values from Modbus slaves
18
# Step 4: Bulk insert point values and update latest values in historical database
19
########################################################################################################################
20
21
22
def process(logger, data_source_id, host, port):
23
24
    while True:
25
        # the outermost while loop
26
27
        ################################################################################################################
28
        # Step 1: telnet the host
29
        ################################################################################################################
30
        try:
31
            telnetlib.Telnet(host, port, 10)
32
            print("Succeeded to telnet %s:%s in acquisition process ", host, port)
33
        except Exception as e:
34
            logger.error("Failed to telnet %s:%s in acquisition process: %s  ", host, port, str(e))
35
            time.sleep(300)
36
            continue
37
38
        ################################################################################################################
39
        # Step 2: Get point list
40
        ################################################################################################################
41
        cnx_system_db = None
42
        cursor_system_db = None
43
        try:
44
            cnx_system_db = mysql.connector.connect(**config.myems_system_db)
45
            cursor_system_db = cnx_system_db.cursor()
46
        except Exception as e:
47
            logger.error("Error in step 2.1 of acquisition process " + str(e))
48
            if cursor_system_db:
49
                cursor_system_db.close()
50
            if cnx_system_db:
51
                cnx_system_db.close()
52
            # sleep and then continue the outermost loop to reload points
53
            time.sleep(60)
54
            continue
55
56
        try:
57
            query = (" SELECT id, name, object_type, is_trend, ratio, address "
58
                     " FROM tbl_points "
59
                     " WHERE data_source_id = %s AND is_virtual = FALSE "
60
                     " ORDER BY id ")
61
            cursor_system_db.execute(query, (data_source_id, ))
62
            rows_point = cursor_system_db.fetchall()
63
        except Exception as e:
64
            logger.error("Error in step 2.2 of acquisition process: " + str(e))
65
            # sleep several minutes and continue the outer loop to reload points
66
            time.sleep(60)
67
            continue
68
69
        if rows_point is None or len(rows_point) == 0:
70
            # there is no points for this data source
71
            logger.error("Point Not Found in Data Source (ID = %s), acquisition process terminated ", data_source_id)
72
            # sleep 60 seconds and go back to the begin of outermost while loop to reload points
73
            time.sleep(60)
74
            continue
75
76
        # There are points for this data source
77
        point_list = list()
78
        for row_point in rows_point:
79
            point_list.append({"id": row_point[0],
80
                               "name": row_point[1],
81
                               "object_type": row_point[2],
82
                               "is_trend": row_point[3],
83
                               "ratio": row_point[4],
84
                               "address": row_point[5]})
85
86
        ################################################################################################################
87
        # Step 3: Read point values from Modbus slaves
88
        ################################################################################################################
89
        # connect to historical database
90
        cnx_historical_db = None
91
        cursor_historical_db = None
92
        try:
93
            cnx_historical_db = mysql.connector.connect(**config.myems_historical_db)
94
            cursor_historical_db = cnx_historical_db.cursor()
95
        except Exception as e:
96
            logger.error("Error in step 3.1 of acquisition process " + str(e))
97
            if cursor_historical_db:
98
                cursor_historical_db.close()
99
            if cnx_historical_db:
100
                cnx_historical_db.close()
101
            # sleep 60 seconds and go back to the begin of outermost while loop to reload points
102
            time.sleep(60)
103
            continue
104
105
        # connect to the Modbus data source
106
        master = modbus_tcp.TcpMaster(host=host, port=port, timeout_in_sec=5.0)
107
        master.set_timeout(5.0)
108
        print("Ready to connect to %s:%s ", host, port)
109
110
        # inner loop to read all point values within a configurable period
111
        while True:
112
            is_modbus_tcp_timed_out = False
113
            energy_value_list = list()
114
            analog_value_list = list()
115
            digital_value_list = list()
116
117
            # foreach point loop
118
            for point in point_list:
119
                try:
120
                    address = json.loads(point['address'])
121
                except Exception as e:
122
                    logger.error("Error in step 3.2 of acquisition process: \n"
123
                                 "Invalid point address in JSON " + str(e))
124
                    continue
125
126
                if 'slave_id' not in address.keys() \
127
                    or 'function_code' not in address.keys() \
128
                    or 'offset' not in address.keys() \
129
                    or 'number_of_registers' not in address.keys() \
130
                    or 'format' not in address.keys() \
131
                    or 'byte_swap' not in address.keys() \
132
                    or address['slave_id'] < 1 \
133
                    or address['function_code'] not in (1, 2, 3, 4) \
134
                    or address['offset'] < 0 \
135
                    or address['number_of_registers'] < 0 \
136
                    or len(address['format']) < 1 \
137
                        or not isinstance(address['byte_swap'], bool):
138
139
                    logger.error('Data Source(ID=%s), Point(ID=%s) Invalid address data.',
140
                                 data_source_id, point['id'])
141
                    # invalid point is found,
142
                    # and go on the foreach point loop to process next point
143
                    continue
144
145
                # read register value for valid point
146
                try:
147
                    result = master.execute(slave=address['slave_id'],
148
                                            function_code=address['function_code'],
149
                                            starting_address=address['offset'],
150
                                            quantity_of_x=address['number_of_registers'],
151
                                            data_format=address['format'])
152
                except Exception as e:
153
                    logger.error(str(e) +
154
                                 " host:" + host + " port:" + str(port) +
155
                                 " slave_id:" + str(address['slave_id']) +
156
                                 " function_code:" + str(address['function_code']) +
157
                                 " starting_address:" + str(address['offset']) +
158
                                 " quantity_of_x:" + str(address['number_of_registers']) +
159
                                 " data_format:" + str(address['format']) +
160
                                 " byte_swap:" + str(address['byte_swap']))
161
162
                    if 'timed out' in str(e):
163
                        is_modbus_tcp_timed_out = True
164
                        # timeout error, break the foreach point loop
165
                        break
166
                    else:
167
                        # exception occurred when read register value,
168
                        # and go on the foreach point loop to process next point
169
                        continue
170
171
                if result is None or not isinstance(result, tuple) or len(result) == 0:
172
                    # invalid result,
173
                    # and go on the foreach point loop to process next point
174
                    logger.error("Error in step 3.3 of acquisition process: \n"
175
                                 " invalid result: None "
176
                                 " for point_id: " + str(point['id']))
177
                    continue
178
179
                if not isinstance(result[0], float) and not isinstance(result[0], int) or math.isnan(result[0]):
180
                    # invalid result, and go on the foreach point loop to process next point
181
                    logger.error(" Error in step 3.4 of acquisition process:\n"
182
                                 " invalid result: not float and not int or not a number "
183
                                 " for point_id: " + str(point['id']))
184
                    continue
185
186
                if address['byte_swap']:
187
                    if address['number_of_registers'] == 2:
188
                        value = byte_swap_32_bit(result[0])
189
                    elif address['number_of_registers'] == 4:
190
                        value = byte_swap_64_bit(result[0])
191
                    else:
192
                        value = result[0]
193
                else:
194
                    value = result[0]
195
196
                if point['object_type'] == 'ANALOG_VALUE':
197
                    analog_value_list.append({'data_source_id': data_source_id,
198
                                              'point_id': point['id'],
199
                                              'is_trend': point['is_trend'],
200
                                              'value': Decimal(value) * point['ratio']})
201
                elif point['object_type'] == 'ENERGY_VALUE':
202
                    energy_value_list.append({'data_source_id': data_source_id,
203
                                              'point_id': point['id'],
204
                                              'is_trend': point['is_trend'],
205
                                              'value': Decimal(value) * point['ratio']})
206
                elif point['object_type'] == 'DIGITAL_VALUE':
207
                    digital_value_list.append({'data_source_id': data_source_id,
208
                                               'point_id': point['id'],
209
                                               'is_trend': point['is_trend'],
210
                                               'value': int(value) * int(point['ratio'])})
211
212
            # end of foreach point loop
213
214
            if is_modbus_tcp_timed_out:
215
                # Modbus TCP connection timeout error
216
217
                # destroy the Modbus master
218
                del master
219
220
                # close the connection to database
221
                if cursor_historical_db:
222
                    cursor_historical_db.close()
223
                if cnx_historical_db:
224
                    cnx_historical_db.close()
225
226
                # break the inner while loop to reconnect the Modbus device
227
                time.sleep(60)
228
                break
229
230
            ############################################################################################################
231
            # Step 4: Bulk insert point values and update latest values in historical database
232
            ############################################################################################################
233
            # check the connection to the Historical Database
234
            if not cnx_historical_db.is_connected():
235
                try:
236
                    cnx_historical_db = mysql.connector.connect(**config.myems_historical_db)
237
                    cursor_historical_db = cnx_historical_db.cursor()
238
                except Exception as e:
239
                    logger.error("Error in step 4.1 of acquisition process: " + str(e))
240
                    if cursor_historical_db:
241
                        cursor_historical_db.close()
242
                    if cnx_historical_db:
243
                        cnx_historical_db.close()
244
                    # sleep some seconds
245
                    time.sleep(60)
246
                    continue
247
248
            # check the connection to the System Database
249
            if not cnx_system_db.is_connected():
250
                try:
251
                    cnx_system_db = mysql.connector.connect(**config.myems_system_db)
252
                    cursor_system_db = cnx_system_db.cursor()
253
                except Exception as e:
254
                    logger.error("Error in step 4.2 of acquisition process: " + str(e))
255
                    if cursor_system_db:
256
                        cursor_system_db.close()
257
                    if cnx_system_db:
258
                        cnx_system_db.close()
259
                    # sleep some seconds
260
                    time.sleep(60)
261
                    continue
262
263
            current_datetime_utc = datetime.utcnow()
264
            # bulk insert values into historical database within a period
265
            # update latest values in the meanwhile
266 View Code Duplication
            if len(analog_value_list) > 0:
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
267
                add_values = (" INSERT INTO tbl_analog_value (point_id, utc_date_time, actual_value) "
268
                              " VALUES  ")
269
                trend_value_count = 0
270
271
                for point_value in analog_value_list:
272
                    if point_value['is_trend']:
273
                        add_values += " (" + str(point_value['point_id']) + ","
274
                        add_values += "'" + current_datetime_utc.isoformat() + "',"
275
                        add_values += str(point_value['value']) + "), "
276
                        trend_value_count += 1
277
278
                if trend_value_count > 0:
279
                    try:
280
                        # trim ", " at the end of string and then execute
281
                        cursor_historical_db.execute(add_values[:-2])
282
                        cnx_historical_db.commit()
283
                    except Exception as e:
284
                        logger.error("Error in step 4.3.1 of acquisition process " + str(e))
285
                        # ignore this exception
286
                        pass
287
288
                # update tbl_analog_value_latest
289
                delete_values = " DELETE FROM tbl_analog_value_latest WHERE point_id IN ( "
290
                latest_values = (" INSERT INTO tbl_analog_value_latest (point_id, utc_date_time, actual_value) "
291
                                 " VALUES  ")
292
                latest_value_count = 0
293
294
                for point_value in analog_value_list:
295
                    delete_values += str(point_value['point_id']) + ","
296
                    latest_values += " (" + str(point_value['point_id']) + ","
297
                    latest_values += "'" + current_datetime_utc.isoformat() + "',"
298
                    latest_values += str(point_value['value']) + "), "
299
                    latest_value_count += 1
300
301
                if latest_value_count > 0:
302
                    try:
303
                        # replace "," at the end of string with ")"
304
                        cursor_historical_db.execute(delete_values[:-1] + ")")
305
                        cnx_historical_db.commit()
306
                    except Exception as e:
307
                        logger.error("Error in step 4.3.2 of acquisition process " + str(e))
308
                        # ignore this exception
309
                        pass
310
311
                    try:
312
                        # trim ", " at the end of string and then execute
313
                        cursor_historical_db.execute(latest_values[:-2])
314
                        cnx_historical_db.commit()
315
                    except Exception as e:
316
                        logger.error("Error in step 4.3.3 of acquisition process " + str(e))
317
                        # ignore this exception
318
                        pass
319
320 View Code Duplication
            if len(energy_value_list) > 0:
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
321
                add_values = (" INSERT INTO tbl_energy_value (point_id, utc_date_time, actual_value) "
322
                              " VALUES  ")
323
                trend_value_count = 0
324
325
                for point_value in energy_value_list:
326
                    if point_value['is_trend']:
327
                        add_values += " (" + str(point_value['point_id']) + ","
328
                        add_values += "'" + current_datetime_utc.isoformat() + "',"
329
                        add_values += str(point_value['value']) + "), "
330
                        trend_value_count += 1
331
332
                if trend_value_count > 0:
333
                    try:
334
                        # trim ", " at the end of string and then execute
335
                        cursor_historical_db.execute(add_values[:-2])
336
                        cnx_historical_db.commit()
337
                    except Exception as e:
338
                        logger.error("Error in step 4.4.1 of acquisition process: " + str(e))
339
                        # ignore this exception
340
                        pass
341
342
                # update tbl_energy_value_latest
343
                delete_values = " DELETE FROM tbl_energy_value_latest WHERE point_id IN ( "
344
                latest_values = (" INSERT INTO tbl_energy_value_latest (point_id, utc_date_time, actual_value) "
345
                                 " VALUES  ")
346
347
                latest_value_count = 0
348
                for point_value in energy_value_list:
349
                    delete_values += str(point_value['point_id']) + ","
350
                    latest_values += " (" + str(point_value['point_id']) + ","
351
                    latest_values += "'" + current_datetime_utc.isoformat() + "',"
352
                    latest_values += str(point_value['value']) + "), "
353
                    latest_value_count += 1
354
355
                if latest_value_count > 0:
356
                    try:
357
                        # replace "," at the end of string with ")"
358
                        cursor_historical_db.execute(delete_values[:-1] + ")")
359
                        cnx_historical_db.commit()
360
361
                    except Exception as e:
362
                        logger.error("Error in step 4.4.2 of acquisition process " + str(e))
363
                        # ignore this exception
364
                        pass
365
366
                    try:
367
                        # trim ", " at the end of string and then execute
368
                        cursor_historical_db.execute(latest_values[:-2])
369
                        cnx_historical_db.commit()
370
371
                    except Exception as e:
372
                        logger.error("Error in step 4.4.3 of acquisition process " + str(e))
373
                        # ignore this exception
374
                        pass
375
376 View Code Duplication
            if len(digital_value_list) > 0:
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
377
                add_values = (" INSERT INTO tbl_digital_value (point_id, utc_date_time, actual_value) "
378
                              " VALUES  ")
379
                trend_value_count = 0
380
381
                for point_value in digital_value_list:
382
                    if point_value['is_trend']:
383
                        add_values += " (" + str(point_value['point_id']) + ","
384
                        add_values += "'" + current_datetime_utc.isoformat() + "',"
385
                        add_values += str(point_value['value']) + "), "
386
                        trend_value_count += 1
387
388
                if trend_value_count > 0:
389
                    try:
390
                        # trim ", " at the end of string and then execute
391
                        cursor_historical_db.execute(add_values[:-2])
392
                        cnx_historical_db.commit()
393
                    except Exception as e:
394
                        logger.error("Error in step 4.5.1 of acquisition process: " + str(e))
395
                        # ignore this exception
396
                        pass
397
398
                # update tbl_digital_value_latest
399
                delete_values = " DELETE FROM tbl_digital_value_latest WHERE point_id IN ( "
400
                latest_values = (" INSERT INTO tbl_digital_value_latest (point_id, utc_date_time, actual_value) "
401
                                 " VALUES  ")
402
                latest_value_count = 0
403
                for point_value in digital_value_list:
404
                    delete_values += str(point_value['point_id']) + ","
405
                    latest_values += " (" + str(point_value['point_id']) + ","
406
                    latest_values += "'" + current_datetime_utc.isoformat() + "',"
407
                    latest_values += str(point_value['value']) + "), "
408
                    latest_value_count += 1
409
410
                if latest_value_count > 0:
411
                    try:
412
                        # replace "," at the end of string with ")"
413
                        cursor_historical_db.execute(delete_values[:-1] + ")")
414
                        cnx_historical_db.commit()
415
                    except Exception as e:
416
                        logger.error("Error in step 4.5.2 of acquisition process " + str(e))
417
                        # ignore this exception
418
                        pass
419
420
                    try:
421
                        # trim ", " at the end of string and then execute
422
                        cursor_historical_db.execute(latest_values[:-2])
423
                        cnx_historical_db.commit()
424
                    except Exception as e:
425
                        logger.error("Error in step 4.5.3 of acquisition process " + str(e))
426
                        # ignore this exception
427
                        pass
428
429
            # update data source last seen datetime
430
            update_row = (" UPDATE tbl_data_sources "
431
                          " SET last_seen_datetime_utc = '" + current_datetime_utc.isoformat() + "' "
432
                          " WHERE id = %s ")
433
            try:
434
                cursor_system_db.execute(update_row, (data_source_id, ))
435
                cnx_system_db.commit()
436
            except Exception as e:
437
                logger.error("Error in step 4.6 of acquisition process " + str(e))
438
                # ignore this exception
439
                pass
440
441
            # sleep and continue the next iteration of the inner while loop
442
            time.sleep(config.interval_in_seconds)
443
444
        # end of inner while loop
445
446
    # end of outermost while loop
447