1 | import json |
||
2 | import random |
||
3 | import re |
||
4 | import time |
||
5 | from datetime import datetime |
||
6 | from decimal import Decimal |
||
7 | from multiprocessing import Pool |
||
8 | import mysql.connector |
||
9 | from sympy import sympify, Piecewise, symbols |
||
10 | import config |
||
11 | |||
12 | |||
13 | ######################################################################################################################## |
||
14 | # PROCEDURES: |
||
15 | # Step 1: Query all virtual points |
||
16 | # Step 2: Create multiprocessing pool to call worker in parallel |
||
17 | ######################################################################################################################## |
||
18 | |||
19 | View Code Duplication | def calculate(logger): |
|
0 ignored issues
–
show
Duplication
introduced
by
![]() |
|||
20 | while True: |
||
21 | # the outermost while loop to reconnect server if there is a connection error |
||
22 | cnx_system_db = None |
||
23 | cursor_system_db = None |
||
24 | try: |
||
25 | cnx_system_db = mysql.connector.connect(**config.myems_system_db) |
||
26 | cursor_system_db = cnx_system_db.cursor() |
||
27 | except Exception as e: |
||
28 | logger.error("Error in step 0 of virtual point calculate " + str(e)) |
||
29 | if cursor_system_db: |
||
30 | cursor_system_db.close() |
||
31 | if cnx_system_db: |
||
32 | cnx_system_db.close() |
||
33 | # sleep and continue the outer loop to reconnect the database |
||
34 | time.sleep(60) |
||
35 | continue |
||
36 | |||
37 | print("Connected to MyEMS System Database") |
||
38 | |||
39 | virtual_point_list = list() |
||
40 | try: |
||
41 | cursor_system_db.execute(" SELECT id, name, data_source_id, object_type, high_limit, low_limit, address " |
||
42 | " FROM tbl_points " |
||
43 | " WHERE is_virtual = 1 ") |
||
44 | rows_virtual_points = cursor_system_db.fetchall() |
||
45 | |||
46 | if rows_virtual_points is None or len(rows_virtual_points) == 0: |
||
47 | # sleep several minutes and continue the outer loop to reconnect the database |
||
48 | time.sleep(60) |
||
49 | continue |
||
50 | |||
51 | for row in rows_virtual_points: |
||
52 | meta_result = {"id": row[0], |
||
53 | "name": row[1], |
||
54 | "data_source_id": row[2], |
||
55 | "object_type": row[3], |
||
56 | "high_limit": row[4], |
||
57 | "low_limit": row[5], |
||
58 | "address": row[6]} |
||
59 | virtual_point_list.append(meta_result) |
||
60 | |||
61 | except Exception as e: |
||
62 | logger.error("Error in step 1 of virtual point calculate " + str(e)) |
||
63 | # sleep and continue the outer loop to reconnect the database |
||
64 | time.sleep(60) |
||
65 | continue |
||
66 | finally: |
||
67 | if cursor_system_db: |
||
68 | cursor_system_db.close() |
||
69 | if cnx_system_db: |
||
70 | cnx_system_db.close() |
||
71 | |||
72 | # shuffle the virtual point list for randomly calculating |
||
73 | random.shuffle(virtual_point_list) |
||
74 | |||
75 | print("Got all virtual points in MyEMS System Database") |
||
76 | ################################################################################################################ |
||
77 | # Step 2: Create multiprocessing pool to call worker in parallel |
||
78 | ################################################################################################################ |
||
79 | p = Pool(processes=config.pool_size) |
||
80 | error_list = p.map(worker, virtual_point_list) |
||
81 | p.close() |
||
82 | p.join() |
||
83 | |||
84 | for error in error_list: |
||
85 | if error is not None and len(error) > 0: |
||
86 | logger.error(error) |
||
87 | |||
88 | print("go to sleep ") |
||
89 | time.sleep(60) |
||
90 | print("wake from sleep, and continue to work") |
||
91 | |||
92 | |||
93 | ######################################################################################################################## |
||
94 | # Step 1: get start datetime and end datetime |
||
95 | # Step 2: parse the expression and get all points in substitutions |
||
96 | # Step 3: query points type from system database |
||
97 | # Step 4: query points value from historical database |
||
98 | # Step 5: evaluate the equation with points values |
||
99 | ######################################################################################################################## |
||
100 | |||
101 | def worker(virtual_point): |
||
102 | cnx_historical_db = None |
||
103 | cursor_historical_db = None |
||
104 | |||
105 | try: |
||
106 | cnx_historical_db = mysql.connector.connect(**config.myems_historical_db) |
||
107 | cursor_historical_db = cnx_historical_db.cursor() |
||
108 | except Exception as e: |
||
109 | if cursor_historical_db: |
||
110 | cursor_historical_db.close() |
||
111 | if cnx_historical_db: |
||
112 | cnx_historical_db.close() |
||
113 | return "Error in step 1.1 of virtual point worker " + str(e) + " for '" + virtual_point['name'] + "'" |
||
114 | |||
115 | print("Start to process virtual point: " + "'" + virtual_point['name'] + "'") |
||
116 | |||
117 | #################################################################################################################### |
||
118 | # step 1: get start datetime and end datetime |
||
119 | #################################################################################################################### |
||
120 | if virtual_point['object_type'] == 'ANALOG_VALUE': |
||
121 | table_name = "tbl_analog_value" |
||
122 | elif virtual_point['object_type'] == 'ENERGY_VALUE': |
||
123 | table_name = "tbl_energy_value" |
||
124 | else: |
||
125 | if cursor_historical_db: |
||
126 | cursor_historical_db.close() |
||
127 | if cnx_historical_db: |
||
128 | cnx_historical_db.close() |
||
129 | return "variable point type should not be DIGITAL_VALUE " + " for '" + virtual_point['name'] + "'" |
||
130 | |||
131 | try: |
||
132 | query = (" SELECT MAX(utc_date_time) " |
||
133 | " FROM " + table_name + |
||
134 | " WHERE point_id = %s ") |
||
135 | cursor_historical_db.execute(query, (virtual_point['id'],)) |
||
136 | row = cursor_historical_db.fetchone() |
||
137 | except Exception as e: |
||
138 | if cursor_historical_db: |
||
139 | cursor_historical_db.close() |
||
140 | if cnx_historical_db: |
||
141 | cnx_historical_db.close() |
||
142 | return "Error in step 1.2 of virtual point worker " + str(e) + " for '" + virtual_point['name'] + "'" |
||
143 | |||
144 | start_datetime_utc = datetime.strptime(config.start_datetime_utc, '%Y-%m-%d %H:%M:%S').replace(tzinfo=None) |
||
145 | |||
146 | if row is not None and len(row) > 0 and isinstance(row[0], datetime): |
||
147 | start_datetime_utc = row[0].replace(tzinfo=None) |
||
148 | |||
149 | end_datetime_utc = datetime.utcnow().replace(tzinfo=None) |
||
150 | |||
151 | if end_datetime_utc <= start_datetime_utc: |
||
152 | if cursor_historical_db: |
||
153 | cursor_historical_db.close() |
||
154 | if cnx_historical_db: |
||
155 | cnx_historical_db.close() |
||
156 | return "it isn't time to calculate" + " for '" + virtual_point['name'] + "'" |
||
157 | |||
158 | print("start_datetime_utc: " + start_datetime_utc.isoformat()[0:19] |
||
159 | + "end_datetime_utc: " + end_datetime_utc.isoformat()[0:19]) |
||
160 | |||
161 | ############################################################################################################ |
||
162 | # Step 2: parse the expression and get all points in substitutions |
||
163 | ############################################################################################################ |
||
164 | point_list = list() |
||
165 | try: |
||
166 | ######################################################################################################## |
||
167 | # parse the expression and get all points in substitutions |
||
168 | ######################################################################################################## |
||
169 | address = json.loads(virtual_point['address']) |
||
170 | # algebraic expression example: '{"expression": "x1-x2", "substitutions": {"x1":1,"x2":2}}' |
||
171 | # piecewise function example: '{"expression":"(1,x<200 ), (2,x>=500), (0,True)", "substitutions":{"x":101}}' |
||
172 | if 'expression' not in address.keys() \ |
||
173 | or 'substitutions' not in address.keys() \ |
||
174 | or len(address['expression']) == 0 \ |
||
175 | or len(address['substitutions']) == 0: |
||
176 | return "Error in step 2.1 of virtual point worker for '" + virtual_point['name'] + "'" |
||
177 | expression = address['expression'] |
||
178 | substitutions = address['substitutions'] |
||
179 | for variable_name, point_id in substitutions.items(): |
||
180 | point_list.append({"variable_name": variable_name, "point_id": point_id}) |
||
181 | except Exception as e: |
||
182 | if cursor_historical_db: |
||
183 | cursor_historical_db.close() |
||
184 | if cnx_historical_db: |
||
185 | cnx_historical_db.close() |
||
186 | return "Error in step 2.2 of virtual point worker " + str(e) + " for '" + virtual_point['name'] + "'" |
||
187 | |||
188 | ############################################################################################################ |
||
189 | # Step 3: query points type from system database |
||
190 | ############################################################################################################ |
||
191 | print("getting points type ") |
||
192 | cnx_system_db = None |
||
193 | cursor_system_db = None |
||
194 | try: |
||
195 | cnx_system_db = mysql.connector.connect(**config.myems_system_db) |
||
196 | cursor_system_db = cnx_system_db.cursor() |
||
197 | except Exception as e: |
||
198 | if cursor_system_db: |
||
199 | cursor_system_db.close() |
||
200 | if cnx_system_db: |
||
201 | cnx_system_db.close() |
||
202 | print("Error in step 3 of virtual point worker " + str(e)) |
||
203 | return "Error in step 3 of virtual point worker " + str(e) |
||
204 | |||
205 | print("Connected to MyEMS System Database") |
||
206 | |||
207 | all_point_dict = dict() |
||
208 | try: |
||
209 | cursor_system_db.execute(" SELECT id, object_type " |
||
210 | " FROM tbl_points ") |
||
211 | rows_points = cursor_system_db.fetchall() |
||
212 | |||
213 | if rows_points is None or len(rows_points) == 0: |
||
214 | return "Error in step 3.1 of virtual point worker for '" + virtual_point['name'] + "'" |
||
215 | |||
216 | for row in rows_points: |
||
217 | all_point_dict[row[0]] = row[1] |
||
218 | except Exception as e: |
||
219 | return "Error in step 3.2 virtual point worker " + str(e) + " for '" + virtual_point['name'] + "'" |
||
220 | finally: |
||
221 | if cursor_system_db: |
||
222 | cursor_system_db.close() |
||
223 | if cnx_system_db: |
||
224 | cnx_system_db.close() |
||
225 | ############################################################################################################ |
||
226 | # Step 4: query points value from historical database |
||
227 | ############################################################################################################ |
||
228 | |||
229 | print("getting point values ") |
||
230 | point_values_dict = dict() |
||
231 | if point_list is not None and len(point_list) > 0: |
||
232 | try: |
||
233 | for point in point_list: |
||
234 | point_object_type = all_point_dict.get(point['point_id']) |
||
235 | if point_object_type is None: |
||
236 | return "variable point type should not be None " + " for '" + virtual_point['name'] + "'" |
||
237 | if point_object_type == 'ANALOG_VALUE': |
||
238 | query = (" SELECT utc_date_time, actual_value " |
||
239 | " FROM tbl_analog_value " |
||
240 | " WHERE point_id = %s AND utc_date_time > %s AND utc_date_time < %s " |
||
241 | " ORDER BY utc_date_time ") |
||
242 | cursor_historical_db.execute(query, (point['point_id'], start_datetime_utc, end_datetime_utc,)) |
||
243 | rows = cursor_historical_db.fetchall() |
||
244 | if rows is not None and len(rows) > 0: |
||
245 | point_values_dict[point['point_id']] = dict() |
||
246 | for row in rows: |
||
247 | point_values_dict[point['point_id']][row[0]] = row[1] |
||
248 | elif point_object_type == 'ENERGY_VALUE': |
||
249 | query = (" SELECT utc_date_time, actual_value " |
||
250 | " FROM tbl_energy_value " |
||
251 | " WHERE point_id = %s AND utc_date_time > %s AND utc_date_time < %s " |
||
252 | " ORDER BY utc_date_time ") |
||
253 | cursor_historical_db.execute(query, (point['point_id'], start_datetime_utc, end_datetime_utc,)) |
||
254 | rows = cursor_historical_db.fetchall() |
||
255 | if rows is not None and len(rows) > 0: |
||
256 | point_values_dict[point['point_id']] = dict() |
||
257 | for row in rows: |
||
258 | point_values_dict[point['point_id']][row[0]] = row[1] |
||
259 | else: |
||
260 | point_values_dict[point['point_id']] = None |
||
261 | else: |
||
262 | # point type should not be DIGITAL_VALUE |
||
263 | return "variable point type should not be DIGITAL_VALUE " + " for '" + virtual_point['name'] + "'" |
||
264 | except Exception as e: |
||
265 | if cursor_historical_db: |
||
266 | cursor_historical_db.close() |
||
267 | if cnx_historical_db: |
||
268 | cnx_historical_db.close() |
||
269 | return "Error in step 4.1 virtual point worker " + str(e) + " for '" + virtual_point['name'] + "'" |
||
270 | |||
271 | ############################################################################################################ |
||
272 | # Step 5: evaluate the equation with points values |
||
273 | ############################################################################################################ |
||
274 | |||
275 | print("getting date time set for all points") |
||
276 | utc_date_time_set = set() |
||
277 | if point_values_dict is not None and len(point_values_dict) > 0: |
||
278 | for point_id, point_values in point_values_dict.items(): |
||
279 | if point_values is not None and len(point_values) > 0: |
||
280 | utc_date_time_set = utc_date_time_set.union(point_values.keys()) |
||
281 | |||
282 | print("evaluating the equation with SymPy") |
||
283 | normalized_values = list() |
||
284 | |||
285 | ############################################################################################################ |
||
286 | # Converting Strings to SymPy Expressions |
||
287 | # The sympify function(that’s sympify, not to be confused with simplify) can be used to |
||
288 | # convert strings into SymPy expressions. |
||
289 | ############################################################################################################ |
||
290 | try: |
||
291 | if re.search(',', expression): |
||
292 | for item in substitutions.keys(): |
||
293 | locals()[item] = symbols(item) |
||
0 ignored issues
–
show
Comprehensibility
Best Practice
introduced
by
|
|||
294 | expr = eval(expression) |
||
295 | print("the expression will be evaluated as piecewise function: " + str(expr)) |
||
296 | else: |
||
297 | expr = sympify(expression) |
||
298 | print("the expression will be evaluated as algebraic expression: " + str(expr)) |
||
299 | |||
300 | for utc_date_time in utc_date_time_set: |
||
301 | meta_data = dict() |
||
302 | meta_data['utc_date_time'] = utc_date_time |
||
303 | |||
304 | #################################################################################################### |
||
305 | # create a dictionary of Symbol: point pairs |
||
306 | #################################################################################################### |
||
307 | |||
308 | subs = dict() |
||
309 | |||
310 | #################################################################################################### |
||
311 | # Evaluating the expression at current_datetime_utc |
||
312 | #################################################################################################### |
||
313 | |||
314 | if point_list is not None and len(point_list) > 0: |
||
315 | for point in point_list: |
||
316 | actual_value = point_values_dict[point['point_id']].get(utc_date_time, None) |
||
317 | if actual_value is None: |
||
318 | break |
||
319 | subs[point['variable_name']] = actual_value |
||
320 | |||
321 | if len(subs) != len(point_list): |
||
322 | continue |
||
323 | |||
324 | #################################################################################################### |
||
325 | # To numerically evaluate an expression with a Symbol at a point, |
||
326 | # we might use subs followed by evalf, |
||
327 | # but it is more efficient and numerically stable to pass the substitution to evalf |
||
328 | # using the subs flag, which takes a dictionary of Symbol: point pairs. |
||
329 | #################################################################################################### |
||
330 | if re.search(',', expression): |
||
331 | formula = Piecewise(*expr) |
||
332 | meta_data['actual_value'] = Decimal(str(formula.subs(subs))) |
||
333 | normalized_values.append(meta_data) |
||
334 | else: |
||
335 | meta_data['actual_value'] = Decimal(str(expr.evalf(subs=subs))) |
||
336 | normalized_values.append(meta_data) |
||
337 | except Exception as e: |
||
338 | if cursor_historical_db: |
||
339 | cursor_historical_db.close() |
||
340 | if cnx_historical_db: |
||
341 | cnx_historical_db.close() |
||
342 | return "Error in step 5.1 virtual point worker " + str(e) + " for '" + virtual_point['name'] + "'" |
||
343 | |||
344 | print("saving virtual points values to historical database") |
||
345 | |||
346 | if len(normalized_values) > 0: |
||
347 | latest_meta_data = normalized_values[0] |
||
348 | |||
349 | while len(normalized_values) > 0: |
||
350 | insert_100 = normalized_values[:100] |
||
351 | normalized_values = normalized_values[100:] |
||
352 | |||
353 | try: |
||
354 | add_values = (" INSERT INTO " + table_name + |
||
355 | " (point_id, utc_date_time, actual_value) " |
||
356 | " VALUES ") |
||
357 | |||
358 | for meta_data in insert_100: |
||
359 | add_values += " (" + str(virtual_point['id']) + "," |
||
360 | add_values += "'" + meta_data['utc_date_time'].isoformat()[0:19] + "'," |
||
361 | add_values += str(meta_data['actual_value']) + "), " |
||
362 | |||
363 | if meta_data['utc_date_time'] > latest_meta_data['utc_date_time']: |
||
364 | latest_meta_data = meta_data |
||
365 | |||
366 | # print("add_values:" + add_values) |
||
367 | # trim ", " at the end of string and then execute |
||
368 | cursor_historical_db.execute(add_values[:-2]) |
||
369 | cnx_historical_db.commit() |
||
370 | except Exception as e: |
||
371 | if cursor_historical_db: |
||
372 | cursor_historical_db.close() |
||
373 | if cnx_historical_db: |
||
374 | cnx_historical_db.close() |
||
375 | return "Error in step 5.2 virtual point worker " + str(e) + " for '" + virtual_point['name'] + "'" |
||
376 | |||
377 | try: |
||
378 | # update tbl_analog_value_latest or tbl_energy_value_latest |
||
379 | delete_value = " DELETE FROM " + table_name + "_latest WHERE point_id = {} ".format(virtual_point['id']) |
||
380 | # print("delete_value:" + delete_value) |
||
381 | cursor_historical_db.execute(delete_value) |
||
382 | cnx_historical_db.commit() |
||
383 | |||
384 | latest_value = (" INSERT INTO " + table_name + "_latest (point_id, utc_date_time, actual_value) " |
||
385 | " VALUES ({}, '{}', {}) " |
||
386 | .format(virtual_point['id'], |
||
387 | latest_meta_data['utc_date_time'].isoformat()[0:19], |
||
388 | latest_meta_data['actual_value'])) |
||
389 | # print("latest_value:" + latest_value) |
||
390 | cursor_historical_db.execute(latest_value) |
||
391 | cnx_historical_db.commit() |
||
392 | except Exception as e: |
||
393 | if cursor_historical_db: |
||
394 | cursor_historical_db.close() |
||
395 | if cnx_historical_db: |
||
396 | cnx_historical_db.close() |
||
397 | return "Error in step 5.3 virtual point worker " + str(e) + " for '" + virtual_point['name'] + "'" |
||
398 | |||
399 | if cursor_historical_db: |
||
400 | cursor_historical_db.close() |
||
401 | if cnx_historical_db: |
||
402 | cnx_historical_db.close() |
||
403 | |||
404 | return None |
||
405 |