virtualpoint   F
last analyzed

Complexity

Total Complexity 143

Size/Duplication

Total Lines 695
Duplicated Lines 10.36 %

Importance

Changes 0
Metric Value
wmc 143
eloc 396
dl 72
loc 695
rs 2
c 0
b 0
f 0

6 Functions

Rating   Name   Duplication   Size   Complexity  
B validate_expression_safe() 0 33 5
F calculate() 72 90 14
F parse_piecewise_safe() 0 89 20
A validate_point_id() 0 18 4
F worker() 0 376 97
A validate_variable_name() 0 14 3

How to fix   Duplicated Code    Complexity   

Duplicated Code

Duplicate code is one of the most pungent code smells. A rule that is often used is to re-structure code once it is duplicated in three or more places.

Common duplication problems, and corresponding solutions are:

Complexity

 Tip:   Before tackling complexity, make sure that you eliminate any duplication first. This often can reduce the size of classes significantly.

Complex classes like virtualpoint often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
"""
2
MyEMS Normalization Service - Virtual Point Processing Module
3
4
This module handles the calculation of virtual point values using mathematical expressions.
5
Virtual points are computed points that derive their values from combinations of other points
6
(analog points, digital points, and other virtual points) using algebraic equations and piecewise functions.
7
8
The virtual point processing performs the following functions:
9
1. Retrieves all virtual points and their mathematical expressions from the system database
10
2. Uses multiprocessing to process virtual points in parallel for efficiency
11
3. Parses mathematical expressions and identifies dependent points
12
4. Retrieves latest values from dependent points in the historical database
13
5. Evaluates mathematical expressions using SymPy library
14
6. Stores calculated virtual point values in the historical database
15
16
Key features:
17
- Supports complex mathematical expressions with multiple variables
18
- Handles different point types (analog, digital, virtual) in expressions
19
- Uses SymPy for robust mathematical expression evaluation including piecewise functions
20
- Maintains data integrity through comprehensive error handling
21
- Processes virtual points continuously to provide real-time calculated values
22
"""
23
24
import json
25
import random
26
import re
27
import time
28
from datetime import datetime
29
from decimal import Decimal
30
from multiprocessing import Pool
31
import mysql.connector
32
from sympy import sympify, Piecewise, symbols, parse_expr
33
import config
34
35
MAX_DATETIME_POINTS = 100000
36
37
# Maximum expression length to prevent DoS attacks
38
MAX_EXPRESSION_LENGTH = 10000
39
40
# Maximum number of substitutions to prevent DoS attacks
41
MAX_SUBSTITUTIONS = 100
42
43
44
########################################################################################################################
45
# Validation Functions
46
########################################################################################################################
47
48
def validate_variable_name(name):
49
    """
50
    Validate variable name to ensure it follows safe identifier rules.
51
    
52
    Args:
53
        name: Variable name to validate
54
        
55
    Raises:
56
        ValueError: If variable name is invalid
57
    """
58
    if not isinstance(name, str):
59
        raise ValueError(f"Variable name must be a string, got {type(name)}")
60
    if not re.match(r'^[a-zA-Z_][a-zA-Z0-9_]*$', name):
61
        raise ValueError(f"Invalid variable name: {name}. Must start with letter or underscore and contain only "
62
                         f"alphanumeric characters and underscores.")
63
64
65
def validate_point_id(point_id):
66
    """
67
    Validate point_id to ensure it is a positive integer.
68
    
69
    Args:
70
        point_id: Point ID to validate
71
        
72
    Raises:
73
        ValueError: If point_id is invalid
74
    """
75
    if not isinstance(point_id, int):
76
        # Try to convert if it's a numeric string
77
        try:
78
            point_id = int(point_id)
79
        except (ValueError, TypeError):
80
            raise ValueError(f"Invalid point_id: {point_id}. Must be an integer.")
81
    if point_id <= 0:
82
        raise ValueError(f"Invalid point_id: {point_id}. Must be a positive integer.")
83
84
85
def validate_expression_safe(expression):
86
    """
87
    Validate expression to ensure it doesn't contain dangerous patterns.
88
    
89
    Args:
90
        expression: Expression string to validate
91
        
92
    Raises:
93
        ValueError: If expression contains dangerous patterns
94
    """
