Passed
Push — master ( 2e4a80...c9c6f2 )
by
unknown
09:47
created

virtualpoint.calculate()   F

Complexity

Conditions 14

Size

Total Lines 90
Code Lines 52

Duplication

Lines 72
Ratio 80 %

Importance

Changes 0
Metric Value
eloc 52
dl 72
loc 90
rs 3.6
c 0
b 0
f 0
cc 14
nop 1

How to fix   Long Method    Complexity   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

Complexity

Complex classes like virtualpoint.calculate() often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
"""
2
MyEMS Normalization Service - Virtual Point Processing Module
3
4
This module handles the calculation of virtual point values using mathematical expressions.
5
Virtual points are computed points that derive their values from combinations of other points
6
(analog points, digital points, and other virtual points) using algebraic equations and piecewise functions.
7
8
The virtual point processing performs the following functions:
9
1. Retrieves all virtual points and their mathematical expressions from the system database
10
2. Uses multiprocessing to process virtual points in parallel for efficiency
11
3. Parses mathematical expressions and identifies dependent points
12
4. Retrieves latest values from dependent points in the historical database
13
5. Evaluates mathematical expressions using SymPy library
14
6. Stores calculated virtual point values in the historical database
15
16
Key features:
17
- Supports complex mathematical expressions with multiple variables
18
- Handles different point types (analog, digital, virtual) in expressions
19
- Uses SymPy for robust mathematical expression evaluation including piecewise functions
20
- Maintains data integrity through comprehensive error handling
21
- Processes virtual points continuously to provide real-time calculated values
22
"""
23
24
import json
25
import random
26
import re
27
import time
28
from datetime import datetime
29
from decimal import Decimal
30
from multiprocessing import Pool
31
import mysql.connector
32
from sympy import sympify, Piecewise, symbols, parse_expr
33
import config
34
35
# Security configuration: Maximum number of datetime points to process in one batch
36
MAX_DATETIME_POINTS = 100000
37
38
# Maximum expression length to prevent DoS attacks
39
MAX_EXPRESSION_LENGTH = 10000
40
41
# Maximum number of substitutions to prevent DoS attacks
42
MAX_SUBSTITUTIONS = 100
43
44
45
########################################################################################################################
46
# Security Validation Functions
47
########################################################################################################################
48
49
def validate_variable_name(name):
50
    """
51
    Validate variable name to ensure it follows safe identifier rules.
52
    
53
    Args:
54
        name: Variable name to validate
55
        
56
    Raises:
57
        ValueError: If variable name is invalid
58
    """
59
    if not isinstance(name, str):
60
        raise ValueError(f"Variable name must be a string, got {type(name)}")
61
    if not re.match(r'^[a-zA-Z_][a-zA-Z0-9_]*$', name):
62
        raise ValueError(f"Invalid variable name: {name}. Must start with letter or underscore and contain only alphanumeric characters and underscores.")
63
64
65
def validate_point_id(point_id):
66
    """
67
    Validate point_id to ensure it is a positive integer.
68
    
69
    Args:
70
        point_id: Point ID to validate
71
        
72
    Raises:
73
        ValueError: If point_id is invalid
74
    """
75
    if not isinstance(point_id, int):
76
        # Try to convert if it's a numeric string
77
        try:
78
            point_id = int(point_id)
79
        except (ValueError, TypeError):
80
            raise ValueError(f"Invalid point_id: {point_id}. Must be an integer.")
81
    if point_id <= 0:
82
        raise ValueError(f"Invalid point_id: {point_id}. Must be a positive integer.")
83
84
85
def validate_expression_safe(expression):
86
    """
87
    Validate expression to ensure it doesn't contain dangerous patterns.
88
    
89
    Args:
90
        expression: Expression string to validate
91
        
92
    Raises:
93
        ValueError: If expression contains dangerous patterns
94
    """
95
    if not isinstance(expression, str):
96
        raise ValueError(f"Expression must be a string, got {type(expression)}")
97
    
98
    if len(expression) > MAX_EXPRESSION_LENGTH:
99
        raise ValueError(f"Expression too long: {len(expression)} characters. Maximum allowed: {MAX_EXPRESSION_LENGTH}")
100
    
101
    # Check for dangerous patterns that could lead to code execution
