Total Complexity | 65 |
Total Lines | 302 |
Duplicated Lines | 23.84 % |
Changes | 0 |
Duplicate code is one of the most pungent code smells. A rule that is often used is to re-structure code once it is duplicated in three or more places.
Common duplication problems, and corresponding solutions are:
Complex classes like virtualpoint often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.
Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.
1 | import time |
||
2 | from datetime import datetime, timedelta |
||
3 | import mysql.connector |
||
4 | from sympy import sympify |
||
5 | from multiprocessing import Pool |
||
6 | import random |
||
7 | import json |
||
8 | import config |
||
9 | |||
10 | |||
11 | ######################################################################################################################## |
||
12 | # PROCEDURES: |
||
13 | # Step 1: Query all virtual points |
||
14 | # Step 2: Create multiprocessing pool to call worker in parallel |
||
15 | ######################################################################################################################## |
||
16 | |||
17 | View Code Duplication | def calculate(logger): |
|
|
|||
18 | |||
19 | while True: |
||
20 | # outer loop to reconnect server if there is a connection error |
||
21 | cnx_system_db = None |
||
22 | cursor_system_db = None |
||
23 | try: |
||
24 | cnx_system_db = mysql.connector.connect(**config.myems_system_db) |
||
25 | cursor_system_db = cnx_system_db.cursor(dictionary=True) |
||
26 | except Exception as e: |
||
27 | logger.error("Error in step 0 of virtual point calculate " + str(e)) |
||
28 | if cursor_system_db: |
||
29 | cursor_system_db.close() |
||
30 | if cnx_system_db: |
||
31 | cnx_system_db.close() |
||
32 | # sleep and continue the outer loop to reconnect the database |
||
33 | time.sleep(60) |
||
34 | continue |
||
35 | |||
36 | print("Connected to MyEMS System Database") |
||
37 | |||
38 | virtual_point_list = list() |
||
39 | try: |
||
40 | cursor_system_db.execute(" SELECT id, name, data_source_id, high_limit, low_limit, address " |
||
41 | " FROM tbl_points " |
||
42 | " WHERE is_virtual = TRUE AND object_type = 'ANALOG_VALUE' ") |
||
43 | rows_virtual_points = cursor_system_db.fetchall() |
||
44 | |||
45 | if rows_virtual_points is None or len(rows_virtual_points) == 0: |
||
46 | # sleep several minutes and continue the outer loop to reconnect the database |
||
47 | time.sleep(60) |
||
48 | continue |
||
49 | |||
50 | for row in rows_virtual_points: |
||
51 | meta_result = {"id": row['id'], |
||
52 | "name": row['name'], |
||
53 | "data_source_id": row['data_source_id'], |
||
54 | "high_limit": row['high_limit'], |
||
55 | "low_limit": row['low_limit'], |
||
56 | "address": row['address']} |
||
57 | virtual_point_list.append(meta_result) |
||
58 | |||
59 | except Exception as e: |
||
60 | logger.error("Error in step 1 of virtual point calculate " + str(e)) |
||
61 | # sleep and continue the outer loop to reconnect the database |
||
62 | time.sleep(60) |
||
63 | continue |
||
64 | finally: |
||
65 | if cursor_system_db: |
||
66 | cursor_system_db.close() |
||
67 | if cnx_system_db: |
||
68 | cnx_system_db.close() |
||
69 | |||
70 | # shuffle the virtual point list for randomly calculating |
||
71 | random.shuffle(virtual_point_list) |
||
72 | |||
73 | print("Got all virtual points in MyEMS System Database") |
||
74 | ################################################################################################################ |
||
75 | # Step 2: Create multiprocessing pool to call worker in parallel |
||
76 | ################################################################################################################ |
||
77 | p = Pool(processes=config.pool_size) |
||
78 | error_list = p.map(worker, virtual_point_list) |
||
79 | p.close() |
||
80 | p.join() |
||
81 | |||
82 | for error in error_list: |
||
83 | if error is not None and len(error) > 0: |
||
84 | logger.error(error) |
||
85 | |||
86 | print("go to sleep ...") |
||
87 | time.sleep(60) |
||
88 | print("wake from sleep, and continue to work...") |
||
89 | |||
90 | |||
91 | ######################################################################################################################## |
||
92 | # Step 1: get start datetime and end datetime |
||
93 | # Step 2: parse the expression and get all points in substitutions |
||
94 | # Step 3: query points value from historical database |
||
95 | # Step 4: evaluate the equation with points values |
||
96 | ######################################################################################################################## |
||
97 | |||
98 | def worker(virtual_point): |
||
99 | cnx_historical_db = None |
||
100 | cursor_historical_db = None |
||
101 | |||
102 | try: |
||
103 | cnx_historical_db = mysql.connector.connect(**config.myems_historical_db) |
||
104 | cursor_historical_db = cnx_historical_db.cursor() |
||
105 | except Exception as e: |
||
106 | if cursor_historical_db: |
||
107 | cursor_historical_db.close() |
||
108 | if cnx_historical_db: |
||
109 | cnx_historical_db.close() |
||
110 | return "Error in step 1.1 of virtual point worker " + str(e) + " for '" + virtual_point['name'] + "'" |
||
111 | |||
112 | print("Start to process virtual point: " + "'" + virtual_point['name']+"'") |
||
113 | |||
114 | #################################################################################################################### |
||
115 | # step 1: get start datetime and end datetime |
||
116 | #################################################################################################################### |
||
117 | |||
118 | try: |
||
119 | query = (" SELECT MAX(utc_date_time) " |
||
120 | " FROM tbl_analog_value " |
||
121 | " WHERE point_id = %s ") |
||
122 | cursor_historical_db.execute(query, (virtual_point['id'],)) |
||
123 | row = cursor_historical_db.fetchone() |
||
124 | except Exception as e: |
||
125 | if cursor_historical_db: |
||
126 | cursor_historical_db.close() |
||
127 | if cnx_historical_db: |
||
128 | cnx_historical_db.close() |
||
129 | return "Error in step 1.2 of virtual point worker " + str(e) + " for '" + virtual_point['name'] + "'" |
||
130 | |||
131 | start_datetime_utc = datetime.strptime(config.start_datetime_utc, '%Y-%m-%d %H:%M:%S') |
||
132 | start_datetime_utc = start_datetime_utc.replace(minute=0, second=0, microsecond=0, tzinfo=None) |
||
133 | |||
134 | if row is not None and len(row) > 0 and isinstance(row[0], datetime): |
||
135 | # replace second and microsecond with 0 |
||
136 | # note: do not replace minute in case of calculating in half hourly |
||
137 | start_datetime_utc = row[0].replace(second=0, microsecond=0, tzinfo=None) |
||
138 | # start from the next time slot |
||
139 | start_datetime_utc += timedelta(minutes=config.minutes_to_count) |
||
140 | |||
141 | end_datetime_utc = datetime.utcnow().replace() |
||
142 | end_datetime_utc = end_datetime_utc.replace(second=0, microsecond=0, tzinfo=None) |
||
143 | |||
144 | if end_datetime_utc <= start_datetime_utc: |
||
145 | if cursor_historical_db: |
||
146 | cursor_historical_db.close() |
||
147 | if cnx_historical_db: |
||
148 | cnx_historical_db.close() |
||
149 | return "it's too early to calculate" + " for '" + virtual_point['name'] + "'" |
||
150 | |||
151 | print("start_datetime_utc: " + start_datetime_utc.isoformat()[0:19] |
||
152 | + "end_datetime_utc: " + end_datetime_utc.isoformat()[0:19]) |
||
153 | |||
154 | ############################################################################################################ |
||
155 | # Step 2: parse the expression and get all points in substitutions |
||
156 | ############################################################################################################ |
||
157 | point_list = list() |
||
158 | try: |
||
159 | ######################################################################################################## |
||
160 | # parse the expression and get all points in substitutions |
||
161 | ######################################################################################################## |
||
162 | address = json.loads(virtual_point['address']) |
||
163 | # example: '{"expression": "x1-x2", "substitutions": {"x1":1,"x2":2}}' |
||
164 | if 'expression' not in address.keys() \ |
||
165 | or 'substitutions' not in address.keys() \ |
||
166 | or len(address['expression']) == 0 \ |
||
167 | or len(address['substitutions']) == 0: |
||
168 | return "Error in step 2.1 of virtual point worker for '" + virtual_point['name'] + "'" |
||
169 | expression = address['expression'] |
||
170 | substitutions = address['substitutions'] |
||
171 | for variable_name, point_id in substitutions.items(): |
||
172 | point_list.append({"variable_name": variable_name, "point_id": point_id}) |
||
173 | except Exception as e: |
||
174 | if cursor_historical_db: |
||
175 | cursor_historical_db.close() |
||
176 | if cnx_historical_db: |
||
177 | cnx_historical_db.close() |
||
178 | return "Error in step 2.2 of virtual point worker " + str(e) + " for '" + virtual_point['name'] + "'" |
||
179 | |||
180 | ############################################################################################################ |
||
181 | # Step 3: query points value from historical database |
||
182 | ############################################################################################################ |
||
183 | |||
184 | print("getting point values ...") |
||
185 | point_values_dict = dict() |
||
186 | if point_list is not None and len(point_list) > 0: |
||
187 | try: |
||
188 | for point in point_list: |
||
189 | point_id = str(point['point_id']) |
||
190 | query = (" SELECT utc_date_time, actual_value " |
||
191 | " FROM tbl_analog_value " |
||
192 | " WHERE point_id = %s AND utc_date_time >= %s AND utc_date_time < %s " |
||
193 | " ORDER BY utc_date_time ") |
||
194 | cursor_historical_db.execute(query, (point_id, start_datetime_utc, end_datetime_utc, )) |
||
195 | rows = cursor_historical_db.fetchall() |
||
196 | if rows is None or len(rows) == 0: |
||
197 | point_values_dict[point_id] = None |
||
198 | else: |
||
199 | point_values_dict[point_id] = dict() |
||
200 | for row in rows: |
||
201 | point_values_dict[point_id][row[0]] = row[1] |
||
202 | except Exception as e: |
||
203 | if cursor_historical_db: |
||
204 | cursor_historical_db.close() |
||
205 | if cnx_historical_db: |
||
206 | cnx_historical_db.close() |
||
207 | return "Error in step 3.1 virtual point worker " + str(e) + " for '" + virtual_point['name'] + "'" |
||
208 | |||
209 | ############################################################################################################ |
||
210 | # Step 4: evaluate the equation with points values |
||
211 | ############################################################################################################ |
||
212 | |||
213 | print("getting date time set for all points...") |
||
214 | utc_date_time_set = set() |
||
215 | if point_values_dict is not None and len(point_values_dict) > 0: |
||
216 | for point_id, point_values in point_values_dict.items(): |
||
217 | if point_values is not None and len(point_values) > 0: |
||
218 | utc_date_time_set = utc_date_time_set.union(point_values.keys()) |
||
219 | |||
220 | print("evaluating the equation with SymPy...") |
||
221 | normalized_values = list() |
||
222 | |||
223 | ############################################################################################################ |
||
224 | # Converting Strings to SymPy Expressions |
||
225 | # The sympify function(that’s sympify, not to be confused with simplify) can be used to |
||
226 | # convert strings into SymPy expressions. |
||
227 | ############################################################################################################ |
||
228 | try: |
||
229 | expr = sympify(expression) |
||
230 | print("the expression to be evaluated: " + str(expr)) |
||
231 | for utc_date_time in utc_date_time_set: |
||
232 | meta_data = dict() |
||
233 | meta_data['utc_date_time'] = utc_date_time |
||
234 | |||
235 | #################################################################################################### |
||
236 | # create a dictionary of Symbol: point pairs |
||
237 | #################################################################################################### |
||
238 | |||
239 | subs = dict() |
||
240 | |||
241 | #################################################################################################### |
||
242 | # Evaluating the expression at current_datetime_utc |
||
243 | #################################################################################################### |
||
244 | |||
245 | if point_list is not None and len(point_list) > 0: |
||
246 | for point in point_list: |
||
247 | point_id = str(point['point_id']) |
||
248 | actual_value = point_values_dict[point_id].get(utc_date_time, None) |
||
249 | if actual_value is None: |
||
250 | break |
||
251 | subs[point['variable_name']] = actual_value |
||
252 | |||
253 | if len(subs) != len(point_list): |
||
254 | continue |
||
255 | |||
256 | #################################################################################################### |
||
257 | # To numerically evaluate an expression with a Symbol at a point, |
||
258 | # we might use subs followed by evalf, |
||
259 | # but it is more efficient and numerically stable to pass the substitution to evalf |
||
260 | # using the subs flag, which takes a dictionary of Symbol: point pairs. |
||
261 | #################################################################################################### |
||
262 | |||
263 | meta_data['actual_value'] = expr.evalf(subs=subs) |
||
264 | normalized_values.append(meta_data) |
||
265 | |||
266 | except Exception as e: |
||
267 | if cursor_historical_db: |
||
268 | cursor_historical_db.close() |
||
269 | if cnx_historical_db: |
||
270 | cnx_historical_db.close() |
||
271 | return "Error in step 4.1 virtual point worker " + str(e) + " for '" + virtual_point['name'] + "'" |
||
272 | |||
273 | print("saving virtual points values to historical database...") |
||
274 | |||
275 | if len(normalized_values) > 0: |
||
276 | try: |
||
277 | add_values = (" INSERT INTO tbl_analog_value " |
||
278 | " (point_id, utc_date_time, actual_value) " |
||
279 | " VALUES ") |
||
280 | |||
281 | for meta_data in normalized_values: |
||
282 | add_values += " (" + str(virtual_point['id']) + "," |
||
283 | add_values += "'" + meta_data['utc_date_time'].isoformat()[0:19] + "'," |
||
284 | add_values += str(meta_data['actual_value']) + "), " |
||
285 | print("add_values:" + add_values) |
||
286 | # trim ", " at the end of string and then execute |
||
287 | cursor_historical_db.execute(add_values[:-2]) |
||
288 | cnx_historical_db.commit() |
||
289 | except Exception as e: |
||
290 | if cursor_historical_db: |
||
291 | cursor_historical_db.close() |
||
292 | if cnx_historical_db: |
||
293 | cnx_historical_db.close() |
||
294 | return "Error in step 4.2 virtual point worker " + str(e) + " for '" + virtual_point['name'] + "'" |
||
295 | |||
296 | if cursor_historical_db: |
||
297 | cursor_historical_db.close() |
||
298 | if cnx_historical_db: |
||
299 | cnx_historical_db.close() |
||
300 | |||
301 | return None |
||
302 |