Issues (1588)

myems-modbus-tcp/acquisition.py (3 issues)

1
import os
2
import json
3
import math
4
import telnetlib3
5
import asyncio
6
import time
7
from datetime import datetime
8
from decimal import Decimal
9
import mysql.connector
10
from modbus_tk import modbus_tcp
11
import config
12
from byte_swap import byte_swap_32_bit, byte_swap_64_bit
13
14
15
########################################################################################################################
16
# Check connectivity to the host and port
17
########################################################################################################################
18
async def check_connectivity(host, port):
19
    reader, writer = await telnetlib3.open_connection(host, port)
20
    # Close the connection
21
    writer.close()
22
23
########################################################################################################################
24
# Acquisition Procedures
25
# Step 1: Update process id in database
26
# Step 2: Check connectivity to the host and port
27
# Step 3: Get point list
28
# Step 4: Read point values from Modbus slaves
29
# Step 5: Bulk insert point values and update latest values in historical database
30
########################################################################################################################
31
32
33
def process(logger, data_source_id, host, port, interval_in_seconds):
34
    ####################################################################################################################
35
    # Step 1: Update process id in database
36
    ####################################################################################################################
37
    cnx_system_db = None
38
    cursor_system_db = None
39
    try:
40
        cnx_system_db = mysql.connector.connect(**config.myems_system_db)
41
        cursor_system_db = cnx_system_db.cursor()
42
    except Exception as e:
43
        logger.error("Error in step 1.1 of acquisition process " + str(e))
44
        if cursor_system_db:
45
            cursor_system_db.close()
46
        if cnx_system_db:
47
            cnx_system_db.close()
48
        return
49
50
    update_row = (" UPDATE tbl_data_sources "
51
                  " SET process_id = %s "
52
                  " WHERE id = %s ")
53
    try:
54
        cursor_system_db.execute(update_row, (os.getpid(), data_source_id,))
55
        cnx_system_db.commit()
56
    except Exception as e:
57
        logger.error("Error in step 1.2 of acquisition process " + str(e))
58
        return
59
    finally:
60
        if cursor_system_db:
61
            cursor_system_db.close()
62
        if cnx_system_db:
63
            cnx_system_db.close()
64
65
    while True:
66
        # begin of the outermost while loop
67
        ################################################################################################################
68
        # Step 2: Check connectivity to the host and port
69
        ################################################################################################################
70
        try:
71
            asyncio.run(check_connectivity(host, port))
72
            print("Succeeded to connect %s:%s in acquisition process ", host, port)
73
        except Exception as e:
74
            logger.error("Failed to connect %s:%s in acquisition process: %s  ", host, port, str(e))
75
            # go to begin of the outermost while loop
76
            time.sleep(300)
77
            continue
78
79
        ################################################################################################################
80
        # Step 3: Get point list
81
        ################################################################################################################
82
        cnx_system_db = None
83
        cursor_system_db = None
84
        try:
85
            cnx_system_db = mysql.connector.connect(**config.myems_system_db)
86
            cursor_system_db = cnx_system_db.cursor()
87
        except Exception as e:
88
            logger.error("Error in step 3.1 of acquisition process " + str(e))
89
            if cursor_system_db:
90
                cursor_system_db.close()
91
            if cnx_system_db:
92
                cnx_system_db.close()
93
            # go to begin of the outermost while loop
94
            time.sleep(60)
95
            continue
96
97
        try:
98
            query = (" SELECT id, name, object_type, is_trend, ratio, offset_constant, address "
99
                     " FROM tbl_points "
100
                     " WHERE data_source_id = %s AND is_virtual = 0 "
101
                     " ORDER BY id ")
102
            cursor_system_db.execute(query, (data_source_id,))
103
            rows_point = cursor_system_db.fetchall()
104
        except Exception as e:
105
            logger.error("Error in step 3.2 of acquisition process: " + str(e))
106
            if cursor_system_db:
107
                cursor_system_db.close()
108
            if cnx_system_db:
109
                cnx_system_db.close()
110
            # go to begin of the outermost while loop
111
            time.sleep(60)
112
            continue
113
114
        if rows_point is None or len(rows_point) == 0:
115
            # there is no points for this data source
116
            logger.error("Point Not Found in Data Source (ID = %s), acquisition process terminated ", data_source_id)
117
            if cursor_system_db:
118
                cursor_system_db.close()
119
            if cnx_system_db:
120
                cnx_system_db.close()
121
            # go to begin of the outermost while loop
122
            time.sleep(60)
123
            continue
124
125
        # There are points for this data source
126
        point_list = list()
127
        for row_point in rows_point:
128
            point_list.append({"id": row_point[0],
129
                               "name": row_point[1],
130
                               "object_type": row_point[2],
131
                               "is_trend": row_point[3],
132
                               "ratio": row_point[4],
133
                               "offset_constant": row_point[5],
134
                               "address": row_point[6]})
135
136
        ################################################################################################################
137
        # Step 4: Read point values from Modbus slaves
138
        ################################################################################################################
139
        # connect to historical database
140
        cnx_historical_db = None
141
        cursor_historical_db = None
142
        try:
143
            cnx_historical_db = mysql.connector.connect(**config.myems_historical_db)
144
            cursor_historical_db = cnx_historical_db.cursor()
145
        except Exception as e:
146
            logger.error("Error in step 4.1 of acquisition process " + str(e))
147
            if cursor_historical_db:
148
                cursor_historical_db.close()
149
            if cnx_historical_db:
150
                cnx_historical_db.close()
151
152
            if cursor_system_db:
153
                cursor_system_db.close()
154
            if cnx_system_db:
155
                cnx_system_db.close()
156
            # go to begin of the outermost while loop
157
            time.sleep(60)
158
            continue
159
160
        # connect to the Modbus data source
161
        master = modbus_tcp.TcpMaster(host=host, port=port, timeout_in_sec=5.0)
162
        master.set_timeout(5.0)
163
        print("Ready to connect to %s:%s ", host, port)
164
165
        # inner while loop to read all point values periodically
166
        while True:
167
            # begin of the inner while loop
168
            is_modbus_tcp_timed_out = False
169
            energy_value_list = list()
170
            analog_value_list = list()
171
            digital_value_list = list()
172
173
            # TODO: update point list in another thread
174
            # foreach point loop
175
            for point in point_list:
176
                # begin of foreach point loop
177
                try:
178
                    address = json.loads(point['address'])
179
                except Exception as e:
180
                    logger.error("Error in step 4.2 of acquisition process: Invalid point address in JSON " + str(e))
181
                    continue
182
183
                if 'slave_id' not in address.keys() \
184
                        or 'function_code' not in address.keys() \
185
                        or 'offset' not in address.keys() \
186
                        or 'number_of_registers' not in address.keys() \
187
                        or 'format' not in address.keys() \
188
                        or 'byte_swap' not in address.keys() \
189
                        or address['slave_id'] < 1 \
190
                        or address['function_code'] not in (1, 2, 3, 4) \
191
                        or address['offset'] < 0 \
192
                        or address['number_of_registers'] < 0 \
193
                        or len(address['format']) < 1 \
194
                        or not isinstance(address['byte_swap'], bool):
195
                    logger.error('Data Source(ID=%s), Point(ID=%s) Invalid address data.',
196
                                 data_source_id, point['id'])
197
                    # invalid point is found
198
                    # go to begin of foreach point loop to process next point
199
                    continue
200
201
                # read point value
202
                try:
203
                    result = master.execute(slave=address['slave_id'],
204
                                            function_code=address['function_code'],
205
                                            starting_address=address['offset'],
206
                                            quantity_of_x=address['number_of_registers'],
207
                                            data_format=address['format'])
208
                except Exception as e:
209
                    logger.error(str(e) +
210
                                 " host:" + host + " port:" + str(port) +
211
                                 " slave_id:" + str(address['slave_id']) +
212
                                 " function_code:" + str(address['function_code']) +
213
                                 " starting_address:" + str(address['offset']) +
214
                                 " quantity_of_x:" + str(address['number_of_registers']) +
215
                                 " data_format:" + str(address['format']) +
216
                                 " byte_swap:" + str(address['byte_swap']))
217
218
                    if 'timed out' in str(e):
219
                        is_modbus_tcp_timed_out = True
220
                        # timeout error
221
                        # break the foreach point loop
222
                        break
223
                    else:
224
                        # exception occurred when read register value,
225
                        # go to begin of foreach point loop to process next point
226
                        continue
227
228
                if result is None or not isinstance(result, tuple) or len(result) == 0:
229
                    logger.error("Error in step 4.3 of acquisition process: \n"
230
                                 " invalid result: None "
231
                                 " for point_id: " + str(point['id']))
232
                    # invalid result
233
                    # go to begin of foreach point loop to process next point
234
                    continue
235
236
                if not isinstance(result[0], float) and not isinstance(result[0], int) or math.isnan(result[0]):
237
                    logger.error(" Error in step 4.4 of acquisition process:\n"
238
                                 " invalid result: not float and not int or not a number "
239
                                 " for point_id: " + str(point['id']))
240
                    # invalid result
241
                    # go to begin of foreach point loop to process next point
242
                    continue
243
244
                if address['byte_swap']:
245
                    if address['number_of_registers'] == 2:
246
                        value = byte_swap_32_bit(result[0])
247
                    elif address['number_of_registers'] == 4:
248
                        value = byte_swap_64_bit(result[0])
249
                    else:
250
                        value = result[0]
251
                else:
252
                    value = result[0]
253
254
                if point['object_type'] == 'ANALOG_VALUE':
255
                    # Standard SQL requires that DECIMAL(18, 3) be able to store any value with 18 digits and
256
                    # 3 decimals, so values that can be stored in the column range
257
                    # from -999999999999999.999 to 999999999999999.999.
258
                    if Decimal(-999999999999999.999) <= Decimal(value) <= Decimal(999999999999999.999):
259
                        analog_value_list.append({'point_id': point['id'],
260
                                                  'is_trend': point['is_trend'],
261
                                                  'value': Decimal(value) * point['ratio'] + point['offset_constant']})
262
                elif point['object_type'] == 'ENERGY_VALUE':
263
                    # Standard SQL requires that DECIMAL(18, 3) be able to store any value with 18 digits and
264
                    # 3 decimals, so values that can be stored in the column range
265
                    # from -999999999999999.999 to 999999999999999.999.
266
                    if Decimal(-999999999999999.999) <= Decimal(value) <= Decimal(999999999999999.999):
267
                        energy_value_list.append({'point_id': point['id'],
268
                                                  'is_trend': point['is_trend'],
269
                                                  'value': Decimal(value) * point['ratio'] + point['offset_constant']})
270
                elif point['object_type'] == 'DIGITAL_VALUE':
271
                    digital_value_list.append({'point_id': point['id'],
272
                                               'is_trend': point['is_trend'],
273
                                               'value': int(value) * int(point['ratio']) + int(point['offset_constant'])
274
                                               })
275
276
            # end of foreach point loop
277
278
            if is_modbus_tcp_timed_out:
279
                # Modbus TCP connection timeout
280
281
                # destroy the Modbus master
282
                del master
283
284
                # close the connection to database
285
                if cursor_historical_db:
286
                    cursor_historical_db.close()
287
                if cnx_historical_db:
288
                    cnx_historical_db.close()
289
                if cursor_system_db:
290
                    cursor_system_db.close()
291
                if cnx_system_db:
292
                    cnx_system_db.close()
293
294
                # break the inner while loop
295
                # go to begin of the outermost while loop
296
                time.sleep(60)
297
                break
298
299
            ############################################################################################################
300
            # Step 5: Bulk insert point values and update latest values in historical database
301
            ############################################################################################################
302
            # check the connection to the Historical Database
303
            if not cnx_historical_db.is_connected():
304
                try:
305
                    cnx_historical_db = mysql.connector.connect(**config.myems_historical_db)
306
                    cursor_historical_db = cnx_historical_db.cursor()
307
                except Exception as e:
308
                    logger.error("Error in step 5.1 of acquisition process: " + str(e))
309
                    if cursor_historical_db:
310
                        cursor_historical_db.close()
311
                    if cnx_historical_db:
312
                        cnx_historical_db.close()
313
                    # go to begin of the inner while loop
314
                    time.sleep(60)
315
                    continue
316
317
            # check the connection to the System Database
318
            if not cnx_system_db.is_connected():
319
                try:
320
                    cnx_system_db = mysql.connector.connect(**config.myems_system_db)
321
                    cursor_system_db = cnx_system_db.cursor()
322
                except Exception as e:
323
                    logger.error("Error in step 5.2 of acquisition process: " + str(e))
324
                    if cursor_system_db:
325
                        cursor_system_db.close()
326
                    if cnx_system_db:
327
                        cnx_system_db.close()