102
    dangerous_patterns = [
103
        (r'__\w+__', 'Double underscore pattern (magic methods)'),
104
        (r'import\s+', 'Import statement'),
105
        (r'exec\s*\(', 'exec() call'),
106
        (r'eval\s*\(', 'eval() call'),
107
        (r'open\s*\(', 'File open operation'),
108
        (r'file\s*\(', 'File operation'),
109
        (r'__import__', 'Direct __import__ call'),
110
        (r'compile\s*\(', 'compile() call'),
111
        (r'globals\s*\(', 'globals() call'),
112
        (r'locals\s*\(', 'locals() call'),
113
    ]
114
    
115
    for pattern, description in dangerous_patterns:
116
        if re.search(pattern, expression, re.IGNORECASE):
117
            raise ValueError(f"Dangerous pattern detected in expression: {description}")
118
119
120
def parse_piecewise_safe(expression, substitutions):
121
    """
122
    Safely parse a piecewise function expression without using eval().
123
    
124
    Piecewise format: "(value1, condition1), (value2, condition2), (default_value, True)"
125
    
126
    Args:
127
        expression: Piecewise function expression string
128
        substitutions: Dictionary mapping variable names to point IDs
129
        
130
    Returns:
131
        List of tuples (value_expr, condition_expr) for Piecewise construction
132
        
133
    Raises:
134
        ValueError: If expression cannot be safely parsed
135
    """
136
    # Validate expression first
137
    validate_expression_safe(expression)
138
    
139
    # Parse the piecewise expression manually
140
    # Format: "(value, condition), (value, condition), ..."
141
    try:
142
        # Use regex to find all tuples: (value, condition)
143
        # Pattern matches: ( ... , ... ) where we need to find the comma that separates value from condition
144
        # This is complex because conditions may contain parentheses and commas
145
        # Strategy: Find matching parentheses pairs and split on commas outside of them
146
        
147
        piecewise_parts = []
148
        expr = expression.strip()
149
        
150
        # Remove outer parentheses if present
151
        if expr.startswith('(') and expr.endswith(')'):
152
            # Check if it's a single tuple or multiple tuples
153
            # Count opening and closing parentheses
154
            depth = 0
155
            for i, char in enumerate(expr):
156
                if char == '(':
157
                    depth += 1
158
                elif char == ')':
159
                    depth -= 1
160
                    if depth == 0 and i < len(expr) - 1:
161
                        # Found end of first tuple, there are multiple tuples
162
                        break
163
            else:
164
                # Single tuple, remove outer parentheses
165
                expr = expr[1:-1]
166
        
167
        # Split by "), (" pattern to separate tuples
168
        # This pattern appears between tuples
169
        parts = re.split(r'\)\s*,\s*\(', expr)
170
        
171
        # Process each tuple part
172
        for part in parts:
173
            # Remove leading/trailing parentheses and whitespace
174
            part = part.strip().lstrip('(').rstrip(')')
175
            
176
            # Find the comma that separates value from condition
177
            # We need the rightmost comma that's not inside nested parentheses
178
            depth = 0
179
            last_comma_pos = -1
180
            for i, char in enumerate(part):
181
                if char == '(':
182
                    depth += 1
183
                elif char == ')':
184
                    depth -= 1
185
                elif char == ',' and depth == 0:
186
                    last_comma_pos = i
187
            
188
            if last_comma_pos == -1:
189
                raise ValueError(f"Invalid piecewise tuple format: {part}. Missing comma separator.")
190
            
191
            value_str = part[:last_comma_pos].strip()
192
            condition_str = part[last_comma_pos+1:].strip()
193
            
194
            if not value_str or not condition_str:
195
                raise ValueError(f"Invalid piecewise tuple format: {part}. Empty value or condition.")
196
            
197
            # Parse value and condition using sympify (safe)
198
            value_expr = sympify(value_str)
199
            condition_expr = sympify(condition_str)
200
            
201
            piecewise_parts.append((value_expr, condition_expr))
202
        
203
        if not piecewise_parts:
204
            raise ValueError("No valid piecewise parts found in expression")
205
        
206
        return piecewise_parts
207
    except Exception as e:
208
        raise ValueError(f"Failed to parse piecewise expression: {str(e)}")