95
    if not isinstance(expression, str):
96
        raise ValueError(f"Expression must be a string, got {type(expression)}")
97
    
98
    if len(expression) > MAX_EXPRESSION_LENGTH:
99
        raise ValueError(f"Expression too long: {len(expression)} characters. Maximum allowed: {MAX_EXPRESSION_LENGTH}")
100
    
101
    # Check for dangerous patterns that could lead to code execution
102
    dangerous_patterns = [
103
        (r'__\w+__', 'Double underscore pattern (magic methods)'),
104
        (r'import\s+', 'Import statement'),
105
        (r'exec\s*\(', 'exec() call'),
106
        (r'eval\s*\(', 'eval() call'),
107
        (r'open\s*\(', 'File open operation'),
108
        (r'file\s*\(', 'File operation'),
109
        (r'__import__', 'Direct __import__ call'),
110
        (r'compile\s*\(', 'compile() call'),
111
        (r'globals\s*\(', 'globals() call'),
112
        (r'locals\s*\(', 'locals() call'),
113
    ]
114
    
115
    for pattern, description in dangerous_patterns:
116
        if re.search(pattern, expression, re.IGNORECASE):
117
            raise ValueError(f"Dangerous pattern detected in expression: {description}")
118
119
120
def parse_piecewise_safe(expression, substitutions):
121
    """
122
    Safely parse a piecewise function expression without using eval().
123
    
124
    Piecewise format: "(value1, condition1), (value2, condition2), (default_value, True)"
125
    
126
    Args:
127
        expression: Piecewise function expression string
128
        substitutions: Dictionary mapping variable names to point IDs
129
        
130
    Returns:
131
        List of tuples (value_expr, condition_expr) for Piecewise construction
132
        
133
    Raises:
134
        ValueError: If expression cannot be safely parsed
135
    """
136
    # Validate expression first
137
    validate_expression_safe(expression)
138
    
139
    # Parse the piecewise expression manually
140
    # Format: "(value, condition), (value, condition), ..."
141
    try:
142
        # Use regex to find all tuples: (value, condition)
143
        # Pattern matches: ( ... , ... ) where we need to find the comma that separates value from condition
144
        # This is complex because conditions may contain parentheses and commas
145
        # Strategy: Find matching parentheses pairs and split on commas outside of them
146
        
147
        piecewise_parts = []
148
        expr = expression.strip()
149
        
150
        # Remove outer parentheses if present
151
        if expr.startswith('(') and expr.endswith(')'):
152
            # Check if it's a single tuple or multiple tuples
153
            # Count opening and closing parentheses
154
            depth = 0
155
            for i, char in enumerate(expr):
156
                if char == '(':
157
                    depth += 1
158
                elif char == ')':
159
                    depth -= 1
160
                    if depth == 0 and i < len(expr) - 1:
161
                        # Found end of first tuple, there are multiple tuples
162
                        break
163
            else:
164
                # Single tuple, remove outer parentheses
165
                expr = expr[1:-1]
166
        
167
        # Split by "), (" pattern to separate tuples
168
        # This pattern appears between tuples
169
        parts = re.split(r'\)\s*,\s*\(', expr)
170
        
171
        # Process each tuple part
172
        for part in parts:
173
            # Remove leading/trailing parentheses and whitespace
174
            part = part.strip().lstrip('(').rstrip(')')
175
            
176
            # Find the comma that separates value from condition
177
            # We need the rightmost comma that's not inside nested parentheses
178
            depth = 0
179
            last_comma_pos = -1
180
            for i, char in enumerate(part):
181
                if char == '(':
182
                    depth += 1
183
                elif char == ')':
184
                    depth -= 1
185
                elif char == ',' and depth == 0:
186
                    last_comma_pos = i
187
            
188
            if last_comma_pos == -1:
189
                raise ValueError(f"Invalid piecewise tuple format: {part}. Missing comma separator.")
190
            
191
            value_str = part[:last_comma_pos].strip()
192
            condition_str = part[last_comma_pos+1:].strip()
193
            
194
            if not value_str or not condition_str:
195
                raise ValueError(f"Invalid piecewise tuple format: {part}. Empty value or condition.")
