Issues (1577)

myems-normalization/virtualpoint.py (2 issues)

1
import json
2
import random
3
import re
4
import time
5
from datetime import datetime
6
from decimal import Decimal
7
from multiprocessing import Pool
8
import mysql.connector
9
from sympy import sympify, Piecewise, symbols
10
import config
11
12
13
########################################################################################################################
14
# PROCEDURES:
15
# Step 1: Query all virtual points
16
# Step 2: Create multiprocessing pool to call worker in parallel
17
########################################################################################################################
18
19 View Code Duplication
def calculate(logger):
0 ignored issues
show
This code seems to be duplicated in your project.
Loading history...
20
    while True:
21
        # the outermost while loop to reconnect server if there is a connection error
22
        cnx_system_db = None
23
        cursor_system_db = None
24
        try:
25
            cnx_system_db = mysql.connector.connect(**config.myems_system_db)
26
            cursor_system_db = cnx_system_db.cursor()
27
        except Exception as e:
28
            logger.error("Error in step 0 of virtual point calculate " + str(e))
29
            if cursor_system_db:
30
                cursor_system_db.close()
31
            if cnx_system_db:
32
                cnx_system_db.close()
33
            # sleep and continue the outer loop to reconnect the database
34
            time.sleep(60)
35
            continue
36
37
        print("Connected to MyEMS System Database")
38
39
        virtual_point_list = list()
40
        try:
41
            cursor_system_db.execute(" SELECT id, name, data_source_id, object_type, high_limit, low_limit, address "
42
                                     " FROM tbl_points "
43
                                     " WHERE is_virtual = 1 ")
44
            rows_virtual_points = cursor_system_db.fetchall()
45
46
            if rows_virtual_points is None or len(rows_virtual_points) == 0:
47
                # sleep several minutes and continue the outer loop to reconnect the database
48
                time.sleep(60)
49
                continue
50
51
            for row in rows_virtual_points:
52
                meta_result = {"id": row[0],
53
                               "name": row[1],
54
                               "data_source_id": row[2],
55
                               "object_type": row[3],
56
                               "high_limit": row[4],
57
                               "low_limit": row[5],
58
                               "address": row[6]}
59
                virtual_point_list.append(meta_result)
60
61
        except Exception as e:
62
            logger.error("Error in step 1 of virtual point calculate " + str(e))
63
            # sleep and continue the outer loop to reconnect the database
64
            time.sleep(60)
65
            continue
66
        finally:
67
            if cursor_system_db:
68
                cursor_system_db.close()
69
            if cnx_system_db:
70
                cnx_system_db.close()
71
72
        # shuffle the virtual point list for randomly calculating
73
        random.shuffle(virtual_point_list)
74
75
        print("Got all virtual points in MyEMS System Database")
76
        ################################################################################################################
77
        # Step 2: Create multiprocessing pool to call worker in parallel
78
        ################################################################################################################
79
        p = Pool(processes=config.pool_size)
80
        error_list = p.map(worker, virtual_point_list)
81
        p.close()
82
        p.join()
83
84
        for error in error_list:
85
            if error is not None and len(error) > 0:
86
                logger.error(error)
87
88
        print("go to sleep ")
89
        time.sleep(60)
90
        print("wake from sleep, and continue to work")
91
92
93
########################################################################################################################
94
# Step 1: get start datetime and end datetime
95
# Step 2: parse the expression and get all points in substitutions
96
# Step 3: query points type from system database
97
# Step 4: query points value from historical database
98
# Step 5: evaluate the equation with points values
99
########################################################################################################################
100
101
def worker(virtual_point):
102
    cnx_historical_db = None
103
    cursor_historical_db = None
104
105
    try:
106
        cnx_historical_db = mysql.connector.connect(**config.myems_historical_db)
107
        cursor_historical_db = cnx_historical_db.cursor()
108
    except Exception as e:
109
        if cursor_historical_db:
110
            cursor_historical_db.close()
111
        if cnx_historical_db:
112
            cnx_historical_db.close()
113
        return "Error in step 1.1 of virtual point worker " + str(e) + " for '" + virtual_point['name'] + "'"
114
115
    print("Start to process virtual point: " + "'" + virtual_point['name'] + "'")
116
117
    ####################################################################################################################
118
    # step 1: get start datetime and end datetime