209
210
211
########################################################################################################################
212
# Virtual Point Calculation Procedures:
213
# Step 1: Query all virtual points and their mathematical expressions from system database
214
# Step 2: Create multiprocessing pool to call worker processes in parallel
215
########################################################################################################################
216
217 View Code Duplication
def calculate(logger):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
218
    """
219
    Main function for virtual point calculation using mathematical expressions.
220
221
    This function runs continuously, retrieving all virtual points from the system database
222
    and processing them in parallel to calculate virtual point values using their
223
    configured mathematical expressions.
224
225
    Args:
226
        logger: Logger instance for recording calculation activities and errors
227
    """
228
    while True:
229
        # The outermost while loop to reconnect to server if there is a connection error
230
        cnx_system_db = None
231
        cursor_system_db = None
232
233
        # Connect to system database to retrieve virtual point configuration
234
        try:
235
            cnx_system_db = mysql.connector.connect(**config.myems_system_db)
236
            cursor_system_db = cnx_system_db.cursor()
237
        except Exception as e:
238
            logger.error("Error in step 0 of virtual point calculate " + str(e))
239
            if cursor_system_db:
240
                cursor_system_db.close()
241
            if cnx_system_db:
242
                cnx_system_db.close()
243
            # Sleep and continue the outer loop to reconnect the database
244
            time.sleep(60)
245
            continue
246
247
        print("Connected to MyEMS System Database")
248
249
        # Retrieve all virtual points with their configuration data
250
        virtual_point_list = list()
251
        try:
252
            cursor_system_db.execute(" SELECT id, name, data_source_id, object_type, high_limit, low_limit, address "
253
                                     " FROM tbl_points "
254
                                     " WHERE is_virtual = 1 ")
255
            rows_virtual_points = cursor_system_db.fetchall()
256
257
            # Check if virtual points were found
258
            if rows_virtual_points is None or len(rows_virtual_points) == 0:
259
                # Sleep several minutes and continue the outer loop to reconnect the database
260
                time.sleep(60)
261
                continue
262
263
            # Build virtual point list with configuration data
264
            for row in rows_virtual_points:
265
                meta_result = {"id": row[0],
266
                               "name": row[1],
267
                               "data_source_id": row[2],
268
                               "object_type": row[3],
269
                               "high_limit": row[4],
270
                               "low_limit": row[5],
271
                               "address": row[6]}
272
                virtual_point_list.append(meta_result)
273
274
        except Exception as e:
275
            logger.error("Error in step 1 of virtual point calculate " + str(e))
276
            # sleep and continue the outer loop to reconnect the database
277
            time.sleep(60)
278
            continue
279
        finally:
280
            if cursor_system_db:
281
                cursor_system_db.close()
282
            if cnx_system_db:
283
                cnx_system_db.close()
284
285
        # Shuffle the virtual point list for randomly calculating point values
286
        # This helps distribute processing load evenly across time
287
        random.shuffle(virtual_point_list)
288
289
        print("Got all virtual points in MyEMS System Database")
290
        ################################################################################################################
291
        # Step 2: Create multiprocessing pool to call worker processes in parallel
292
        ################################################################################################################
293
        # Create process pool with configured size for parallel processing
294
        p = Pool(processes=config.pool_size)
295
        error_list = p.map(worker, virtual_point_list)
296
        p.close()
297
        p.join()
298
299
        # Log any errors from worker processes
300
        for error in error_list:
301
            if error is not None and len(error) > 0:
302
                logger.error(error)
303
304
        print("go to sleep ")
305
        time.sleep(60)  # Sleep for 1 minute before next processing cycle
306
        print("wake from sleep, and continue to work")
307
308
309
########################################################################################################################
310
# Worker Process Procedures for Individual Virtual Point Processing:
311
# Step 1: Get start datetime and end datetime for processing
312
# Step 2: Parse the expression and get all points in substitutions
313
# Step 3: Query points type from system database
314
# Step 4: Query points value from historical database
315
# Step 5: Evaluate the equation with points values and store results
316
# Returns the error string for logging or returns None on success
317
########################################################################################################################
318
319
def worker(virtual_point):
320
    """
321
    Worker function to process a single virtual point's calculation.
322
323
    This function processes one virtual point at a time, evaluating its mathematical
324
    expression using data from dependent points and storing the calculated results.
325
326
    Args:
327
        virtual_point: Dictionary containing virtual point configuration (id, name, object_type, address, etc.)
328
329
    Returns:
330
        None on success, error string on failure
331
    """
