MyEMS /
myems
| 1 | """ |
||
| 2 | MyEMS Modbus TCP Gateway Service - Data Acquisition Module |
||
| 3 | |||
| 4 | This module handles the core data acquisition functionality for Modbus TCP devices. |
||
| 5 | It connects to Modbus TCP servers (slave devices), reads data from configured points, |
||
| 6 | and stores the collected data in the MyEMS historical database. |
||
| 7 | |||
| 8 | The acquisition process performs the following functions: |
||
| 9 | 1. Updates process ID in database for monitoring |
||
| 10 | 2. Checks connectivity to the Modbus TCP host and port |
||
| 11 | 3. Retrieves point configuration from system database |
||
| 12 | 4. Reads point values from Modbus TCP slaves using configured parameters |
||
| 13 | 5. Processes and validates the collected data |
||
| 14 | 6. Bulk inserts point values and updates latest values in historical database |
||
| 15 | |||
| 16 | The module supports multiple data types: |
||
| 17 | - ANALOG_VALUE: Continuous sensor readings (temperature, pressure, flow, etc.) |
||
| 18 | - DIGITAL_VALUE: Binary states (on/off, open/closed, alarm status, etc.) |
||
| 19 | - ENERGY_VALUE: Cumulative energy consumption data (kWh readings) |
||
| 20 | |||
| 21 | Data processing includes: |
||
| 22 | - Ratio and offset calculations for calibration |
||
| 23 | - Byte swapping for devices with non-standard byte order |
||
| 24 | - Data validation and range checking |
||
| 25 | - Trend data storage and latest value updates |
||
| 26 | """ |
||
| 27 | |||
| 28 | import os |
||
| 29 | import json |
||
| 30 | import math |
||
| 31 | import telnetlib3 |
||
| 32 | import asyncio |
||
| 33 | import time |
||
| 34 | from datetime import datetime |
||
| 35 | from decimal import Decimal |
||
| 36 | import mysql.connector |
||
| 37 | from modbus_tk import modbus_tcp |
||
| 38 | import config |
||
| 39 | from byte_swap import byte_swap_32_bit, byte_swap_64_bit |
||
| 40 | |||
| 41 | |||
| 42 | ######################################################################################################################## |
||
| 43 | # Check connectivity to the host and port |
||
| 44 | ######################################################################################################################## |
||
| 45 | async def check_connectivity(host, port): |
||
| 46 | """ |
||
| 47 | Test basic TCP connectivity to a Modbus TCP host and port. |
||
| 48 | |||
| 49 | This function attempts to establish a TCP connection to verify that the |
||
| 50 | target Modbus TCP server is reachable before attempting Modbus communication. |
||
| 51 | |||
| 52 | Args: |
||
| 53 | host: Target hostname or IP address of the Modbus TCP server |
||
| 54 | port: Target port number (typically 502 for Modbus TCP) |
||
| 55 | |||
| 56 | Raises: |
||
| 57 | Exception: If connection fails |
||
| 58 | """ |
||
| 59 | reader, writer = await telnetlib3.open_connection(host, port) |
||
| 60 | # Close the connection immediately after establishing it |
||
| 61 | writer.close() |
||
| 62 | |||
| 63 | ######################################################################################################################## |
||
| 64 | # Data Acquisition Procedures |
||
| 65 | # Step 1: Update process ID in database for monitoring and management |
||
| 66 | # Step 2: Check connectivity to the Modbus TCP host and port |
||
| 67 | # Step 3: Get point list from system database for this data source |
||
| 68 | # Step 4: Read point values from Modbus TCP slaves using configured parameters |
||
| 69 | # Step 5: Bulk insert point values and update latest values in historical database |
||
| 70 | ######################################################################################################################## |
||
| 71 | |||
| 72 | |||
| 73 | def process(logger, data_source_id, host, port, interval_in_seconds): |
||
| 74 | """ |
||
| 75 | Main data acquisition process function. |
||
| 76 | |||
| 77 | This function manages the complete data acquisition lifecycle for a Modbus TCP data source. |
||
| 78 | It runs continuously, connecting to the Modbus device, reading configured points, |
||
| 79 | and storing the data in the historical database. |
||
| 80 | |||
| 81 | Args: |
||
| 82 | logger: Logger instance for recording acquisition activities and errors |
||
| 83 | data_source_id: Unique identifier for the data source in the system database |
||
| 84 | host: Hostname or IP address of the Modbus TCP server |
||
| 85 | port: Port number of the Modbus TCP server (typically 502) |
||
| 86 | interval_in_seconds: Time interval between data acquisition cycles |
||
| 87 | """ |
||
| 88 | #################################################################################################################### |
||
| 89 | # Step 1: Update process ID in database for monitoring and management |
||
| 90 | #################################################################################################################### |
||
| 91 | cnx_system_db = None |
||
| 92 | cursor_system_db = None |
||
| 93 | |||
| 94 | # Connect to system database to register this acquisition process |
||
| 95 | try: |
||
| 96 | cnx_system_db = mysql.connector.connect(**config.myems_system_db) |
||
| 97 | cursor_system_db = cnx_system_db.cursor() |
||
| 98 | except Exception as e: |
||
| 99 | logger.error("Error in step 1.1 of acquisition process " + str(e)) |
||
| 100 | # Clean up database connections in case of error |
||
| 101 | if cursor_system_db: |
||
| 102 | cursor_system_db.close() |
||
| 103 | if cnx_system_db: |
||
| 104 | cnx_system_db.close() |
||
| 105 | return |
||
| 106 | |||
| 107 | # Update the data source record with the current process ID for monitoring |
||
| 108 | update_row = (" UPDATE tbl_data_sources " |
||
| 109 | " SET process_id = %s " |
||
| 110 | " WHERE id = %s ") |
||
| 111 | try: |
||
| 112 | cursor_system_db.execute(update_row, (os.getpid(), data_source_id,)) |
||
| 113 | cnx_system_db.commit() |
||
| 114 | except Exception as e: |
||
| 115 | logger.error("Error in step 1.2 of acquisition process " + str(e)) |
||
| 116 | return |
||
| 117 | finally: |
||
| 118 | # Always clean up database connections |
||
| 119 | if cursor_system_db: |
||
| 120 | cursor_system_db.close() |
||
| 121 | if cnx_system_db: |
||
| 122 | cnx_system_db.close() |
||
| 123 | |||
| 124 | # Main acquisition loop - runs continuously until process termination |
||
| 125 | while True: |
||
| 126 | # Begin of the outermost while loop |
||
| 127 | ################################################################################################################ |
||
| 128 | # Step 2: Check connectivity to the Modbus TCP host and port |
||
| 129 | ################################################################################################################ |
||
| 130 | try: |
||
| 131 | # Test basic TCP connectivity before attempting Modbus communication |
||
| 132 | asyncio.run(check_connectivity(host, port)) |
||
| 133 | print("Succeeded to connect %s:%s in acquisition process ", host, port) |
||
| 134 | except Exception as e: |
||
| 135 | logger.error("Failed to connect %s:%s in acquisition process: %s ", host, port, str(e)) |
||
| 136 | # Go to begin of the outermost while loop and wait before retrying |
||
| 137 | time.sleep(300) # Wait 5 minutes before retrying connection |
||
| 138 | continue |
||
| 139 | |||
| 140 | ################################################################################################################ |
||
| 141 | # Step 3: Get point list from system database for this data source |
||
| 142 | ################################################################################################################ |
||
| 143 | cnx_system_db = None |
||
| 144 | cursor_system_db = None |
||
| 145 | |||
| 146 | # Connect to system database to retrieve point configuration |
||
| 147 | try: |
||
| 148 | cnx_system_db = mysql.connector.connect(**config.myems_system_db) |
||
| 149 | cursor_system_db = cnx_system_db.cursor() |
||
| 150 | except Exception as e: |
||
| 151 | logger.error("Error in step 3.1 of acquisition process " + str(e)) |
||
| 152 | # Clean up database connections in case of error |
||
| 153 | if cursor_system_db: |
||
| 154 | cursor_system_db.close() |
||
| 155 | if cnx_system_db: |
||
| 156 | cnx_system_db.close() |
||
| 157 | # Go to begin of the outermost while loop |
||
| 158 | time.sleep(60) |
||
| 159 | continue |
||
| 160 | |||
| 161 | # Retrieve all configured points for this data source |
||
| 162 | try: |
||
| 163 | query = (" SELECT id, name, object_type, is_trend, ratio, offset_constant, address " |
||
| 164 | " FROM tbl_points " |
||
| 165 | " WHERE data_source_id = %s AND is_virtual = 0 " |
||
| 166 | " ORDER BY id ") |
||
| 167 | cursor_system_db.execute(query, (data_source_id,)) |
||
| 168 | rows_point = cursor_system_db.fetchall() |
||
| 169 | except Exception as e: |
||
| 170 | logger.error("Error in step 3.2 of acquisition process: " + str(e)) |
||
| 171 | # Clean up database connections in case of error |
||
| 172 | if cursor_system_db: |
||
| 173 | cursor_system_db.close() |
||
| 174 | if cnx_system_db: |
||
| 175 | cnx_system_db.close() |
||
| 176 | # Go to begin of the outermost while loop |
||
| 177 | time.sleep(60) |
||
| 178 | continue |
||
| 179 | |||
| 180 | # Validate that points were found for this data source |
||
| 181 | if rows_point is None or len(rows_point) == 0: |
||
| 182 | # There are no points configured for this data source |
||
| 183 | logger.error("Point Not Found in Data Source (ID = %s), acquisition process terminated ", data_source_id) |
||
| 184 | # Clean up database connections |
||
| 185 | if cursor_system_db: |
||
| 186 | cursor_system_db.close() |
||
| 187 | if cnx_system_db: |
||
| 188 | cnx_system_db.close() |
||
| 189 | # Go to begin of the outermost while loop |
||
| 190 | time.sleep(60) |
||
| 191 | continue |
||
| 192 | |||
| 193 | # Build point list from database results |
||
| 194 | # There are points configured for this data source |
||
| 195 | point_list = list() |
||
| 196 | for row_point in rows_point: |
||
| 197 | point_list.append({"id": row_point[0], |
||
| 198 | "name": row_point[1], |
||
| 199 | "object_type": row_point[2], |
||
| 200 | "is_trend": row_point[3], |
||
| 201 | "ratio": row_point[4], |
||
| 202 | "offset_constant": row_point[5], |
||
| 203 | "address": row_point[6]}) |
||
| 204 | |||
| 205 | ################################################################################################################ |
||
| 206 | # Step 4: Read point values from Modbus TCP slaves using configured parameters |
||
| 207 | ################################################################################################################ |
||
| 208 | # Connect to historical database for storing collected data |
||
| 209 | cnx_historical_db = None |
||
| 210 | cursor_historical_db = None |
||
| 211 | try: |
||
| 212 | cnx_historical_db = mysql.connector.connect(**config.myems_historical_db) |
||
| 213 | cursor_historical_db = cnx_historical_db.cursor() |
||
| 214 | except Exception as e: |
||
| 215 | logger.error("Error in step 4.1 of acquisition process " + str(e)) |
||
| 216 | # Clean up database connections in case of error |
||
| 217 | if cursor_historical_db: |
||
| 218 | cursor_historical_db.close() |
||
| 219 | if cnx_historical_db: |
||
| 220 | cnx_historical_db.close() |
||
| 221 | |||
| 222 | if cursor_system_db: |
||
| 223 | cursor_system_db.close() |
||
| 224 | if cnx_system_db: |
||
| 225 | cnx_system_db.close() |
||
| 226 | # Go to begin of the outermost while loop |
||
| 227 | time.sleep(60) |
||
| 228 | continue |
||
| 229 | |||
| 230 | # Connect to the Modbus TCP data source (slave device) |
||
| 231 | master = modbus_tcp.TcpMaster(host=host, port=port, timeout_in_sec=5.0) |
||
| 232 | master.set_timeout(5.0) |
||
| 233 | print("Ready to connect to %s:%s ", host, port) |
||
| 234 | |||
| 235 | # Inner while loop to read all point values periodically |
||
| 236 | while True: |
||
| 237 | # Begin of the inner while loop |
||
| 238 | is_modbus_tcp_timed_out = False |
||
| 239 | energy_value_list = list() |
||
| 240 | analog_value_list = list() |
||
| 241 | digital_value_list = list() |
||
| 242 | |||
| 243 | # TODO: Update point list in another thread for dynamic configuration changes |
||
| 244 | # Process each configured point |
||
| 245 | for point in point_list: |
||
| 246 | # Begin of foreach point loop |
||
| 247 | try: |
||
| 248 | # Parse the point address configuration from JSON |
||
| 249 | address = json.loads(point['address']) |
||
| 250 | except Exception as e: |
||
| 251 | logger.error("Error in step 4.2 of acquisition process: Invalid point address in JSON " + str(e)) |
||
| 252 | continue |
||
| 253 | |||
| 254 | # Validate point address configuration |
||
| 255 | if 'slave_id' not in address.keys() \ |
||
| 256 | or 'function_code' not in address.keys() \ |
||
| 257 | or 'offset' not in address.keys() \ |
||
| 258 | or 'number_of_registers' not in address.keys() \ |
||
| 259 | or 'format' not in address.keys() \ |
||
| 260 | or 'byte_swap' not in address.keys() \ |
||
| 261 | or address['slave_id'] < 1 \ |
||
| 262 | or address['function_code'] not in (1, 2, 3, 4) \ |
||
| 263 | or address['offset'] < 0 \ |
||
| 264 | or address['number_of_registers'] < 0 \ |
||
| 265 | or len(address['format']) < 1 \ |
||
| 266 | or not isinstance(address['byte_swap'], bool): |
||
| 267 | logger.error('Data Source(ID=%s), Point(ID=%s) Invalid address data.', |
||
| 268 | data_source_id, point['id']) |
||
| 269 | # Invalid point configuration found |
||
| 270 | # Go to begin of foreach point loop to process next point |
||
| 271 | continue |
||
| 272 | |||
| 273 | # Read point value from Modbus TCP slave |
||
| 274 | try: |
||
| 275 | result = master.execute(slave=address['slave_id'], |
||
| 276 | function_code=address['function_code'], |
||
| 277 | starting_address=address['offset'], |
||
| 278 | quantity_of_x=address['number_of_registers'], |
||
| 279 | data_format=address['format']) |
||
| 280 | except Exception as e: |
||
| 281 | logger.error(str(e) + |
||
| 282 | " host:" + host + " port:" + str(port) + |
||
| 283 | " slave_id:" + str(address['slave_id']) + |
||
| 284 | " function_code:" + str(address['function_code']) + |
||
| 285 | " starting_address:" + str(address['offset']) + |
||
| 286 | " quantity_of_x:" + str(address['number_of_registers']) + |
||
| 287 | " data_format:" + str(address['format']) + |
||
| 288 | " byte_swap:" + str(address['byte_swap'])) |
||
| 289 | |||
| 290 | if 'timed out' in str(e): |
||
| 291 | is_modbus_tcp_timed_out = True |
||
| 292 | # Timeout error - break the foreach point loop |
||
| 293 | break |
||
| 294 | else: |
||
| 295 | # Exception occurred when reading register value |
||
| 296 | # Go to begin of foreach point loop to process next point |
||
| 297 | continue |
||
| 298 | |||
| 299 | # Validate the read result |
||
| 300 | if result is None or not isinstance(result, tuple) or len(result) == 0: |
||
| 301 | logger.error("Error in step 4.3 of acquisition process: \n" |
||
| 302 | " invalid result: None " |
||
| 303 | " for point_id: " + str(point['id'])) |
||
| 304 | # Invalid result |
||
| 305 | # Go to begin of foreach point loop to process next point |
||
| 306 | continue |
||
| 307 | |||
| 308 | # Validate the result value |
||
| 309 | if not isinstance(result[0], float) and not isinstance(result[0], int) or math.isnan(result[0]): |
||
| 310 | logger.error(" Error in step 4.4 of acquisition process:\n" |
||
| 311 | " invalid result: not float and not int or not a number " |
||
| 312 | " for point_id: " + str(point['id'])) |
||
| 313 | # Invalid result |
||
| 314 | # Go to begin of foreach point loop to process next point |
||
| 315 | continue |
||
| 316 | |||
| 317 | # Apply byte swapping if configured |
||
| 318 | if address['byte_swap']: |
||
| 319 | if address['number_of_registers'] == 2: |
||
| 320 | # 32-bit data (2 registers) - swap adjacent bytes |
||
| 321 | value = byte_swap_32_bit(result[0]) |
||
| 322 | elif address['number_of_registers'] == 4: |
||
| 323 | # 64-bit data (4 registers) - swap adjacent bytes |
||
| 324 | value = byte_swap_64_bit(result[0]) |
||
| 325 | else: |
||
| 326 | # No byte swapping for other register counts |
||
| 327 | value = result[0] |
||
| 328 | else: |
||
| 329 | # No byte swapping required |
||
| 330 | value = result[0] |
||
| 331 | |||
| 332 | # Process the value based on point type and apply ratio/offset |
||
| 333 | if point['object_type'] == 'ANALOG_VALUE': |
||
| 334 | # Standard SQL requires that DECIMAL(18, 3) be able to store any value with 18 digits and |
||
| 335 | # 3 decimals, so values that can be stored in the column range |
||
| 336 | # from -999999999999999.999 to 999999999999999.999. |
||
| 337 | if Decimal(-999999999999999.999) <= Decimal(value) <= Decimal(999999999999999.999): |
||
| 338 | analog_value_list.append({'point_id': point['id'], |
||
| 339 | 'is_trend': point['is_trend'], |
||
| 340 | 'value': Decimal(value) * point['ratio'] + point['offset_constant']}) |
||
| 341 | elif point['object_type'] == 'ENERGY_VALUE': |
||
| 342 | # Standard SQL requires that DECIMAL(18, 3) be able to store any value with 18 digits and |
||
| 343 | # 3 decimals, so values that can be stored in the column range |
||
| 344 | # from -999999999999999.999 to 999999999999999.999. |
||
| 345 | if Decimal(-999999999999999.999) <= Decimal(value) <= Decimal(999999999999999.999): |
||
| 346 | energy_value_list.append({'point_id': point['id'], |
||
| 347 | 'is_trend': point['is_trend'], |
||
| 348 | 'value': Decimal(value) * point['ratio'] + point['offset_constant']}) |
||
| 349 | elif point['object_type'] == 'DIGITAL_VALUE': |
||
| 350 | digital_value_list.append({'point_id': point['id'], |
||
| 351 | 'is_trend': point['is_trend'], |
||
| 352 | 'value': int(value) * int(point['ratio']) + int(point['offset_constant']) |
||
| 353 | }) |
||
| 354 | |||
| 355 | # End of foreach point loop |
||
| 356 | |||
| 357 | if is_modbus_tcp_timed_out: |
||
| 358 | # Modbus TCP connection timeout occurred |
||
| 359 | # Clean up connections and restart the acquisition process |
||
| 360 | |||
| 361 | # Destroy the Modbus master connection |
||
| 362 | del master |
||
| 363 | |||
| 364 | # Close all database connections |
||
| 365 | if cursor_historical_db: |
||
| 366 | cursor_historical_db.close() |
||
| 367 | if cnx_historical_db: |
||
| 368 | cnx_historical_db.close() |
||
| 369 | if cursor_system_db: |
||
| 370 | cursor_system_db.close() |
||
| 371 | if cnx_system_db: |
||
| 372 | cnx_system_db.close() |
||
| 373 | |||
| 374 | # Break the inner while loop and go to begin of the outermost while loop |
||
| 375 | time.sleep(60) |
||
| 376 | break |
||
| 377 | |||
| 378 | ############################################################################################################ |
||
| 379 | # Step 5: Bulk insert point values and update latest values in historical database |
||
| 380 | ############################################################################################################ |
||
| 381 | # Check the connection to the Historical Database |
||
| 382 | if not cnx_historical_db.is_connected(): |
||
| 383 | try: |
||
| 384 | cnx_historical_db = mysql.connector.connect(**config.myems_historical_db) |
||
| 385 | cursor_historical_db = cnx_historical_db.cursor() |
||
| 386 | except Exception as e: |
||
| 387 | logger.error("Error in step 5.1 of acquisition process: " + str(e)) |
||
| 388 | # Clean up database connections in case of error |
||
| 389 | if cursor_historical_db: |
||
| 390 | cursor_historical_db.close() |
||
| 391 | if cnx_historical_db: |
||
| 392 | cnx_historical_db.close() |
||
| 393 | # Go to begin of the inner while loop |
||
| 394 | time.sleep(60) |
||
| 395 | continue |
||
| 396 | |||
| 397 | # Check the connection to the System Database |
||
| 398 | if not cnx_system_db.is_connected(): |
||
| 399 | try: |
||
| 400 | cnx_system_db = mysql.connector.connect(**config.myems_system_db) |
||
| 401 | cursor_system_db = cnx_system_db.cursor() |
||
| 402 | except Exception as e: |
||
| 403 | logger.error("Error in step 5.2 of acquisition process: " + str(e)) |
||
| 404 | # Clean up database connections in case of error |
||
| 405 | if cursor_system_db: |
||
| 406 | cursor_system_db.close() |
||
| 407 | if cnx_system_db: |
||
| 408 | cnx_system_db.close() |
||
| 409 | # Go to begin of the inner while loop |
||
| 410 | time.sleep(60) |
||
| 411 | continue |
||
| 412 | |||
| 413 | # Get current UTC timestamp for data storage |
||
| 414 | current_datetime_utc = datetime.utcnow() |
||
| 415 | |||
| 416 | # Bulk insert analog values into historical database and update latest values |
||
| 417 | # Process in batches of 100 to avoid overwhelming the database |
||
| 418 | View Code Duplication | while len(analog_value_list) > 0: |
|
|
0 ignored issues
–
show
Duplication
introduced
by
Loading history...
|
|||
| 419 | analog_value_list_100 = analog_value_list[:100] # Take first 100 items |
||
| 420 | analog_value_list = analog_value_list[100:] # Remove processed items |
||
| 421 | |||
| 422 | # Build INSERT statement for trend data (historical values) |
||
| 423 | add_values = (" INSERT INTO tbl_analog_value (point_id, utc_date_time, actual_value) " |
||
| 424 | " VALUES ") |
||
| 425 | trend_value_count = 0 |
||
| 426 | |||
| 427 | # Add trend values to INSERT statement |
||
| 428 | for point_value in analog_value_list_100: |
||
| 429 | if point_value['is_trend']: |
||
| 430 | add_values += " (" + str(point_value['point_id']) + "," |
||
| 431 | add_values += "'" + current_datetime_utc.isoformat() + "'," |
||
| 432 | add_values += str(point_value['value']) + "), " |
||
| 433 | trend_value_count += 1 |
||
| 434 | |||
| 435 | # Execute trend data insertion if there are trend values |
||
| 436 | if trend_value_count > 0: |
||
| 437 | try: |
||
| 438 | # Trim ", " at the end of string and then execute |
||
| 439 | cursor_historical_db.execute(add_values[:-2]) |
||
| 440 | cnx_historical_db.commit() |
||
| 441 | except Exception as e: |
||
| 442 | logger.error("Error in step 5.3.1 of acquisition process " + str(e)) |
||
| 443 | # Ignore this exception and continue processing |
||
| 444 | |||
| 445 | # Update latest values table for analog values |
||
| 446 | delete_values = " DELETE FROM tbl_analog_value_latest WHERE point_id IN ( " |
||
| 447 | latest_values = (" INSERT INTO tbl_analog_value_latest (point_id, utc_date_time, actual_value) " |
||
| 448 | " VALUES ") |
||
| 449 | latest_value_count = 0 |
||
| 450 | |||
| 451 | # Build DELETE and INSERT statements for latest values |
||
| 452 | for point_value in analog_value_list_100: |
||
| 453 | delete_values += str(point_value['point_id']) + "," |
||
| 454 | latest_values += " (" + str(point_value['point_id']) + "," |
||
| 455 | latest_values += "'" + current_datetime_utc.isoformat() + "'," |
||
| 456 | latest_values += str(point_value['value']) + "), " |
||
| 457 | latest_value_count += 1 |
||
| 458 | |||
| 459 | # Execute latest values update if there are values to process |
||
| 460 | if latest_value_count > 0: |
||
| 461 | try: |
||
| 462 | # Replace "," at the end of string with ")" and execute DELETE |
||
| 463 | cursor_historical_db.execute(delete_values[:-1] + ")") |
||
| 464 | cnx_historical_db.commit() |
||
| 465 | except Exception as e: |
||
| 466 | logger.error("Error in step 5.3.2 of acquisition process " + str(e)) |
||
| 467 | # Ignore this exception and continue processing |
||
| 468 | |||
| 469 | try: |
||
| 470 | # Trim ", " at the end of string and then execute INSERT |
||
| 471 | cursor_historical_db.execute(latest_values[:-2]) |
||
| 472 | cnx_historical_db.commit() |
||
| 473 | except Exception as e: |
||
| 474 | logger.error("Error in step 5.3.3 of acquisition process " + str(e)) |
||
| 475 | # Ignore this exception and continue processing |
||
| 476 | |||
| 477 | # Bulk insert energy values into historical database and update latest values |
||
| 478 | # Process in batches of 100 to avoid overwhelming the database |
||
| 479 | View Code Duplication | while len(energy_value_list) > 0: |
|
|
0 ignored issues
–
show
|
|||
| 480 | energy_value_list_100 = energy_value_list[:100] # Take first 100 items |
||
| 481 | energy_value_list = energy_value_list[100:] # Remove processed items |
||
| 482 | |||
| 483 | # Build INSERT statement for trend data (historical values) |
||
| 484 | add_values = (" INSERT INTO tbl_energy_value (point_id, utc_date_time, actual_value) " |
||
| 485 | " VALUES ") |
||
| 486 | trend_value_count = 0 |
||
| 487 | |||
| 488 | # Add trend values to INSERT statement |
||
| 489 | for point_value in energy_value_list_100: |
||
| 490 | if point_value['is_trend']: |
||
| 491 | add_values += " (" + str(point_value['point_id']) + "," |
||
| 492 | add_values += "'" + current_datetime_utc.isoformat() + "'," |
||
| 493 | add_values += str(point_value['value']) + "), " |
||
| 494 | trend_value_count += 1 |
||
| 495 | |||
| 496 | # Execute trend data insertion if there are trend values |
||
| 497 | if trend_value_count > 0: |
||
| 498 | try: |
||
| 499 | # Trim ", " at the end of string and then execute |
||
| 500 | cursor_historical_db.execute(add_values[:-2]) |
||
| 501 | cnx_historical_db.commit() |
||
| 502 | except Exception as e: |
||
| 503 | logger.error("Error in step 5.4.1 of acquisition process: " + str(e)) |
||
| 504 | # Ignore this exception and continue processing |
||
| 505 | |||
| 506 | # Update latest values table for energy values |
||
| 507 | delete_values = " DELETE FROM tbl_energy_value_latest WHERE point_id IN ( " |
||
| 508 | latest_values = (" INSERT INTO tbl_energy_value_latest (point_id, utc_date_time, actual_value) " |
||
| 509 | " VALUES ") |
||
| 510 | latest_value_count = 0 |
||
| 511 | |||
| 512 | # Build DELETE and INSERT statements for latest values |
||
| 513 | for point_value in energy_value_list_100: |
||
| 514 | delete_values += str(point_value['point_id']) + "," |
||
| 515 | latest_values += " (" + str(point_value['point_id']) + "," |
||
| 516 | latest_values += "'" + current_datetime_utc.isoformat() + "'," |
||
| 517 | latest_values += str(point_value['value']) + "), " |
||
| 518 | latest_value_count += 1 |
||
| 519 | |||
| 520 | # Execute latest values update if there are values to process |
||
| 521 | if latest_value_count > 0: |
||
| 522 | try: |
||
| 523 | # Replace "," at the end of string with ")" and execute DELETE |
||
| 524 | cursor_historical_db.execute(delete_values[:-1] + ")") |
||
| 525 | cnx_historical_db.commit() |
||
| 526 | except Exception as e: |
||
| 527 | logger.error("Error in step 5.4.2 of acquisition process " + str(e)) |
||
| 528 | # Ignore this exception and continue processing |
||
| 529 | |||
| 530 | try: |
||
| 531 | # Trim ", " at the end of string and then execute INSERT |
||
| 532 | cursor_historical_db.execute(latest_values[:-2]) |
||
| 533 | cnx_historical_db.commit() |
||
| 534 | except Exception as e: |
||
| 535 | logger.error("Error in step 5.4.3 of acquisition process " + str(e)) |
||
| 536 | # Ignore this exception and continue processing |
||
| 537 | |||
| 538 | # Bulk insert digital values into historical database and update latest values |
||
| 539 | # Process in batches of 100 to avoid overwhelming the database |
||
| 540 | View Code Duplication | while len(digital_value_list) > 0: |
|
|
0 ignored issues
–
show
|
|||
| 541 | digital_value_list_100 = digital_value_list[:100] # Take first 100 items |
||
| 542 | digital_value_list = digital_value_list[100:] # Remove processed items |
||
| 543 | |||
| 544 | # Build INSERT statement for trend data (historical values) |
||
| 545 | add_values = (" INSERT INTO tbl_digital_value (point_id, utc_date_time, actual_value) " |
||
| 546 | " VALUES ") |
||
| 547 | trend_value_count = 0 |
||
| 548 | |||
| 549 | # Add trend values to INSERT statement |
||
| 550 | for point_value in digital_value_list_100: |
||
| 551 | if point_value['is_trend']: |
||
| 552 | add_values += " (" + str(point_value['point_id']) + "," |
||
| 553 | add_values += "'" + current_datetime_utc.isoformat() + "'," |
||
| 554 | add_values += str(point_value['value']) + "), " |
||
| 555 | trend_value_count += 1 |
||
| 556 | |||
| 557 | # Execute trend data insertion if there are trend values |
||
| 558 | if trend_value_count > 0: |
||
| 559 | try: |
||
| 560 | # Trim ", " at the end of string and then execute |
||
| 561 | cursor_historical_db.execute(add_values[:-2]) |
||
| 562 | cnx_historical_db.commit() |
||
| 563 | except Exception as e: |
||
| 564 | logger.error("Error in step 5.5.1 of acquisition process: " + str(e)) |
||
| 565 | # Ignore this exception and continue processing |
||
| 566 | |||
| 567 | # Update latest values table for digital values |
||
| 568 | delete_values = " DELETE FROM tbl_digital_value_latest WHERE point_id IN ( " |
||
| 569 | latest_values = (" INSERT INTO tbl_digital_value_latest (point_id, utc_date_time, actual_value) " |
||
| 570 | " VALUES ") |
||
| 571 | latest_value_count = 0 |
||
| 572 | |||
| 573 | # Build DELETE and INSERT statements for latest values |
||
| 574 | for point_value in digital_value_list_100: |
||
| 575 | delete_values += str(point_value['point_id']) + "," |
||
| 576 | latest_values += " (" + str(point_value['point_id']) + "," |
||
| 577 | latest_values += "'" + current_datetime_utc.isoformat() + "'," |
||
| 578 | latest_values += str(point_value['value']) + "), " |
||
| 579 | latest_value_count += 1 |
||
| 580 | |||
| 581 | # Execute latest values update if there are values to process |
||
| 582 | if latest_value_count > 0: |
||
| 583 | try: |
||
| 584 | # Replace "," at the end of string with ")" and execute DELETE |
||
| 585 | cursor_historical_db.execute(delete_values[:-1] + ")") |
||
| 586 | cnx_historical_db.commit() |
||
| 587 | except Exception as e: |
||
| 588 | logger.error("Error in step 5.5.2 of acquisition process " + str(e)) |
||
| 589 | # Ignore this exception and continue processing |
||
| 590 | |||
| 591 | try: |
||
| 592 | # Trim ", " at the end of string and then execute INSERT |
||
| 593 | cursor_historical_db.execute(latest_values[:-2]) |
||
| 594 | cnx_historical_db.commit() |
||
| 595 | except Exception as e: |
||
| 596 | logger.error("Error in step 5.5.3 of acquisition process " + str(e)) |
||
| 597 | # Ignore this exception and continue processing |
||
| 598 | |||
| 599 | # Update data source last seen datetime to indicate successful data collection |
||
| 600 | update_row = (" UPDATE tbl_data_sources " |
||
| 601 | " SET last_seen_datetime_utc = '" + current_datetime_utc.isoformat() + "' " |
||
| 602 | " WHERE id = %s ") |
||
| 603 | try: |
||
| 604 | cursor_system_db.execute(update_row, (data_source_id,)) |
||
| 605 | cnx_system_db.commit() |
||
| 606 | except Exception as e: |
||
| 607 | logger.error("Error in step 5.6 of acquisition process " + str(e)) |
||
| 608 | # Clean up database connections in case of error |
||
| 609 | if cursor_system_db: |
||
| 610 | cursor_system_db.close() |
||
| 611 | if cnx_system_db: |
||
| 612 | cnx_system_db.close() |
||
| 613 | # Go to begin of the inner while loop |
||
| 614 | time.sleep(60) |
||
| 615 | continue |
||
| 616 | |||
| 617 | # Sleep for the configured interval before starting the next data collection cycle |
||
| 618 | # This argument may be a floating point number for subsecond precision |
||
| 619 | time.sleep(interval_in_seconds) |
||
| 620 | |||
| 621 | # End of the inner while loop |
||
| 622 | |||
| 623 | # End of the outermost while loop |
||
| 624 |