1
|
|
|
import json |
2
|
|
|
import random |
3
|
|
|
import re |
4
|
|
|
import time |
5
|
|
|
from datetime import datetime |
6
|
|
|
from decimal import Decimal |
7
|
|
|
from multiprocessing import Pool |
8
|
|
|
import mysql.connector |
9
|
|
|
from sympy import sympify, Piecewise, symbols |
10
|
|
|
import config |
11
|
|
|
|
12
|
|
|
|
13
|
|
|
######################################################################################################################## |
14
|
|
|
# PROCEDURES: |
15
|
|
|
# Step 1: Query all virtual points |
16
|
|
|
# Step 2: Create multiprocessing pool to call worker in parallel |
17
|
|
|
######################################################################################################################## |
18
|
|
|
|
19
|
|
View Code Duplication |
def calculate(logger): |
|
|
|
|
20
|
|
|
while True: |
21
|
|
|
# the outermost while loop to reconnect server if there is a connection error |
22
|
|
|
cnx_system_db = None |
23
|
|
|
cursor_system_db = None |
24
|
|
|
try: |
25
|
|
|
cnx_system_db = mysql.connector.connect(**config.myems_system_db) |
26
|
|
|
cursor_system_db = cnx_system_db.cursor() |
27
|
|
|
except Exception as e: |
28
|
|
|
logger.error("Error in step 0 of virtual point calculate " + str(e)) |
29
|
|
|
if cursor_system_db: |
30
|
|
|
cursor_system_db.close() |
31
|
|
|
if cnx_system_db: |
32
|
|
|
cnx_system_db.close() |
33
|
|
|
# sleep and continue the outer loop to reconnect the database |
34
|
|
|
time.sleep(60) |
35
|
|
|
continue |
36
|
|
|
|
37
|
|
|
print("Connected to MyEMS System Database") |
38
|
|
|
|
39
|
|
|
virtual_point_list = list() |
40
|
|
|
try: |
41
|
|
|
cursor_system_db.execute(" SELECT id, name, data_source_id, object_type, high_limit, low_limit, address " |
42
|
|
|
" FROM tbl_points " |
43
|
|
|
" WHERE is_virtual = 1 ") |
44
|
|
|
rows_virtual_points = cursor_system_db.fetchall() |
45
|
|
|
|
46
|
|
|
if rows_virtual_points is None or len(rows_virtual_points) == 0: |
47
|
|
|
# sleep several minutes and continue the outer loop to reconnect the database |
48
|
|
|
time.sleep(60) |
49
|
|
|
continue |
50
|
|
|
|
51
|
|
|
for row in rows_virtual_points: |
52
|
|
|
meta_result = {"id": row[0], |
53
|
|
|
"name": row[1], |
54
|
|
|
"data_source_id": row[2], |
55
|
|
|
"object_type": row[3], |
56
|
|
|
"high_limit": row[4], |
57
|
|
|
"low_limit": row[5], |
58
|
|
|
"address": row[6]} |
59
|
|
|
virtual_point_list.append(meta_result) |
60
|
|
|
|
61
|
|
|
except Exception as e: |
62
|
|
|
logger.error("Error in step 1 of virtual point calculate " + str(e)) |
63
|
|
|
# sleep and continue the outer loop to reconnect the database |
64
|
|
|
time.sleep(60) |
65
|
|
|
continue |
66
|
|
|
finally: |
67
|
|
|
if cursor_system_db: |
68
|
|
|
cursor_system_db.close() |
69
|
|
|
if cnx_system_db: |
70
|
|
|
cnx_system_db.close() |
71
|
|
|
|
72
|
|
|
# shuffle the virtual point list for randomly calculating |
73
|
|
|
random.shuffle(virtual_point_list) |
74
|
|
|
|
75
|
|
|
print("Got all virtual points in MyEMS System Database") |
76
|
|
|
################################################################################################################ |
77
|
|
|
# Step 2: Create multiprocessing pool to call worker in parallel |
78
|
|
|
################################################################################################################ |
79
|
|
|
p = Pool(processes=config.pool_size) |
80
|
|
|
error_list = p.map(worker, virtual_point_list) |
81
|
|
|
p.close() |
82
|
|
|
p.join() |
83
|
|
|
|
84
|
|
|
for error in error_list: |
85
|
|
|
if error is not None and len(error) > 0: |
86
|
|
|
logger.error(error) |
87
|
|
|
|
88
|
|
|
print("go to sleep ") |
89
|
|
|
time.sleep(60) |
90
|
|
|
print("wake from sleep, and continue to work") |
91
|
|
|
|
92
|
|
|
|
93
|
|
|
######################################################################################################################## |
94
|
|
|
# Step 1: get start datetime and end datetime |
95
|
|
|
# Step 2: parse the expression and get all points in substitutions |
96
|
|
|
# Step 3: query points type from system database |
97
|
|
|
# Step 4: query points value from historical database |
98
|
|
|
# Step 5: evaluate the equation with points values |
99
|
|
|
######################################################################################################################## |
100
|
|
|
|
101
|
|
|
def worker(virtual_point): |
102
|
|
|
cnx_historical_db = None |
103
|
|
|
cursor_historical_db = None |
104
|
|
|
|
105
|
|
|
try: |
106
|
|
|
cnx_historical_db = mysql.connector.connect(**config.myems_historical_db) |
107
|
|
|
cursor_historical_db = cnx_historical_db.cursor() |
108
|
|
|
except Exception as e: |
109
|
|
|
if cursor_historical_db: |
110
|
|
|
cursor_historical_db.close() |
111
|
|
|
if cnx_historical_db: |
112
|
|
|
cnx_historical_db.close() |
113
|
|
|
return "Error in step 1.1 of virtual point worker " + str(e) + " for '" + virtual_point['name'] + "'" |
114
|
|
|
|
115
|
|
|
print("Start to process virtual point: " + "'" + virtual_point['name'] + "'") |
116
|
|
|
|
117
|
|
|
#################################################################################################################### |
118
|
|
|
# step 1: get start datetime and end datetime |
119
|
|
|
#################################################################################################################### |
120
|
|
|
if virtual_point['object_type'] == 'ANALOG_VALUE': |
121
|
|
|
table_name = "tbl_analog_value" |
122
|
|
|
elif virtual_point['object_type'] == 'ENERGY_VALUE': |
123
|
|
|
table_name = "tbl_energy_value" |
124
|
|
|
else: |
125
|
|
|
if cursor_historical_db: |
126
|
|
|
cursor_historical_db.close() |
127
|
|
|
if cnx_historical_db: |
128
|
|
|
cnx_historical_db.close() |
129
|
|
|
return "variable point type should not be DIGITAL_VALUE " + " for '" + virtual_point['name'] + "'" |
130
|
|
|
|
131
|
|
|
try: |
132
|
|
|
query = (" SELECT MAX(utc_date_time) " |
133
|
|
|
" FROM " + table_name + |
134
|
|
|
" WHERE point_id = %s ") |
135
|
|
|
cursor_historical_db.execute(query, (virtual_point['id'],)) |
136
|
|
|
row = cursor_historical_db.fetchone() |
137
|
|
|
except Exception as e: |
138
|
|
|
if cursor_historical_db: |
139
|
|
|
cursor_historical_db.close() |
140
|
|
|
if cnx_historical_db: |
141
|
|
|
cnx_historical_db.close() |
142
|
|
|
return "Error in step 1.2 of virtual point worker " + str(e) + " for '" + virtual_point['name'] + "'" |
143
|
|
|
|
144
|
|
|
start_datetime_utc = datetime.strptime(config.start_datetime_utc, '%Y-%m-%d %H:%M:%S').replace(tzinfo=None) |
145
|
|
|
|
146
|
|
|
if row is not None and len(row) > 0 and isinstance(row[0], datetime): |
147
|
|
|
start_datetime_utc = row[0].replace(tzinfo=None) |
148
|
|
|
|
149
|
|
|
end_datetime_utc = datetime.utcnow().replace(tzinfo=None) |
150
|
|
|
|
151
|
|
|
if end_datetime_utc <= start_datetime_utc: |
152
|
|
|
if cursor_historical_db: |
153
|
|
|
cursor_historical_db.close() |
154
|
|
|
if cnx_historical_db: |
155
|
|
|
cnx_historical_db.close() |
156
|
|
|
return "it isn't time to calculate" + " for '" + virtual_point['name'] + "'" |
157
|
|
|
|
158
|
|
|
print("start_datetime_utc: " + start_datetime_utc.isoformat()[0:19] |
159
|
|
|
+ "end_datetime_utc: " + end_datetime_utc.isoformat()[0:19]) |
160
|
|
|
|
161
|
|
|
############################################################################################################ |
162
|
|
|
# Step 2: parse the expression and get all points in substitutions |
163
|
|
|
############################################################################################################ |
164
|
|
|
point_list = list() |
165
|
|
|
try: |
166
|
|
|
######################################################################################################## |
167
|
|
|
# parse the expression and get all points in substitutions |
168
|
|
|
######################################################################################################## |
169
|
|
|
address = json.loads(virtual_point['address']) |
170
|
|
|
# algebraic expression example: '{"expression": "x1-x2", "substitutions": {"x1":1,"x2":2}}' |
171
|
|
|
# piecewise function example: '{"expression":"(1,x<200 ), (2,x>=500), (0,True)", "substitutions":{"x":101}}' |
172
|
|
|
if 'expression' not in address.keys() \ |
173
|
|
|
or 'substitutions' not in address.keys() \ |
174
|
|
|
or len(address['expression']) == 0 \ |
175
|
|
|
or len(address['substitutions']) == 0: |
176
|
|
|
return "Error in step 2.1 of virtual point worker for '" + virtual_point['name'] + "'" |
177
|
|
|
expression = address['expression'] |
178
|
|
|
substitutions = address['substitutions'] |
179
|
|
|
for variable_name, point_id in substitutions.items(): |
180
|
|
|
point_list.append({"variable_name": variable_name, "point_id": point_id}) |
181
|
|
|
except Exception as e: |
182
|
|
|
if cursor_historical_db: |
183
|
|
|
cursor_historical_db.close() |
184
|
|
|
if cnx_historical_db: |
185
|
|
|
cnx_historical_db.close() |
186
|
|
|
return "Error in step 2.2 of virtual point worker " + str(e) + " for '" + virtual_point['name'] + "'" |
187
|
|
|
|
188
|
|
|
############################################################################################################ |
189
|
|
|
# Step 3: query points type from system database |
190
|
|
|
############################################################################################################ |
191
|
|
|
print("getting points type ") |
192
|
|
|
cnx_system_db = None |
193
|
|
|
cursor_system_db = None |
194
|
|
|
try: |
195
|
|
|
cnx_system_db = mysql.connector.connect(**config.myems_system_db) |
196
|
|
|
cursor_system_db = cnx_system_db.cursor() |
197
|
|
|
except Exception as e: |
198
|
|
|
if cursor_system_db: |
199
|
|
|
cursor_system_db.close() |
200
|
|
|
if cnx_system_db: |
201
|
|
|
cnx_system_db.close() |
202
|
|
|
print("Error in step 3 of virtual point worker " + str(e)) |
203
|
|
|
return "Error in step 3 of virtual point worker " + str(e) |
204
|
|
|
|
205
|
|
|
print("Connected to MyEMS System Database") |
206
|
|
|
|
207
|
|
|
all_point_dict = dict() |
208
|
|
|
try: |
209
|
|
|
cursor_system_db.execute(" SELECT id, object_type " |
210
|
|
|
" FROM tbl_points ") |
211
|
|
|
rows_points = cursor_system_db.fetchall() |
212
|
|
|
|
213
|
|
|
if rows_points is None or len(rows_points) == 0: |
214
|
|
|
return "Error in step 3.1 of virtual point worker for '" + virtual_point['name'] + "'" |
215
|
|
|
|
216
|
|
|
for row in rows_points: |
217
|
|
|
all_point_dict[row[0]] = row[1] |
218
|
|
|
except Exception as e: |
219
|
|
|
return "Error in step 3.2 virtual point worker " + str(e) + " for '" + virtual_point['name'] + "'" |
220
|
|
|
finally: |
221
|
|
|
if cursor_system_db: |
222
|
|
|
cursor_system_db.close() |
223
|
|
|
if cnx_system_db: |
224
|
|
|
cnx_system_db.close() |
225
|
|
|
############################################################################################################ |
226
|
|
|
# Step 4: query points value from historical database |
227
|
|
|
############################################################################################################ |
228
|
|
|
|
229
|
|
|
print("getting point values ") |
230
|
|
|
point_values_dict = dict() |
231
|
|
|
if point_list is not None and len(point_list) > 0: |
232
|
|
|
try: |
233
|
|
|
for point in point_list: |
234
|
|
|
point_object_type = all_point_dict.get(point['point_id']) |
235
|
|
|
if point_object_type is None: |
236
|
|
|
return "variable point type should not be None " + " for '" + virtual_point['name'] + "'" |
237
|
|
|
if point_object_type == 'ANALOG_VALUE': |
238
|
|
|
query = (" SELECT utc_date_time, actual_value " |
239
|
|
|
" FROM tbl_analog_value " |
240
|
|
|
" WHERE point_id = %s AND utc_date_time > %s AND utc_date_time < %s " |
241
|
|
|
" ORDER BY utc_date_time ") |
242
|
|
|
cursor_historical_db.execute(query, (point['point_id'], start_datetime_utc, end_datetime_utc,)) |
243
|
|
|
rows = cursor_historical_db.fetchall() |
244
|
|
|
if rows is not None and len(rows) > 0: |
245
|
|
|
point_values_dict[point['point_id']] = dict() |
246
|
|
|
for row in rows: |
247
|
|
|
point_values_dict[point['point_id']][row[0]] = row[1] |
248
|
|
|
elif point_object_type == 'ENERGY_VALUE': |
249
|
|
|
query = (" SELECT utc_date_time, actual_value " |
250
|
|
|
" FROM tbl_energy_value " |
251
|
|
|
" WHERE point_id = %s AND utc_date_time > %s AND utc_date_time < %s " |
252
|
|
|
" ORDER BY utc_date_time ") |
253
|
|
|
cursor_historical_db.execute(query, (point['point_id'], start_datetime_utc, end_datetime_utc,)) |
254
|
|
|
rows = cursor_historical_db.fetchall() |
255
|
|
|
if rows is not None and len(rows) > 0: |
256
|
|
|
point_values_dict[point['point_id']] = dict() |
257
|
|
|
for row in rows: |
258
|
|
|
point_values_dict[point['point_id']][row[0]] = row[1] |
259
|
|
|
else: |
260
|
|
|
point_values_dict[point['point_id']] = None |
261
|
|
|
else: |
262
|
|
|
# point type should not be DIGITAL_VALUE |
263
|
|
|
return "variable point type should not be DIGITAL_VALUE " + " for '" + virtual_point['name'] + "'" |
264
|
|
|
except Exception as e: |
265
|
|
|
if cursor_historical_db: |
266
|
|
|
cursor_historical_db.close() |
267
|
|
|
if cnx_historical_db: |
268
|
|
|
cnx_historical_db.close() |
269
|
|
|
return "Error in step 4.1 virtual point worker " + str(e) + " for '" + virtual_point['name'] + "'" |
270
|
|
|
|
271
|
|
|
############################################################################################################ |
272
|
|
|
# Step 5: evaluate the equation with points values |
273
|
|
|
############################################################################################################ |
274
|
|
|
|
275
|
|
|
print("getting date time set for all points") |
276
|
|
|
utc_date_time_set = set() |
277
|
|
|
if point_values_dict is not None and len(point_values_dict) > 0: |
278
|
|
|
for point_id, point_values in point_values_dict.items(): |
279
|
|
|
if point_values is not None and len(point_values) > 0: |
280
|
|
|
utc_date_time_set = utc_date_time_set.union(point_values.keys()) |
281
|
|
|
|
282
|
|
|
print("evaluating the equation with SymPy") |
283
|
|
|
normalized_values = list() |
284
|
|
|
|
285
|
|
|
############################################################################################################ |
286
|
|
|
# Converting Strings to SymPy Expressions |
287
|
|
|
# The sympify function(that’s sympify, not to be confused with simplify) can be used to |
288
|
|
|
# convert strings into SymPy expressions. |
289
|
|
|
############################################################################################################ |
290
|
|
|
try: |
291
|
|
|
if re.search(',', expression): |
292
|
|
|
for item in substitutions.keys(): |
293
|
|
|
locals()[item] = symbols(item) |
|
|
|
|
294
|
|
|
expr = eval(expression) |
295
|
|
|
print("the expression will be evaluated as piecewise function: " + str(expr)) |
296
|
|
|
else: |
297
|
|
|
expr = sympify(expression) |
298
|
|
|
print("the expression will be evaluated as algebraic expression: " + str(expr)) |
299
|
|
|
|
300
|
|
|
for utc_date_time in utc_date_time_set: |
301
|
|
|
meta_data = dict() |
302
|
|
|
meta_data['utc_date_time'] = utc_date_time |
303
|
|
|
|
304
|
|
|
#################################################################################################### |
305
|
|
|
# create a dictionary of Symbol: point pairs |
306
|
|
|
#################################################################################################### |
307
|
|
|
|
308
|
|
|
subs = dict() |
309
|
|
|
|
310
|
|
|
#################################################################################################### |
311
|
|
|
# Evaluating the expression at current_datetime_utc |
312
|
|
|
#################################################################################################### |
313
|
|
|
|
314
|
|
|
if point_list is not None and len(point_list) > 0: |
315
|
|
|
for point in point_list: |
316
|
|
|
actual_value = point_values_dict[point['point_id']].get(utc_date_time, None) |
317
|
|
|
if actual_value is None: |
318
|
|
|
break |
319
|
|
|
subs[point['variable_name']] = actual_value |
320
|
|
|
|
321
|
|
|
if len(subs) != len(point_list): |
322
|
|
|
continue |
323
|
|
|
|
324
|
|
|
#################################################################################################### |
325
|
|
|
# To numerically evaluate an expression with a Symbol at a point, |
326
|
|
|
# we might use subs followed by evalf, |
327
|
|
|
# but it is more efficient and numerically stable to pass the substitution to evalf |
328
|
|
|
# using the subs flag, which takes a dictionary of Symbol: point pairs. |
329
|
|
|
#################################################################################################### |
330
|
|
|
if re.search(',', expression): |
331
|
|
|
formula = Piecewise(*expr) |
332
|
|
|
meta_data['actual_value'] = Decimal(str(formula.subs(subs))) |
333
|
|
|
normalized_values.append(meta_data) |
334
|
|
|
else: |
335
|
|
|
meta_data['actual_value'] = Decimal(str(expr.evalf(subs=subs))) |
336
|
|
|
normalized_values.append(meta_data) |
337
|
|
|
except Exception as e: |
338
|
|
|
if cursor_historical_db: |
339
|
|
|
cursor_historical_db.close() |
340
|
|
|
if cnx_historical_db: |
341
|
|
|
cnx_historical_db.close() |
342
|
|
|
return "Error in step 5.1 virtual point worker " + str(e) + " for '" + virtual_point['name'] + "'" |
343
|
|
|
|
344
|
|
|
print("saving virtual points values to historical database") |
345
|
|
|
|
346
|
|
|
if len(normalized_values) > 0: |
347
|
|
|
latest_meta_data = normalized_values[0] |
348
|
|
|
|
349
|
|
|
while len(normalized_values) > 0: |
350
|
|
|
insert_100 = normalized_values[:100] |
351
|
|
|
normalized_values = normalized_values[100:] |
352
|
|
|
|
353
|
|
|
try: |
354
|
|
|
add_values = (" INSERT INTO " + table_name + |
355
|
|
|
" (point_id, utc_date_time, actual_value) " |
356
|
|
|
" VALUES ") |
357
|
|
|
|
358
|
|
|
for meta_data in insert_100: |
359
|
|
|
add_values += " (" + str(virtual_point['id']) + "," |
360
|
|
|
add_values += "'" + meta_data['utc_date_time'].isoformat()[0:19] + "'," |
361
|
|
|
add_values += str(meta_data['actual_value']) + "), " |
362
|
|
|
|
363
|
|
|
if meta_data['utc_date_time'] > latest_meta_data['utc_date_time']: |
364
|
|
|
latest_meta_data = meta_data |
365
|
|
|
|
366
|
|
|
# print("add_values:" + add_values) |
367
|
|
|
# trim ", " at the end of string and then execute |
368
|
|
|
cursor_historical_db.execute(add_values[:-2]) |
369
|
|
|
cnx_historical_db.commit() |
370
|
|
|
except Exception as e: |
371
|
|
|
if cursor_historical_db: |
372
|
|
|
cursor_historical_db.close() |
373
|
|
|
if cnx_historical_db: |
374
|
|
|
cnx_historical_db.close() |
375
|
|
|
return "Error in step 5.2 virtual point worker " + str(e) + " for '" + virtual_point['name'] + "'" |
376
|
|
|
|
377
|
|
|
try: |
378
|
|
|
# update tbl_analog_value_latest or tbl_energy_value_latest |
379
|
|
|
delete_value = " DELETE FROM " + table_name + "_latest WHERE point_id = {} ".format(virtual_point['id']) |
380
|
|
|
# print("delete_value:" + delete_value) |
381
|
|
|
cursor_historical_db.execute(delete_value) |
382
|
|
|
cnx_historical_db.commit() |
383
|
|
|
|
384
|
|
|
latest_value = (" INSERT INTO " + table_name + "_latest (point_id, utc_date_time, actual_value) " |
385
|
|
|
" VALUES ({}, '{}', {}) " |
386
|
|
|
.format(virtual_point['id'], |
387
|
|
|
latest_meta_data['utc_date_time'].isoformat()[0:19], |
388
|
|
|
latest_meta_data['actual_value'])) |
389
|
|
|
# print("latest_value:" + latest_value) |
390
|
|
|
cursor_historical_db.execute(latest_value) |
391
|
|
|
cnx_historical_db.commit() |
392
|
|
|
except Exception as e: |
393
|
|
|
if cursor_historical_db: |
394
|
|
|
cursor_historical_db.close() |
395
|
|
|
if cnx_historical_db: |
396
|
|
|
cnx_historical_db.close() |
397
|
|
|
return "Error in step 5.3 virtual point worker " + str(e) + " for '" + virtual_point['name'] + "'" |
398
|
|
|
|
399
|
|
|
if cursor_historical_db: |
400
|
|
|
cursor_historical_db.close() |
401
|
|
|
if cnx_historical_db: |
402
|
|
|
cnx_historical_db.close() |
403
|
|
|
|
404
|
|
|
return None |
405
|
|
|
|