196
            
197
            # Parse value and condition using sympify (safe)
198
            value_expr = sympify(value_str)
199
            condition_expr = sympify(condition_str)
200
            
201
            piecewise_parts.append((value_expr, condition_expr))
202
        
203
        if not piecewise_parts:
204
            raise ValueError("No valid piecewise parts found in expression")
205
        
206
        return piecewise_parts
207
    except Exception as e:
208
        raise ValueError(f"Failed to parse piecewise expression: {str(e)}")
209
210
211
########################################################################################################################
212
# Virtual Point Calculation Procedures:
213
# Step 1: Query all virtual points and their mathematical expressions from system database
214
# Step 2: Create multiprocessing pool to call worker processes in parallel
215
########################################################################################################################
216
217 View Code Duplication
def calculate(logger):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
218
    """
219
    Main function for virtual point calculation using mathematical expressions.
220
221
    This function runs continuously, retrieving all virtual points from the system database
222
    and processing them in parallel to calculate virtual point values using their
223
    configured mathematical expressions.
224
225
    Args:
226
        logger: Logger instance for recording calculation activities and errors
227
    """
228
    while True:
229
        # The outermost while loop to reconnect to server if there is a connection error
230
        cnx_system_db = None
231
        cursor_system_db = None
232
233
        # Connect to system database to retrieve virtual point configuration
234
        try:
235
            cnx_system_db = mysql.connector.connect(**config.myems_system_db)
236
            cursor_system_db = cnx_system_db.cursor()
237
        except Exception as e:
238
            logger.error("Error in step 0 of virtual point calculate " + str(e))
239
            if cursor_system_db:
240
                cursor_system_db.close()
241
            if cnx_system_db:
242
                cnx_system_db.close()
243
            # Sleep and continue the outer loop to reconnect the database
244
            time.sleep(60)
245
            continue
246
247
        print("Connected to MyEMS System Database")
248
249
        # Retrieve all virtual points with their configuration data
250
        virtual_point_list = list()
251
        try:
252
            cursor_system_db.execute(" SELECT id, name, data_source_id, object_type, high_limit, low_limit, address "
253
                                     " FROM tbl_points "
254
                                     " WHERE is_virtual = 1 ")
255
            rows_virtual_points = cursor_system_db.fetchall()
256
257
            # Check if virtual points were found
258
            if rows_virtual_points is None or len(rows_virtual_points) == 0:
259
                # Sleep several minutes and continue the outer loop to reconnect the database
260
                time.sleep(60)
261
                continue
262
263
            # Build virtual point list with configuration data
264
            for row in rows_virtual_points:
265
                meta_result = {"id": row[0],
266
                               "name": row[1],
267
                               "data_source_id": row[2],
268
                               "object_type": row[3],
269
                               "high_limit": row[4],
270
                               "low_limit": row[5],
271
                               "address": row[6]}
272
                virtual_point_list.append(meta_result)
273
274
        except Exception as e:
275
            logger.error("Error in step 1 of virtual point calculate " + str(e))
276
            # sleep and continue the outer loop to reconnect the database
277
            time.sleep(60)
278
            continue
279
        finally:
280
            if cursor_system_db:
281
                cursor_system_db.close()
282
            if cnx_system_db:
283
                cnx_system_db.close()
284
285
        # Shuffle the virtual point list for randomly calculating point values
286
        # This helps distribute processing load evenly across time
287
        random.shuffle(virtual_point_list)
288
289
        print("Got all virtual points in MyEMS System Database")
290
        ################################################################################################################
291
        # Step 2: Create multiprocessing pool to call worker processes in parallel
292
        ################################################################################################################
293
        # Create process pool with configured size for parallel processing
294
        p = Pool(processes=config.pool_size)
295
        error_list = p.map(worker, virtual_point_list)
296
        p.close()
297
        p.join()
298
299
        # Log any errors from worker processes
300
        for error in error_list:
301
            if error is not None and len(error) > 0:
302
                logger.error(error)
303
304
        print("go to sleep ")
305
        time.sleep(60)  # Sleep for 1 minute before next processing cycle
306
        print("wake from sleep, and continue to work")
