Issues (1751)

myems-modbus-tcp/acquisition.py (3 issues)

1
"""
2
MyEMS Modbus TCP Gateway Service - Data Acquisition Module
3
4
This module handles the core data acquisition functionality for Modbus TCP devices.
5
It connects to Modbus TCP servers (slave devices), reads data from configured points,
6
and stores the collected data in the MyEMS historical database.
7
8
The acquisition process performs the following functions:
9
1. Updates process ID in database for monitoring
10
2. Checks connectivity to the Modbus TCP host and port
11
3. Retrieves point configuration from system database
12
4. Reads point values from Modbus TCP slaves using configured parameters
13
5. Processes and validates the collected data
14
6. Bulk inserts point values and updates latest values in historical database
15
16
The module supports multiple data types:
17
- ANALOG_VALUE: Continuous sensor readings (temperature, pressure, flow, etc.)
18
- DIGITAL_VALUE: Binary states (on/off, open/closed, alarm status, etc.)
19
- ENERGY_VALUE: Cumulative energy consumption data (kWh readings)
20
21
Data processing includes:
22
- Ratio and offset calculations for calibration
23
- Byte swapping for devices with non-standard byte order
24
- Data validation and range checking
25
- Trend data storage and latest value updates
26
"""
27
28
import os
29
import json
30
import math
31
import telnetlib3
32
import asyncio
33
import time
34
from datetime import datetime
35
from decimal import Decimal
36
import mysql.connector
37
from modbus_tk import modbus_tcp
38
import config
39
from byte_swap import byte_swap_32_bit, byte_swap_64_bit
40
41
42
########################################################################################################################
43
# Check connectivity to the host and port
44
########################################################################################################################
45
async def check_connectivity(host, port):
46
    """
47
    Test basic TCP connectivity to a Modbus TCP host and port.
48
49
    This function attempts to establish a TCP connection to verify that the
50
    target Modbus TCP server is reachable before attempting Modbus communication.
51
52
    Args:
53
        host: Target hostname or IP address of the Modbus TCP server
54
        port: Target port number (typically 502 for Modbus TCP)
55
56
    Raises:
57
        Exception: If connection fails
58
    """
59
    reader, writer = await telnetlib3.open_connection(host, port)
60
    # Close the connection immediately after establishing it
61
    writer.close()
62
63
########################################################################################################################
64
# Data Acquisition Procedures
65
# Step 1: Update process ID in database for monitoring and management
66
# Step 2: Check connectivity to the Modbus TCP host and port
67
# Step 3: Get point list from system database for this data source
68
# Step 4: Read point values from Modbus TCP slaves using configured parameters
69
# Step 5: Bulk insert point values and update latest values in historical database
70
########################################################################################################################
71
72
73
def process(logger, data_source_id, host, port, interval_in_seconds):
74
    """
75
    Main data acquisition process function.
76
77
    This function manages the complete data acquisition lifecycle for a Modbus TCP data source.
78
    It runs continuously, connecting to the Modbus device, reading configured points,
79
    and storing the data in the historical database.
80
81
    Args:
82
        logger: Logger instance for recording acquisition activities and errors
83
        data_source_id: Unique identifier for the data source in the system database
84
        host: Hostname or IP address of the Modbus TCP server
85
        port: Port number of the Modbus TCP server (typically 502)
86
        interval_in_seconds: Time interval between data acquisition cycles
87
    """
88
    ####################################################################################################################
89
    # Step 1: Update process ID in database for monitoring and management
90
    ####################################################################################################################
91
    cnx_system_db = None
92
    cursor_system_db = None
93
94
    # Connect to system database to register this acquisition process
95
    try:
96
        cnx_system_db = mysql.connector.connect(**config.myems_system_db)
97
        cursor_system_db = cnx_system_db.cursor()
98
    except Exception as e:
99
        logger.error("Error in step 1.1 of acquisition process " + str(e))