328
                    # go to begin of the inner while loop
329
                    time.sleep(60)
330
                    continue
331
332
            current_datetime_utc = datetime.utcnow()
333
            # bulk insert values into historical database within a period
334
            # and then update latest values
335 View Code Duplication
            while len(analog_value_list) > 0:
0 ignored issues
show
This code seems to be duplicated in your project.
Loading history...
336
                analog_value_list_100 = analog_value_list[:100]
337
                analog_value_list = analog_value_list[100:]
338
339
                add_values = (" INSERT INTO tbl_analog_value (point_id, utc_date_time, actual_value) "
340
                              " VALUES  ")
341
                trend_value_count = 0
342
343
                for point_value in analog_value_list_100:
344
                    if point_value['is_trend']:
345
                        add_values += " (" + str(point_value['point_id']) + ","
346
                        add_values += "'" + current_datetime_utc.isoformat() + "',"
347
                        add_values += str(point_value['value']) + "), "
348
                        trend_value_count += 1
349
350
                if trend_value_count > 0:
351
                    try:
352
                        # trim ", " at the end of string and then execute
353
                        cursor_historical_db.execute(add_values[:-2])
354
                        cnx_historical_db.commit()
355
                    except Exception as e:
356
                        logger.error("Error in step 5.3.1 of acquisition process " + str(e))
357
                        # ignore this exception
358
359
                # update tbl_analog_value_latest
360
                delete_values = " DELETE FROM tbl_analog_value_latest WHERE point_id IN ( "
361
                latest_values = (" INSERT INTO tbl_analog_value_latest (point_id, utc_date_time, actual_value) "
362
                                 " VALUES  ")
363
                latest_value_count = 0
364
365
                for point_value in analog_value_list_100:
366
                    delete_values += str(point_value['point_id']) + ","
367
                    latest_values += " (" + str(point_value['point_id']) + ","
368
                    latest_values += "'" + current_datetime_utc.isoformat() + "',"
369
                    latest_values += str(point_value['value']) + "), "
370
                    latest_value_count += 1
371
372
                if latest_value_count > 0:
373
                    try:
374
                        # replace "," at the end of string with ")"
375
                        cursor_historical_db.execute(delete_values[:-1] + ")")
376
                        cnx_historical_db.commit()
377
                    except Exception as e:
378
                        logger.error("Error in step 5.3.2 of acquisition process " + str(e))
379
                        # ignore this exception
380
381
                    try:
382
                        # trim ", " at the end of string and then execute
383
                        cursor_historical_db.execute(latest_values[:-2])
384
                        cnx_historical_db.commit()
385
                    except Exception as e:
386
                        logger.error("Error in step 5.3.3 of acquisition process " + str(e))
387
                        # ignore this exception
388
389 View Code Duplication
            while len(energy_value_list) > 0:
0 ignored issues
show
This code seems to be duplicated in your project.
Loading history...
390
                energy_value_list_100 = energy_value_list[:100]
391
                energy_value_list = energy_value_list[100:]
392
393
                add_values = (" INSERT INTO tbl_energy_value (point_id, utc_date_time, actual_value) "
394
                              " VALUES  ")
395
                trend_value_count = 0
396
397
                for point_value in energy_value_list_100:
398
                    if point_value['is_trend']:
399
                        add_values += " (" + str(point_value['point_id']) + ","
400
                        add_values += "'" + current_datetime_utc.isoformat() + "',"
401
                        add_values += str(point_value['value']) + "), "
402
                        trend_value_count += 1
403
404
                if trend_value_count > 0:
405
                    try:
406
                        # trim ", " at the end of string and then execute
407
                        cursor_historical_db.execute(add_values[:-2])
408
                        cnx_historical_db.commit()
409
                    except Exception as e:
410
                        logger.error("Error in step 5.4.1 of acquisition process: " + str(e))
411
                        # ignore this exception
412
413
                # update tbl_energy_value_latest
414
                delete_values = " DELETE FROM tbl_energy_value_latest WHERE point_id IN ( "
415
                latest_values = (" INSERT INTO tbl_energy_value_latest (point_id, utc_date_time, actual_value) "
416
                                 " VALUES  ")
417
                latest_value_count = 0
418
                for point_value in energy_value_list_100:
419
                    delete_values += str(point_value['point_id']) + ","
420
                    latest_values += " (" + str(point_value['point_id']) + ","