332
    cnx_historical_db = None
333
    cursor_historical_db = None
334
335
    # Connect to historical database to check existing processed data
336
    try:
337
        cnx_historical_db = mysql.connector.connect(**config.myems_historical_db)
338
        cursor_historical_db = cnx_historical_db.cursor()
339
    except Exception as e:
340
        if cursor_historical_db:
341
            cursor_historical_db.close()
342
        if cnx_historical_db:
343
            cnx_historical_db.close()
344
        # Return generic error message to avoid information disclosure
345
        return f"Error connecting to historical database for virtual point '{virtual_point['name']}': {type(e).__name__}"
346
347
    print("Start to process virtual point: " + "'" + virtual_point['name'] + "'")
348
349
    ####################################################################################################################
350
    # Step 1: Get start datetime and end datetime for processing
351
    ####################################################################################################################
352
    # Determine the appropriate table based on virtual point object type
353
    if virtual_point['object_type'] == 'ANALOG_VALUE':
354
        table_name = "tbl_analog_value"
355
    elif virtual_point['object_type'] == 'ENERGY_VALUE':
356
        table_name = "tbl_energy_value"
357
    else:
358
        if cursor_historical_db:
359
            cursor_historical_db.close()
360
        if cnx_historical_db:
361
            cnx_historical_db.close()
362
        return "variable point type should not be DIGITAL_VALUE " + " for '" + virtual_point['name'] + "'"
363
364
    try:
365
        query = (" SELECT MAX(utc_date_time) "
366
                 " FROM " + table_name +
367
                 " WHERE point_id = %s ")
368
        cursor_historical_db.execute(query, (virtual_point['id'],))
369
        row = cursor_historical_db.fetchone()
370
    except Exception as e:
371
        if cursor_historical_db:
372
            cursor_historical_db.close()
373
        if cnx_historical_db:
374
            cnx_historical_db.close()
375
        # Return generic error message to avoid information disclosure
376
        return f"Error querying historical database for virtual point '{virtual_point['name']}': {type(e).__name__}"
377
378
    start_datetime_utc = datetime.strptime(config.start_datetime_utc, '%Y-%m-%d %H:%M:%S').replace(tzinfo=None)
379
380
    if row is not None and len(row) > 0 and isinstance(row[0], datetime):
381
        start_datetime_utc = row[0].replace(tzinfo=None)
382
383
    end_datetime_utc = datetime.utcnow().replace(tzinfo=None)
384
385
    if end_datetime_utc <= start_datetime_utc:
386
        if cursor_historical_db:
387
            cursor_historical_db.close()
388
        if cnx_historical_db:
389
            cnx_historical_db.close()
390
        return "it isn't time to calculate" + " for '" + virtual_point['name'] + "'"
391
392
    print("start_datetime_utc: " + start_datetime_utc.isoformat()[0:19]
393
          + "end_datetime_utc: " + end_datetime_utc.isoformat()[0:19])
394
395
    ############################################################################################################
396
    # Step 2: parse the expression and get all points in substitutions
397
    ############################################################################################################
398
    point_list = list()
399
    expression = None
400
    substitutions = None
401
    try:
402
        ########################################################################################################
403
        # parse the expression and get all points in substitutions
404
        ########################################################################################################
405
        address = json.loads(virtual_point['address'])
406
        # algebraic expression example: '{"expression": "x1-x2", "substitutions": {"x1":1,"x2":2}}'
407
        # piecewise function example: '{"expression":"(1,x<200 ), (2,x>=500), (0,True)", "substitutions":{"x":101}}'
408
        if 'expression' not in address.keys() \
409
                or 'substitutions' not in address.keys() \
410
                or len(address['expression']) == 0 \
411
                or len(address['substitutions']) == 0:
412
            if cursor_historical_db:
413
                cursor_historical_db.close()
414
            if cnx_historical_db:
415
                cnx_historical_db.close()
416
            return "Error in step 2.1 of virtual point worker for '" + virtual_point['name'] + "'"
417
        
418
        expression = address['expression']
419
        substitutions = address['substitutions']
420
        
421
        # Security: Validate expression and substitutions