100
        # Clean up database connections in case of error
101
        if cursor_system_db:
102
            cursor_system_db.close()
103
        if cnx_system_db:
104
            cnx_system_db.close()
105
        return
106
107
    # Update the data source record with the current process ID for monitoring
108
    update_row = (" UPDATE tbl_data_sources "
109
                  " SET process_id = %s "
110
                  " WHERE id = %s ")
111
    try:
112
        cursor_system_db.execute(update_row, (os.getpid(), data_source_id,))
113
        cnx_system_db.commit()
114
    except Exception as e:
115
        logger.error("Error in step 1.2 of acquisition process " + str(e))
116
        return
117
    finally:
118
        # Always clean up database connections
119
        if cursor_system_db:
120
            cursor_system_db.close()
121
        if cnx_system_db:
122
            cnx_system_db.close()
123
124
    # Main acquisition loop - runs continuously until process termination
125
    while True:
126
        # Begin of the outermost while loop
127
        ################################################################################################################
128
        # Step 2: Check connectivity to the Modbus TCP host and port
129
        ################################################################################################################
130
        try:
131
            # Test basic TCP connectivity before attempting Modbus communication
132
            asyncio.run(check_connectivity(host, port))
133
            print("Succeeded to connect %s:%s in acquisition process ", host, port)
134
        except Exception as e:
135
            logger.error("Failed to connect %s:%s in acquisition process: %s  ", host, port, str(e))
136
            # Go to begin of the outermost while loop and wait before retrying
137
            time.sleep(300)  # Wait 5 minutes before retrying connection
138
            continue
139
140
        ################################################################################################################
141
        # Step 3: Get point list from system database for this data source
142
        ################################################################################################################
143
        cnx_system_db = None
144
        cursor_system_db = None
145
146
        # Connect to system database to retrieve point configuration
147
        try:
148
            cnx_system_db = mysql.connector.connect(**config.myems_system_db)
149
            cursor_system_db = cnx_system_db.cursor()
150
        except Exception as e:
151
            logger.error("Error in step 3.1 of acquisition process " + str(e))
152
            # Clean up database connections in case of error
153
            if cursor_system_db:
154
                cursor_system_db.close()
155
            if cnx_system_db:
156
                cnx_system_db.close()
157
            # Go to begin of the outermost while loop
158
            time.sleep(60)
159
            continue
160
161
        # Retrieve all configured points for this data source
162
        try:
163
            query = (" SELECT id, name, object_type, is_trend, ratio, offset_constant, address "
164
                     " FROM tbl_points "
165
                     " WHERE data_source_id = %s AND is_virtual = 0 "
166
                     " ORDER BY id ")
167
            cursor_system_db.execute(query, (data_source_id,))
168
            rows_point = cursor_system_db.fetchall()
169
        except Exception as e:
170
            logger.error("Error in step 3.2 of acquisition process: " + str(e))
171
            # Clean up database connections in case of error
172
            if cursor_system_db:
173
                cursor_system_db.close()
174
            if cnx_system_db:
175
                cnx_system_db.close()
176
            # Go to begin of the outermost while loop
177
            time.sleep(60)
178
            continue
179
180
        # Validate that points were found for this data source
181
        if rows_point is None or len(rows_point) == 0:
182
            # There are no points configured for this data source
183
            logger.error("Point Not Found in Data Source (ID = %s), acquisition process terminated ", data_source_id)
184
            # Clean up database connections
185
            if cursor_system_db:
186
                cursor_system_db.close()
187
            if cnx_system_db:
188
                cnx_system_db.close()
189
            # Go to begin of the outermost while loop
190
            time.sleep(60)
191
            continue
192
193
        # Build point list from database results
194
        # There are points configured for this data source
195
        point_list = list()
196
        for row_point in rows_point:
197
            point_list.append({"id": row_point[0],
198
                               "name": row_point[1],
199
                               "object_type": row_point[2],
200
                               "is_trend": row_point[3],
201
                               "ratio": row_point[4],
202
                               "offset_constant": row_point[5],
203
                               "address": row_point[6]})
204
205
        ################################################################################################################
206
        # Step 4: Read point values from Modbus TCP slaves using configured parameters
207
        ################################################################################################################
208
        # Connect to historical database for storing collected data
209
        cnx_historical_db = None
210
        cursor_historical_db = None
211
        try:
212
            cnx_historical_db = mysql.connector.connect(**config.myems_historical_db)
213
            cursor_historical_db = cnx_historical_db.cursor()
214
        except Exception as e:
215
            logger.error("Error in step 4.1 of acquisition process " + str(e))
216
            # Clean up database connections in case of error
217
            if cursor_historical_db:
218
                cursor_historical_db.close()
219
            if cnx_historical_db:
220
                cnx_historical_db.close()
221
222
            if cursor_system_db:
223
                cursor_system_db.close()
224
            if cnx_system_db:
225
                cnx_system_db.close()
226
            # Go to begin of the outermost while loop
227
            time.sleep(60)
228
            continue
229
230
        # Connect to the Modbus TCP data source (slave device)
231
        master = modbus_tcp.TcpMaster(host=host, port=port, timeout_in_sec=5.0)
232
        master.set_timeout(5.0)
233
        print("Ready to connect to %s:%s ", host, port)
234
235
        # Inner while loop to read all point values periodically
236
        while True:
237
            # Begin of the inner while loop
238
            is_modbus_tcp_timed_out = False
239
            energy_value_list = list()
240
            analog_value_list = list()
241
            digital_value_list = list()
242
243
            # TODO: Update point list in another thread for dynamic configuration changes
244
            # Process each configured point
245
            for point in point_list:
246
                # Begin of foreach point loop
247
                try:
248
                    # Parse the point address configuration from JSON
249
                    address = json.loads(point['address'])
250
                except Exception as e:
251
                    logger.error("Error in step 4.2 of acquisition process: Invalid point address in JSON " + str(e))
252
                    continue
253
254
                # Validate point address configuration
255
                if 'slave_id' not in address.keys() \
256
                        or 'function_code' not in address.keys() \
257
                        or 'offset' not in address.keys() \
258
                        or 'number_of_registers' not in address.keys() \
259
                        or 'format' not in address.keys() \
260
                        or 'byte_swap' not in address.keys() \
261
                        or address['slave_id'] < 1 \
262
                        or address['function_code'] not in (1, 2, 3, 4) \
263
                        or address['offset'] < 0 \
264
                        or address['number_of_registers'] < 0 \
265
                        or len(address['format']) < 1 \
266
                        or not isinstance(address['byte_swap'], bool):
267
                    logger.error('Data Source(ID=%s), Point(ID=%s) Invalid address data.',
268
                                 data_source_id, point['id'])
269
                    # Invalid point configuration found
270
                    # Go to begin of foreach point loop to process next point
271
                    continue
272
273
                # Read point value from Modbus TCP slave
274
                try:
275
                    result = master.execute(slave=address['slave_id'],
276
                                            function_code=address['function_code'],
277
                                            starting_address=address['offset'],
278
                                            quantity_of_x=address['number_of_registers'],
279
                                            data_format=address['format'])
280
                except Exception as e:
281
                    logger.error(str(e) +
282
                                 " host:" + host + " port:" + str(port) +
283
                                 " slave_id:" + str(address['slave_id']) +
284
                                 " function_code:" + str(address['function_code']) +
285
                                 " starting_address:" + str(address['offset']) +
286
                                 " quantity_of_x:" + str(address['number_of_registers']) +
287
                                 " data_format:" + str(address['format']) +
288
                                 " byte_swap:" + str(address['byte_swap']))
289
290
                    if 'timed out' in str(e):