307
308
309
########################################################################################################################
310
# Worker Process Procedures for Individual Virtual Point Processing:
311
# Step 1: Get start datetime and end datetime for processing
312
# Step 2: Parse the expression and get all points in substitutions
313
# Step 3: Query points type from system database
314
# Step 4: Query points value from historical database
315
# Step 5: Evaluate the equation with points values and store results
316
# Returns the error string for logging or returns None on success
317
########################################################################################################################
318
319
def worker(virtual_point):
320
    """
321
    Worker function to process a single virtual point's calculation.
322
323
    This function processes one virtual point at a time, evaluating its mathematical
324
    expression using data from dependent points and storing the calculated results.
325
326
    Args:
327
        virtual_point: Dictionary containing virtual point configuration (id, name, object_type, address, etc.)
328
329
    Returns:
330
        None on success, error string on failure
331
    """
332
    cnx_historical_db = None
333
    cursor_historical_db = None
334
335
    # Connect to historical database to check existing processed data
336
    try:
337
        cnx_historical_db = mysql.connector.connect(**config.myems_historical_db)
338
        cursor_historical_db = cnx_historical_db.cursor()
339
    except Exception as e:
340
        if cursor_historical_db:
341
            cursor_historical_db.close()
342
        if cnx_historical_db:
343
            cnx_historical_db.close()
344
        # Return generic error message to avoid information disclosure
345
        return f"Error connecting to historical database for virtual point '{virtual_point['name']}': " \
346
               f"{type(e).__name__}"
347
348
    print("Start to process virtual point: " + "'" + virtual_point['name'] + "'")
349
350
    ####################################################################################################################
351
    # Step 1: Get start datetime and end datetime for processing
352
    ####################################################################################################################
353
    # Determine the appropriate table based on virtual point object type
354
    if virtual_point['object_type'] == 'ANALOG_VALUE':
355
        table_name = "tbl_analog_value"
356
    elif virtual_point['object_type'] == 'ENERGY_VALUE':
357
        table_name = "tbl_energy_value"
358
    else:
359
        if cursor_historical_db:
360
            cursor_historical_db.close()
361
        if cnx_historical_db:
362
            cnx_historical_db.close()
363
        return "variable point type should not be DIGITAL_VALUE " + " for '" + virtual_point['name'] + "'"
364
365
    try:
366
        query = (" SELECT MAX(utc_date_time) "
367
                 " FROM " + table_name +
368
                 " WHERE point_id = %s ")
369
        cursor_historical_db.execute(query, (virtual_point['id'],))
370
        row = cursor_historical_db.fetchone()
371
    except Exception as e:
372
        if cursor_historical_db:
373
            cursor_historical_db.close()
374
        if cnx_historical_db:
375
            cnx_historical_db.close()
376
        # Return generic error message to avoid information disclosure
377
        return f"Error querying historical database for virtual point '{virtual_point['name']}': {type(e).__name__}"
378
379
    start_datetime_utc = datetime.strptime(config.start_datetime_utc, '%Y-%m-%d %H:%M:%S').replace(tzinfo=None)
380
381
    if row is not None and len(row) > 0 and isinstance(row[0], datetime):
382
        start_datetime_utc = row[0].replace(tzinfo=None)
383
384
    end_datetime_utc = datetime.utcnow().replace(tzinfo=None)
385
386
    if end_datetime_utc <= start_datetime_utc:
387
        if cursor_historical_db:
388
            cursor_historical_db.close()
389
        if cnx_historical_db:
390
            cnx_historical_db.close()
391
        return "it isn't time to calculate" + " for '" + virtual_point['name'] + "'"
392
393
    print("start_datetime_utc: " + start_datetime_utc.isoformat()[0:19]
394
          + "end_datetime_utc: " + end_datetime_utc.isoformat()[0:19])
395
396
    ############################################################################################################
397
    # Step 2: parse the expression and get all points in substitutions
398
    ############################################################################################################
399
    point_list = list()
400
    expression = None
401
    substitutions = None
402
    try:
403
        ########################################################################################################
404
        # parse the expression and get all points in substitutions
405
        ########################################################################################################
406
        address = json.loads(virtual_point['address'])
407
        # algebraic expression example: '{"expression": "x1-x2", "substitutions": {"x1":1,"x2":2}}'