119
    ####################################################################################################################
120
    if virtual_point['object_type'] == 'ANALOG_VALUE':
121
        table_name = "tbl_analog_value"
122
    elif virtual_point['object_type'] == 'ENERGY_VALUE':
123
        table_name = "tbl_energy_value"
124
    else:
125
        if cursor_historical_db:
126
            cursor_historical_db.close()
127
        if cnx_historical_db:
128
            cnx_historical_db.close()
129
        return "variable point type should not be DIGITAL_VALUE " + " for '" + virtual_point['name'] + "'"
130
131
    try:
132
        query = (" SELECT MAX(utc_date_time) "
133
                 " FROM " + table_name +
134
                 " WHERE point_id = %s ")
135
        cursor_historical_db.execute(query, (virtual_point['id'],))
136
        row = cursor_historical_db.fetchone()
137
    except Exception as e:
138
        if cursor_historical_db:
139
            cursor_historical_db.close()
140
        if cnx_historical_db:
141
            cnx_historical_db.close()
142
        return "Error in step 1.2 of virtual point worker " + str(e) + " for '" + virtual_point['name'] + "'"
143
144
    start_datetime_utc = datetime.strptime(config.start_datetime_utc, '%Y-%m-%d %H:%M:%S').replace(tzinfo=None)
145
146
    if row is not None and len(row) > 0 and isinstance(row[0], datetime):
147
        start_datetime_utc = row[0].replace(tzinfo=None)
148
149
    end_datetime_utc = datetime.utcnow().replace(tzinfo=None)
150
151
    if end_datetime_utc <= start_datetime_utc:
152
        if cursor_historical_db:
153
            cursor_historical_db.close()
154
        if cnx_historical_db:
155
            cnx_historical_db.close()
156
        return "it isn't time to calculate" + " for '" + virtual_point['name'] + "'"
157
158
    print("start_datetime_utc: " + start_datetime_utc.isoformat()[0:19]
159
          + "end_datetime_utc: " + end_datetime_utc.isoformat()[0:19])
160
161
    ############################################################################################################
162
    # Step 2: parse the expression and get all points in substitutions
163
    ############################################################################################################
164
    point_list = list()
165
    try:
166
        ########################################################################################################
167
        # parse the expression and get all points in substitutions
168
        ########################################################################################################
169
        address = json.loads(virtual_point['address'])
170
        # algebraic expression example: '{"expression": "x1-x2", "substitutions": {"x1":1,"x2":2}}'
171
        # piecewise function example: '{"expression":"(1,x<200 ), (2,x>=500), (0,True)", "substitutions":{"x":101}}'
172
        if 'expression' not in address.keys() \
173
                or 'substitutions' not in address.keys() \
174
                or len(address['expression']) == 0 \
175
                or len(address['substitutions']) == 0:
176
            return "Error in step 2.1 of virtual point worker for '" + virtual_point['name'] + "'"
177
        expression = address['expression']
178
        substitutions = address['substitutions']
179
        for variable_name, point_id in substitutions.items():
180
            point_list.append({"variable_name": variable_name, "point_id": point_id})
181
    except Exception as e:
182
        if cursor_historical_db:
183
            cursor_historical_db.close()
184
        if cnx_historical_db:
185
            cnx_historical_db.close()
186
        return "Error in step 2.2 of virtual point worker " + str(e) + " for '" + virtual_point['name'] + "'"
187
188
    ############################################################################################################
189
    # Step 3: query points type from system database
190
    ############################################################################################################
191
    print("getting points type ")
192
    cnx_system_db = None
193
    cursor_system_db = None
194
    try:
195
        cnx_system_db = mysql.connector.connect(**config.myems_system_db)
196
        cursor_system_db = cnx_system_db.cursor()
197
    except Exception as e:
198
        if cursor_system_db:
199
            cursor_system_db.close()
200
        if cnx_system_db:
201
            cnx_system_db.close()
202
        print("Error in step 3 of virtual point worker " + str(e))
203
        return "Error in step 3 of virtual point worker " + str(e)
204
205
    print("Connected to MyEMS System Database")
206
207
    all_point_dict = dict()
208
    try:
209
        cursor_system_db.execute(" SELECT id, object_type "
210
                                 " FROM tbl_points ")
211
        rows_points = cursor_system_db.fetchall()
