| Total Complexity | 143 |
| Total Lines | 710 |
| Duplicated Lines | 10.14 % |
| Changes | 0 | ||
Duplicate code is one of the most pungent code smells. A rule that is often used is to re-structure code once it is duplicated in three or more places.
Common duplication problems, and corresponding solutions are:
Complex classes like virtualpoint often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.
Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.
| 1 | """ |
||
| 2 | MyEMS Normalization Service - Virtual Point Processing Module |
||
| 3 | |||
| 4 | This module handles the calculation of virtual point values using mathematical expressions. |
||
| 5 | Virtual points are computed points that derive their values from combinations of other points |
||
| 6 | (analog points, digital points, and other virtual points) using algebraic equations and piecewise functions. |
||
| 7 | |||
| 8 | The virtual point processing performs the following functions: |
||
| 9 | 1. Retrieves all virtual points and their mathematical expressions from the system database |
||
| 10 | 2. Uses multiprocessing to process virtual points in parallel for efficiency |
||
| 11 | 3. Parses mathematical expressions and identifies dependent points |
||
| 12 | 4. Retrieves latest values from dependent points in the historical database |
||
| 13 | 5. Evaluates mathematical expressions using SymPy library |
||
| 14 | 6. Stores calculated virtual point values in the historical database |
||
| 15 | |||
| 16 | Key features: |
||
| 17 | - Supports complex mathematical expressions with multiple variables |
||
| 18 | - Handles different point types (analog, digital, virtual) in expressions |
||
| 19 | - Uses SymPy for robust mathematical expression evaluation including piecewise functions |
||
| 20 | - Maintains data integrity through comprehensive error handling |
||
| 21 | - Processes virtual points continuously to provide real-time calculated values |
||
| 22 | """ |
||
| 23 | |||
| 24 | import json |
||
| 25 | import random |
||
| 26 | import re |
||
| 27 | import time |
||
| 28 | from datetime import datetime |
||
| 29 | from decimal import Decimal |
||
| 30 | from multiprocessing import Pool |
||
| 31 | import mysql.connector |
||
| 32 | from sympy import sympify, Piecewise, symbols, parse_expr |
||
| 33 | import config |
||
| 34 | |||
| 35 | # Security configuration: Maximum number of datetime points to process in one batch |
||
| 36 | MAX_DATETIME_POINTS = 100000 |
||
| 37 | |||
| 38 | # Maximum expression length to prevent DoS attacks |
||
| 39 | MAX_EXPRESSION_LENGTH = 10000 |
||
| 40 | |||
| 41 | # Maximum number of substitutions to prevent DoS attacks |
||
| 42 | MAX_SUBSTITUTIONS = 100 |
||
| 43 | |||
| 44 | |||
| 45 | ######################################################################################################################## |
||
| 46 | # Security Validation Functions |
||
| 47 | ######################################################################################################################## |
||
| 48 | |||
| 49 | def validate_variable_name(name): |
||
| 50 | """ |
||
| 51 | Validate variable name to ensure it follows safe identifier rules. |
||
| 52 | |||
| 53 | Args: |
||
| 54 | name: Variable name to validate |
||
| 55 | |||
| 56 | Raises: |
||
| 57 | ValueError: If variable name is invalid |
||
| 58 | """ |
||
| 59 | if not isinstance(name, str): |
||
| 60 | raise ValueError(f"Variable name must be a string, got {type(name)}") |
||
| 61 | if not re.match(r'^[a-zA-Z_][a-zA-Z0-9_]*$', name): |
||
| 62 | raise ValueError(f"Invalid variable name: {name}. Must start with letter or underscore and contain only alphanumeric characters and underscores.") |
||
| 63 | |||
| 64 | |||
| 65 | def validate_point_id(point_id): |
||
| 66 | """ |
||
| 67 | Validate point_id to ensure it is a positive integer. |
||
| 68 | |||
| 69 | Args: |
||
| 70 | point_id: Point ID to validate |
||
| 71 | |||
| 72 | Raises: |
||
| 73 | ValueError: If point_id is invalid |
||
| 74 | """ |
||
| 75 | if not isinstance(point_id, int): |
||
| 76 | # Try to convert if it's a numeric string |
||
| 77 | try: |
||
| 78 | point_id = int(point_id) |
||
| 79 | except (ValueError, TypeError): |
||
| 80 | raise ValueError(f"Invalid point_id: {point_id}. Must be an integer.") |
||
| 81 | if point_id <= 0: |
||
| 82 | raise ValueError(f"Invalid point_id: {point_id}. Must be a positive integer.") |
||
| 83 | |||
| 84 | |||
| 85 | def validate_expression_safe(expression): |
||
| 86 | """ |
||
| 87 | Validate expression to ensure it doesn't contain dangerous patterns. |
||
| 88 | |||
| 89 | Args: |
||
| 90 | expression: Expression string to validate |
||
| 91 | |||
| 92 | Raises: |
||
| 93 | ValueError: If expression contains dangerous patterns |
||
| 94 | """ |
||
| 95 | if not isinstance(expression, str): |
||
| 96 | raise ValueError(f"Expression must be a string, got {type(expression)}") |
||
| 97 | |||
| 98 | if len(expression) > MAX_EXPRESSION_LENGTH: |
||
| 99 | raise ValueError(f"Expression too long: {len(expression)} characters. Maximum allowed: {MAX_EXPRESSION_LENGTH}") |
||
| 100 | |||
| 101 | # Check for dangerous patterns that could lead to code execution |
||
| 102 | dangerous_patterns = [ |
||
| 103 | (r'__\w+__', 'Double underscore pattern (magic methods)'), |
||
| 104 | (r'import\s+', 'Import statement'), |
||
| 105 | (r'exec\s*\(', 'exec() call'), |
||
| 106 | (r'eval\s*\(', 'eval() call'), |
||
| 107 | (r'open\s*\(', 'File open operation'), |
||
| 108 | (r'file\s*\(', 'File operation'), |
||
| 109 | (r'__import__', 'Direct __import__ call'), |
||
| 110 | (r'compile\s*\(', 'compile() call'), |
||
| 111 | (r'globals\s*\(', 'globals() call'), |
||
| 112 | (r'locals\s*\(', 'locals() call'), |
||
| 113 | ] |
||
| 114 | |||
| 115 | for pattern, description in dangerous_patterns: |
||
| 116 | if re.search(pattern, expression, re.IGNORECASE): |
||
| 117 | raise ValueError(f"Dangerous pattern detected in expression: {description}") |
||
| 118 | |||
| 119 | |||
| 120 | def parse_piecewise_safe(expression, substitutions): |
||
| 121 | """ |
||
| 122 | Safely parse a piecewise function expression without using eval(). |
||
| 123 | |||
| 124 | Piecewise format: "(value1, condition1), (value2, condition2), (default_value, True)" |
||
| 125 | |||
| 126 | Args: |
||
| 127 | expression: Piecewise function expression string |
||
| 128 | substitutions: Dictionary mapping variable names to point IDs |
||
| 129 | |||
| 130 | Returns: |
||
| 131 | List of tuples (value_expr, condition_expr) for Piecewise construction |
||
| 132 | |||
| 133 | Raises: |
||
| 134 | ValueError: If expression cannot be safely parsed |
||
| 135 | """ |
||
| 136 | # Validate expression first |
||
| 137 | validate_expression_safe(expression) |
||
| 138 | |||
| 139 | # Parse the piecewise expression manually |
||
| 140 | # Format: "(value, condition), (value, condition), ..." |
||
| 141 | try: |
||
| 142 | # Use regex to find all tuples: (value, condition) |
||
| 143 | # Pattern matches: ( ... , ... ) where we need to find the comma that separates value from condition |
||
| 144 | # This is complex because conditions may contain parentheses and commas |
||
| 145 | # Strategy: Find matching parentheses pairs and split on commas outside of them |
||
| 146 | |||
| 147 | piecewise_parts = [] |
||
| 148 | expr = expression.strip() |
||
| 149 | |||
| 150 | # Remove outer parentheses if present |
||
| 151 | if expr.startswith('(') and expr.endswith(')'): |
||
| 152 | # Check if it's a single tuple or multiple tuples |
||
| 153 | # Count opening and closing parentheses |
||
| 154 | depth = 0 |
||
| 155 | for i, char in enumerate(expr): |
||
| 156 | if char == '(': |
||
| 157 | depth += 1 |
||
| 158 | elif char == ')': |
||
| 159 | depth -= 1 |
||
| 160 | if depth == 0 and i < len(expr) - 1: |
||
| 161 | # Found end of first tuple, there are multiple tuples |
||
| 162 | break |
||
| 163 | else: |
||
| 164 | # Single tuple, remove outer parentheses |
||
| 165 | expr = expr[1:-1] |
||
| 166 | |||
| 167 | # Split by "), (" pattern to separate tuples |
||
| 168 | # This pattern appears between tuples |
||
| 169 | parts = re.split(r'\)\s*,\s*\(', expr) |
||
| 170 | |||
| 171 | # Process each tuple part |
||
| 172 | for part in parts: |
||
| 173 | # Remove leading/trailing parentheses and whitespace |
||
| 174 | part = part.strip().lstrip('(').rstrip(')') |
||
| 175 | |||
| 176 | # Find the comma that separates value from condition |
||
| 177 | # We need the rightmost comma that's not inside nested parentheses |
||
| 178 | depth = 0 |
||
| 179 | last_comma_pos = -1 |
||
| 180 | for i, char in enumerate(part): |
||
| 181 | if char == '(': |
||
| 182 | depth += 1 |
||
| 183 | elif char == ')': |
||
| 184 | depth -= 1 |
||
| 185 | elif char == ',' and depth == 0: |
||
| 186 | last_comma_pos = i |
||
| 187 | |||
| 188 | if last_comma_pos == -1: |
||
| 189 | raise ValueError(f"Invalid piecewise tuple format: {part}. Missing comma separator.") |
||
| 190 | |||
| 191 | value_str = part[:last_comma_pos].strip() |
||
| 192 | condition_str = part[last_comma_pos+1:].strip() |
||
| 193 | |||
| 194 | if not value_str or not condition_str: |
||
| 195 | raise ValueError(f"Invalid piecewise tuple format: {part}. Empty value or condition.") |
||
| 196 | |||
| 197 | # Parse value and condition using sympify (safe) |
||
| 198 | value_expr = sympify(value_str) |
||
| 199 | condition_expr = sympify(condition_str) |
||
| 200 | |||
| 201 | piecewise_parts.append((value_expr, condition_expr)) |
||
| 202 | |||
| 203 | if not piecewise_parts: |
||
| 204 | raise ValueError("No valid piecewise parts found in expression") |
||
| 205 | |||
| 206 | return piecewise_parts |
||
| 207 | except Exception as e: |
||
| 208 | raise ValueError(f"Failed to parse piecewise expression: {str(e)}") |
||
| 209 | |||
| 210 | |||
| 211 | ######################################################################################################################## |
||
| 212 | # Virtual Point Calculation Procedures: |
||
| 213 | # Step 1: Query all virtual points and their mathematical expressions from system database |
||
| 214 | # Step 2: Create multiprocessing pool to call worker processes in parallel |
||
| 215 | ######################################################################################################################## |
||
| 216 | |||
| 217 | View Code Duplication | def calculate(logger): |
|
|
|
|||
| 218 | """ |
||
| 219 | Main function for virtual point calculation using mathematical expressions. |
||
| 220 | |||
| 221 | This function runs continuously, retrieving all virtual points from the system database |
||
| 222 | and processing them in parallel to calculate virtual point values using their |
||
| 223 | configured mathematical expressions. |
||
| 224 | |||
| 225 | Args: |
||
| 226 | logger: Logger instance for recording calculation activities and errors |
||
| 227 | """ |
||
| 228 | while True: |
||
| 229 | # The outermost while loop to reconnect to server if there is a connection error |
||
| 230 | cnx_system_db = None |
||
| 231 | cursor_system_db = None |
||
| 232 | |||
| 233 | # Connect to system database to retrieve virtual point configuration |
||
| 234 | try: |
||
| 235 | cnx_system_db = mysql.connector.connect(**config.myems_system_db) |
||
| 236 | cursor_system_db = cnx_system_db.cursor() |
||
| 237 | except Exception as e: |
||
| 238 | logger.error("Error in step 0 of virtual point calculate " + str(e)) |
||
| 239 | if cursor_system_db: |
||
| 240 | cursor_system_db.close() |
||
| 241 | if cnx_system_db: |
||
| 242 | cnx_system_db.close() |
||
| 243 | # Sleep and continue the outer loop to reconnect the database |
||
| 244 | time.sleep(60) |
||
| 245 | continue |
||
| 246 | |||
| 247 | print("Connected to MyEMS System Database") |
||
| 248 | |||
| 249 | # Retrieve all virtual points with their configuration data |
||
| 250 | virtual_point_list = list() |
||
| 251 | try: |
||
| 252 | cursor_system_db.execute(" SELECT id, name, data_source_id, object_type, high_limit, low_limit, address " |
||
| 253 | " FROM tbl_points " |
||
| 254 | " WHERE is_virtual = 1 ") |
||
| 255 | rows_virtual_points = cursor_system_db.fetchall() |
||
| 256 | |||
| 257 | # Check if virtual points were found |
||
| 258 | if rows_virtual_points is None or len(rows_virtual_points) == 0: |
||
| 259 | # Sleep several minutes and continue the outer loop to reconnect the database |
||
| 260 | time.sleep(60) |
||
| 261 | continue |
||
| 262 | |||
| 263 | # Build virtual point list with configuration data |
||
| 264 | for row in rows_virtual_points: |
||
| 265 | meta_result = {"id": row[0], |
||
| 266 | "name": row[1], |
||
| 267 | "data_source_id": row[2], |
||
| 268 | "object_type": row[3], |
||
| 269 | "high_limit": row[4], |
||
| 270 | "low_limit": row[5], |
||
| 271 | "address": row[6]} |
||
| 272 | virtual_point_list.append(meta_result) |
||
| 273 | |||
| 274 | except Exception as e: |
||
| 275 | logger.error("Error in step 1 of virtual point calculate " + str(e)) |
||
| 276 | # sleep and continue the outer loop to reconnect the database |
||
| 277 | time.sleep(60) |
||
| 278 | continue |
||
| 279 | finally: |
||
| 280 | if cursor_system_db: |
||
| 281 | cursor_system_db.close() |
||
| 282 | if cnx_system_db: |
||
| 283 | cnx_system_db.close() |
||
| 284 | |||
| 285 | # Shuffle the virtual point list for randomly calculating point values |
||
| 286 | # This helps distribute processing load evenly across time |
||
| 287 | random.shuffle(virtual_point_list) |
||
| 288 | |||
| 289 | print("Got all virtual points in MyEMS System Database") |
||
| 290 | ################################################################################################################ |
||
| 291 | # Step 2: Create multiprocessing pool to call worker processes in parallel |
||
| 292 | ################################################################################################################ |
||
| 293 | # Create process pool with configured size for parallel processing |
||
| 294 | p = Pool(processes=config.pool_size) |
||
| 295 | error_list = p.map(worker, virtual_point_list) |
||
| 296 | p.close() |
||
| 297 | p.join() |
||
| 298 | |||
| 299 | # Log any errors from worker processes |
||
| 300 | for error in error_list: |
||
| 301 | if error is not None and len(error) > 0: |
||
| 302 | logger.error(error) |
||
| 303 | |||
| 304 | print("go to sleep ") |
||
| 305 | time.sleep(60) # Sleep for 1 minute before next processing cycle |
||
| 306 | print("wake from sleep, and continue to work") |
||
| 307 | |||
| 308 | |||
| 309 | ######################################################################################################################## |
||
| 310 | # Worker Process Procedures for Individual Virtual Point Processing: |
||
| 311 | # Step 1: Get start datetime and end datetime for processing |
||
| 312 | # Step 2: Parse the expression and get all points in substitutions |
||
| 313 | # Step 3: Query points type from system database |
||
| 314 | # Step 4: Query points value from historical database |
||
| 315 | # Step 5: Evaluate the equation with points values and store results |
||
| 316 | # Returns the error string for logging or returns None on success |
||
| 317 | ######################################################################################################################## |
||
| 318 | |||
| 319 | def worker(virtual_point): |
||
| 320 | """ |
||
| 321 | Worker function to process a single virtual point's calculation. |
||
| 322 | |||
| 323 | This function processes one virtual point at a time, evaluating its mathematical |
||
| 324 | expression using data from dependent points and storing the calculated results. |
||
| 325 | |||
| 326 | Args: |
||
| 327 | virtual_point: Dictionary containing virtual point configuration (id, name, object_type, address, etc.) |
||
| 328 | |||
| 329 | Returns: |
||
| 330 | None on success, error string on failure |
||
| 331 | """ |
||
| 332 | cnx_historical_db = None |
||
| 333 | cursor_historical_db = None |
||
| 334 | |||
| 335 | # Connect to historical database to check existing processed data |
||
| 336 | try: |
||
| 337 | cnx_historical_db = mysql.connector.connect(**config.myems_historical_db) |
||
| 338 | cursor_historical_db = cnx_historical_db.cursor() |
||
| 339 | except Exception as e: |
||
| 340 | if cursor_historical_db: |
||
| 341 | cursor_historical_db.close() |
||
| 342 | if cnx_historical_db: |
||
| 343 | cnx_historical_db.close() |
||
| 344 | # Return generic error message to avoid information disclosure |
||
| 345 | return f"Error connecting to historical database for virtual point '{virtual_point['name']}': {type(e).__name__}" |
||
| 346 | |||
| 347 | print("Start to process virtual point: " + "'" + virtual_point['name'] + "'") |
||
| 348 | |||
| 349 | #################################################################################################################### |
||
| 350 | # Step 1: Get start datetime and end datetime for processing |
||
| 351 | #################################################################################################################### |
||
| 352 | # Determine the appropriate table based on virtual point object type |
||
| 353 | if virtual_point['object_type'] == 'ANALOG_VALUE': |
||
| 354 | table_name = "tbl_analog_value" |
||
| 355 | elif virtual_point['object_type'] == 'ENERGY_VALUE': |
||
| 356 | table_name = "tbl_energy_value" |
||
| 357 | else: |
||
| 358 | if cursor_historical_db: |
||
| 359 | cursor_historical_db.close() |
||
| 360 | if cnx_historical_db: |
||
| 361 | cnx_historical_db.close() |
||
| 362 | return "variable point type should not be DIGITAL_VALUE " + " for '" + virtual_point['name'] + "'" |
||
| 363 | |||
| 364 | try: |
||
| 365 | query = (" SELECT MAX(utc_date_time) " |
||
| 366 | " FROM " + table_name + |
||
| 367 | " WHERE point_id = %s ") |
||
| 368 | cursor_historical_db.execute(query, (virtual_point['id'],)) |
||
| 369 | row = cursor_historical_db.fetchone() |
||
| 370 | except Exception as e: |
||
| 371 | if cursor_historical_db: |
||
| 372 | cursor_historical_db.close() |
||
| 373 | if cnx_historical_db: |
||
| 374 | cnx_historical_db.close() |
||
| 375 | # Return generic error message to avoid information disclosure |
||
| 376 | return f"Error querying historical database for virtual point '{virtual_point['name']}': {type(e).__name__}" |
||
| 377 | |||
| 378 | start_datetime_utc = datetime.strptime(config.start_datetime_utc, '%Y-%m-%d %H:%M:%S').replace(tzinfo=None) |
||
| 379 | |||
| 380 | if row is not None and len(row) > 0 and isinstance(row[0], datetime): |
||
| 381 | start_datetime_utc = row[0].replace(tzinfo=None) |
||
| 382 | |||
| 383 | end_datetime_utc = datetime.utcnow().replace(tzinfo=None) |
||
| 384 | |||
| 385 | if end_datetime_utc <= start_datetime_utc: |
||
| 386 | if cursor_historical_db: |
||
| 387 | cursor_historical_db.close() |
||
| 388 | if cnx_historical_db: |
||
| 389 | cnx_historical_db.close() |
||
| 390 | return "it isn't time to calculate" + " for '" + virtual_point['name'] + "'" |
||
| 391 | |||
| 392 | print("start_datetime_utc: " + start_datetime_utc.isoformat()[0:19] |
||
| 393 | + "end_datetime_utc: " + end_datetime_utc.isoformat()[0:19]) |
||
| 394 | |||
| 395 | ############################################################################################################ |
||
| 396 | # Step 2: parse the expression and get all points in substitutions |
||
| 397 | ############################################################################################################ |
||
| 398 | point_list = list() |
||
| 399 | expression = None |
||
| 400 | substitutions = None |
||
| 401 | try: |
||
| 402 | ######################################################################################################## |
||
| 403 | # parse the expression and get all points in substitutions |
||
| 404 | ######################################################################################################## |
||
| 405 | address = json.loads(virtual_point['address']) |
||
| 406 | # algebraic expression example: '{"expression": "x1-x2", "substitutions": {"x1":1,"x2":2}}' |
||
| 407 | # piecewise function example: '{"expression":"(1,x<200 ), (2,x>=500), (0,True)", "substitutions":{"x":101}}' |
||
| 408 | if 'expression' not in address.keys() \ |
||
| 409 | or 'substitutions' not in address.keys() \ |
||
| 410 | or len(address['expression']) == 0 \ |
||
| 411 | or len(address['substitutions']) == 0: |
||
| 412 | if cursor_historical_db: |
||
| 413 | cursor_historical_db.close() |
||
| 414 | if cnx_historical_db: |
||
| 415 | cnx_historical_db.close() |
||
| 416 | return "Error in step 2.1 of virtual point worker for '" + virtual_point['name'] + "'" |
||
| 417 | |||
| 418 | expression = address['expression'] |
||
| 419 | substitutions = address['substitutions'] |
||
| 420 | |||
| 421 | # Security: Validate expression and substitutions |
||
| 422 | try: |
||
| 423 | validate_expression_safe(expression) |
||
| 424 | except ValueError as e: |
||
| 425 | if cursor_historical_db: |
||
| 426 | cursor_historical_db.close() |
||
| 427 | if cnx_historical_db: |
||
| 428 | cnx_historical_db.close() |
||
| 429 | return f"Error in step 2.1.1: Invalid expression for '{virtual_point['name']}': {str(e)}" |
||
| 430 | |||
| 431 | # Security: Validate substitutions count |
||
| 432 | if len(substitutions) > MAX_SUBSTITUTIONS: |
||
| 433 | if cursor_historical_db: |
||
| 434 | cursor_historical_db.close() |
||
| 435 | if cnx_historical_db: |
||
| 436 | cnx_historical_db.close() |
||
| 437 | return f"Error in step 2.1.2: Too many substitutions ({len(substitutions)}) for '{virtual_point['name']}'" |
||
| 438 | |||
| 439 | # Security: Validate variable names and point IDs |
||
| 440 | for variable_name, point_id in substitutions.items(): |
||
| 441 | try: |
||
| 442 | validate_variable_name(variable_name) |
||
| 443 | validate_point_id(point_id) |
||
| 444 | except ValueError as e: |
||
| 445 | if cursor_historical_db: |
||
| 446 | cursor_historical_db.close() |
||
| 447 | if cnx_historical_db: |
||
| 448 | cnx_historical_db.close() |
||
| 449 | return f"Error in step 2.1.3: Invalid variable or point_id for '{virtual_point['name']}': {str(e)}" |
||
| 450 | point_list.append({"variable_name": variable_name, "point_id": point_id}) |
||
| 451 | except json.JSONDecodeError as e: |
||
| 452 | if cursor_historical_db: |
||
| 453 | cursor_historical_db.close() |
||
| 454 | if cnx_historical_db: |
||
| 455 | cnx_historical_db.close() |
||
| 456 | return "Error in step 2.2: Invalid JSON in address for '" + virtual_point['name'] + "'" |
||
| 457 | except Exception as e: |
||
| 458 | if cursor_historical_db: |
||
| 459 | cursor_historical_db.close() |
||
| 460 | if cnx_historical_db: |
||
| 461 | cnx_historical_db.close() |
||
| 462 | return "Error in step 2.2 of virtual point worker " + str(e) + " for '" + virtual_point['name'] + "'" |
||
| 463 | |||
| 464 | ############################################################################################################ |
||
| 465 | # Step 3: query points type from system database |
||
| 466 | ############################################################################################################ |
||
| 467 | print("getting points type ") |
||
| 468 | cnx_system_db = None |
||
| 469 | cursor_system_db = None |
||
| 470 | try: |
||
| 471 | cnx_system_db = mysql.connector.connect(**config.myems_system_db) |
||
| 472 | cursor_system_db = cnx_system_db.cursor() |
||
| 473 | except Exception as e: |
||
| 474 | if cursor_system_db: |
||
| 475 | cursor_system_db.close() |
||
| 476 | if cnx_system_db: |
||
| 477 | cnx_system_db.close() |
||
| 478 | print("Error in step 3 of virtual point worker " + str(e)) |
||
| 479 | # Return generic error message to avoid information disclosure |
||
| 480 | return f"Error connecting to system database for virtual point '{virtual_point['name']}': {type(e).__name__}" |
||
| 481 | |||
| 482 | print("Connected to MyEMS System Database") |
||
| 483 | |||
| 484 | all_point_dict = dict() |
||
| 485 | try: |
||
| 486 | cursor_system_db.execute(" SELECT id, object_type " |
||
| 487 | " FROM tbl_points ") |
||
| 488 | rows_points = cursor_system_db.fetchall() |
||
| 489 | |||
| 490 | if rows_points is None or len(rows_points) == 0: |
||
| 491 | return f"Error: No points found in system database for virtual point '{virtual_point['name']}'" |
||
| 492 | |||
| 493 | for row in rows_points: |
||
| 494 | all_point_dict[row[0]] = row[1] |
||
| 495 | except Exception as e: |
||
| 496 | # Return generic error message to avoid information disclosure |
||
| 497 | return f"Error querying points from system database for virtual point '{virtual_point['name']}': {type(e).__name__}" |
||
| 498 | finally: |
||
| 499 | if cursor_system_db: |
||
| 500 | cursor_system_db.close() |
||
| 501 | if cnx_system_db: |
||
| 502 | cnx_system_db.close() |
||
| 503 | ############################################################################################################ |
||
| 504 | # Step 4: query points value from historical database |
||
| 505 | ############################################################################################################ |
||
| 506 | |||
| 507 | print("getting point values ") |
||
| 508 | point_values_dict = dict() |
||
| 509 | if point_list is not None and len(point_list) > 0: |
||
| 510 | try: |
||
| 511 | for point in point_list: |
||
| 512 | point_object_type = all_point_dict.get(point['point_id']) |
||
| 513 | if point_object_type is None: |
||
| 514 | return "variable point type should not be None " + " for '" + virtual_point['name'] + "'" |
||
| 515 | if point_object_type == 'ANALOG_VALUE': |
||
| 516 | query = (" SELECT utc_date_time, actual_value " |
||
| 517 | " FROM tbl_analog_value " |
||
| 518 | " WHERE point_id = %s AND utc_date_time > %s AND utc_date_time < %s " |
||
| 519 | " ORDER BY utc_date_time ") |
||
| 520 | cursor_historical_db.execute(query, (point['point_id'], start_datetime_utc, end_datetime_utc,)) |
||
| 521 | rows = cursor_historical_db.fetchall() |
||
| 522 | if rows is not None and len(rows) > 0: |
||
| 523 | point_values_dict[point['point_id']] = dict() |
||
| 524 | for row in rows: |
||
| 525 | point_values_dict[point['point_id']][row[0]] = row[1] |
||
| 526 | elif point_object_type == 'ENERGY_VALUE': |
||
| 527 | query = (" SELECT utc_date_time, actual_value " |
||
| 528 | " FROM tbl_energy_value " |
||
| 529 | " WHERE point_id = %s AND utc_date_time > %s AND utc_date_time < %s " |
||
| 530 | " ORDER BY utc_date_time ") |
||
| 531 | cursor_historical_db.execute(query, (point['point_id'], start_datetime_utc, end_datetime_utc,)) |
||
| 532 | rows = cursor_historical_db.fetchall() |
||
| 533 | if rows is not None and len(rows) > 0: |
||
| 534 | point_values_dict[point['point_id']] = dict() |
||
| 535 | for row in rows: |
||
| 536 | point_values_dict[point['point_id']][row[0]] = row[1] |
||
| 537 | else: |
||
| 538 | point_values_dict[point['point_id']] = None |
||
| 539 | else: |
||
| 540 | # point type should not be DIGITAL_VALUE |
||
| 541 | return "variable point type should not be DIGITAL_VALUE " + " for '" + virtual_point['name'] + "'" |
||
| 542 | except Exception as e: |
||
| 543 | if cursor_historical_db: |
||
| 544 | cursor_historical_db.close() |
||
| 545 | if cnx_historical_db: |
||
| 546 | cnx_historical_db.close() |
||
| 547 | # Return generic error message to avoid information disclosure |
||
| 548 | return f"Error querying point values from historical database for virtual point '{virtual_point['name']}': {type(e).__name__}" |
||
| 549 | |||
| 550 | ############################################################################################################ |
||
| 551 | # Step 5: evaluate the equation with points values |
||
| 552 | ############################################################################################################ |
||
| 553 | |||
| 554 | print("getting date time set for all points") |
||
| 555 | utc_date_time_set = set() |
||
| 556 | if point_values_dict is not None and len(point_values_dict) > 0: |
||
| 557 | for point_id, point_values in point_values_dict.items(): |
||
| 558 | if point_values is not None and len(point_values) > 0: |
||
| 559 | utc_date_time_set = utc_date_time_set.union(point_values.keys()) |
||
| 560 | |||
| 561 | # Security: Resource limit check to prevent DoS attacks |
||
| 562 | if len(utc_date_time_set) > MAX_DATETIME_POINTS: |
||
| 563 | if cursor_historical_db: |
||
| 564 | cursor_historical_db.close() |
||
| 565 | if cnx_historical_db: |
||
| 566 | cnx_historical_db.close() |
||
| 567 | return f"Error: Too many datetime points to process ({len(utc_date_time_set)}) for '{virtual_point['name']}'. Maximum allowed: {MAX_DATETIME_POINTS}" |
||
| 568 | |||
| 569 | print("evaluating the equation with SymPy") |
||
| 570 | normalized_values = list() |
||
| 571 | |||
| 572 | ############################################################################################################ |
||
| 573 | # Converting Strings to SymPy Expressions |
||
| 574 | # The sympify function(that's sympify, not to be confused with simplify) can be used to |
||
| 575 | # convert strings into SymPy expressions. |
||
| 576 | # SECURITY FIX: Removed eval() and replaced with safe SymPy parsing |
||
| 577 | ############################################################################################################ |
||
| 578 | try: |
||
| 579 | # Security: Use safe parsing instead of eval() |
||
| 580 | if re.search(',', expression): |
||
| 581 | # Piecewise function: parse safely without eval() |
||
| 582 | piecewise_parts = parse_piecewise_safe(expression, substitutions) |
||
| 583 | expr = Piecewise(*piecewise_parts) |
||
| 584 | print("the expression will be evaluated as piecewise function: " + str(expr)) |
||
| 585 | else: |
||
| 586 | # Algebraic expression: use sympify (safe) |
||
| 587 | expr = sympify(expression) |
||
| 588 | print("the expression will be evaluated as algebraic expression: " + str(expr)) |
||
| 589 | |||
| 590 | for utc_date_time in utc_date_time_set: |
||
| 591 | meta_data = dict() |
||
| 592 | meta_data['utc_date_time'] = utc_date_time |
||
| 593 | |||
| 594 | #################################################################################################### |
||
| 595 | # create a dictionary of Symbol: point pairs |
||
| 596 | #################################################################################################### |
||
| 597 | |||
| 598 | subs = dict() |
||
| 599 | |||
| 600 | #################################################################################################### |
||
| 601 | # Evaluating the expression at current_datetime_utc |
||
| 602 | #################################################################################################### |
||
| 603 | |||
| 604 | if point_list is not None and len(point_list) > 0: |
||
| 605 | for point in point_list: |
||
| 606 | actual_value = point_values_dict[point['point_id']].get(utc_date_time, None) |
||
| 607 | if actual_value is None: |
||
| 608 | break |
||
| 609 | subs[point['variable_name']] = actual_value |
||
| 610 | |||
| 611 | if len(subs) != len(point_list): |
||
| 612 | continue |
||
| 613 | |||
| 614 | #################################################################################################### |
||
| 615 | # To numerically evaluate an expression with a Symbol at a point, |
||
| 616 | # we might use subs followed by evalf, |
||
| 617 | # but it is more efficient and numerically stable to pass the substitution to evalf |
||
| 618 | # using the subs flag, which takes a dictionary of Symbol: point pairs. |
||
| 619 | #################################################################################################### |
||
| 620 | # Note: expr is already a Piecewise object for piecewise functions, or a SymPy expression for algebraic |
||
| 621 | if re.search(',', expression): |
||
| 622 | # expr is already a Piecewise object |
||
| 623 | meta_data['actual_value'] = Decimal(str(expr.subs(subs))) |
||
| 624 | normalized_values.append(meta_data) |
||
| 625 | else: |
||
| 626 | # expr is a SymPy expression |
||
| 627 | meta_data['actual_value'] = Decimal(str(expr.evalf(subs=subs))) |
||
| 628 | normalized_values.append(meta_data) |
||
| 629 | except Exception as e: |
||
| 630 | if cursor_historical_db: |
||
| 631 | cursor_historical_db.close() |
||
| 632 | if cnx_historical_db: |
||
| 633 | cnx_historical_db.close() |
||
| 634 | # Return generic error message to avoid information disclosure |
||
| 635 | # Detailed error information should be logged separately if logger is available |
||
| 636 | return f"Error evaluating expression for virtual point '{virtual_point['name']}': {type(e).__name__}" |
||
| 637 | |||
| 638 | print("saving virtual points values to historical database") |
||
| 639 | |||
| 640 | if len(normalized_values) > 0: |
||
| 641 | latest_meta_data = normalized_values[0] |
||
| 642 | |||
| 643 | # Security: Use parameterized queries to prevent SQL injection |
||
| 644 | # Validate table_name is from whitelist (already validated via object_type) |
||
| 645 | if table_name not in ['tbl_analog_value', 'tbl_energy_value']: |
||
| 646 | if cursor_historical_db: |
||
| 647 | cursor_historical_db.close() |
||
| 648 | if cnx_historical_db: |
||
| 649 | cnx_historical_db.close() |
||
| 650 | return f"Error: Invalid table name '{table_name}' for '{virtual_point['name']}'" |
||
| 651 | |||
| 652 | insert_query = ("INSERT INTO " + table_name + |
||
| 653 | " (point_id, utc_date_time, actual_value) VALUES (%s, %s, %s)") |
||
| 654 | |||
| 655 | while len(normalized_values) > 0: |
||
| 656 | insert_100 = normalized_values[:100] |
||
| 657 | normalized_values = normalized_values[100:] |
||
| 658 | |||
| 659 | try: |
||
| 660 | # Security: Use executemany with parameterized query |
||
| 661 | values = [] |
||
| 662 | for meta_data in insert_100: |
||
| 663 | values.append(( |
||
| 664 | virtual_point['id'], |
||
| 665 | meta_data['utc_date_time'], |
||
| 666 | meta_data['actual_value'] |
||
| 667 | )) |
||
| 668 | if meta_data['utc_date_time'] > latest_meta_data['utc_date_time']: |
||
| 669 | latest_meta_data = meta_data |
||
| 670 | |||
| 671 | cursor_historical_db.executemany(insert_query, values) |
||
| 672 | cnx_historical_db.commit() |
||
| 673 | except Exception as e: |
||
| 674 | if cursor_historical_db: |
||
| 675 | cursor_historical_db.close() |
||
| 676 | if cnx_historical_db: |
||
| 677 | cnx_historical_db.close() |
||
| 678 | # Return generic error message to avoid information disclosure |
||
| 679 | return f"Error saving calculated values to database for virtual point '{virtual_point['name']}': {type(e).__name__}" |
||
| 680 | |||
| 681 | try: |
||
| 682 | # Security: Use parameterized query for delete operation |
||
| 683 | delete_query = "DELETE FROM " + table_name + "_latest WHERE point_id = %s" |
||
| 684 | cursor_historical_db.execute(delete_query, (virtual_point['id'],)) |
||
| 685 | cnx_historical_db.commit() |
||
| 686 | |||
| 687 | # Security: Use parameterized query for insert operation |
||
| 688 | insert_latest_query = ("INSERT INTO " + table_name + "_latest " + |
||
| 689 | "(point_id, utc_date_time, actual_value) VALUES (%s, %s, %s)") |
||
| 690 | cursor_historical_db.execute(insert_latest_query, ( |
||
| 691 | virtual_point['id'], |
||
| 692 | latest_meta_data['utc_date_time'], |
||
| 693 | latest_meta_data['actual_value'] |
||
| 694 | )) |
||
| 695 | cnx_historical_db.commit() |
||
| 696 | except Exception as e: |
||
| 697 | if cursor_historical_db: |
||
| 698 | cursor_historical_db.close() |
||
| 699 | if cnx_historical_db: |
||
| 700 | cnx_historical_db.close() |
||
| 701 | # Return generic error message to avoid information disclosure |
||
| 702 | return f"Error updating latest value in database for virtual point '{virtual_point['name']}': {type(e).__name__}" |
||
| 703 | |||
| 704 | if cursor_historical_db: |
||
| 705 | cursor_historical_db.close() |
||
| 706 | if cnx_historical_db: |
||
| 707 | cnx_historical_db.close() |
||
| 708 | |||
| 709 | return None |
||
| 710 |