422
        try:
423
            validate_expression_safe(expression)
424
        except ValueError as e:
425
            if cursor_historical_db:
426
                cursor_historical_db.close()
427
            if cnx_historical_db:
428
                cnx_historical_db.close()
429
            return f"Error in step 2.1.1: Invalid expression for '{virtual_point['name']}': {str(e)}"
430
        
431
        # Security: Validate substitutions count
432
        if len(substitutions) > MAX_SUBSTITUTIONS:
433
            if cursor_historical_db:
434
                cursor_historical_db.close()
435
            if cnx_historical_db:
436
                cnx_historical_db.close()
437
            return f"Error in step 2.1.2: Too many substitutions ({len(substitutions)}) for '{virtual_point['name']}'"
438
        
439
        # Security: Validate variable names and point IDs
440
        for variable_name, point_id in substitutions.items():
441
            try:
442
                validate_variable_name(variable_name)
443
                validate_point_id(point_id)
444
            except ValueError as e:
445
                if cursor_historical_db:
446
                    cursor_historical_db.close()
447
                if cnx_historical_db:
448
                    cnx_historical_db.close()
449
                return f"Error in step 2.1.3: Invalid variable or point_id for '{virtual_point['name']}': {str(e)}"
450
            point_list.append({"variable_name": variable_name, "point_id": point_id})
451
    except json.JSONDecodeError as e:
452
        if cursor_historical_db:
453
            cursor_historical_db.close()
454
        if cnx_historical_db:
455
            cnx_historical_db.close()
456
        return "Error in step 2.2: Invalid JSON in address for '" + virtual_point['name'] + "'"
457
    except Exception as e:
458
        if cursor_historical_db:
459
            cursor_historical_db.close()
460
        if cnx_historical_db:
461
            cnx_historical_db.close()
462
        return "Error in step 2.2 of virtual point worker " + str(e) + " for '" + virtual_point['name'] + "'"
463
464
    ############################################################################################################
465
    # Step 3: query points type from system database
466
    ############################################################################################################
467
    print("getting points type ")
468
    cnx_system_db = None
469
    cursor_system_db = None
470
    try:
471
        cnx_system_db = mysql.connector.connect(**config.myems_system_db)
472
        cursor_system_db = cnx_system_db.cursor()
473
    except Exception as e:
474
        if cursor_system_db:
475
            cursor_system_db.close()
476
        if cnx_system_db:
477
            cnx_system_db.close()
478
        print("Error in step 3 of virtual point worker " + str(e))
479
        # Return generic error message to avoid information disclosure
480
        return f"Error connecting to system database for virtual point '{virtual_point['name']}': {type(e).__name__}"
481
482
    print("Connected to MyEMS System Database")
483
484
    all_point_dict = dict()
485
    try:
486
        cursor_system_db.execute(" SELECT id, object_type "
487
                                 " FROM tbl_points ")
488
        rows_points = cursor_system_db.fetchall()
489
490
        if rows_points is None or len(rows_points) == 0:
491
            return f"Error: No points found in system database for virtual point '{virtual_point['name']}'"
492
493
        for row in rows_points:
494
            all_point_dict[row[0]] = row[1]
495
    except Exception as e:
496
        # Return generic error message to avoid information disclosure
497
        return f"Error querying points from system database for virtual point '{virtual_point['name']}': {type(e).__name__}"
498
    finally:
499
        if cursor_system_db:
500
            cursor_system_db.close()
501
        if cnx_system_db:
502
            cnx_system_db.close()
503
    ############################################################################################################
504
    # Step 4: query points value from historical database
505
    ############################################################################################################
506
507
    print("getting point values ")
508
    point_values_dict = dict()
509
    if point_list is not None and len(point_list) > 0:
510
        try:
511
            for point in point_list:
512
                point_object_type = all_point_dict.get(point['point_id'])
513
                if point_object_type is None:
514
                    return "variable point type should not be None " + " for '" + virtual_point['name'] + "'"
515
                if point_object_type == 'ANALOG_VALUE':
516
                    query = (" SELECT utc_date_time, actual_value "
517
                             " FROM tbl_analog_value "
518
                             " WHERE point_id = %s AND utc_date_time > %s AND utc_date_time < %s "
519
                             " ORDER BY utc_date_time ")