408
        # piecewise function example: '{"expression":"(1,x<200 ), (2,x>=500), (0,True)", "substitutions":{"x":101}}'
409
        if 'expression' not in address.keys() \
410
                or 'substitutions' not in address.keys() \
411
                or len(address['expression']) == 0 \
412
                or len(address['substitutions']) == 0:
413
            if cursor_historical_db:
414
                cursor_historical_db.close()
415
            if cnx_historical_db:
416
                cnx_historical_db.close()
417
            return "Error in step 2.1 of virtual point worker for '" + virtual_point['name'] + "'"
418
        
419
        expression = address['expression']
420
        substitutions = address['substitutions']
421
422
        try:
423
            validate_expression_safe(expression)
424
        except ValueError as e:
425
            if cursor_historical_db:
426
                cursor_historical_db.close()
427
            if cnx_historical_db:
428
                cnx_historical_db.close()
429
            return f"Error in step 2.1.1: Invalid expression for '{virtual_point['name']}': {str(e)}"
430
431
        if len(substitutions) > MAX_SUBSTITUTIONS:
432
            if cursor_historical_db:
433
                cursor_historical_db.close()
434
            if cnx_historical_db:
435
                cnx_historical_db.close()
436
            return f"Error in step 2.1.2: Too many substitutions ({len(substitutions)}) for '{virtual_point['name']}'"
437
        
438
        # validate variable names and point IDs
439
        for variable_name, point_id in substitutions.items():
440
            try:
441
                validate_variable_name(variable_name)
442
                validate_point_id(point_id)
443
            except ValueError as e:
444
                if cursor_historical_db:
445
                    cursor_historical_db.close()
446
                if cnx_historical_db:
447
                    cnx_historical_db.close()
448
                return f"Error in step 2.1.3: Invalid variable or point_id for '{virtual_point['name']}': {str(e)}"
449
            point_list.append({"variable_name": variable_name, "point_id": point_id})
450
    except json.JSONDecodeError as e:
451
        if cursor_historical_db:
452
            cursor_historical_db.close()
453
        if cnx_historical_db:
454
            cnx_historical_db.close()
455
        return "Error in step 2.2: Invalid JSON in address for '" + virtual_point['name'] + "'"
456
    except Exception as e:
457
        if cursor_historical_db:
458
            cursor_historical_db.close()
459
        if cnx_historical_db:
460
            cnx_historical_db.close()
461
        return "Error in step 2.2 of virtual point worker " + str(e) + " for '" + virtual_point['name'] + "'"
462
463
    ############################################################################################################
464
    # Step 3: query points type from system database
465
    ############################################################################################################
466
    print("getting points type ")
467
    cnx_system_db = None
468
    cursor_system_db = None
469
    try:
470
        cnx_system_db = mysql.connector.connect(**config.myems_system_db)
471
        cursor_system_db = cnx_system_db.cursor()
472
    except Exception as e:
473
        if cursor_system_db:
474
            cursor_system_db.close()
475
        if cnx_system_db:
476
            cnx_system_db.close()
477
        print("Error in step 3 of virtual point worker " + str(e))
478
        return f"Error connecting to system database for virtual point '{virtual_point['name']}': {type(e).__name__}"
479
480
    print("Connected to MyEMS System Database")
481
482
    all_point_dict = dict()
483
    try:
484
        cursor_system_db.execute(" SELECT id, object_type "
485
                                 " FROM tbl_points ")
486
        rows_points = cursor_system_db.fetchall()
487
488
        if rows_points is None or len(rows_points) == 0:
489
            return f"Error: No points found in system database for virtual point '{virtual_point['name']}'"
490
491
        for row in rows_points:
492
            all_point_dict[row[0]] = row[1]
493
    except Exception as e:
494
        return f"Error querying points from system database for virtual point '{virtual_point['name']}': " \
495
               f"{type(e).__name__}"
496
    finally:
497
        if cursor_system_db:
498
            cursor_system_db.close()
499
        if cnx_system_db:
500
            cnx_system_db.close()
501
    ############################################################################################################
502
    # Step 4: query points value from historical database
503
    ############################################################################################################
504
505
    print("getting point values ")