291
                        is_modbus_tcp_timed_out = True
292
                        # Timeout error - break the foreach point loop
293
                        break
294
                    else:
295
                        # Exception occurred when reading register value
296
                        # Go to begin of foreach point loop to process next point
297
                        continue
298
299
                # Validate the read result
300
                if result is None or not isinstance(result, tuple) or len(result) == 0:
301
                    logger.error("Error in step 4.3 of acquisition process: \n"
302
                                 " invalid result: None "
303
                                 " for point_id: " + str(point['id']))
304
                    # Invalid result
305
                    # Go to begin of foreach point loop to process next point
306
                    continue
307
308
                # Validate the result value
309
                if not isinstance(result[0], float) and not isinstance(result[0], int) or math.isnan(result[0]):
310
                    logger.error(" Error in step 4.4 of acquisition process:\n"
311
                                 " invalid result: not float and not int or not a number "
312
                                 " for point_id: " + str(point['id']))
313
                    # Invalid result
314
                    # Go to begin of foreach point loop to process next point
315
                    continue
316
317
                # Apply byte swapping if configured
318
                if address['byte_swap']:
319
                    if address['number_of_registers'] == 2:
320
                        # 32-bit data (2 registers) - swap adjacent bytes
321
                        value = byte_swap_32_bit(result[0])
322
                    elif address['number_of_registers'] == 4:
323
                        # 64-bit data (4 registers) - swap adjacent bytes
324
                        value = byte_swap_64_bit(result[0])
325
                    else:
326
                        # No byte swapping for other register counts
327
                        value = result[0]
328
                else:
329
                    # No byte swapping required
330
                    value = result[0]
331
332
                # Process the value based on point type and apply ratio/offset
333
                if point['object_type'] == 'ANALOG_VALUE':
334
                    # Standard SQL requires that DECIMAL(18, 3) be able to store any value with 18 digits and
335
                    # 3 decimals, so values that can be stored in the column range
336
                    # from -999999999999999.999 to 999999999999999.999.
337
                    if Decimal(-999999999999999.999) <= Decimal(value) <= Decimal(999999999999999.999):
338
                        analog_value_list.append({'point_id': point['id'],
339
                                                  'is_trend': point['is_trend'],
340
                                                  'value': Decimal(value) * point['ratio'] + point['offset_constant']})
341
                elif point['object_type'] == 'ENERGY_VALUE':
342
                    # Standard SQL requires that DECIMAL(18, 3) be able to store any value with 18 digits and
343
                    # 3 decimals, so values that can be stored in the column range
344
                    # from -999999999999999.999 to 999999999999999.999.
345
                    if Decimal(-999999999999999.999) <= Decimal(value) <= Decimal(999999999999999.999):
346
                        energy_value_list.append({'point_id': point['id'],
347
                                                  'is_trend': point['is_trend'],
348
                                                  'value': Decimal(value) * point['ratio'] + point['offset_constant']})
349
                elif point['object_type'] == 'DIGITAL_VALUE':
350
                    digital_value_list.append({'point_id': point['id'],
351
                                               'is_trend': point['is_trend'],
352
                                               'value': int(value) * int(point['ratio']) + int(point['offset_constant'])
353
                                               })
354
355
            # End of foreach point loop
356
357
            if is_modbus_tcp_timed_out:
358
                # Modbus TCP connection timeout occurred
359
                # Clean up connections and restart the acquisition process
360
361
                # Destroy the Modbus master connection
362
                del master
363
364
                # Close all database connections
365
                if cursor_historical_db:
366
                    cursor_historical_db.close()
367
                if cnx_historical_db:
368
                    cnx_historical_db.close()
369
                if cursor_system_db:
370
                    cursor_system_db.close()
371
                if cnx_system_db:
372
                    cnx_system_db.close()
373
374
                # Break the inner while loop and go to begin of the outermost while loop