212
213
        if rows_points is None or len(rows_points) == 0:
214
            return "Error in step 3.1 of virtual point worker for '" + virtual_point['name'] + "'"
215
216
        for row in rows_points:
217
            all_point_dict[row[0]] = row[1]
218
    except Exception as e:
219
        return "Error in step 3.2 virtual point worker " + str(e) + " for '" + virtual_point['name'] + "'"
220
    finally:
221
        if cursor_system_db:
222
            cursor_system_db.close()
223
        if cnx_system_db:
224
            cnx_system_db.close()
225
    ############################################################################################################
226
    # Step 4: query points value from historical database
227
    ############################################################################################################
228
229
    print("getting point values ")
230
    point_values_dict = dict()
231
    if point_list is not None and len(point_list) > 0:
232
        try:
233
            for point in point_list:
234
                point_object_type = all_point_dict.get(point['point_id'])
235
                if point_object_type is None:
236
                    return "variable point type should not be None " + " for '" + virtual_point['name'] + "'"
237
                if point_object_type == 'ANALOG_VALUE':
238
                    query = (" SELECT utc_date_time, actual_value "
239
                             " FROM tbl_analog_value "
240
                             " WHERE point_id = %s AND utc_date_time > %s AND utc_date_time < %s "
241
                             " ORDER BY utc_date_time ")
242
                    cursor_historical_db.execute(query, (point['point_id'], start_datetime_utc, end_datetime_utc,))
243
                    rows = cursor_historical_db.fetchall()
244
                    if rows is not None and len(rows) > 0:
245
                        point_values_dict[point['point_id']] = dict()
246
                        for row in rows:
247
                            point_values_dict[point['point_id']][row[0]] = row[1]
248
                elif point_object_type == 'ENERGY_VALUE':
249
                    query = (" SELECT utc_date_time, actual_value "
250
                             " FROM tbl_energy_value "
251
                             " WHERE point_id = %s AND utc_date_time > %s AND utc_date_time < %s "
252
                             " ORDER BY utc_date_time ")
253
                    cursor_historical_db.execute(query, (point['point_id'], start_datetime_utc, end_datetime_utc,))
254
                    rows = cursor_historical_db.fetchall()
255
                    if rows is not None and len(rows) > 0:
256
                        point_values_dict[point['point_id']] = dict()
257
                        for row in rows:
258
                            point_values_dict[point['point_id']][row[0]] = row[1]
259
                    else:
260
                        point_values_dict[point['point_id']] = None
261
                else:
262
                    # point type should not be DIGITAL_VALUE
263
                    return "variable point type should not be DIGITAL_VALUE " + " for '" + virtual_point['name'] + "'"
264
        except Exception as e:
265
            if cursor_historical_db:
266
                cursor_historical_db.close()
267
            if cnx_historical_db:
268
                cnx_historical_db.close()
269
            return "Error in step 4.1 virtual point worker " + str(e) + " for '" + virtual_point['name'] + "'"
270
271
    ############################################################################################################
272
    # Step 5: evaluate the equation with points values
273
    ############################################################################################################
274
275
    print("getting date time set for all points")
276
    utc_date_time_set = set()
277
    if point_values_dict is not None and len(point_values_dict) > 0:
278
        for point_id, point_values in point_values_dict.items():
279
            if point_values is not None and len(point_values) > 0:
280
                utc_date_time_set = utc_date_time_set.union(point_values.keys())
281
282
    print("evaluating the equation with SymPy")
283
    normalized_values = list()
284
285
    ############################################################################################################
286
    # Converting Strings to SymPy Expressions
287
    # The sympify function(that’s sympify, not to be confused with simplify) can be used to
288
    # convert strings into SymPy expressions.
289
    ############################################################################################################
290
    try:
291
        if re.search(',', expression):
292
            for item in substitutions.keys():
293
                locals()[item] = symbols(item)
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable locals does not seem to be defined.
Loading history...
294
            expr = eval(expression)
295
            print("the expression will be evaluated as piecewise function: " + str(expr))
296
        else:
297
            expr = sympify(expression)
298
            print("the expression will be evaluated as algebraic expression: " + str(expr))
299
300
        for utc_date_time in utc_date_time_set:
301
            meta_data = dict()
302
            meta_data['utc_date_time'] = utc_date_time