506
    point_values_dict = dict()
507
    if point_list is not None and len(point_list) > 0:
508
        try:
509
            for point in point_list:
510
                point_object_type = all_point_dict.get(point['point_id'])
511
                if point_object_type is None:
512
                    return "variable point type should not be None " + " for '" + virtual_point['name'] + "'"
513
                if point_object_type == 'ANALOG_VALUE':
514
                    query = (" SELECT utc_date_time, actual_value "
515
                             " FROM tbl_analog_value "
516
                             " WHERE point_id = %s AND utc_date_time > %s AND utc_date_time < %s "
517
                             " ORDER BY utc_date_time ")
518
                    cursor_historical_db.execute(query, (point['point_id'], start_datetime_utc, end_datetime_utc,))
519
                    rows = cursor_historical_db.fetchall()
520
                    if rows is not None and len(rows) > 0:
521
                        point_values_dict[point['point_id']] = dict()
522
                        for row in rows:
523
                            point_values_dict[point['point_id']][row[0]] = row[1]
524
                elif point_object_type == 'ENERGY_VALUE':
525
                    query = (" SELECT utc_date_time, actual_value "
526
                             " FROM tbl_energy_value "
527
                             " WHERE point_id = %s AND utc_date_time > %s AND utc_date_time < %s "
528
                             " ORDER BY utc_date_time ")
529
                    cursor_historical_db.execute(query, (point['point_id'], start_datetime_utc, end_datetime_utc,))
530
                    rows = cursor_historical_db.fetchall()
531
                    if rows is not None and len(rows) > 0:
532
                        point_values_dict[point['point_id']] = dict()
533
                        for row in rows:
534
                            point_values_dict[point['point_id']][row[0]] = row[1]
535
                    else:
536
                        point_values_dict[point['point_id']] = None
537
                else:
538
                    # point type should not be DIGITAL_VALUE
539
                    return "variable point type should not be DIGITAL_VALUE " + " for '" + virtual_point['name'] + "'"
540
        except Exception as e:
541
            if cursor_historical_db:
542
                cursor_historical_db.close()
543
            if cnx_historical_db:
544
                cnx_historical_db.close()
545
            return f"Error querying point values from historical database for virtual point " \
546
                   f"'{virtual_point['name']}': {type(e).__name__}"
547
548
    ############################################################################################################
549
    # Step 5: evaluate the equation with points values
550
    ############################################################################################################
551
552
    print("getting date time set for all points")
553
    utc_date_time_set = set()
554
    if point_values_dict is not None and len(point_values_dict) > 0:
555
        for point_id, point_values in point_values_dict.items():
556
            if point_values is not None and len(point_values) > 0:
557
                utc_date_time_set = utc_date_time_set.union(point_values.keys())
558
559
    if len(utc_date_time_set) > MAX_DATETIME_POINTS:
560
        if cursor_historical_db:
561
            cursor_historical_db.close()
562
        if cnx_historical_db:
563
            cnx_historical_db.close()
564
        return f"Error: Too many datetime points to process ({len(utc_date_time_set)}) for '{virtual_point['name']}'." \
565
               f" Maximum allowed: {MAX_DATETIME_POINTS}"
566
567
    print("evaluating the equation with SymPy")
568
    normalized_values = list()
569
570
    ############################################################################################################
571
    # Converting Strings to SymPy Expressions
572
    # The sympify function(that's sympify, not to be confused with simplify) can be used to
573
    # convert strings into SymPy expressions.
574
    ############################################################################################################
575
    try:
576
        if re.search(',', expression):
577
            piecewise_parts = parse_piecewise_safe(expression, substitutions)
578
            expr = Piecewise(*piecewise_parts)
579
            print("the expression will be evaluated as piecewise function: " + str(expr))
580
        else:
581
            expr = sympify(expression)
582
            print("the expression will be evaluated as algebraic expression: " + str(expr))
583
584
        for utc_date_time in utc_date_time_set:
585
            meta_data = dict()
586
            meta_data['utc_date_time'] = utc_date_time
587
588
            ####################################################################################################
589
            # create a dictionary of Symbol: point pairs
590
            ####################################################################################################
591
592
            subs = dict()
593
594
            ####################################################################################################