421
                    latest_values += "'" + current_datetime_utc.isoformat() + "',"
422
                    latest_values += str(point_value['value']) + "), "
423
                    latest_value_count += 1
424
425
                if latest_value_count > 0:
426
                    try:
427
                        # replace "," at the end of string with ")"
428
                        cursor_historical_db.execute(delete_values[:-1] + ")")
429
                        cnx_historical_db.commit()
430
431
                    except Exception as e:
432
                        logger.error("Error in step 5.4.2 of acquisition process " + str(e))
433
                        # ignore this exception
434
435
                    try:
436
                        # trim ", " at the end of string and then execute
437
                        cursor_historical_db.execute(latest_values[:-2])
438
                        cnx_historical_db.commit()
439
440
                    except Exception as e:
441
                        logger.error("Error in step 5.4.3 of acquisition process " + str(e))
442
                        # ignore this exception
443
444 View Code Duplication
            while len(digital_value_list) > 0:
0 ignored issues
show
This code seems to be duplicated in your project.
Loading history...
445
                digital_value_list_100 = digital_value_list[:100]
446
                digital_value_list = digital_value_list[100:]
447
448
                add_values = (" INSERT INTO tbl_digital_value (point_id, utc_date_time, actual_value) "
449
                              " VALUES  ")
450
                trend_value_count = 0
451
452
                for point_value in digital_value_list_100:
453
                    if point_value['is_trend']:
454
                        add_values += " (" + str(point_value['point_id']) + ","
455
                        add_values += "'" + current_datetime_utc.isoformat() + "',"
456
                        add_values += str(point_value['value']) + "), "
457
                        trend_value_count += 1
458
459
                if trend_value_count > 0:
460
                    try:
461
                        # trim ", " at the end of string and then execute
462
                        cursor_historical_db.execute(add_values[:-2])
463
                        cnx_historical_db.commit()
464
                    except Exception as e:
465
                        logger.error("Error in step 5.5.1 of acquisition process: " + str(e))
466
                        # ignore this exception
467
468
                # update tbl_digital_value_latest
469
                delete_values = " DELETE FROM tbl_digital_value_latest WHERE point_id IN ( "
470
                latest_values = (" INSERT INTO tbl_digital_value_latest (point_id, utc_date_time, actual_value) "
471
                                 " VALUES  ")
472
                latest_value_count = 0
473
                for point_value in digital_value_list_100:
474
                    delete_values += str(point_value['point_id']) + ","
475
                    latest_values += " (" + str(point_value['point_id']) + ","
476
                    latest_values += "'" + current_datetime_utc.isoformat() + "',"
477
                    latest_values += str(point_value['value']) + "), "
478
                    latest_value_count += 1
479
480
                if latest_value_count > 0:
481
                    try:
482
                        # replace "," at the end of string with ")"
483
                        cursor_historical_db.execute(delete_values[:-1] + ")")
484
                        cnx_historical_db.commit()
485
                    except Exception as e:
486
                        logger.error("Error in step 5.5.2 of acquisition process " + str(e))
487
                        # ignore this exception
488
489
                    try:
490
                        # trim ", " at the end of string and then execute
491
                        cursor_historical_db.execute(latest_values[:-2])
492
                        cnx_historical_db.commit()
493
                    except Exception as e:
494
                        logger.error("Error in step 5.5.3 of acquisition process " + str(e))
495
                        # ignore this exception
496
497
            # update data source last seen datetime
498
            update_row = (" UPDATE tbl_data_sources "
499
                          " SET last_seen_datetime_utc = '" + current_datetime_utc.isoformat() + "' "
500
                          " WHERE id = %s ")
501
            try:
502
                cursor_system_db.execute(update_row, (data_source_id,))
503
                cnx_system_db.commit()
504
            except Exception as e:
505
                logger.error("Error in step 5.6 of acquisition process " + str(e))
506
                if cursor_system_db:
507
                    cursor_system_db.close()
508
                if cnx_system_db:
509
                    cnx_system_db.close()
510
                # go to begin of the inner while loop
511
                time.sleep(60)
512
                continue
513
514
            # Sleep interval in seconds and continue the inner while loop
515
            # this argument may be a floating point number for subsecond precision
516
            time.sleep(interval_in_seconds)
517
518
        # end of the inner while loop
519
520
    # end of the outermost while loop
521