375
                time.sleep(60)
376
                break
377
378
            ############################################################################################################
379
            # Step 5: Bulk insert point values and update latest values in historical database
380
            ############################################################################################################
381
            # Check the connection to the Historical Database
382
            if not cnx_historical_db.is_connected():
383
                try:
384
                    cnx_historical_db = mysql.connector.connect(**config.myems_historical_db)
385
                    cursor_historical_db = cnx_historical_db.cursor()
386
                except Exception as e:
387
                    logger.error("Error in step 5.1 of acquisition process: " + str(e))
388
                    # Clean up database connections in case of error
389
                    if cursor_historical_db:
390
                        cursor_historical_db.close()
391
                    if cnx_historical_db:
392
                        cnx_historical_db.close()
393
                    # Go to begin of the inner while loop
394
                    time.sleep(60)
395
                    continue
396
397
            # Check the connection to the System Database
398
            if not cnx_system_db.is_connected():
399
                try:
400
                    cnx_system_db = mysql.connector.connect(**config.myems_system_db)
401
                    cursor_system_db = cnx_system_db.cursor()
402
                except Exception as e:
403
                    logger.error("Error in step 5.2 of acquisition process: " + str(e))
404
                    # Clean up database connections in case of error
405
                    if cursor_system_db:
406
                        cursor_system_db.close()
407
                    if cnx_system_db:
408
                        cnx_system_db.close()
409
                    # Go to begin of the inner while loop
410
                    time.sleep(60)
411
                    continue
412
413
            # Get current UTC timestamp for data storage
414
            current_datetime_utc = datetime.utcnow()
415
416
            # Bulk insert analog values into historical database and update latest values
417
            # Process in batches of 100 to avoid overwhelming the database
418 View Code Duplication
            while len(analog_value_list) > 0:
0 ignored issues
show
This code seems to be duplicated in your project.
Loading history...
419
                analog_value_list_100 = analog_value_list[:100]  # Take first 100 items
420
                analog_value_list = analog_value_list[100:]      # Remove processed items
421
422
                # Build INSERT statement for trend data (historical values)
423
                add_values = (" INSERT INTO tbl_analog_value (point_id, utc_date_time, actual_value) "
424
                              " VALUES  ")
425
                trend_value_count = 0
426
427
                # Add trend values to INSERT statement
428
                for point_value in analog_value_list_100:
429
                    if point_value['is_trend']:
430
                        add_values += " (" + str(point_value['point_id']) + ","
431
                        add_values += "'" + current_datetime_utc.isoformat() + "',"
432
                        add_values += str(point_value['value']) + "), "
433
                        trend_value_count += 1
434
435
                # Execute trend data insertion if there are trend values
436
                if trend_value_count > 0:
437
                    try:
438
                        # Trim ", " at the end of string and then execute
439
                        cursor_historical_db.execute(add_values[:-2])
440
                        cnx_historical_db.commit()
441
                    except Exception as e:
442
                        logger.error("Error in step 5.3.1 of acquisition process " + str(e))
443
                        # Ignore this exception and continue processing
444
445
                # Update latest values table for analog values
446
                delete_values = " DELETE FROM tbl_analog_value_latest WHERE point_id IN ( "
447
                latest_values = (" INSERT INTO tbl_analog_value_latest (point_id, utc_date_time, actual_value) "
448
                                 " VALUES  ")
449
                latest_value_count = 0
450
451
                # Build DELETE and INSERT statements for latest values
452
                for point_value in analog_value_list_100:
453
                    delete_values += str(point_value['point_id']) + ","
454
                    latest_values += " (" + str(point_value['point_id']) + ","
455
                    latest_values += "'" + current_datetime_utc.isoformat() + "',"
456
                    latest_values += str(point_value['value']) + "), "
457
                    latest_value_count += 1
458
459
                # Execute latest values update if there are values to process
460
                if latest_value_count > 0:
461
                    try:
462
                        # Replace "," at the end of string with ")" and execute DELETE
463
                        cursor_historical_db.execute(delete_values[:-1] + ")")
464
                        cnx_historical_db.commit()
465
                    except Exception as e:
466
                        logger.error("Error in step 5.3.2 of acquisition process " + str(e))
467
                        # Ignore this exception and continue processing
468
469
                    try:
470
                        # Trim ", " at the end of string and then execute INSERT
471
                        cursor_historical_db.execute(latest_values[:-2])
472
                        cnx_historical_db.commit()
473
                    except Exception as e:
474
                        logger.error("Error in step 5.3.3 of acquisition process " + str(e))
475
                        # Ignore this exception and continue processing
476
477
            # Bulk insert energy values into historical database and update latest values
478
            # Process in batches of 100 to avoid overwhelming the database
479 View Code Duplication
            while len(energy_value_list) > 0:
0 ignored issues
show
This code seems to be duplicated in your project.
Loading history...
480
                energy_value_list_100 = energy_value_list[:100]  # Take first 100 items
481
                energy_value_list = energy_value_list[100:]      # Remove processed items
482
483
                # Build INSERT statement for trend data (historical values)
484
                add_values = (" INSERT INTO tbl_energy_value (point_id, utc_date_time, actual_value) "
485
                              " VALUES  ")
486
                trend_value_count = 0
487
488
                # Add trend values to INSERT statement
489
                for point_value in energy_value_list_100:
490
                    if point_value['is_trend']:
491
                        add_values += " (" + str(point_value['point_id']) + ","
492
                        add_values += "'" + current_datetime_utc.isoformat() + "',"
493
                        add_values += str(point_value['value']) + "), "
494
                        trend_value_count += 1
495
496
                # Execute trend data insertion if there are trend values
497
                if trend_value_count > 0:
498
                    try:
499
                        # Trim ", " at the end of string and then execute
500
                        cursor_historical_db.execute(add_values[:-2])
501
                        cnx_historical_db.commit()
502
                    except Exception as e:
503
                        logger.error("Error in step 5.4.1 of acquisition process: " + str(e))
504
                        # Ignore this exception and continue processing
505
506
                # Update latest values table for energy values
507
                delete_values = " DELETE FROM tbl_energy_value_latest WHERE point_id IN ( "
508
                latest_values = (" INSERT INTO tbl_energy_value_latest (point_id, utc_date_time, actual_value) "
509
                                 " VALUES  ")
510
                latest_value_count = 0
511
512
                # Build DELETE and INSERT statements for latest values
513
                for point_value in energy_value_list_100:
514
                    delete_values += str(point_value['point_id']) + ","
515
                    latest_values += " (" + str(point_value['point_id']) + ","
516
                    latest_values += "'" + current_datetime_utc.isoformat() + "',"
517
                    latest_values += str(point_value['value']) + "), "
518
                    latest_value_count += 1
519
520
                # Execute latest values update if there are values to process
521
                if latest_value_count > 0:
522
                    try:
523
                        # Replace "," at the end of string with ")" and execute DELETE
524
                        cursor_historical_db.execute(delete_values[:-1] + ")")
525
                        cnx_historical_db.commit()
526
                    except Exception as e:
527
                        logger.error("Error in step 5.4.2 of acquisition process " + str(e))
528
                        # Ignore this exception and continue processing
529
530
                    try:
531
                        # Trim ", " at the end of string and then execute INSERT
532
                        cursor_historical_db.execute(latest_values[:-2])
533
                        cnx_historical_db.commit()
534
                    except Exception as e:
535
                        logger.error("Error in step 5.4.3 of acquisition process " + str(e))
536
                        # Ignore this exception and continue processing
537
538
            # Bulk insert digital values into historical database and update latest values
539
            # Process in batches of 100 to avoid overwhelming the database