595
            # Evaluating the expression at current_datetime_utc
596
            ####################################################################################################
597
598
            if point_list is not None and len(point_list) > 0:
599
                for point in point_list:
600
                    actual_value = point_values_dict[point['point_id']].get(utc_date_time, None)
601
                    if actual_value is None:
602
                        break
603
                    subs[point['variable_name']] = actual_value
604
605
            if len(subs) != len(point_list):
606
                continue
607
608
            ####################################################################################################
609
            # To numerically evaluate an expression with a Symbol at a point,
610
            # we might use subs followed by evalf,
611
            # but it is more efficient and numerically stable to pass the substitution to evalf
612
            # using the subs flag, which takes a dictionary of Symbol: point pairs.
613
            ####################################################################################################
614
            # Note: expr is already a Piecewise object for piecewise functions, or a SymPy expression for algebraic
615
            if re.search(',', expression):
616
                # expr is already a Piecewise object
617
                meta_data['actual_value'] = Decimal(str(expr.subs(subs)))
618
                normalized_values.append(meta_data)
619
            else:
620
                # expr is a SymPy expression
621
                meta_data['actual_value'] = Decimal(str(expr.evalf(subs=subs)))
622
                normalized_values.append(meta_data)
623
    except Exception as e:
624
        if cursor_historical_db:
625
            cursor_historical_db.close()
626
        if cnx_historical_db:
627
            cnx_historical_db.close()
628
        return f"Error evaluating expression for virtual point '{virtual_point['name']}': {type(e).__name__}"
629
630
    print("saving virtual points values to historical database")
631
632
    if len(normalized_values) > 0:
633
        latest_meta_data = normalized_values[0]
634
        if table_name not in ['tbl_analog_value', 'tbl_energy_value']:
635
            if cursor_historical_db:
636
                cursor_historical_db.close()
637
            if cnx_historical_db:
638
                cnx_historical_db.close()
639
            return f"Error: Invalid table name '{table_name}' for '{virtual_point['name']}'"
640
641
        insert_query = ("INSERT INTO " + table_name +
642
                        " (point_id, utc_date_time, actual_value) VALUES (%s, %s, %s)")
643
644
        while len(normalized_values) > 0:
645
            insert_100 = normalized_values[:100]
646
            normalized_values = normalized_values[100:]
647
648
            try:
649
                values = []
650
                for meta_data in insert_100:
651
                    values.append((
652
                        virtual_point['id'],
653
                        meta_data['utc_date_time'],
654
                        meta_data['actual_value']
655
                    ))
656
                    if meta_data['utc_date_time'] > latest_meta_data['utc_date_time']:
657
                        latest_meta_data = meta_data
658
659
                cursor_historical_db.executemany(insert_query, values)
660
                cnx_historical_db.commit()
661
            except Exception as e:
662
                if cursor_historical_db:
663
                    cursor_historical_db.close()
664
                if cnx_historical_db:
665
                    cnx_historical_db.close()
666
                return f"Error saving calculated values to database for virtual point '{virtual_point['name']}': " \
667
                       f"{type(e).__name__}"
668
669
        try:
670
            delete_query = "DELETE FROM " + table_name + "_latest WHERE point_id = %s"
671
            cursor_historical_db.execute(delete_query, (virtual_point['id'],))
672
            cnx_historical_db.commit()
673
674
            insert_latest_query = ("INSERT INTO " + table_name + "_latest " +
675
                                   " (point_id, utc_date_time, actual_value) VALUES (%s, %s, %s)")
676
            cursor_historical_db.execute(insert_latest_query, (
677
                virtual_point['id'],
678
                latest_meta_data['utc_date_time'],
679
                latest_meta_data['actual_value']
680
            ))
681
            cnx_historical_db.commit()
682
        except Exception as e:
683
            if cursor_historical_db:
684
                cursor_historical_db.close()
685
            if cnx_historical_db:
686
                cnx_historical_db.close()
687
            return f"Error updating latest value in database for virtual point '{virtual_point['name']}': {type(e).__name__}"
688
689
    if cursor_historical_db:
690
        cursor_historical_db.close()
691
    if cnx_historical_db:
692
        cnx_historical_db.close()
693
694
    return None
695