520
                    cursor_historical_db.execute(query, (point['point_id'], start_datetime_utc, end_datetime_utc,))
521
                    rows = cursor_historical_db.fetchall()
522
                    if rows is not None and len(rows) > 0:
523
                        point_values_dict[point['point_id']] = dict()
524
                        for row in rows:
525
                            point_values_dict[point['point_id']][row[0]] = row[1]
526
                elif point_object_type == 'ENERGY_VALUE':
527
                    query = (" SELECT utc_date_time, actual_value "
528
                             " FROM tbl_energy_value "
529
                             " WHERE point_id = %s AND utc_date_time > %s AND utc_date_time < %s "
530
                             " ORDER BY utc_date_time ")
531
                    cursor_historical_db.execute(query, (point['point_id'], start_datetime_utc, end_datetime_utc,))
532
                    rows = cursor_historical_db.fetchall()
533
                    if rows is not None and len(rows) > 0:
534
                        point_values_dict[point['point_id']] = dict()
535
                        for row in rows:
536
                            point_values_dict[point['point_id']][row[0]] = row[1]
537
                    else:
538
                        point_values_dict[point['point_id']] = None
539
                else:
540
                    # point type should not be DIGITAL_VALUE
541
                    return "variable point type should not be DIGITAL_VALUE " + " for '" + virtual_point['name'] + "'"
542
        except Exception as e:
543
            if cursor_historical_db:
544
                cursor_historical_db.close()
545
            if cnx_historical_db:
546
                cnx_historical_db.close()
547
            # Return generic error message to avoid information disclosure
548
            return f"Error querying point values from historical database for virtual point '{virtual_point['name']}': {type(e).__name__}"
549
550
    ############################################################################################################
551
    # Step 5: evaluate the equation with points values
552
    ############################################################################################################
553
554
    print("getting date time set for all points")
555
    utc_date_time_set = set()
556
    if point_values_dict is not None and len(point_values_dict) > 0:
557
        for point_id, point_values in point_values_dict.items():
558
            if point_values is not None and len(point_values) > 0:
559
                utc_date_time_set = utc_date_time_set.union(point_values.keys())
560
561
    # Security: Resource limit check to prevent DoS attacks
562
    if len(utc_date_time_set) > MAX_DATETIME_POINTS:
563
        if cursor_historical_db:
564
            cursor_historical_db.close()
565
        if cnx_historical_db:
566
            cnx_historical_db.close()
567
        return f"Error: Too many datetime points to process ({len(utc_date_time_set)}) for '{virtual_point['name']}'. Maximum allowed: {MAX_DATETIME_POINTS}"
568
569
    print("evaluating the equation with SymPy")
570
    normalized_values = list()
571
572
    ############################################################################################################
573
    # Converting Strings to SymPy Expressions
574
    # The sympify function(that's sympify, not to be confused with simplify) can be used to
575
    # convert strings into SymPy expressions.
576
    # SECURITY FIX: Removed eval() and replaced with safe SymPy parsing
577
    ############################################################################################################
578
    try:
579
        # Security: Use safe parsing instead of eval()
580
        if re.search(',', expression):
581
            # Piecewise function: parse safely without eval()
582
            piecewise_parts = parse_piecewise_safe(expression, substitutions)
583
            expr = Piecewise(*piecewise_parts)
584
            print("the expression will be evaluated as piecewise function: " + str(expr))
585
        else:
586
            # Algebraic expression: use sympify (safe)
587
            expr = sympify(expression)
588
            print("the expression will be evaluated as algebraic expression: " + str(expr))
589
590
        for utc_date_time in utc_date_time_set:
591
            meta_data = dict()
592
            meta_data['utc_date_time'] = utc_date_time
593
594
            ####################################################################################################
595
            # create a dictionary of Symbol: point pairs
596
            ####################################################################################################
597
598
            subs = dict()
599
600
            ####################################################################################################
601
            # Evaluating the expression at current_datetime_utc
602
            ####################################################################################################
603
604
            if point_list is not None and len(point_list) > 0:
605
                for point in point_list:
606
                    actual_value = point_values_dict[point['point_id']].get(utc_date_time, None)
607
                    if actual_value is None:
608
                        break
609
                    subs[point['variable_name']] = actual_value
610
611
            if len(subs) != len(point_list):