540 View Code Duplication
            while len(digital_value_list) > 0:
0 ignored issues
show
This code seems to be duplicated in your project.
Loading history...
541
                digital_value_list_100 = digital_value_list[:100]  # Take first 100 items
542
                digital_value_list = digital_value_list[100:]      # Remove processed items
543
544
                # Build INSERT statement for trend data (historical values)
545
                add_values = (" INSERT INTO tbl_digital_value (point_id, utc_date_time, actual_value) "
546
                              " VALUES  ")
547
                trend_value_count = 0
548
549
                # Add trend values to INSERT statement
550
                for point_value in digital_value_list_100:
551
                    if point_value['is_trend']:
552
                        add_values += " (" + str(point_value['point_id']) + ","
553
                        add_values += "'" + current_datetime_utc.isoformat() + "',"
554
                        add_values += str(point_value['value']) + "), "
555
                        trend_value_count += 1
556
557
                # Execute trend data insertion if there are trend values
558
                if trend_value_count > 0:
559
                    try:
560
                        # Trim ", " at the end of string and then execute
561
                        cursor_historical_db.execute(add_values[:-2])
562
                        cnx_historical_db.commit()
563
                    except Exception as e:
564
                        logger.error("Error in step 5.5.1 of acquisition process: " + str(e))
565
                        # Ignore this exception and continue processing
566
567
                # Update latest values table for digital values
568
                delete_values = " DELETE FROM tbl_digital_value_latest WHERE point_id IN ( "
569
                latest_values = (" INSERT INTO tbl_digital_value_latest (point_id, utc_date_time, actual_value) "
570
                                 " VALUES  ")
571
                latest_value_count = 0
572
573
                # Build DELETE and INSERT statements for latest values
574
                for point_value in digital_value_list_100:
575
                    delete_values += str(point_value['point_id']) + ","
576
                    latest_values += " (" + str(point_value['point_id']) + ","
577
                    latest_values += "'" + current_datetime_utc.isoformat() + "',"
578
                    latest_values += str(point_value['value']) + "), "
579
                    latest_value_count += 1
580
581
                # Execute latest values update if there are values to process
582
                if latest_value_count > 0:
583
                    try:
584
                        # Replace "," at the end of string with ")" and execute DELETE
585
                        cursor_historical_db.execute(delete_values[:-1] + ")")
586
                        cnx_historical_db.commit()
587
                    except Exception as e:
588
                        logger.error("Error in step 5.5.2 of acquisition process " + str(e))
589
                        # Ignore this exception and continue processing
590
591
                    try:
592
                        # Trim ", " at the end of string and then execute INSERT
593
                        cursor_historical_db.execute(latest_values[:-2])
594
                        cnx_historical_db.commit()
595
                    except Exception as e:
596
                        logger.error("Error in step 5.5.3 of acquisition process " + str(e))
597
                        # Ignore this exception and continue processing
598
599
            # Update data source last seen datetime to indicate successful data collection
600
            update_row = (" UPDATE tbl_data_sources "
601
                          " SET last_seen_datetime_utc = '" + current_datetime_utc.isoformat() + "' "
602
                          " WHERE id = %s ")
603
            try:
604
                cursor_system_db.execute(update_row, (data_source_id,))
605
                cnx_system_db.commit()
606
            except Exception as e:
607
                logger.error("Error in step 5.6 of acquisition process " + str(e))
608
                # Clean up database connections in case of error
609
                if cursor_system_db:
610
                    cursor_system_db.close()
611
                if cnx_system_db:
612
                    cnx_system_db.close()
613
                # Go to begin of the inner while loop
614
                time.sleep(60)
615
                continue
616
617
            # Sleep for the configured interval before starting the next data collection cycle
618
            # This argument may be a floating point number for subsecond precision
619
            time.sleep(interval_in_seconds)
620
621
        # End of the inner while loop
622
623
    # End of the outermost while loop
624