303
304
            ####################################################################################################
305
            # create a dictionary of Symbol: point pairs
306
            ####################################################################################################
307
308
            subs = dict()
309
310
            ####################################################################################################
311
            # Evaluating the expression at current_datetime_utc
312
            ####################################################################################################
313
314
            if point_list is not None and len(point_list) > 0:
315
                for point in point_list:
316
                    actual_value = point_values_dict[point['point_id']].get(utc_date_time, None)
317
                    if actual_value is None:
318
                        break
319
                    subs[point['variable_name']] = actual_value
320
321
            if len(subs) != len(point_list):
322
                continue
323
324
            ####################################################################################################
325
            # To numerically evaluate an expression with a Symbol at a point,
326
            # we might use subs followed by evalf,
327
            # but it is more efficient and numerically stable to pass the substitution to evalf
328
            # using the subs flag, which takes a dictionary of Symbol: point pairs.
329
            ####################################################################################################
330
            if re.search(',', expression):
331
                formula = Piecewise(*expr)
332
                meta_data['actual_value'] = Decimal(str(formula.subs(subs)))
333
                normalized_values.append(meta_data)
334
            else:
335
                meta_data['actual_value'] = Decimal(str(expr.evalf(subs=subs)))
336
                normalized_values.append(meta_data)
337
    except Exception as e:
338
        if cursor_historical_db:
339
            cursor_historical_db.close()
340
        if cnx_historical_db:
341
            cnx_historical_db.close()
342
        return "Error in step 5.1 virtual point worker " + str(e) + " for '" + virtual_point['name'] + "'"
343
344
    print("saving virtual points values to historical database")
345
346
    if len(normalized_values) > 0:
347
        latest_meta_data = normalized_values[0]
348
349
        while len(normalized_values) > 0:
350
            insert_100 = normalized_values[:100]
351
            normalized_values = normalized_values[100:]
352
353
            try:
354
                add_values = (" INSERT INTO " + table_name +
355
                              " (point_id, utc_date_time, actual_value) "
356
                              " VALUES  ")
357
358
                for meta_data in insert_100:
359
                    add_values += " (" + str(virtual_point['id']) + ","
360
                    add_values += "'" + meta_data['utc_date_time'].isoformat()[0:19] + "',"
361
                    add_values += str(meta_data['actual_value']) + "), "
362
363
                    if meta_data['utc_date_time'] > latest_meta_data['utc_date_time']:
364
                        latest_meta_data = meta_data
365
366
                # print("add_values:" + add_values)
367
                # trim ", " at the end of string and then execute
368
                cursor_historical_db.execute(add_values[:-2])
369
                cnx_historical_db.commit()
370
            except Exception as e:
371
                if cursor_historical_db:
372
                    cursor_historical_db.close()
373
                if cnx_historical_db:
374
                    cnx_historical_db.close()
375
                return "Error in step 5.2 virtual point worker " + str(e) + " for '" + virtual_point['name'] + "'"
376
377
        try:
378
            # update tbl_analog_value_latest or tbl_energy_value_latest
379
            delete_value = " DELETE FROM " + table_name + "_latest WHERE point_id = {} ".format(virtual_point['id'])
380
            # print("delete_value:" + delete_value)
381
            cursor_historical_db.execute(delete_value)
382
            cnx_historical_db.commit()
383
384
            latest_value = (" INSERT INTO " + table_name + "_latest (point_id, utc_date_time, actual_value) "
385
                            " VALUES ({}, '{}', {}) "
386
                            .format(virtual_point['id'],
387
                                    latest_meta_data['utc_date_time'].isoformat()[0:19],
388
                                    latest_meta_data['actual_value']))
389
            # print("latest_value:" + latest_value)
390
            cursor_historical_db.execute(latest_value)
391
            cnx_historical_db.commit()
392
        except Exception as e:
393
            if cursor_historical_db:
394
                cursor_historical_db.close()
395
            if cnx_historical_db:
396
                cnx_historical_db.close()
397
            return "Error in step 5.3 virtual point worker " + str(e) + " for '" + virtual_point['name'] + "'"
398
399
    if cursor_historical_db:
400
        cursor_historical_db.close()
401
    if cnx_historical_db:
402
        cnx_historical_db.close()
403
404
    return None
405