612
                continue
613
614
            ####################################################################################################
615
            # To numerically evaluate an expression with a Symbol at a point,
616
            # we might use subs followed by evalf,
617
            # but it is more efficient and numerically stable to pass the substitution to evalf
618
            # using the subs flag, which takes a dictionary of Symbol: point pairs.
619
            ####################################################################################################
620
            # Note: expr is already a Piecewise object for piecewise functions, or a SymPy expression for algebraic
621
            if re.search(',', expression):
622
                # expr is already a Piecewise object
623
                meta_data['actual_value'] = Decimal(str(expr.subs(subs)))
624
                normalized_values.append(meta_data)
625
            else:
626
                # expr is a SymPy expression
627
                meta_data['actual_value'] = Decimal(str(expr.evalf(subs=subs)))
628
                normalized_values.append(meta_data)
629
    except Exception as e:
630
        if cursor_historical_db:
631
            cursor_historical_db.close()
632
        if cnx_historical_db:
633
            cnx_historical_db.close()
634
        # Return generic error message to avoid information disclosure
635
        # Detailed error information should be logged separately if logger is available
636
        return f"Error evaluating expression for virtual point '{virtual_point['name']}': {type(e).__name__}"
637
638
    print("saving virtual points values to historical database")
639
640
    if len(normalized_values) > 0:
641
        latest_meta_data = normalized_values[0]
642
643
        # Security: Use parameterized queries to prevent SQL injection
644
        # Validate table_name is from whitelist (already validated via object_type)
645
        if table_name not in ['tbl_analog_value', 'tbl_energy_value']:
646
            if cursor_historical_db:
647
                cursor_historical_db.close()
648
            if cnx_historical_db:
649
                cnx_historical_db.close()
650
            return f"Error: Invalid table name '{table_name}' for '{virtual_point['name']}'"
651
652
        insert_query = ("INSERT INTO " + table_name +
653
                       " (point_id, utc_date_time, actual_value) VALUES (%s, %s, %s)")
654
655
        while len(normalized_values) > 0:
656
            insert_100 = normalized_values[:100]
657
            normalized_values = normalized_values[100:]
658
659
            try:
660
                # Security: Use executemany with parameterized query
661
                values = []
662
                for meta_data in insert_100:
663
                    values.append((
664
                        virtual_point['id'],
665
                        meta_data['utc_date_time'],
666
                        meta_data['actual_value']
667
                    ))
668
                    if meta_data['utc_date_time'] > latest_meta_data['utc_date_time']:
669
                        latest_meta_data = meta_data
670
671
                cursor_historical_db.executemany(insert_query, values)
672
                cnx_historical_db.commit()
673
            except Exception as e:
674
                if cursor_historical_db:
675
                    cursor_historical_db.close()
676
                if cnx_historical_db:
677
                    cnx_historical_db.close()
678
                # Return generic error message to avoid information disclosure
679
                return f"Error saving calculated values to database for virtual point '{virtual_point['name']}': {type(e).__name__}"
680
681
        try:
682
            # Security: Use parameterized query for delete operation
683
            delete_query = "DELETE FROM " + table_name + "_latest WHERE point_id = %s"
684
            cursor_historical_db.execute(delete_query, (virtual_point['id'],))
685
            cnx_historical_db.commit()
686
687
            # Security: Use parameterized query for insert operation
688
            insert_latest_query = ("INSERT INTO " + table_name + "_latest " +
689
                                  "(point_id, utc_date_time, actual_value) VALUES (%s, %s, %s)")
690
            cursor_historical_db.execute(insert_latest_query, (
691
                virtual_point['id'],
692
                latest_meta_data['utc_date_time'],
693
                latest_meta_data['actual_value']
694
            ))
695
            cnx_historical_db.commit()
696
        except Exception as e:
697
            if cursor_historical_db:
698
                cursor_historical_db.close()
699
            if cnx_historical_db:
700
                cnx_historical_db.close()
701
            # Return generic error message to avoid information disclosure
702
            return f"Error updating latest value in database for virtual point '{virtual_point['name']}': {type(e).__name__}"
703
704
    if cursor_historical_db:
705
        cursor_historical_db.close()
706
    if cnx_historical_db:
707
        cnx_historical_db.close()
708
709
    return None
710