| Conditions | 97 |
| Total Lines | 391 |
| Code Lines | 256 |
| Lines | 0 |
| Ratio | 0 % |
| Changes | 0 | ||
Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.
For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.
Commonly applied refactorings include:
If many parameters/temporary variables are present:
Complex classes like virtualpoint.worker() often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.
Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.
| 1 | """ |
||
| 319 | def worker(virtual_point): |
||
| 320 | """ |
||
| 321 | Worker function to process a single virtual point's calculation. |
||
| 322 | |||
| 323 | This function processes one virtual point at a time, evaluating its mathematical |
||
| 324 | expression using data from dependent points and storing the calculated results. |
||
| 325 | |||
| 326 | Args: |
||
| 327 | virtual_point: Dictionary containing virtual point configuration (id, name, object_type, address, etc.) |
||
| 328 | |||
| 329 | Returns: |
||
| 330 | None on success, error string on failure |
||
| 331 | """ |
||
| 332 | cnx_historical_db = None |
||
| 333 | cursor_historical_db = None |
||
| 334 | |||
| 335 | # Connect to historical database to check existing processed data |
||
| 336 | try: |
||
| 337 | cnx_historical_db = mysql.connector.connect(**config.myems_historical_db) |
||
| 338 | cursor_historical_db = cnx_historical_db.cursor() |
||
| 339 | except Exception as e: |
||
| 340 | if cursor_historical_db: |
||
| 341 | cursor_historical_db.close() |
||
| 342 | if cnx_historical_db: |
||
| 343 | cnx_historical_db.close() |
||
| 344 | # Return generic error message to avoid information disclosure |
||
| 345 | return f"Error connecting to historical database for virtual point '{virtual_point['name']}': {type(e).__name__}" |
||
| 346 | |||
| 347 | print("Start to process virtual point: " + "'" + virtual_point['name'] + "'") |
||
| 348 | |||
| 349 | #################################################################################################################### |
||
| 350 | # Step 1: Get start datetime and end datetime for processing |
||
| 351 | #################################################################################################################### |
||
| 352 | # Determine the appropriate table based on virtual point object type |
||
| 353 | if virtual_point['object_type'] == 'ANALOG_VALUE': |
||
| 354 | table_name = "tbl_analog_value" |
||
| 355 | elif virtual_point['object_type'] == 'ENERGY_VALUE': |
||
| 356 | table_name = "tbl_energy_value" |
||
| 357 | else: |
||
| 358 | if cursor_historical_db: |
||
| 359 | cursor_historical_db.close() |
||
| 360 | if cnx_historical_db: |
||
| 361 | cnx_historical_db.close() |
||
| 362 | return "variable point type should not be DIGITAL_VALUE " + " for '" + virtual_point['name'] + "'" |
||
| 363 | |||
| 364 | try: |
||
| 365 | query = (" SELECT MAX(utc_date_time) " |
||
| 366 | " FROM " + table_name + |
||
| 367 | " WHERE point_id = %s ") |
||
| 368 | cursor_historical_db.execute(query, (virtual_point['id'],)) |
||
| 369 | row = cursor_historical_db.fetchone() |
||
| 370 | except Exception as e: |
||
| 371 | if cursor_historical_db: |
||
| 372 | cursor_historical_db.close() |
||
| 373 | if cnx_historical_db: |
||
| 374 | cnx_historical_db.close() |
||
| 375 | # Return generic error message to avoid information disclosure |
||
| 376 | return f"Error querying historical database for virtual point '{virtual_point['name']}': {type(e).__name__}" |
||
| 377 | |||
| 378 | start_datetime_utc = datetime.strptime(config.start_datetime_utc, '%Y-%m-%d %H:%M:%S').replace(tzinfo=None) |
||
| 379 | |||
| 380 | if row is not None and len(row) > 0 and isinstance(row[0], datetime): |
||
| 381 | start_datetime_utc = row[0].replace(tzinfo=None) |
||
| 382 | |||
| 383 | end_datetime_utc = datetime.utcnow().replace(tzinfo=None) |
||
| 384 | |||
| 385 | if end_datetime_utc <= start_datetime_utc: |
||
| 386 | if cursor_historical_db: |
||
| 387 | cursor_historical_db.close() |
||
| 388 | if cnx_historical_db: |
||
| 389 | cnx_historical_db.close() |
||
| 390 | return "it isn't time to calculate" + " for '" + virtual_point['name'] + "'" |
||
| 391 | |||
| 392 | print("start_datetime_utc: " + start_datetime_utc.isoformat()[0:19] |
||
| 393 | + "end_datetime_utc: " + end_datetime_utc.isoformat()[0:19]) |
||
| 394 | |||
| 395 | ############################################################################################################ |
||
| 396 | # Step 2: parse the expression and get all points in substitutions |
||
| 397 | ############################################################################################################ |
||
| 398 | point_list = list() |
||
| 399 | expression = None |
||
| 400 | substitutions = None |
||
| 401 | try: |
||
| 402 | ######################################################################################################## |
||
| 403 | # parse the expression and get all points in substitutions |
||
| 404 | ######################################################################################################## |
||
| 405 | address = json.loads(virtual_point['address']) |
||
| 406 | # algebraic expression example: '{"expression": "x1-x2", "substitutions": {"x1":1,"x2":2}}' |
||
| 407 | # piecewise function example: '{"expression":"(1,x<200 ), (2,x>=500), (0,True)", "substitutions":{"x":101}}' |
||
| 408 | if 'expression' not in address.keys() \ |
||
| 409 | or 'substitutions' not in address.keys() \ |
||
| 410 | or len(address['expression']) == 0 \ |
||
| 411 | or len(address['substitutions']) == 0: |
||
| 412 | if cursor_historical_db: |
||
| 413 | cursor_historical_db.close() |
||
| 414 | if cnx_historical_db: |
||
| 415 | cnx_historical_db.close() |
||
| 416 | return "Error in step 2.1 of virtual point worker for '" + virtual_point['name'] + "'" |
||
| 417 | |||
| 418 | expression = address['expression'] |
||
| 419 | substitutions = address['substitutions'] |
||
| 420 | |||
| 421 | # Security: Validate expression and substitutions |
||
| 422 | try: |
||
| 423 | validate_expression_safe(expression) |
||
| 424 | except ValueError as e: |
||
| 425 | if cursor_historical_db: |
||
| 426 | cursor_historical_db.close() |
||
| 427 | if cnx_historical_db: |
||
| 428 | cnx_historical_db.close() |
||
| 429 | return f"Error in step 2.1.1: Invalid expression for '{virtual_point['name']}': {str(e)}" |
||
| 430 | |||
| 431 | # Security: Validate substitutions count |
||
| 432 | if len(substitutions) > MAX_SUBSTITUTIONS: |
||
| 433 | if cursor_historical_db: |
||
| 434 | cursor_historical_db.close() |
||
| 435 | if cnx_historical_db: |
||
| 436 | cnx_historical_db.close() |
||
| 437 | return f"Error in step 2.1.2: Too many substitutions ({len(substitutions)}) for '{virtual_point['name']}'" |
||
| 438 | |||
| 439 | # Security: Validate variable names and point IDs |
||
| 440 | for variable_name, point_id in substitutions.items(): |
||
| 441 | try: |
||
| 442 | validate_variable_name(variable_name) |
||
| 443 | validate_point_id(point_id) |
||
| 444 | except ValueError as e: |
||
| 445 | if cursor_historical_db: |
||
| 446 | cursor_historical_db.close() |
||
| 447 | if cnx_historical_db: |
||
| 448 | cnx_historical_db.close() |
||
| 449 | return f"Error in step 2.1.3: Invalid variable or point_id for '{virtual_point['name']}': {str(e)}" |
||
| 450 | point_list.append({"variable_name": variable_name, "point_id": point_id}) |
||
| 451 | except json.JSONDecodeError as e: |
||
| 452 | if cursor_historical_db: |
||
| 453 | cursor_historical_db.close() |
||
| 454 | if cnx_historical_db: |
||
| 455 | cnx_historical_db.close() |
||
| 456 | return "Error in step 2.2: Invalid JSON in address for '" + virtual_point['name'] + "'" |
||
| 457 | except Exception as e: |
||
| 458 | if cursor_historical_db: |
||
| 459 | cursor_historical_db.close() |
||
| 460 | if cnx_historical_db: |
||
| 461 | cnx_historical_db.close() |
||
| 462 | return "Error in step 2.2 of virtual point worker " + str(e) + " for '" + virtual_point['name'] + "'" |
||
| 463 | |||
| 464 | ############################################################################################################ |
||
| 465 | # Step 3: query points type from system database |
||
| 466 | ############################################################################################################ |
||
| 467 | print("getting points type ") |
||
| 468 | cnx_system_db = None |
||
| 469 | cursor_system_db = None |
||
| 470 | try: |
||
| 471 | cnx_system_db = mysql.connector.connect(**config.myems_system_db) |
||
| 472 | cursor_system_db = cnx_system_db.cursor() |
||
| 473 | except Exception as e: |
||
| 474 | if cursor_system_db: |
||
| 475 | cursor_system_db.close() |
||
| 476 | if cnx_system_db: |
||
| 477 | cnx_system_db.close() |
||
| 478 | print("Error in step 3 of virtual point worker " + str(e)) |
||
| 479 | # Return generic error message to avoid information disclosure |
||
| 480 | return f"Error connecting to system database for virtual point '{virtual_point['name']}': {type(e).__name__}" |
||
| 481 | |||
| 482 | print("Connected to MyEMS System Database") |
||
| 483 | |||
| 484 | all_point_dict = dict() |
||
| 485 | try: |
||
| 486 | cursor_system_db.execute(" SELECT id, object_type " |
||
| 487 | " FROM tbl_points ") |
||
| 488 | rows_points = cursor_system_db.fetchall() |
||
| 489 | |||
| 490 | if rows_points is None or len(rows_points) == 0: |
||
| 491 | return f"Error: No points found in system database for virtual point '{virtual_point['name']}'" |
||
| 492 | |||
| 493 | for row in rows_points: |
||
| 494 | all_point_dict[row[0]] = row[1] |
||
| 495 | except Exception as e: |
||
| 496 | # Return generic error message to avoid information disclosure |
||
| 497 | return f"Error querying points from system database for virtual point '{virtual_point['name']}': {type(e).__name__}" |
||
| 498 | finally: |
||
| 499 | if cursor_system_db: |
||
| 500 | cursor_system_db.close() |
||
| 501 | if cnx_system_db: |
||
| 502 | cnx_system_db.close() |
||
| 503 | ############################################################################################################ |
||
| 504 | # Step 4: query points value from historical database |
||
| 505 | ############################################################################################################ |
||
| 506 | |||
| 507 | print("getting point values ") |
||
| 508 | point_values_dict = dict() |
||
| 509 | if point_list is not None and len(point_list) > 0: |
||
| 510 | try: |
||
| 511 | for point in point_list: |
||
| 512 | point_object_type = all_point_dict.get(point['point_id']) |
||
| 513 | if point_object_type is None: |
||
| 514 | return "variable point type should not be None " + " for '" + virtual_point['name'] + "'" |
||
| 515 | if point_object_type == 'ANALOG_VALUE': |
||
| 516 | query = (" SELECT utc_date_time, actual_value " |
||
| 517 | " FROM tbl_analog_value " |
||
| 518 | " WHERE point_id = %s AND utc_date_time > %s AND utc_date_time < %s " |
||
| 519 | " ORDER BY utc_date_time ") |
||
| 520 | cursor_historical_db.execute(query, (point['point_id'], start_datetime_utc, end_datetime_utc,)) |
||
| 521 | rows = cursor_historical_db.fetchall() |
||
| 522 | if rows is not None and len(rows) > 0: |
||
| 523 | point_values_dict[point['point_id']] = dict() |
||
| 524 | for row in rows: |
||
| 525 | point_values_dict[point['point_id']][row[0]] = row[1] |
||
| 526 | elif point_object_type == 'ENERGY_VALUE': |
||
| 527 | query = (" SELECT utc_date_time, actual_value " |
||
| 528 | " FROM tbl_energy_value " |
||
| 529 | " WHERE point_id = %s AND utc_date_time > %s AND utc_date_time < %s " |
||
| 530 | " ORDER BY utc_date_time ") |
||
| 531 | cursor_historical_db.execute(query, (point['point_id'], start_datetime_utc, end_datetime_utc,)) |
||
| 532 | rows = cursor_historical_db.fetchall() |
||
| 533 | if rows is not None and len(rows) > 0: |
||
| 534 | point_values_dict[point['point_id']] = dict() |
||
| 535 | for row in rows: |
||
| 536 | point_values_dict[point['point_id']][row[0]] = row[1] |
||
| 537 | else: |
||
| 538 | point_values_dict[point['point_id']] = None |
||
| 539 | else: |
||
| 540 | # point type should not be DIGITAL_VALUE |
||
| 541 | return "variable point type should not be DIGITAL_VALUE " + " for '" + virtual_point['name'] + "'" |
||
| 542 | except Exception as e: |
||
| 543 | if cursor_historical_db: |
||
| 544 | cursor_historical_db.close() |
||
| 545 | if cnx_historical_db: |
||
| 546 | cnx_historical_db.close() |
||
| 547 | # Return generic error message to avoid information disclosure |
||
| 548 | return f"Error querying point values from historical database for virtual point '{virtual_point['name']}': {type(e).__name__}" |
||
| 549 | |||
| 550 | ############################################################################################################ |
||
| 551 | # Step 5: evaluate the equation with points values |
||
| 552 | ############################################################################################################ |
||
| 553 | |||
| 554 | print("getting date time set for all points") |
||
| 555 | utc_date_time_set = set() |
||
| 556 | if point_values_dict is not None and len(point_values_dict) > 0: |
||
| 557 | for point_id, point_values in point_values_dict.items(): |
||
| 558 | if point_values is not None and len(point_values) > 0: |
||
| 559 | utc_date_time_set = utc_date_time_set.union(point_values.keys()) |
||
| 560 | |||
| 561 | # Security: Resource limit check to prevent DoS attacks |
||
| 562 | if len(utc_date_time_set) > MAX_DATETIME_POINTS: |
||
| 563 | if cursor_historical_db: |
||
| 564 | cursor_historical_db.close() |
||
| 565 | if cnx_historical_db: |
||
| 566 | cnx_historical_db.close() |
||
| 567 | return f"Error: Too many datetime points to process ({len(utc_date_time_set)}) for '{virtual_point['name']}'. Maximum allowed: {MAX_DATETIME_POINTS}" |
||
| 568 | |||
| 569 | print("evaluating the equation with SymPy") |
||
| 570 | normalized_values = list() |
||
| 571 | |||
| 572 | ############################################################################################################ |
||
| 573 | # Converting Strings to SymPy Expressions |
||
| 574 | # The sympify function(that's sympify, not to be confused with simplify) can be used to |
||
| 575 | # convert strings into SymPy expressions. |
||
| 576 | # SECURITY FIX: Removed eval() and replaced with safe SymPy parsing |
||
| 577 | ############################################################################################################ |
||
| 578 | try: |
||
| 579 | # Security: Use safe parsing instead of eval() |
||
| 580 | if re.search(',', expression): |
||
| 581 | # Piecewise function: parse safely without eval() |
||
| 582 | piecewise_parts = parse_piecewise_safe(expression, substitutions) |
||
| 583 | expr = Piecewise(*piecewise_parts) |
||
| 584 | print("the expression will be evaluated as piecewise function: " + str(expr)) |
||
| 585 | else: |
||
| 586 | # Algebraic expression: use sympify (safe) |
||
| 587 | expr = sympify(expression) |
||
| 588 | print("the expression will be evaluated as algebraic expression: " + str(expr)) |
||
| 589 | |||
| 590 | for utc_date_time in utc_date_time_set: |
||
| 591 | meta_data = dict() |
||
| 592 | meta_data['utc_date_time'] = utc_date_time |
||
| 593 | |||
| 594 | #################################################################################################### |
||
| 595 | # create a dictionary of Symbol: point pairs |
||
| 596 | #################################################################################################### |
||
| 597 | |||
| 598 | subs = dict() |
||
| 599 | |||
| 600 | #################################################################################################### |
||
| 601 | # Evaluating the expression at current_datetime_utc |
||
| 602 | #################################################################################################### |
||
| 603 | |||
| 604 | if point_list is not None and len(point_list) > 0: |
||
| 605 | for point in point_list: |
||
| 606 | actual_value = point_values_dict[point['point_id']].get(utc_date_time, None) |
||
| 607 | if actual_value is None: |
||
| 608 | break |
||
| 609 | subs[point['variable_name']] = actual_value |
||
| 610 | |||
| 611 | if len(subs) != len(point_list): |
||
| 612 | continue |
||
| 613 | |||
| 614 | #################################################################################################### |
||
| 615 | # To numerically evaluate an expression with a Symbol at a point, |
||
| 616 | # we might use subs followed by evalf, |
||
| 617 | # but it is more efficient and numerically stable to pass the substitution to evalf |
||
| 618 | # using the subs flag, which takes a dictionary of Symbol: point pairs. |
||
| 619 | #################################################################################################### |
||
| 620 | # Note: expr is already a Piecewise object for piecewise functions, or a SymPy expression for algebraic |
||
| 621 | if re.search(',', expression): |
||
| 622 | # expr is already a Piecewise object |
||
| 623 | meta_data['actual_value'] = Decimal(str(expr.subs(subs))) |
||
| 624 | normalized_values.append(meta_data) |
||
| 625 | else: |
||
| 626 | # expr is a SymPy expression |
||
| 627 | meta_data['actual_value'] = Decimal(str(expr.evalf(subs=subs))) |
||
| 628 | normalized_values.append(meta_data) |
||
| 629 | except Exception as e: |
||
| 630 | if cursor_historical_db: |
||
| 631 | cursor_historical_db.close() |
||
| 632 | if cnx_historical_db: |
||
| 633 | cnx_historical_db.close() |
||
| 634 | # Return generic error message to avoid information disclosure |
||
| 635 | # Detailed error information should be logged separately if logger is available |
||
| 636 | return f"Error evaluating expression for virtual point '{virtual_point['name']}': {type(e).__name__}" |
||
| 637 | |||
| 638 | print("saving virtual points values to historical database") |
||
| 639 | |||
| 640 | if len(normalized_values) > 0: |
||
| 641 | latest_meta_data = normalized_values[0] |
||
| 642 | |||
| 643 | # Security: Use parameterized queries to prevent SQL injection |
||
| 644 | # Validate table_name is from whitelist (already validated via object_type) |
||
| 645 | if table_name not in ['tbl_analog_value', 'tbl_energy_value']: |
||
| 646 | if cursor_historical_db: |
||
| 647 | cursor_historical_db.close() |
||
| 648 | if cnx_historical_db: |
||
| 649 | cnx_historical_db.close() |
||
| 650 | return f"Error: Invalid table name '{table_name}' for '{virtual_point['name']}'" |
||
| 651 | |||
| 652 | insert_query = ("INSERT INTO " + table_name + |
||
| 653 | " (point_id, utc_date_time, actual_value) VALUES (%s, %s, %s)") |
||
| 654 | |||
| 655 | while len(normalized_values) > 0: |
||
| 656 | insert_100 = normalized_values[:100] |
||
| 657 | normalized_values = normalized_values[100:] |
||
| 658 | |||
| 659 | try: |
||
| 660 | # Security: Use executemany with parameterized query |
||
| 661 | values = [] |
||
| 662 | for meta_data in insert_100: |
||
| 663 | values.append(( |
||
| 664 | virtual_point['id'], |
||
| 665 | meta_data['utc_date_time'], |
||
| 666 | meta_data['actual_value'] |
||
| 667 | )) |
||
| 668 | if meta_data['utc_date_time'] > latest_meta_data['utc_date_time']: |
||
| 669 | latest_meta_data = meta_data |
||
| 670 | |||
| 671 | cursor_historical_db.executemany(insert_query, values) |
||
| 672 | cnx_historical_db.commit() |
||
| 673 | except Exception as e: |
||
| 674 | if cursor_historical_db: |
||
| 675 | cursor_historical_db.close() |
||
| 676 | if cnx_historical_db: |
||
| 677 | cnx_historical_db.close() |
||
| 678 | # Return generic error message to avoid information disclosure |
||
| 679 | return f"Error saving calculated values to database for virtual point '{virtual_point['name']}': {type(e).__name__}" |
||
| 680 | |||
| 681 | try: |
||
| 682 | # Security: Use parameterized query for delete operation |
||
| 683 | delete_query = "DELETE FROM " + table_name + "_latest WHERE point_id = %s" |
||
| 684 | cursor_historical_db.execute(delete_query, (virtual_point['id'],)) |
||
| 685 | cnx_historical_db.commit() |
||
| 686 | |||
| 687 | # Security: Use parameterized query for insert operation |
||
| 688 | insert_latest_query = ("INSERT INTO " + table_name + "_latest " + |
||
| 689 | "(point_id, utc_date_time, actual_value) VALUES (%s, %s, %s)") |
||
| 690 | cursor_historical_db.execute(insert_latest_query, ( |
||
| 691 | virtual_point['id'], |
||
| 692 | latest_meta_data['utc_date_time'], |
||
| 693 | latest_meta_data['actual_value'] |
||
| 694 | )) |
||
| 695 | cnx_historical_db.commit() |
||
| 696 | except Exception as e: |
||
| 697 | if cursor_historical_db: |
||
| 698 | cursor_historical_db.close() |
||
| 699 | if cnx_historical_db: |
||
| 700 | cnx_historical_db.close() |
||
| 701 | # Return generic error message to avoid information disclosure |
||
| 702 | return f"Error updating latest value in database for virtual point '{virtual_point['name']}': {type(e).__name__}" |
||
| 703 | |||
| 704 | if cursor_historical_db: |
||
| 705 | cursor_historical_db.close() |
||
| 706 | if cnx_historical_db: |
||
| 707 | cnx_historical_db.close() |
||
| 708 | |||
| 709 | return None |
||
| 710 |