@@ 106-527 (lines=422) @@ | ||
103 | # NOTE: returns None or the error string because that the logger object cannot be passed in as parameter |
|
104 | ######################################################################################################################## |
|
105 | ||
106 | def worker(equipment): |
|
107 | #################################################################################################################### |
|
108 | # Step 1: get all input meters associated with the equipment |
|
109 | #################################################################################################################### |
|
110 | print("Step 1: get all input meters associated with the equipment " + str(equipment['name'])) |
|
111 | ||
112 | meter_list = list() |
|
113 | cnx_system_db = None |
|
114 | cursor_system_db = None |
|
115 | try: |
|
116 | cnx_system_db = mysql.connector.connect(**config.myems_system_db) |
|
117 | cursor_system_db = cnx_system_db.cursor() |
|
118 | except Exception as e: |
|
119 | error_string = "Error in step 1.1 of equipment_energy_input_item.worker " + str(e) |
|
120 | if cursor_system_db: |
|
121 | cursor_system_db.close() |
|
122 | if cnx_system_db: |
|
123 | cnx_system_db.close() |
|
124 | print(error_string) |
|
125 | return error_string |
|
126 | ||
127 | try: |
|
128 | cursor_system_db.execute(" SELECT m.id, m.name, m.energy_item_id " |
|
129 | " FROM tbl_meters m, tbl_equipments_meters em " |
|
130 | " WHERE m.id = em.meter_id " |
|
131 | " AND m.is_counted = true " |
|
132 | " AND m.energy_item_id is NOT NULL " |
|
133 | " AND em.is_output = false " |
|
134 | " AND em.equipment_id = %s ", |
|
135 | (equipment['id'],)) |
|
136 | rows_meters = cursor_system_db.fetchall() |
|
137 | ||
138 | if rows_meters is not None and len(rows_meters) > 0: |
|
139 | for row in rows_meters: |
|
140 | meter_list.append({"id": row[0], |
|
141 | "name": row[1], |
|
142 | "energy_item_id": row[2]}) |
|
143 | ||
144 | except Exception as e: |
|
145 | error_string = "Error in step 1.2 of equipment_energy_input_item.worker " + str(e) |
|
146 | if cursor_system_db: |
|
147 | cursor_system_db.close() |
|
148 | if cnx_system_db: |
|
149 | cnx_system_db.close() |
|
150 | print(error_string) |
|
151 | return error_string |
|
152 | ||
153 | #################################################################################################################### |
|
154 | # Step 2: get all input virtual meters associated with the equipment |
|
155 | #################################################################################################################### |
|
156 | print("Step 2: get all input virtual meters associated with the equipment") |
|
157 | virtual_meter_list = list() |
|
158 | ||
159 | try: |
|
160 | cursor_system_db.execute(" SELECT m.id, m.name, m.energy_item_id " |
|
161 | " FROM tbl_virtual_meters m, tbl_equipments_virtual_meters em " |
|
162 | " WHERE m.id = em.virtual_meter_id " |
|
163 | " AND m.energy_item_id is NOT NULL " |
|
164 | " AND m.is_counted = true " |
|
165 | " AND em.is_output = false " |
|
166 | " AND em.equipment_id = %s ", |
|
167 | (equipment['id'],)) |
|
168 | rows_virtual_meters = cursor_system_db.fetchall() |
|
169 | ||
170 | if rows_virtual_meters is not None and len(rows_virtual_meters) > 0: |
|
171 | for row in rows_virtual_meters: |
|
172 | virtual_meter_list.append({"id": row[0], |
|
173 | "name": row[1], |
|
174 | "energy_item_id": row[2]}) |
|
175 | ||
176 | except Exception as e: |
|
177 | error_string = "Error in step 2.1 of equipment_energy_input_item.worker " + str(e) |
|
178 | if cursor_system_db: |
|
179 | cursor_system_db.close() |
|
180 | if cnx_system_db: |
|
181 | cnx_system_db.close() |
|
182 | print(error_string) |
|
183 | return error_string |
|
184 | ||
185 | #################################################################################################################### |
|
186 | # Step 3: get all input offline meters associated with the equipment |
|
187 | #################################################################################################################### |
|
188 | print("Step 3: get all input offline meters associated with the equipment") |
|
189 | ||
190 | offline_meter_list = list() |
|
191 | ||
192 | try: |
|
193 | cursor_system_db.execute(" SELECT m.id, m.name, m.energy_item_id " |
|
194 | " FROM tbl_offline_meters m, tbl_equipments_offline_meters em " |
|
195 | " WHERE m.id = em.offline_meter_id " |
|
196 | " AND m.energy_item_id is NOT NULL " |
|
197 | " AND m.is_counted = true " |
|
198 | " AND em.is_output = false " |
|
199 | " AND em.equipment_id = %s ", |
|
200 | (equipment['id'],)) |
|
201 | rows_offline_meters = cursor_system_db.fetchall() |
|
202 | ||
203 | if rows_offline_meters is not None and len(rows_offline_meters) > 0: |
|
204 | for row in rows_offline_meters: |
|
205 | offline_meter_list.append({"id": row[0], |
|
206 | "name": row[1], |
|
207 | "energy_item_id": row[2]}) |
|
208 | ||
209 | except Exception as e: |
|
210 | error_string = "Error in step 3.1 of equipment_energy_input_item.worker " + str(e) |
|
211 | print(error_string) |
|
212 | return error_string |
|
213 | finally: |
|
214 | if cursor_system_db: |
|
215 | cursor_system_db.close() |
|
216 | if cnx_system_db: |
|
217 | cnx_system_db.close() |
|
218 | ||
219 | #################################################################################################################### |
|
220 | # stop to the next equipment if this equipment is empty |
|
221 | #################################################################################################################### |
|
222 | if (meter_list is None or len(meter_list) == 0) and \ |
|
223 | (virtual_meter_list is None or len(virtual_meter_list) == 0) and \ |
|
224 | (offline_meter_list is None or len(offline_meter_list) == 0): |
|
225 | print("This is an empty equipment ") |
|
226 | return None |
|
227 | ||
228 | #################################################################################################################### |
|
229 | # Step 4: determine start datetime and end datetime to aggregate |
|
230 | #################################################################################################################### |
|
231 | print("Step 4: determine start datetime and end datetime to aggregate") |
|
232 | cnx_energy_db = None |
|
233 | cursor_energy_db = None |
|
234 | try: |
|
235 | cnx_energy_db = mysql.connector.connect(**config.myems_energy_db) |
|
236 | cursor_energy_db = cnx_energy_db.cursor() |
|
237 | except Exception as e: |
|
238 | error_string = "Error in step 4.1 of equipment_energy_input_item.worker " + str(e) |
|
239 | if cursor_energy_db: |
|
240 | cursor_energy_db.close() |
|
241 | if cnx_energy_db: |
|
242 | cnx_energy_db.close() |
|
243 | print(error_string) |
|
244 | return error_string |
|
245 | ||
246 | try: |
|
247 | query = (" SELECT MAX(start_datetime_utc) " |
|
248 | " FROM tbl_equipment_input_item_hourly " |
|
249 | " WHERE equipment_id = %s ") |
|
250 | cursor_energy_db.execute(query, (equipment['id'],)) |
|
251 | row_datetime = cursor_energy_db.fetchone() |
|
252 | start_datetime_utc = datetime.strptime(config.start_datetime_utc, '%Y-%m-%d %H:%M:%S') |
|
253 | start_datetime_utc = start_datetime_utc.replace(minute=0, second=0, microsecond=0, tzinfo=None) |
|
254 | ||
255 | if row_datetime is not None and len(row_datetime) > 0 and isinstance(row_datetime[0], datetime): |
|
256 | # replace second and microsecond with 0 |
|
257 | # note: do not replace minute in case of calculating in half hourly |
|
258 | start_datetime_utc = row_datetime[0].replace(second=0, microsecond=0, tzinfo=None) |
|
259 | # start from the next time slot |
|
260 | start_datetime_utc += timedelta(minutes=config.minutes_to_count) |
|
261 | ||
262 | end_datetime_utc = datetime.utcnow().replace(second=0, microsecond=0, tzinfo=None) |
|
263 | ||
264 | print("start_datetime_utc: " + start_datetime_utc.isoformat()[0:19] |
|
265 | + "end_datetime_utc: " + end_datetime_utc.isoformat()[0:19]) |
|
266 | ||
267 | except Exception as e: |
|
268 | error_string = "Error in step 4.2 of equipment_energy_input_item.worker " + str(e) |
|
269 | if cursor_energy_db: |
|
270 | cursor_energy_db.close() |
|
271 | if cnx_energy_db: |
|
272 | cnx_energy_db.close() |
|
273 | print(error_string) |
|
274 | return error_string |
|
275 | ||
276 | #################################################################################################################### |
|
277 | # Step 5: for each meter in list, get energy input data from energy database |
|
278 | #################################################################################################################### |
|
279 | energy_meter_hourly = dict() |
|
280 | try: |
|
281 | if meter_list is not None and len(meter_list) > 0: |
|
282 | for meter in meter_list: |
|
283 | meter_id = str(meter['id']) |
|
284 | ||
285 | query = (" SELECT start_datetime_utc, actual_value " |
|
286 | " FROM tbl_meter_hourly " |
|
287 | " WHERE meter_id = %s " |
|
288 | " AND start_datetime_utc >= %s " |
|
289 | " AND start_datetime_utc < %s " |
|
290 | " ORDER BY start_datetime_utc ") |
|
291 | cursor_energy_db.execute(query, (meter_id, start_datetime_utc, end_datetime_utc,)) |
|
292 | rows_energy_values = cursor_energy_db.fetchall() |
|
293 | if rows_energy_values is None or len(rows_energy_values) == 0: |
|
294 | energy_meter_hourly[meter_id] = None |
|
295 | else: |
|
296 | energy_meter_hourly[meter_id] = dict() |
|
297 | for row_energy_value in rows_energy_values: |
|
298 | energy_meter_hourly[meter_id][row_energy_value[0]] = row_energy_value[1] |
|
299 | except Exception as e: |
|
300 | error_string = "Error in step 5.1 of equipment_energy_input_item.worker " + str(e) |
|
301 | if cursor_energy_db: |
|
302 | cursor_energy_db.close() |
|
303 | if cnx_energy_db: |
|
304 | cnx_energy_db.close() |
|
305 | print(error_string) |
|
306 | return error_string |
|
307 | ||
308 | #################################################################################################################### |
|
309 | # Step 6: for each virtual meter in list, get energy input data from energy database |
|
310 | #################################################################################################################### |
|
311 | energy_virtual_meter_hourly = dict() |
|
312 | if virtual_meter_list is not None and len(virtual_meter_list) > 0: |
|
313 | try: |
|
314 | for virtual_meter in virtual_meter_list: |
|
315 | virtual_meter_id = str(virtual_meter['id']) |
|
316 | ||
317 | query = (" SELECT start_datetime_utc, actual_value " |
|
318 | " FROM tbl_virtual_meter_hourly " |
|
319 | " WHERE virtual_meter_id = %s " |
|
320 | " AND start_datetime_utc >= %s " |
|
321 | " AND start_datetime_utc < %s " |
|
322 | " ORDER BY start_datetime_utc ") |
|
323 | cursor_energy_db.execute(query, (virtual_meter_id, start_datetime_utc, end_datetime_utc,)) |
|
324 | rows_energy_values = cursor_energy_db.fetchall() |
|
325 | if rows_energy_values is None or len(rows_energy_values) == 0: |
|
326 | energy_virtual_meter_hourly[virtual_meter_id] = None |
|
327 | else: |
|
328 | energy_virtual_meter_hourly[virtual_meter_id] = dict() |
|
329 | for row_energy_value in rows_energy_values: |
|
330 | energy_virtual_meter_hourly[virtual_meter_id][row_energy_value[0]] = row_energy_value[1] |
|
331 | except Exception as e: |
|
332 | error_string = "Error in step 6.1 of equipment_energy_input_item.worker " + str(e) |
|
333 | if cursor_energy_db: |
|
334 | cursor_energy_db.close() |
|
335 | if cnx_energy_db: |
|
336 | cnx_energy_db.close() |
|
337 | print(error_string) |
|
338 | return error_string |
|
339 | ||
340 | #################################################################################################################### |
|
341 | # Step 7: for each offline meter in list, get energy input data from energy database |
|
342 | #################################################################################################################### |
|
343 | energy_offline_meter_hourly = dict() |
|
344 | if offline_meter_list is not None and len(offline_meter_list) > 0: |
|
345 | try: |
|
346 | for offline_meter in offline_meter_list: |
|
347 | offline_meter_id = str(offline_meter['id']) |
|
348 | ||
349 | query = (" SELECT start_datetime_utc, actual_value " |
|
350 | " FROM tbl_offline_meter_hourly " |
|
351 | " WHERE offline_meter_id = %s " |
|
352 | " AND start_datetime_utc >= %s " |
|
353 | " AND start_datetime_utc < %s " |
|
354 | " ORDER BY start_datetime_utc ") |
|
355 | cursor_energy_db.execute(query, (offline_meter_id, start_datetime_utc, end_datetime_utc,)) |
|
356 | rows_energy_values = cursor_energy_db.fetchall() |
|
357 | if rows_energy_values is None or len(rows_energy_values) == 0: |
|
358 | energy_offline_meter_hourly[offline_meter_id] = None |
|
359 | else: |
|
360 | energy_offline_meter_hourly[offline_meter_id] = dict() |
|
361 | for row_energy_value in rows_energy_values: |
|
362 | energy_offline_meter_hourly[offline_meter_id][row_energy_value[0]] = row_energy_value[1] |
|
363 | ||
364 | except Exception as e: |
|
365 | error_string = "Error in step 7.1 of equipment_energy_input_item.worker " + str(e) |
|
366 | if cursor_energy_db: |
|
367 | cursor_energy_db.close() |
|
368 | if cnx_energy_db: |
|
369 | cnx_energy_db.close() |
|
370 | print(error_string) |
|
371 | return error_string |
|
372 | ||
373 | #################################################################################################################### |
|
374 | # Step 8: determine common time slot to aggregate |
|
375 | #################################################################################################################### |
|
376 | ||
377 | common_start_datetime_utc = start_datetime_utc |
|
378 | common_end_datetime_utc = end_datetime_utc |
|
379 | ||
380 | print("Getting common time slot of energy values for all meters") |
|
381 | if energy_meter_hourly is not None and len(energy_meter_hourly) > 0: |
|
382 | for meter_id, energy_hourly in energy_meter_hourly.items(): |
|
383 | if energy_hourly is None or len(energy_hourly) == 0: |
|
384 | common_start_datetime_utc = None |
|
385 | common_end_datetime_utc = None |
|
386 | break |
|
387 | else: |
|
388 | if common_start_datetime_utc < min(energy_hourly.keys()): |
|
389 | common_start_datetime_utc = min(energy_hourly.keys()) |
|
390 | if common_end_datetime_utc > max(energy_hourly.keys()): |
|
391 | common_end_datetime_utc = max(energy_hourly.keys()) |
|
392 | ||
393 | print("Getting common time slot of energy values for all virtual meters") |
|
394 | if common_start_datetime_utc is not None and common_start_datetime_utc is not None: |
|
395 | if energy_virtual_meter_hourly is not None and len(energy_virtual_meter_hourly) > 0: |
|
396 | for meter_id, energy_hourly in energy_virtual_meter_hourly.items(): |
|
397 | if energy_hourly is None or len(energy_hourly) == 0: |
|
398 | common_start_datetime_utc = None |
|
399 | common_end_datetime_utc = None |
|
400 | break |
|
401 | else: |
|
402 | if common_start_datetime_utc < min(energy_hourly.keys()): |
|
403 | common_start_datetime_utc = min(energy_hourly.keys()) |
|
404 | if common_end_datetime_utc > max(energy_hourly.keys()): |
|
405 | common_end_datetime_utc = max(energy_hourly.keys()) |
|
406 | ||
407 | print("Getting common time slot of energy values for all offline meters") |
|
408 | if common_start_datetime_utc is not None and common_start_datetime_utc is not None: |
|
409 | if energy_offline_meter_hourly is not None and len(energy_offline_meter_hourly) > 0: |
|
410 | for meter_id, energy_hourly in energy_offline_meter_hourly.items(): |
|
411 | if energy_hourly is None or len(energy_hourly) == 0: |
|
412 | common_start_datetime_utc = None |
|
413 | common_end_datetime_utc = None |
|
414 | break |
|
415 | else: |
|
416 | if common_start_datetime_utc < min(energy_hourly.keys()): |
|
417 | common_start_datetime_utc = min(energy_hourly.keys()) |
|
418 | if common_end_datetime_utc > max(energy_hourly.keys()): |
|
419 | common_end_datetime_utc = max(energy_hourly.keys()) |
|
420 | ||
421 | if (energy_meter_hourly is None or len(energy_meter_hourly) == 0) and \ |
|
422 | (energy_virtual_meter_hourly is None or len(energy_virtual_meter_hourly) == 0) and \ |
|
423 | (energy_offline_meter_hourly is None or len(energy_offline_meter_hourly) == 0): |
|
424 | # There isn't any energy data |
|
425 | print("There isn't any energy data") |
|
426 | # continue the for equipment loop to the next equipment |
|
427 | print("continue the for equipment loop to the next equipment") |
|
428 | if cursor_energy_db: |
|
429 | cursor_energy_db.close() |
|
430 | if cnx_energy_db: |
|
431 | cnx_energy_db.close() |
|
432 | return None |
|
433 | ||
434 | print("common_start_datetime_utc: " + str(common_start_datetime_utc)) |
|
435 | print("common_end_datetime_utc: " + str(common_end_datetime_utc)) |
|
436 | ||
437 | #################################################################################################################### |
|
438 | # Step 9: aggregate energy data in the common time slot by energy items and hourly |
|
439 | #################################################################################################################### |
|
440 | ||
441 | print("Step 9: aggregate energy data in the common time slot by energy items and hourly") |
|
442 | aggregated_values = list() |
|
443 | try: |
|
444 | current_datetime_utc = common_start_datetime_utc |
|
445 | while common_start_datetime_utc is not None \ |
|
446 | and common_end_datetime_utc is not None \ |
|
447 | and current_datetime_utc <= common_end_datetime_utc: |
|
448 | aggregated_value = dict() |
|
449 | aggregated_value['start_datetime_utc'] = current_datetime_utc |
|
450 | aggregated_value['meta_data'] = dict() |
|
451 | ||
452 | if meter_list is not None and len(meter_list) > 0: |
|
453 | for meter in meter_list: |
|
454 | meter_id = str(meter['id']) |
|
455 | energy_item_id = meter['energy_item_id'] |
|
456 | actual_value = energy_meter_hourly[meter_id].get(current_datetime_utc, Decimal(0.0)) |
|
457 | aggregated_value['meta_data'][energy_item_id] = \ |
|
458 | aggregated_value['meta_data'].get(energy_item_id, Decimal(0.0)) + actual_value |
|
459 | ||
460 | if virtual_meter_list is not None and len(virtual_meter_list) > 0: |
|
461 | for virtual_meter in virtual_meter_list: |
|
462 | virtual_meter_id = str(virtual_meter['id']) |
|
463 | energy_item_id = virtual_meter['energy_item_id'] |
|
464 | actual_value = energy_virtual_meter_hourly[virtual_meter_id].get(current_datetime_utc, Decimal(0.0)) |
|
465 | aggregated_value['meta_data'][energy_item_id] = \ |
|
466 | aggregated_value['meta_data'].get(energy_item_id, Decimal(0.0)) + actual_value |
|
467 | ||
468 | if offline_meter_list is not None and len(offline_meter_list) > 0: |
|
469 | for offline_meter in offline_meter_list: |
|
470 | offline_meter_id = str(offline_meter['id']) |
|
471 | energy_item_id = offline_meter['energy_item_id'] |
|
472 | actual_value = energy_offline_meter_hourly[offline_meter_id].get(current_datetime_utc, Decimal(0.0)) |
|
473 | aggregated_value['meta_data'][energy_item_id] = \ |
|
474 | aggregated_value['meta_data'].get(energy_item_id, Decimal(0.0)) + actual_value |
|
475 | ||
476 | aggregated_values.append(aggregated_value) |
|
477 | ||
478 | current_datetime_utc += timedelta(minutes=config.minutes_to_count) |
|
479 | ||
480 | except Exception as e: |
|
481 | error_string = "Error in step 9 of equipment_energy_input_item.worker " + str(e) |
|
482 | if cursor_energy_db: |
|
483 | cursor_energy_db.close() |
|
484 | if cnx_energy_db: |
|
485 | cnx_energy_db.close() |
|
486 | print(error_string) |
|
487 | return error_string |
|
488 | ||
489 | #################################################################################################################### |
|
490 | # Step 10: save energy data to energy database |
|
491 | #################################################################################################################### |
|
492 | print("Step 10: save energy data to energy database") |
|
493 | ||
494 | if len(aggregated_values) > 0: |
|
495 | try: |
|
496 | add_values = (" INSERT INTO tbl_equipment_input_item_hourly " |
|
497 | " (equipment_id, " |
|
498 | " energy_item_id, " |
|
499 | " start_datetime_utc, " |
|
500 | " actual_value) " |
|
501 | " VALUES ") |
|
502 | ||
503 | for aggregated_value in aggregated_values: |
|
504 | for energy_item_id, actual_value in aggregated_value['meta_data'].items(): |
|
505 | add_values += " (" + str(equipment['id']) + "," |
|
506 | add_values += " " + str(energy_item_id) + "," |
|
507 | add_values += "'" + aggregated_value['start_datetime_utc'].isoformat()[0:19] + "'," |
|
508 | add_values += str(actual_value) + "), " |
|
509 | print("add_values:" + add_values) |
|
510 | # trim ", " at the end of string and then execute |
|
511 | cursor_energy_db.execute(add_values[:-2]) |
|
512 | cnx_energy_db.commit() |
|
513 | ||
514 | except Exception as e: |
|
515 | error_string = "Error in step 10.1 of equipment_energy_input_item.worker " + str(e) |
|
516 | print(error_string) |
|
517 | return error_string |
|
518 | finally: |
|
519 | if cursor_energy_db: |
|
520 | cursor_energy_db.close() |
|
521 | if cnx_energy_db: |
|
522 | cnx_energy_db.close() |
|
523 | else: |
|
524 | if cursor_energy_db: |
|
525 | cursor_energy_db.close() |
|
526 | if cnx_energy_db: |
|
527 | cnx_energy_db.close() |
|
528 |
@@ 106-524 (lines=419) @@ | ||
103 | # NOTE: returns None or the error string because that the logger object cannot be passed in as parameter |
|
104 | ######################################################################################################################## |
|
105 | ||
106 | def worker(equipment): |
|
107 | #################################################################################################################### |
|
108 | # Step 1: get all output meters associated with the equipment |
|
109 | #################################################################################################################### |
|
110 | print("Step 1: get all output meters associated with the equipment " + str(equipment['name'])) |
|
111 | ||
112 | meter_list = list() |
|
113 | cnx_system_db = None |
|
114 | cursor_system_db = None |
|
115 | try: |
|
116 | cnx_system_db = mysql.connector.connect(**config.myems_system_db) |
|
117 | cursor_system_db = cnx_system_db.cursor() |
|
118 | except Exception as e: |
|
119 | error_string = "Error in step 1.1 of equipment_energy_output_category.worker " + str(e) |
|
120 | if cursor_system_db: |
|
121 | cursor_system_db.close() |
|
122 | if cnx_system_db: |
|
123 | cnx_system_db.close() |
|
124 | print(error_string) |
|
125 | return error_string |
|
126 | ||
127 | try: |
|
128 | cursor_system_db.execute(" SELECT m.id, m.name, m.energy_category_id " |
|
129 | " FROM tbl_meters m, tbl_equipments_meters em " |
|
130 | " WHERE m.id = em.meter_id " |
|
131 | " AND m.is_counted = true " |
|
132 | " AND em.is_output = true " |
|
133 | " AND em.equipment_id = %s ", |
|
134 | (equipment['id'],)) |
|
135 | rows_meters = cursor_system_db.fetchall() |
|
136 | ||
137 | if rows_meters is not None and len(rows_meters) > 0: |
|
138 | for row in rows_meters: |
|
139 | meter_list.append({"id": row[0], |
|
140 | "name": row[1], |
|
141 | "energy_category_id": row[2]}) |
|
142 | ||
143 | except Exception as e: |
|
144 | error_string = "Error in step 1.2 of equipment_energy_output_category.worker " + str(e) |
|
145 | if cursor_system_db: |
|
146 | cursor_system_db.close() |
|
147 | if cnx_system_db: |
|
148 | cnx_system_db.close() |
|
149 | print(error_string) |
|
150 | return error_string |
|
151 | ||
152 | #################################################################################################################### |
|
153 | # Step 2: get all output virtual meters associated with the equipment |
|
154 | #################################################################################################################### |
|
155 | print("Step 2: get all output virtual meters associated with the equipment") |
|
156 | virtual_meter_list = list() |
|
157 | ||
158 | try: |
|
159 | cursor_system_db.execute(" SELECT m.id, m.name, m.energy_category_id " |
|
160 | " FROM tbl_virtual_meters m, tbl_equipments_virtual_meters em " |
|
161 | " WHERE m.id = em.virtual_meter_id " |
|
162 | " AND m.is_counted = true " |
|
163 | " AND em.is_output = true " |
|
164 | " AND em.equipment_id = %s ", |
|
165 | (equipment['id'],)) |
|
166 | rows_virtual_meters = cursor_system_db.fetchall() |
|
167 | ||
168 | if rows_virtual_meters is not None and len(rows_virtual_meters) > 0: |
|
169 | for row in rows_virtual_meters: |
|
170 | virtual_meter_list.append({"id": row[0], |
|
171 | "name": row[1], |
|
172 | "energy_category_id": row[2]}) |
|
173 | ||
174 | except Exception as e: |
|
175 | error_string = "Error in step 2.1 of equipment_energy_output_category.worker " + str(e) |
|
176 | if cursor_system_db: |
|
177 | cursor_system_db.close() |
|
178 | if cnx_system_db: |
|
179 | cnx_system_db.close() |
|
180 | print(error_string) |
|
181 | return error_string |
|
182 | ||
183 | #################################################################################################################### |
|
184 | # Step 3: get all output offline meters associated with the equipment |
|
185 | #################################################################################################################### |
|
186 | print("Step 3: get all output offline meters associated with the equipment") |
|
187 | ||
188 | offline_meter_list = list() |
|
189 | ||
190 | try: |
|
191 | cursor_system_db.execute(" SELECT m.id, m.name, m.energy_category_id " |
|
192 | " FROM tbl_offline_meters m, tbl_equipments_offline_meters em " |
|
193 | " WHERE m.id = em.offline_meter_id " |
|
194 | " AND m.is_counted = true " |
|
195 | " AND em.is_output = true " |
|
196 | " AND em.equipment_id = %s ", |
|
197 | (equipment['id'],)) |
|
198 | rows_offline_meters = cursor_system_db.fetchall() |
|
199 | ||
200 | if rows_offline_meters is not None and len(rows_offline_meters) > 0: |
|
201 | for row in rows_offline_meters: |
|
202 | offline_meter_list.append({"id": row[0], |
|
203 | "name": row[1], |
|
204 | "energy_category_id": row[2]}) |
|
205 | ||
206 | except Exception as e: |
|
207 | error_string = "Error in step 3.1 of equipment_energy_output_category.worker " + str(e) |
|
208 | print(error_string) |
|
209 | return error_string |
|
210 | finally: |
|
211 | if cursor_system_db: |
|
212 | cursor_system_db.close() |
|
213 | if cnx_system_db: |
|
214 | cnx_system_db.close() |
|
215 | ||
216 | #################################################################################################################### |
|
217 | # stop to the next equipment if this equipment is empty |
|
218 | #################################################################################################################### |
|
219 | if (meter_list is None or len(meter_list) == 0) and \ |
|
220 | (virtual_meter_list is None or len(virtual_meter_list) == 0) and \ |
|
221 | (offline_meter_list is None or len(offline_meter_list) == 0): |
|
222 | print("This is an empty equipment ") |
|
223 | return None |
|
224 | ||
225 | #################################################################################################################### |
|
226 | # Step 4: determine start datetime and end datetime to aggregate |
|
227 | #################################################################################################################### |
|
228 | print("Step 4: determine start datetime and end datetime to aggregate") |
|
229 | cnx_energy_db = None |
|
230 | cursor_energy_db = None |
|
231 | try: |
|
232 | cnx_energy_db = mysql.connector.connect(**config.myems_energy_db) |
|
233 | cursor_energy_db = cnx_energy_db.cursor() |
|
234 | except Exception as e: |
|
235 | error_string = "Error in step 4.1 of equipment_energy_output_category.worker " + str(e) |
|
236 | if cursor_energy_db: |
|
237 | cursor_energy_db.close() |
|
238 | if cnx_energy_db: |
|
239 | cnx_energy_db.close() |
|
240 | print(error_string) |
|
241 | return error_string |
|
242 | ||
243 | try: |
|
244 | query = (" SELECT MAX(start_datetime_utc) " |
|
245 | " FROM tbl_equipment_output_category_hourly " |
|
246 | " WHERE equipment_id = %s ") |
|
247 | cursor_energy_db.execute(query, (equipment['id'],)) |
|
248 | row_datetime = cursor_energy_db.fetchone() |
|
249 | start_datetime_utc = datetime.strptime(config.start_datetime_utc, '%Y-%m-%d %H:%M:%S') |
|
250 | start_datetime_utc = start_datetime_utc.replace(minute=0, second=0, microsecond=0, tzinfo=None) |
|
251 | ||
252 | if row_datetime is not None and len(row_datetime) > 0 and isinstance(row_datetime[0], datetime): |
|
253 | # replace second and microsecond with 0 |
|
254 | # note: do not replace minute in case of calculating in half hourly |
|
255 | start_datetime_utc = row_datetime[0].replace(second=0, microsecond=0, tzinfo=None) |
|
256 | # start from the next time slot |
|
257 | start_datetime_utc += timedelta(minutes=config.minutes_to_count) |
|
258 | ||
259 | end_datetime_utc = datetime.utcnow().replace(second=0, microsecond=0, tzinfo=None) |
|
260 | ||
261 | print("start_datetime_utc: " + start_datetime_utc.isoformat()[0:19] |
|
262 | + "end_datetime_utc: " + end_datetime_utc.isoformat()[0:19]) |
|
263 | ||
264 | except Exception as e: |
|
265 | error_string = "Error in step 4.2 of equipment_energy_output_category.worker " + str(e) |
|
266 | if cursor_energy_db: |
|
267 | cursor_energy_db.close() |
|
268 | if cnx_energy_db: |
|
269 | cnx_energy_db.close() |
|
270 | print(error_string) |
|
271 | return error_string |
|
272 | ||
273 | #################################################################################################################### |
|
274 | # Step 5: for each meter in list, get energy output data from energy database |
|
275 | #################################################################################################################### |
|
276 | energy_meter_hourly = dict() |
|
277 | try: |
|
278 | if meter_list is not None and len(meter_list) > 0: |
|
279 | for meter in meter_list: |
|
280 | meter_id = str(meter['id']) |
|
281 | ||
282 | query = (" SELECT start_datetime_utc, actual_value " |
|
283 | " FROM tbl_meter_hourly " |
|
284 | " WHERE meter_id = %s " |
|
285 | " AND start_datetime_utc >= %s " |
|
286 | " AND start_datetime_utc < %s " |
|
287 | " ORDER BY start_datetime_utc ") |
|
288 | cursor_energy_db.execute(query, (meter_id, start_datetime_utc, end_datetime_utc,)) |
|
289 | rows_energy_values = cursor_energy_db.fetchall() |
|
290 | if rows_energy_values is None or len(rows_energy_values) == 0: |
|
291 | energy_meter_hourly[meter_id] = None |
|
292 | else: |
|
293 | energy_meter_hourly[meter_id] = dict() |
|
294 | for row_energy_value in rows_energy_values: |
|
295 | energy_meter_hourly[meter_id][row_energy_value[0]] = row_energy_value[1] |
|
296 | except Exception as e: |
|
297 | error_string = "Error in step 5.1 of equipment_energy_output_category.worker " + str(e) |
|
298 | if cursor_energy_db: |
|
299 | cursor_energy_db.close() |
|
300 | if cnx_energy_db: |
|
301 | cnx_energy_db.close() |
|
302 | print(error_string) |
|
303 | return error_string |
|
304 | ||
305 | #################################################################################################################### |
|
306 | # Step 6: for each virtual meter in list, get energy output data from energy database |
|
307 | #################################################################################################################### |
|
308 | energy_virtual_meter_hourly = dict() |
|
309 | if virtual_meter_list is not None and len(virtual_meter_list) > 0: |
|
310 | try: |
|
311 | for virtual_meter in virtual_meter_list: |
|
312 | virtual_meter_id = str(virtual_meter['id']) |
|
313 | ||
314 | query = (" SELECT start_datetime_utc, actual_value " |
|
315 | " FROM tbl_virtual_meter_hourly " |
|
316 | " WHERE virtual_meter_id = %s " |
|
317 | " AND start_datetime_utc >= %s " |
|
318 | " AND start_datetime_utc < %s " |
|
319 | " ORDER BY start_datetime_utc ") |
|
320 | cursor_energy_db.execute(query, (virtual_meter_id, start_datetime_utc, end_datetime_utc,)) |
|
321 | rows_energy_values = cursor_energy_db.fetchall() |
|
322 | if rows_energy_values is None or len(rows_energy_values) == 0: |
|
323 | energy_virtual_meter_hourly[virtual_meter_id] = None |
|
324 | else: |
|
325 | energy_virtual_meter_hourly[virtual_meter_id] = dict() |
|
326 | for row_energy_value in rows_energy_values: |
|
327 | energy_virtual_meter_hourly[virtual_meter_id][row_energy_value[0]] = row_energy_value[1] |
|
328 | except Exception as e: |
|
329 | error_string = "Error in step 6.1 of equipment_energy_output_category.worker " + str(e) |
|
330 | if cursor_energy_db: |
|
331 | cursor_energy_db.close() |
|
332 | if cnx_energy_db: |
|
333 | cnx_energy_db.close() |
|
334 | print(error_string) |
|
335 | return error_string |
|
336 | ||
337 | #################################################################################################################### |
|
338 | # Step 7: for each offline meter in list, get energy output data from energy database |
|
339 | #################################################################################################################### |
|
340 | energy_offline_meter_hourly = dict() |
|
341 | if offline_meter_list is not None and len(offline_meter_list) > 0: |
|
342 | try: |
|
343 | for offline_meter in offline_meter_list: |
|
344 | offline_meter_id = str(offline_meter['id']) |
|
345 | ||
346 | query = (" SELECT start_datetime_utc, actual_value " |
|
347 | " FROM tbl_offline_meter_hourly " |
|
348 | " WHERE offline_meter_id = %s " |
|
349 | " AND start_datetime_utc >= %s " |
|
350 | " AND start_datetime_utc < %s " |
|
351 | " ORDER BY start_datetime_utc ") |
|
352 | cursor_energy_db.execute(query, (offline_meter_id, start_datetime_utc, end_datetime_utc,)) |
|
353 | rows_energy_values = cursor_energy_db.fetchall() |
|
354 | if rows_energy_values is None or len(rows_energy_values) == 0: |
|
355 | energy_offline_meter_hourly[offline_meter_id] = None |
|
356 | else: |
|
357 | energy_offline_meter_hourly[offline_meter_id] = dict() |
|
358 | for row_energy_value in rows_energy_values: |
|
359 | energy_offline_meter_hourly[offline_meter_id][row_energy_value[0]] = row_energy_value[1] |
|
360 | ||
361 | except Exception as e: |
|
362 | error_string = "Error in step 7.1 of equipment_energy_output_category.worker " + str(e) |
|
363 | if cursor_energy_db: |
|
364 | cursor_energy_db.close() |
|
365 | if cnx_energy_db: |
|
366 | cnx_energy_db.close() |
|
367 | print(error_string) |
|
368 | return error_string |
|
369 | ||
370 | #################################################################################################################### |
|
371 | # Step 8: determine common time slot to aggregate |
|
372 | #################################################################################################################### |
|
373 | ||
374 | common_start_datetime_utc = start_datetime_utc |
|
375 | common_end_datetime_utc = end_datetime_utc |
|
376 | ||
377 | print("Getting common time slot of energy values for all meters") |
|
378 | if energy_meter_hourly is not None and len(energy_meter_hourly) > 0: |
|
379 | for meter_id, energy_hourly in energy_meter_hourly.items(): |
|
380 | if energy_hourly is None or len(energy_hourly) == 0: |
|
381 | common_start_datetime_utc = None |
|
382 | common_end_datetime_utc = None |
|
383 | break |
|
384 | else: |
|
385 | if common_start_datetime_utc < min(energy_hourly.keys()): |
|
386 | common_start_datetime_utc = min(energy_hourly.keys()) |
|
387 | if common_end_datetime_utc > max(energy_hourly.keys()): |
|
388 | common_end_datetime_utc = max(energy_hourly.keys()) |
|
389 | ||
390 | print("Getting common time slot of energy values for all virtual meters") |
|
391 | if common_start_datetime_utc is not None and common_start_datetime_utc is not None: |
|
392 | if energy_virtual_meter_hourly is not None and len(energy_virtual_meter_hourly) > 0: |
|
393 | for meter_id, energy_hourly in energy_virtual_meter_hourly.items(): |
|
394 | if energy_hourly is None or len(energy_hourly) == 0: |
|
395 | common_start_datetime_utc = None |
|
396 | common_end_datetime_utc = None |
|
397 | break |
|
398 | else: |
|
399 | if common_start_datetime_utc < min(energy_hourly.keys()): |
|
400 | common_start_datetime_utc = min(energy_hourly.keys()) |
|
401 | if common_end_datetime_utc > max(energy_hourly.keys()): |
|
402 | common_end_datetime_utc = max(energy_hourly.keys()) |
|
403 | ||
404 | print("Getting common time slot of energy values for all offline meters") |
|
405 | if common_start_datetime_utc is not None and common_start_datetime_utc is not None: |
|
406 | if energy_offline_meter_hourly is not None and len(energy_offline_meter_hourly) > 0: |
|
407 | for meter_id, energy_hourly in energy_offline_meter_hourly.items(): |
|
408 | if energy_hourly is None or len(energy_hourly) == 0: |
|
409 | common_start_datetime_utc = None |
|
410 | common_end_datetime_utc = None |
|
411 | break |
|
412 | else: |
|
413 | if common_start_datetime_utc < min(energy_hourly.keys()): |
|
414 | common_start_datetime_utc = min(energy_hourly.keys()) |
|
415 | if common_end_datetime_utc > max(energy_hourly.keys()): |
|
416 | common_end_datetime_utc = max(energy_hourly.keys()) |
|
417 | ||
418 | if (energy_meter_hourly is None or len(energy_meter_hourly) == 0) and \ |
|
419 | (energy_virtual_meter_hourly is None or len(energy_virtual_meter_hourly) == 0) and \ |
|
420 | (energy_offline_meter_hourly is None or len(energy_offline_meter_hourly) == 0): |
|
421 | # There isn't any energy data |
|
422 | print("There isn't any energy data") |
|
423 | # continue the for equipment loop to the next equipment |
|
424 | print("continue the for equipment loop to the next equipment") |
|
425 | if cursor_energy_db: |
|
426 | cursor_energy_db.close() |
|
427 | if cnx_energy_db: |
|
428 | cnx_energy_db.close() |
|
429 | return None |
|
430 | ||
431 | print("common_start_datetime_utc: " + str(common_start_datetime_utc)) |
|
432 | print("common_end_datetime_utc: " + str(common_end_datetime_utc)) |
|
433 | ||
434 | #################################################################################################################### |
|
435 | # Step 9: aggregate energy data in the common time slot by energy categories and hourly |
|
436 | #################################################################################################################### |
|
437 | ||
438 | print("Step 9: aggregate energy data in the common time slot by energy categories and hourly") |
|
439 | aggregated_values = list() |
|
440 | try: |
|
441 | current_datetime_utc = common_start_datetime_utc |
|
442 | while common_start_datetime_utc is not None \ |
|
443 | and common_end_datetime_utc is not None \ |
|
444 | and current_datetime_utc <= common_end_datetime_utc: |
|
445 | aggregated_value = dict() |
|
446 | aggregated_value['start_datetime_utc'] = current_datetime_utc |
|
447 | aggregated_value['meta_data'] = dict() |
|
448 | ||
449 | if meter_list is not None and len(meter_list) > 0: |
|
450 | for meter in meter_list: |
|
451 | meter_id = str(meter['id']) |
|
452 | energy_category_id = meter['energy_category_id'] |
|
453 | actual_value = energy_meter_hourly[meter_id].get(current_datetime_utc, Decimal(0.0)) |
|
454 | aggregated_value['meta_data'][energy_category_id] = \ |
|
455 | aggregated_value['meta_data'].get(energy_category_id, Decimal(0.0)) + actual_value |
|
456 | ||
457 | if virtual_meter_list is not None and len(virtual_meter_list) > 0: |
|
458 | for virtual_meter in virtual_meter_list: |
|
459 | virtual_meter_id = str(virtual_meter['id']) |
|
460 | energy_category_id = virtual_meter['energy_category_id'] |
|
461 | actual_value = energy_virtual_meter_hourly[virtual_meter_id].get(current_datetime_utc, Decimal(0.0)) |
|
462 | aggregated_value['meta_data'][energy_category_id] = \ |
|
463 | aggregated_value['meta_data'].get(energy_category_id, Decimal(0.0)) + actual_value |
|
464 | ||
465 | if offline_meter_list is not None and len(offline_meter_list) > 0: |
|
466 | for offline_meter in offline_meter_list: |
|
467 | offline_meter_id = str(offline_meter['id']) |
|
468 | energy_category_id = offline_meter['energy_category_id'] |
|
469 | actual_value = energy_offline_meter_hourly[offline_meter_id].get(current_datetime_utc, Decimal(0.0)) |
|
470 | aggregated_value['meta_data'][energy_category_id] = \ |
|
471 | aggregated_value['meta_data'].get(energy_category_id, Decimal(0.0)) + actual_value |
|
472 | ||
473 | aggregated_values.append(aggregated_value) |
|
474 | ||
475 | current_datetime_utc += timedelta(minutes=config.minutes_to_count) |
|
476 | ||
477 | except Exception as e: |
|
478 | error_string = "Error in step 9 of equipment_energy_output_category.worker " + str(e) |
|
479 | if cursor_energy_db: |
|
480 | cursor_energy_db.close() |
|
481 | if cnx_energy_db: |
|
482 | cnx_energy_db.close() |
|
483 | print(error_string) |
|
484 | return error_string |
|
485 | ||
486 | #################################################################################################################### |
|
487 | # Step 10: save energy data to energy database |
|
488 | #################################################################################################################### |
|
489 | print("Step 10: save energy data to energy database") |
|
490 | ||
491 | if len(aggregated_values) > 0: |
|
492 | try: |
|
493 | add_values = (" INSERT INTO tbl_equipment_output_category_hourly " |
|
494 | " (equipment_id, " |
|
495 | " energy_category_id, " |
|
496 | " start_datetime_utc, " |
|
497 | " actual_value) " |
|
498 | " VALUES ") |
|
499 | ||
500 | for aggregated_value in aggregated_values: |
|
501 | for energy_category_id, actual_value in aggregated_value['meta_data'].items(): |
|
502 | add_values += " (" + str(equipment['id']) + "," |
|
503 | add_values += " " + str(energy_category_id) + "," |
|
504 | add_values += "'" + aggregated_value['start_datetime_utc'].isoformat()[0:19] + "'," |
|
505 | add_values += str(actual_value) + "), " |
|
506 | print("add_values:" + add_values) |
|
507 | # trim ", " at the end of string and then execute |
|
508 | cursor_energy_db.execute(add_values[:-2]) |
|
509 | cnx_energy_db.commit() |
|
510 | ||
511 | except Exception as e: |
|
512 | error_string = "Error in step 10.1 of equipment_energy_output_category.worker " + str(e) |
|
513 | print(error_string) |
|
514 | return error_string |
|
515 | finally: |
|
516 | if cursor_energy_db: |
|
517 | cursor_energy_db.close() |
|
518 | if cnx_energy_db: |
|
519 | cnx_energy_db.close() |
|
520 | else: |
|
521 | if cursor_energy_db: |
|
522 | cursor_energy_db.close() |
|
523 | if cnx_energy_db: |
|
524 | cnx_energy_db.close() |
|
525 |
@@ 106-524 (lines=419) @@ | ||
103 | # NOTE: returns None or the error string because that the logger object cannot be passed in as parameter |
|
104 | ######################################################################################################################## |
|
105 | ||
106 | def worker(equipment): |
|
107 | #################################################################################################################### |
|
108 | # Step 1: get all input meters associated with the equipment |
|
109 | #################################################################################################################### |
|
110 | print("Step 1: get all input meters associated with the equipment " + str(equipment['name'])) |
|
111 | ||
112 | meter_list = list() |
|
113 | cnx_system_db = None |
|
114 | cursor_system_db = None |
|
115 | try: |
|
116 | cnx_system_db = mysql.connector.connect(**config.myems_system_db) |
|
117 | cursor_system_db = cnx_system_db.cursor() |
|
118 | except Exception as e: |
|
119 | error_string = "Error in step 1.1 of equipment_energy_input_category.worker " + str(e) |
|
120 | if cursor_system_db: |
|
121 | cursor_system_db.close() |
|
122 | if cnx_system_db: |
|
123 | cnx_system_db.close() |
|
124 | print(error_string) |
|
125 | return error_string |
|
126 | ||
127 | try: |
|
128 | cursor_system_db.execute(" SELECT m.id, m.name, m.energy_category_id " |
|
129 | " FROM tbl_meters m, tbl_equipments_meters em " |
|
130 | " WHERE m.id = em.meter_id " |
|
131 | " AND m.is_counted = true " |
|
132 | " AND em.is_output = false " |
|
133 | " AND em.equipment_id = %s ", |
|
134 | (equipment['id'],)) |
|
135 | rows_meters = cursor_system_db.fetchall() |
|
136 | ||
137 | if rows_meters is not None and len(rows_meters) > 0: |
|
138 | for row in rows_meters: |
|
139 | meter_list.append({"id": row[0], |
|
140 | "name": row[1], |
|
141 | "energy_category_id": row[2]}) |
|
142 | ||
143 | except Exception as e: |
|
144 | error_string = "Error in step 1.2 of equipment_energy_input_category.worker " + str(e) |
|
145 | if cursor_system_db: |
|
146 | cursor_system_db.close() |
|
147 | if cnx_system_db: |
|
148 | cnx_system_db.close() |
|
149 | print(error_string) |
|
150 | return error_string |
|
151 | ||
152 | #################################################################################################################### |
|
153 | # Step 2: get all input virtual meters associated with the equipment |
|
154 | #################################################################################################################### |
|
155 | print("Step 2: get all input virtual meters associated with the equipment") |
|
156 | virtual_meter_list = list() |
|
157 | ||
158 | try: |
|
159 | cursor_system_db.execute(" SELECT m.id, m.name, m.energy_category_id " |
|
160 | " FROM tbl_virtual_meters m, tbl_equipments_virtual_meters em " |
|
161 | " WHERE m.id = em.virtual_meter_id " |
|
162 | " AND m.is_counted = true " |
|
163 | " AND em.is_output = false " |
|
164 | " AND em.equipment_id = %s ", |
|
165 | (equipment['id'],)) |
|
166 | rows_virtual_meters = cursor_system_db.fetchall() |
|
167 | ||
168 | if rows_virtual_meters is not None and len(rows_virtual_meters) > 0: |
|
169 | for row in rows_virtual_meters: |
|
170 | virtual_meter_list.append({"id": row[0], |
|
171 | "name": row[1], |
|
172 | "energy_category_id": row[2]}) |
|
173 | ||
174 | except Exception as e: |
|
175 | error_string = "Error in step 2.1 of equipment_energy_input_category.worker " + str(e) |
|
176 | if cursor_system_db: |
|
177 | cursor_system_db.close() |
|
178 | if cnx_system_db: |
|
179 | cnx_system_db.close() |
|
180 | print(error_string) |
|
181 | return error_string |
|
182 | ||
183 | #################################################################################################################### |
|
184 | # Step 3: get all input offline meters associated with the equipment |
|
185 | #################################################################################################################### |
|
186 | print("Step 3: get all input offline meters associated with the equipment") |
|
187 | ||
188 | offline_meter_list = list() |
|
189 | ||
190 | try: |
|
191 | cursor_system_db.execute(" SELECT m.id, m.name, m.energy_category_id " |
|
192 | " FROM tbl_offline_meters m, tbl_equipments_offline_meters em " |
|
193 | " WHERE m.id = em.offline_meter_id " |
|
194 | " AND m.is_counted = true " |
|
195 | " AND em.is_output = false " |
|
196 | " AND em.equipment_id = %s ", |
|
197 | (equipment['id'],)) |
|
198 | rows_offline_meters = cursor_system_db.fetchall() |
|
199 | ||
200 | if rows_offline_meters is not None and len(rows_offline_meters) > 0: |
|
201 | for row in rows_offline_meters: |
|
202 | offline_meter_list.append({"id": row[0], |
|
203 | "name": row[1], |
|
204 | "energy_category_id": row[2]}) |
|
205 | ||
206 | except Exception as e: |
|
207 | error_string = "Error in step 3.1 of equipment_energy_input_category.worker " + str(e) |
|
208 | print(error_string) |
|
209 | return error_string |
|
210 | finally: |
|
211 | if cursor_system_db: |
|
212 | cursor_system_db.close() |
|
213 | if cnx_system_db: |
|
214 | cnx_system_db.close() |
|
215 | ||
216 | #################################################################################################################### |
|
217 | # stop to the next equipment if this equipment is empty |
|
218 | #################################################################################################################### |
|
219 | if (meter_list is None or len(meter_list) == 0) and \ |
|
220 | (virtual_meter_list is None or len(virtual_meter_list) == 0) and \ |
|
221 | (offline_meter_list is None or len(offline_meter_list) == 0): |
|
222 | print("This is an empty equipment ") |
|
223 | return None |
|
224 | ||
225 | #################################################################################################################### |
|
226 | # Step 4: determine start datetime and end datetime to aggregate |
|
227 | #################################################################################################################### |
|
228 | print("Step 4: determine start datetime and end datetime to aggregate") |
|
229 | cnx_energy_db = None |
|
230 | cursor_energy_db = None |
|
231 | try: |
|
232 | cnx_energy_db = mysql.connector.connect(**config.myems_energy_db) |
|
233 | cursor_energy_db = cnx_energy_db.cursor() |
|
234 | except Exception as e: |
|
235 | error_string = "Error in step 4.1 of equipment_energy_input_category.worker " + str(e) |
|
236 | if cursor_energy_db: |
|
237 | cursor_energy_db.close() |
|
238 | if cnx_energy_db: |
|
239 | cnx_energy_db.close() |
|
240 | print(error_string) |
|
241 | return error_string |
|
242 | ||
243 | try: |
|
244 | query = (" SELECT MAX(start_datetime_utc) " |
|
245 | " FROM tbl_equipment_input_category_hourly " |
|
246 | " WHERE equipment_id = %s ") |
|
247 | cursor_energy_db.execute(query, (equipment['id'],)) |
|
248 | row_datetime = cursor_energy_db.fetchone() |
|
249 | start_datetime_utc = datetime.strptime(config.start_datetime_utc, '%Y-%m-%d %H:%M:%S') |
|
250 | start_datetime_utc = start_datetime_utc.replace(minute=0, second=0, microsecond=0, tzinfo=None) |
|
251 | ||
252 | if row_datetime is not None and len(row_datetime) > 0 and isinstance(row_datetime[0], datetime): |
|
253 | # replace second and microsecond with 0 |
|
254 | # note: do not replace minute in case of calculating in half hourly |
|
255 | start_datetime_utc = row_datetime[0].replace(second=0, microsecond=0, tzinfo=None) |
|
256 | # start from the next time slot |
|
257 | start_datetime_utc += timedelta(minutes=config.minutes_to_count) |
|
258 | ||
259 | end_datetime_utc = datetime.utcnow().replace(second=0, microsecond=0, tzinfo=None) |
|
260 | ||
261 | print("start_datetime_utc: " + start_datetime_utc.isoformat()[0:19] |
|
262 | + "end_datetime_utc: " + end_datetime_utc.isoformat()[0:19]) |
|
263 | ||
264 | except Exception as e: |
|
265 | error_string = "Error in step 4.2 of equipment_energy_input_category.worker " + str(e) |
|
266 | if cursor_energy_db: |
|
267 | cursor_energy_db.close() |
|
268 | if cnx_energy_db: |
|
269 | cnx_energy_db.close() |
|
270 | print(error_string) |
|
271 | return error_string |
|
272 | ||
273 | #################################################################################################################### |
|
274 | # Step 5: for each meter in list, get energy input data from energy database |
|
275 | #################################################################################################################### |
|
276 | energy_meter_hourly = dict() |
|
277 | try: |
|
278 | if meter_list is not None and len(meter_list) > 0: |
|
279 | for meter in meter_list: |
|
280 | meter_id = str(meter['id']) |
|
281 | ||
282 | query = (" SELECT start_datetime_utc, actual_value " |
|
283 | " FROM tbl_meter_hourly " |
|
284 | " WHERE meter_id = %s " |
|
285 | " AND start_datetime_utc >= %s " |
|
286 | " AND start_datetime_utc < %s " |
|
287 | " ORDER BY start_datetime_utc ") |
|
288 | cursor_energy_db.execute(query, (meter_id, start_datetime_utc, end_datetime_utc,)) |
|
289 | rows_energy_values = cursor_energy_db.fetchall() |
|
290 | if rows_energy_values is None or len(rows_energy_values) == 0: |
|
291 | energy_meter_hourly[meter_id] = None |
|
292 | else: |
|
293 | energy_meter_hourly[meter_id] = dict() |
|
294 | for row_energy_value in rows_energy_values: |
|
295 | energy_meter_hourly[meter_id][row_energy_value[0]] = row_energy_value[1] |
|
296 | except Exception as e: |
|
297 | error_string = "Error in step 5.1 of equipment_energy_input_category.worker " + str(e) |
|
298 | if cursor_energy_db: |
|
299 | cursor_energy_db.close() |
|
300 | if cnx_energy_db: |
|
301 | cnx_energy_db.close() |
|
302 | print(error_string) |
|
303 | return error_string |
|
304 | ||
305 | #################################################################################################################### |
|
306 | # Step 6: for each virtual meter in list, get energy input data from energy database |
|
307 | #################################################################################################################### |
|
308 | energy_virtual_meter_hourly = dict() |
|
309 | if virtual_meter_list is not None and len(virtual_meter_list) > 0: |
|
310 | try: |
|
311 | for virtual_meter in virtual_meter_list: |
|
312 | virtual_meter_id = str(virtual_meter['id']) |
|
313 | ||
314 | query = (" SELECT start_datetime_utc, actual_value " |
|
315 | " FROM tbl_virtual_meter_hourly " |
|
316 | " WHERE virtual_meter_id = %s " |
|
317 | " AND start_datetime_utc >= %s " |
|
318 | " AND start_datetime_utc < %s " |
|
319 | " ORDER BY start_datetime_utc ") |
|
320 | cursor_energy_db.execute(query, (virtual_meter_id, start_datetime_utc, end_datetime_utc,)) |
|
321 | rows_energy_values = cursor_energy_db.fetchall() |
|
322 | if rows_energy_values is None or len(rows_energy_values) == 0: |
|
323 | energy_virtual_meter_hourly[virtual_meter_id] = None |
|
324 | else: |
|
325 | energy_virtual_meter_hourly[virtual_meter_id] = dict() |
|
326 | for row_energy_value in rows_energy_values: |
|
327 | energy_virtual_meter_hourly[virtual_meter_id][row_energy_value[0]] = row_energy_value[1] |
|
328 | except Exception as e: |
|
329 | error_string = "Error in step 6.1 of equipment_energy_input_category.worker " + str(e) |
|
330 | if cursor_energy_db: |
|
331 | cursor_energy_db.close() |
|
332 | if cnx_energy_db: |
|
333 | cnx_energy_db.close() |
|
334 | print(error_string) |
|
335 | return error_string |
|
336 | ||
337 | #################################################################################################################### |
|
338 | # Step 7: for each offline meter in list, get energy input data from energy database |
|
339 | #################################################################################################################### |
|
340 | energy_offline_meter_hourly = dict() |
|
341 | if offline_meter_list is not None and len(offline_meter_list) > 0: |
|
342 | try: |
|
343 | for offline_meter in offline_meter_list: |
|
344 | offline_meter_id = str(offline_meter['id']) |
|
345 | ||
346 | query = (" SELECT start_datetime_utc, actual_value " |
|
347 | " FROM tbl_offline_meter_hourly " |
|
348 | " WHERE offline_meter_id = %s " |
|
349 | " AND start_datetime_utc >= %s " |
|
350 | " AND start_datetime_utc < %s " |
|
351 | " ORDER BY start_datetime_utc ") |
|
352 | cursor_energy_db.execute(query, (offline_meter_id, start_datetime_utc, end_datetime_utc,)) |
|
353 | rows_energy_values = cursor_energy_db.fetchall() |
|
354 | if rows_energy_values is None or len(rows_energy_values) == 0: |
|
355 | energy_offline_meter_hourly[offline_meter_id] = None |
|
356 | else: |
|
357 | energy_offline_meter_hourly[offline_meter_id] = dict() |
|
358 | for row_energy_value in rows_energy_values: |
|
359 | energy_offline_meter_hourly[offline_meter_id][row_energy_value[0]] = row_energy_value[1] |
|
360 | ||
361 | except Exception as e: |
|
362 | error_string = "Error in step 7.1 of equipment_energy_input_category.worker " + str(e) |
|
363 | if cursor_energy_db: |
|
364 | cursor_energy_db.close() |
|
365 | if cnx_energy_db: |
|
366 | cnx_energy_db.close() |
|
367 | print(error_string) |
|
368 | return error_string |
|
369 | ||
370 | #################################################################################################################### |
|
371 | # Step 8: determine common time slot to aggregate |
|
372 | #################################################################################################################### |
|
373 | ||
374 | common_start_datetime_utc = start_datetime_utc |
|
375 | common_end_datetime_utc = end_datetime_utc |
|
376 | ||
377 | print("Getting common time slot of energy values for all meters") |
|
378 | if energy_meter_hourly is not None and len(energy_meter_hourly) > 0: |
|
379 | for meter_id, energy_hourly in energy_meter_hourly.items(): |
|
380 | if energy_hourly is None or len(energy_hourly) == 0: |
|
381 | common_start_datetime_utc = None |
|
382 | common_end_datetime_utc = None |
|
383 | break |
|
384 | else: |
|
385 | if common_start_datetime_utc < min(energy_hourly.keys()): |
|
386 | common_start_datetime_utc = min(energy_hourly.keys()) |
|
387 | if common_end_datetime_utc > max(energy_hourly.keys()): |
|
388 | common_end_datetime_utc = max(energy_hourly.keys()) |
|
389 | ||
390 | print("Getting common time slot of energy values for all virtual meters") |
|
391 | if common_start_datetime_utc is not None and common_start_datetime_utc is not None: |
|
392 | if energy_virtual_meter_hourly is not None and len(energy_virtual_meter_hourly) > 0: |
|
393 | for meter_id, energy_hourly in energy_virtual_meter_hourly.items(): |
|
394 | if energy_hourly is None or len(energy_hourly) == 0: |
|
395 | common_start_datetime_utc = None |
|
396 | common_end_datetime_utc = None |
|
397 | break |
|
398 | else: |
|
399 | if common_start_datetime_utc < min(energy_hourly.keys()): |
|
400 | common_start_datetime_utc = min(energy_hourly.keys()) |
|
401 | if common_end_datetime_utc > max(energy_hourly.keys()): |
|
402 | common_end_datetime_utc = max(energy_hourly.keys()) |
|
403 | ||
404 | print("Getting common time slot of energy values for all offline meters") |
|
405 | if common_start_datetime_utc is not None and common_start_datetime_utc is not None: |
|
406 | if energy_offline_meter_hourly is not None and len(energy_offline_meter_hourly) > 0: |
|
407 | for meter_id, energy_hourly in energy_offline_meter_hourly.items(): |
|
408 | if energy_hourly is None or len(energy_hourly) == 0: |
|
409 | common_start_datetime_utc = None |
|
410 | common_end_datetime_utc = None |
|
411 | break |
|
412 | else: |
|
413 | if common_start_datetime_utc < min(energy_hourly.keys()): |
|
414 | common_start_datetime_utc = min(energy_hourly.keys()) |
|
415 | if common_end_datetime_utc > max(energy_hourly.keys()): |
|
416 | common_end_datetime_utc = max(energy_hourly.keys()) |
|
417 | ||
418 | if (energy_meter_hourly is None or len(energy_meter_hourly) == 0) and \ |
|
419 | (energy_virtual_meter_hourly is None or len(energy_virtual_meter_hourly) == 0) and \ |
|
420 | (energy_offline_meter_hourly is None or len(energy_offline_meter_hourly) == 0): |
|
421 | # There isn't any energy data |
|
422 | print("There isn't any energy data") |
|
423 | # continue the for equipment loop to the next equipment |
|
424 | print("continue the for equipment loop to the next equipment") |
|
425 | if cursor_energy_db: |
|
426 | cursor_energy_db.close() |
|
427 | if cnx_energy_db: |
|
428 | cnx_energy_db.close() |
|
429 | return None |
|
430 | ||
431 | print("common_start_datetime_utc: " + str(common_start_datetime_utc)) |
|
432 | print("common_end_datetime_utc: " + str(common_end_datetime_utc)) |
|
433 | ||
434 | #################################################################################################################### |
|
435 | # Step 9: aggregate energy data in the common time slot by energy categories and hourly |
|
436 | #################################################################################################################### |
|
437 | ||
438 | print("Step 9: aggregate energy data in the common time slot by energy categories and hourly") |
|
439 | aggregated_values = list() |
|
440 | try: |
|
441 | current_datetime_utc = common_start_datetime_utc |
|
442 | while common_start_datetime_utc is not None \ |
|
443 | and common_end_datetime_utc is not None \ |
|
444 | and current_datetime_utc <= common_end_datetime_utc: |
|
445 | aggregated_value = dict() |
|
446 | aggregated_value['start_datetime_utc'] = current_datetime_utc |
|
447 | aggregated_value['meta_data'] = dict() |
|
448 | ||
449 | if meter_list is not None and len(meter_list) > 0: |
|
450 | for meter in meter_list: |
|
451 | meter_id = str(meter['id']) |
|
452 | energy_category_id = meter['energy_category_id'] |
|
453 | actual_value = energy_meter_hourly[meter_id].get(current_datetime_utc, Decimal(0.0)) |
|
454 | aggregated_value['meta_data'][energy_category_id] = \ |
|
455 | aggregated_value['meta_data'].get(energy_category_id, Decimal(0.0)) + actual_value |
|
456 | ||
457 | if virtual_meter_list is not None and len(virtual_meter_list) > 0: |
|
458 | for virtual_meter in virtual_meter_list: |
|
459 | virtual_meter_id = str(virtual_meter['id']) |
|
460 | energy_category_id = virtual_meter['energy_category_id'] |
|
461 | actual_value = energy_virtual_meter_hourly[virtual_meter_id].get(current_datetime_utc, Decimal(0.0)) |
|
462 | aggregated_value['meta_data'][energy_category_id] = \ |
|
463 | aggregated_value['meta_data'].get(energy_category_id, Decimal(0.0)) + actual_value |
|
464 | ||
465 | if offline_meter_list is not None and len(offline_meter_list) > 0: |
|
466 | for offline_meter in offline_meter_list: |
|
467 | offline_meter_id = str(offline_meter['id']) |
|
468 | energy_category_id = offline_meter['energy_category_id'] |
|
469 | actual_value = energy_offline_meter_hourly[offline_meter_id].get(current_datetime_utc, Decimal(0.0)) |
|
470 | aggregated_value['meta_data'][energy_category_id] = \ |
|
471 | aggregated_value['meta_data'].get(energy_category_id, Decimal(0.0)) + actual_value |
|
472 | ||
473 | aggregated_values.append(aggregated_value) |
|
474 | ||
475 | current_datetime_utc += timedelta(minutes=config.minutes_to_count) |
|
476 | ||
477 | except Exception as e: |
|
478 | error_string = "Error in step 9 of equipment_energy_input_category.worker " + str(e) |
|
479 | if cursor_energy_db: |
|
480 | cursor_energy_db.close() |
|
481 | if cnx_energy_db: |
|
482 | cnx_energy_db.close() |
|
483 | print(error_string) |
|
484 | return error_string |
|
485 | ||
486 | #################################################################################################################### |
|
487 | # Step 10: save energy data to energy database |
|
488 | #################################################################################################################### |
|
489 | print("Step 10: save energy data to energy database") |
|
490 | ||
491 | if len(aggregated_values) > 0: |
|
492 | try: |
|
493 | add_values = (" INSERT INTO tbl_equipment_input_category_hourly " |
|
494 | " (equipment_id, " |
|
495 | " energy_category_id, " |
|
496 | " start_datetime_utc, " |
|
497 | " actual_value) " |
|
498 | " VALUES ") |
|
499 | ||
500 | for aggregated_value in aggregated_values: |
|
501 | for energy_category_id, actual_value in aggregated_value['meta_data'].items(): |
|
502 | add_values += " (" + str(equipment['id']) + "," |
|
503 | add_values += " " + str(energy_category_id) + "," |
|
504 | add_values += "'" + aggregated_value['start_datetime_utc'].isoformat()[0:19] + "'," |
|
505 | add_values += str(actual_value) + "), " |
|
506 | print("add_values:" + add_values) |
|
507 | # trim ", " at the end of string and then execute |
|
508 | cursor_energy_db.execute(add_values[:-2]) |
|
509 | cnx_energy_db.commit() |
|
510 | ||
511 | except Exception as e: |
|
512 | error_string = "Error in step 10.1 of equipment_energy_input_category.worker " + str(e) |
|
513 | print(error_string) |
|
514 | return error_string |
|
515 | finally: |
|
516 | if cursor_energy_db: |
|
517 | cursor_energy_db.close() |
|
518 | if cnx_energy_db: |
|
519 | cnx_energy_db.close() |
|
520 | else: |
|
521 | if cursor_energy_db: |
|
522 | cursor_energy_db.close() |
|
523 | if cnx_energy_db: |
|
524 | cnx_energy_db.close() |
|
525 |
@@ 106-524 (lines=419) @@ | ||
103 | # NOTE: returns None or the error string because that the logger object cannot be passed in as parameter |
|
104 | ######################################################################################################################## |
|
105 | ||
106 | def worker(tenant): |
|
107 | #################################################################################################################### |
|
108 | # Step 1: get all input meters associated with the tenant |
|
109 | #################################################################################################################### |
|
110 | print("Step 1: get all input meters associated with the tenant " + str(tenant['name'])) |
|
111 | ||
112 | meter_list = list() |
|
113 | cnx_system_db = None |
|
114 | cursor_system_db = None |
|
115 | try: |
|
116 | cnx_system_db = mysql.connector.connect(**config.myems_system_db) |
|
117 | cursor_system_db = cnx_system_db.cursor() |
|
118 | except Exception as e: |
|
119 | error_string = "Error in step 1.1 of tenant_energy_input_item.worker " + str(e) |
|
120 | if cursor_system_db: |
|
121 | cursor_system_db.close() |
|
122 | if cnx_system_db: |
|
123 | cnx_system_db.close() |
|
124 | print(error_string) |
|
125 | return error_string |
|
126 | ||
127 | try: |
|
128 | cursor_system_db.execute(" SELECT m.id, m.name, m.energy_item_id " |
|
129 | " FROM tbl_meters m, tbl_tenants_meters tm " |
|
130 | " WHERE m.id = tm.meter_id " |
|
131 | " AND m.is_counted = true " |
|
132 | " AND m.energy_item_id is NOT NULL " |
|
133 | " AND tm.tenant_id = %s ", |
|
134 | (tenant['id'],)) |
|
135 | rows_meters = cursor_system_db.fetchall() |
|
136 | ||
137 | if rows_meters is not None and len(rows_meters) > 0: |
|
138 | for row in rows_meters: |
|
139 | meter_list.append({"id": row[0], |
|
140 | "name": row[1], |
|
141 | "energy_item_id": row[2]}) |
|
142 | ||
143 | except Exception as e: |
|
144 | error_string = "Error in step 1.2 of tenant_energy_input_item.worker " + str(e) |
|
145 | if cursor_system_db: |
|
146 | cursor_system_db.close() |
|
147 | if cnx_system_db: |
|
148 | cnx_system_db.close() |
|
149 | print(error_string) |
|
150 | return error_string |
|
151 | ||
152 | #################################################################################################################### |
|
153 | # Step 2: get all input virtual meters associated with the tenant |
|
154 | #################################################################################################################### |
|
155 | print("Step 2: get all input virtual meters associated with the tenant") |
|
156 | virtual_meter_list = list() |
|
157 | ||
158 | try: |
|
159 | cursor_system_db.execute(" SELECT m.id, m.name, m.energy_item_id " |
|
160 | " FROM tbl_virtual_meters m, tbl_tenants_virtual_meters tm " |
|
161 | " WHERE m.id = tm.virtual_meter_id " |
|
162 | " AND m.energy_item_id is NOT NULL " |
|
163 | " AND m.is_counted = true " |
|
164 | " AND tm.tenant_id = %s ", |
|
165 | (tenant['id'],)) |
|
166 | rows_virtual_meters = cursor_system_db.fetchall() |
|
167 | ||
168 | if rows_virtual_meters is not None and len(rows_virtual_meters) > 0: |
|
169 | for row in rows_virtual_meters: |
|
170 | virtual_meter_list.append({"id": row[0], |
|
171 | "name": row[1], |
|
172 | "energy_item_id": row[2]}) |
|
173 | ||
174 | except Exception as e: |
|
175 | error_string = "Error in step 2.1 of tenant_energy_input_item.worker " + str(e) |
|
176 | if cursor_system_db: |
|
177 | cursor_system_db.close() |
|
178 | if cnx_system_db: |
|
179 | cnx_system_db.close() |
|
180 | print(error_string) |
|
181 | return error_string |
|
182 | ||
183 | #################################################################################################################### |
|
184 | # Step 3: get all input offline meters associated with the tenant |
|
185 | #################################################################################################################### |
|
186 | print("Step 3: get all input offline meters associated with the tenant") |
|
187 | ||
188 | offline_meter_list = list() |
|
189 | ||
190 | try: |
|
191 | cursor_system_db.execute(" SELECT m.id, m.name, m.energy_item_id " |
|
192 | " FROM tbl_offline_meters m, tbl_tenants_offline_meters tm " |
|
193 | " WHERE m.id = tm.offline_meter_id " |
|
194 | " AND m.energy_item_id is NOT NULL " |
|
195 | " AND m.is_counted = true " |
|
196 | " AND tm.tenant_id = %s ", |
|
197 | (tenant['id'],)) |
|
198 | rows_offline_meters = cursor_system_db.fetchall() |
|
199 | ||
200 | if rows_offline_meters is not None and len(rows_offline_meters) > 0: |
|
201 | for row in rows_offline_meters: |
|
202 | offline_meter_list.append({"id": row[0], |
|
203 | "name": row[1], |
|
204 | "energy_item_id": row[2]}) |
|
205 | ||
206 | except Exception as e: |
|
207 | error_string = "Error in step 3.1 of tenant_energy_input_item.worker " + str(e) |
|
208 | print(error_string) |
|
209 | return error_string |
|
210 | finally: |
|
211 | if cursor_system_db: |
|
212 | cursor_system_db.close() |
|
213 | if cnx_system_db: |
|
214 | cnx_system_db.close() |
|
215 | ||
216 | #################################################################################################################### |
|
217 | # stop to the next tenant if this tenant is empty |
|
218 | #################################################################################################################### |
|
219 | if (meter_list is None or len(meter_list) == 0) and \ |
|
220 | (virtual_meter_list is None or len(virtual_meter_list) == 0) and \ |
|
221 | (offline_meter_list is None or len(offline_meter_list) == 0): |
|
222 | print("This is an empty tenant ") |
|
223 | return None |
|
224 | ||
225 | #################################################################################################################### |
|
226 | # Step 4: determine start datetime and end datetime to aggregate |
|
227 | #################################################################################################################### |
|
228 | print("Step 4: determine start datetime and end datetime to aggregate") |
|
229 | cnx_energy_db = None |
|
230 | cursor_energy_db = None |
|
231 | try: |
|
232 | cnx_energy_db = mysql.connector.connect(**config.myems_energy_db) |
|
233 | cursor_energy_db = cnx_energy_db.cursor() |
|
234 | except Exception as e: |
|
235 | error_string = "Error in step 4.1 of tenant_energy_input_item.worker " + str(e) |
|
236 | if cursor_energy_db: |
|
237 | cursor_energy_db.close() |
|
238 | if cnx_energy_db: |
|
239 | cnx_energy_db.close() |
|
240 | print(error_string) |
|
241 | return error_string |
|
242 | ||
243 | try: |
|
244 | query = (" SELECT MAX(start_datetime_utc) " |
|
245 | " FROM tbl_tenant_input_item_hourly " |
|
246 | " WHERE tenant_id = %s ") |
|
247 | cursor_energy_db.execute(query, (tenant['id'],)) |
|
248 | row_datetime = cursor_energy_db.fetchone() |
|
249 | start_datetime_utc = datetime.strptime(config.start_datetime_utc, '%Y-%m-%d %H:%M:%S') |
|
250 | start_datetime_utc = start_datetime_utc.replace(minute=0, second=0, microsecond=0, tzinfo=None) |
|
251 | ||
252 | if row_datetime is not None and len(row_datetime) > 0 and isinstance(row_datetime[0], datetime): |
|
253 | # replace second and microsecond with 0 |
|
254 | # note: do not replace minute in case of calculating in half hourly |
|
255 | start_datetime_utc = row_datetime[0].replace(second=0, microsecond=0, tzinfo=None) |
|
256 | # start from the next time slot |
|
257 | start_datetime_utc += timedelta(minutes=config.minutes_to_count) |
|
258 | ||
259 | end_datetime_utc = datetime.utcnow().replace(second=0, microsecond=0, tzinfo=None) |
|
260 | ||
261 | print("start_datetime_utc: " + start_datetime_utc.isoformat()[0:19] |
|
262 | + "end_datetime_utc: " + end_datetime_utc.isoformat()[0:19]) |
|
263 | ||
264 | except Exception as e: |
|
265 | error_string = "Error in step 4.2 of tenant_energy_input_item.worker " + str(e) |
|
266 | if cursor_energy_db: |
|
267 | cursor_energy_db.close() |
|
268 | if cnx_energy_db: |
|
269 | cnx_energy_db.close() |
|
270 | print(error_string) |
|
271 | return error_string |
|
272 | ||
273 | #################################################################################################################### |
|
274 | # Step 5: for each meter in list, get energy input data from energy database |
|
275 | #################################################################################################################### |
|
276 | energy_meter_hourly = dict() |
|
277 | try: |
|
278 | if meter_list is not None and len(meter_list) > 0: |
|
279 | for meter in meter_list: |
|
280 | meter_id = str(meter['id']) |
|
281 | ||
282 | query = (" SELECT start_datetime_utc, actual_value " |
|
283 | " FROM tbl_meter_hourly " |
|
284 | " WHERE meter_id = %s " |
|
285 | " AND start_datetime_utc >= %s " |
|
286 | " AND start_datetime_utc < %s " |
|
287 | " ORDER BY start_datetime_utc ") |
|
288 | cursor_energy_db.execute(query, (meter_id, start_datetime_utc, end_datetime_utc,)) |
|
289 | rows_energy_values = cursor_energy_db.fetchall() |
|
290 | if rows_energy_values is None or len(rows_energy_values) == 0: |
|
291 | energy_meter_hourly[meter_id] = None |
|
292 | else: |
|
293 | energy_meter_hourly[meter_id] = dict() |
|
294 | for row_energy_value in rows_energy_values: |
|
295 | energy_meter_hourly[meter_id][row_energy_value[0]] = row_energy_value[1] |
|
296 | except Exception as e: |
|
297 | error_string = "Error in step 5.1 of tenant_energy_input_item.worker " + str(e) |
|
298 | if cursor_energy_db: |
|
299 | cursor_energy_db.close() |
|
300 | if cnx_energy_db: |
|
301 | cnx_energy_db.close() |
|
302 | print(error_string) |
|
303 | return error_string |
|
304 | ||
305 | #################################################################################################################### |
|
306 | # Step 6: for each virtual meter in list, get energy input data from energy database |
|
307 | #################################################################################################################### |
|
308 | energy_virtual_meter_hourly = dict() |
|
309 | if virtual_meter_list is not None and len(virtual_meter_list) > 0: |
|
310 | try: |
|
311 | for virtual_meter in virtual_meter_list: |
|
312 | virtual_meter_id = str(virtual_meter['id']) |
|
313 | ||
314 | query = (" SELECT start_datetime_utc, actual_value " |
|
315 | " FROM tbl_virtual_meter_hourly " |
|
316 | " WHERE virtual_meter_id = %s " |
|
317 | " AND start_datetime_utc >= %s " |
|
318 | " AND start_datetime_utc < %s " |
|
319 | " ORDER BY start_datetime_utc ") |
|
320 | cursor_energy_db.execute(query, (virtual_meter_id, start_datetime_utc, end_datetime_utc,)) |
|
321 | rows_energy_values = cursor_energy_db.fetchall() |
|
322 | if rows_energy_values is None or len(rows_energy_values) == 0: |
|
323 | energy_virtual_meter_hourly[virtual_meter_id] = None |
|
324 | else: |
|
325 | energy_virtual_meter_hourly[virtual_meter_id] = dict() |
|
326 | for row_energy_value in rows_energy_values: |
|
327 | energy_virtual_meter_hourly[virtual_meter_id][row_energy_value[0]] = row_energy_value[1] |
|
328 | except Exception as e: |
|
329 | error_string = "Error in step 6.1 of tenant_energy_input_item.worker " + str(e) |
|
330 | if cursor_energy_db: |
|
331 | cursor_energy_db.close() |
|
332 | if cnx_energy_db: |
|
333 | cnx_energy_db.close() |
|
334 | print(error_string) |
|
335 | return error_string |
|
336 | ||
337 | #################################################################################################################### |
|
338 | # Step 7: for each offline meter in list, get energy input data from energy database |
|
339 | #################################################################################################################### |
|
340 | energy_offline_meter_hourly = dict() |
|
341 | if offline_meter_list is not None and len(offline_meter_list) > 0: |
|
342 | try: |
|
343 | for offline_meter in offline_meter_list: |
|
344 | offline_meter_id = str(offline_meter['id']) |
|
345 | ||
346 | query = (" SELECT start_datetime_utc, actual_value " |
|
347 | " FROM tbl_offline_meter_hourly " |
|
348 | " WHERE offline_meter_id = %s " |
|
349 | " AND start_datetime_utc >= %s " |
|
350 | " AND start_datetime_utc < %s " |
|
351 | " ORDER BY start_datetime_utc ") |
|
352 | cursor_energy_db.execute(query, (offline_meter_id, start_datetime_utc, end_datetime_utc,)) |
|
353 | rows_energy_values = cursor_energy_db.fetchall() |
|
354 | if rows_energy_values is None or len(rows_energy_values) == 0: |
|
355 | energy_offline_meter_hourly[offline_meter_id] = None |
|
356 | else: |
|
357 | energy_offline_meter_hourly[offline_meter_id] = dict() |
|
358 | for row_energy_value in rows_energy_values: |
|
359 | energy_offline_meter_hourly[offline_meter_id][row_energy_value[0]] = row_energy_value[1] |
|
360 | ||
361 | except Exception as e: |
|
362 | error_string = "Error in step 7.1 of tenant_energy_input_item.worker " + str(e) |
|
363 | if cursor_energy_db: |
|
364 | cursor_energy_db.close() |
|
365 | if cnx_energy_db: |
|
366 | cnx_energy_db.close() |
|
367 | print(error_string) |
|
368 | return error_string |
|
369 | ||
370 | #################################################################################################################### |
|
371 | # Step 8: determine common time slot to aggregate |
|
372 | #################################################################################################################### |
|
373 | ||
374 | common_start_datetime_utc = start_datetime_utc |
|
375 | common_end_datetime_utc = end_datetime_utc |
|
376 | ||
377 | print("Getting common time slot of energy values for all meters") |
|
378 | if energy_meter_hourly is not None and len(energy_meter_hourly) > 0: |
|
379 | for meter_id, energy_hourly in energy_meter_hourly.items(): |
|
380 | if energy_hourly is None or len(energy_hourly) == 0: |
|
381 | common_start_datetime_utc = None |
|
382 | common_end_datetime_utc = None |
|
383 | break |
|
384 | else: |
|
385 | if common_start_datetime_utc < min(energy_hourly.keys()): |
|
386 | common_start_datetime_utc = min(energy_hourly.keys()) |
|
387 | if common_end_datetime_utc > max(energy_hourly.keys()): |
|
388 | common_end_datetime_utc = max(energy_hourly.keys()) |
|
389 | ||
390 | print("Getting common time slot of energy values for all virtual meters") |
|
391 | if common_start_datetime_utc is not None and common_start_datetime_utc is not None: |
|
392 | if energy_virtual_meter_hourly is not None and len(energy_virtual_meter_hourly) > 0: |
|
393 | for meter_id, energy_hourly in energy_virtual_meter_hourly.items(): |
|
394 | if energy_hourly is None or len(energy_hourly) == 0: |
|
395 | common_start_datetime_utc = None |
|
396 | common_end_datetime_utc = None |
|
397 | break |
|
398 | else: |
|
399 | if common_start_datetime_utc < min(energy_hourly.keys()): |
|
400 | common_start_datetime_utc = min(energy_hourly.keys()) |
|
401 | if common_end_datetime_utc > max(energy_hourly.keys()): |
|
402 | common_end_datetime_utc = max(energy_hourly.keys()) |
|
403 | ||
404 | print("Getting common time slot of energy values for all offline meters") |
|
405 | if common_start_datetime_utc is not None and common_start_datetime_utc is not None: |
|
406 | if energy_offline_meter_hourly is not None and len(energy_offline_meter_hourly) > 0: |
|
407 | for meter_id, energy_hourly in energy_offline_meter_hourly.items(): |
|
408 | if energy_hourly is None or len(energy_hourly) == 0: |
|
409 | common_start_datetime_utc = None |
|
410 | common_end_datetime_utc = None |
|
411 | break |
|
412 | else: |
|
413 | if common_start_datetime_utc < min(energy_hourly.keys()): |
|
414 | common_start_datetime_utc = min(energy_hourly.keys()) |
|
415 | if common_end_datetime_utc > max(energy_hourly.keys()): |
|
416 | common_end_datetime_utc = max(energy_hourly.keys()) |
|
417 | ||
418 | if (energy_meter_hourly is None or len(energy_meter_hourly) == 0) and \ |
|
419 | (energy_virtual_meter_hourly is None or len(energy_virtual_meter_hourly) == 0) and \ |
|
420 | (energy_offline_meter_hourly is None or len(energy_offline_meter_hourly) == 0): |
|
421 | # There isn't any energy data |
|
422 | print("There isn't any energy data") |
|
423 | # continue the for tenant loop to the next tenant |
|
424 | print("continue the for tenant loop to the next tenant") |
|
425 | if cursor_energy_db: |
|
426 | cursor_energy_db.close() |
|
427 | if cnx_energy_db: |
|
428 | cnx_energy_db.close() |
|
429 | return None |
|
430 | ||
431 | print("common_start_datetime_utc: " + str(common_start_datetime_utc)) |
|
432 | print("common_end_datetime_utc: " + str(common_end_datetime_utc)) |
|
433 | ||
434 | #################################################################################################################### |
|
435 | # Step 9: aggregate energy data in the common time slot by energy items and hourly |
|
436 | #################################################################################################################### |
|
437 | ||
438 | print("Step 9: aggregate energy data in the common time slot by energy items and hourly") |
|
439 | aggregated_values = list() |
|
440 | try: |
|
441 | current_datetime_utc = common_start_datetime_utc |
|
442 | while common_start_datetime_utc is not None \ |
|
443 | and common_end_datetime_utc is not None \ |
|
444 | and current_datetime_utc <= common_end_datetime_utc: |
|
445 | aggregated_value = dict() |
|
446 | aggregated_value['start_datetime_utc'] = current_datetime_utc |
|
447 | aggregated_value['meta_data'] = dict() |
|
448 | ||
449 | if meter_list is not None and len(meter_list) > 0: |
|
450 | for meter in meter_list: |
|
451 | meter_id = str(meter['id']) |
|
452 | energy_item_id = meter['energy_item_id'] |
|
453 | actual_value = energy_meter_hourly[meter_id].get(current_datetime_utc, Decimal(0.0)) |
|
454 | aggregated_value['meta_data'][energy_item_id] = \ |
|
455 | aggregated_value['meta_data'].get(energy_item_id, Decimal(0.0)) + actual_value |
|
456 | ||
457 | if virtual_meter_list is not None and len(virtual_meter_list) > 0: |
|
458 | for virtual_meter in virtual_meter_list: |
|
459 | virtual_meter_id = str(virtual_meter['id']) |
|
460 | energy_item_id = virtual_meter['energy_item_id'] |
|
461 | actual_value = energy_virtual_meter_hourly[virtual_meter_id].get(current_datetime_utc, Decimal(0.0)) |
|
462 | aggregated_value['meta_data'][energy_item_id] = \ |
|
463 | aggregated_value['meta_data'].get(energy_item_id, Decimal(0.0)) + actual_value |
|
464 | ||
465 | if offline_meter_list is not None and len(offline_meter_list) > 0: |
|
466 | for offline_meter in offline_meter_list: |
|
467 | offline_meter_id = str(offline_meter['id']) |
|
468 | energy_item_id = offline_meter['energy_item_id'] |
|
469 | actual_value = energy_offline_meter_hourly[offline_meter_id].get(current_datetime_utc, Decimal(0.0)) |
|
470 | aggregated_value['meta_data'][energy_item_id] = \ |
|
471 | aggregated_value['meta_data'].get(energy_item_id, Decimal(0.0)) + actual_value |
|
472 | ||
473 | aggregated_values.append(aggregated_value) |
|
474 | ||
475 | current_datetime_utc += timedelta(minutes=config.minutes_to_count) |
|
476 | ||
477 | except Exception as e: |
|
478 | error_string = "Error in step 9 of tenant_energy_input_item.worker " + str(e) |
|
479 | if cursor_energy_db: |
|
480 | cursor_energy_db.close() |
|
481 | if cnx_energy_db: |
|
482 | cnx_energy_db.close() |
|
483 | print(error_string) |
|
484 | return error_string |
|
485 | ||
486 | #################################################################################################################### |
|
487 | # Step 10: save energy data to energy database |
|
488 | #################################################################################################################### |
|
489 | print("Step 10: save energy data to energy database") |
|
490 | ||
491 | if len(aggregated_values) > 0: |
|
492 | try: |
|
493 | add_values = (" INSERT INTO tbl_tenant_input_item_hourly " |
|
494 | " (tenant_id, " |
|
495 | " energy_item_id, " |
|
496 | " start_datetime_utc, " |
|
497 | " actual_value) " |
|
498 | " VALUES ") |
|
499 | ||
500 | for aggregated_value in aggregated_values: |
|
501 | for energy_item_id, actual_value in aggregated_value['meta_data'].items(): |
|
502 | add_values += " (" + str(tenant['id']) + "," |
|
503 | add_values += " " + str(energy_item_id) + "," |
|
504 | add_values += "'" + aggregated_value['start_datetime_utc'].isoformat()[0:19] + "'," |
|
505 | add_values += str(actual_value) + "), " |
|
506 | print("add_values:" + add_values) |
|
507 | # trim ", " at the end of string and then execute |
|
508 | cursor_energy_db.execute(add_values[:-2]) |
|
509 | cnx_energy_db.commit() |
|
510 | ||
511 | except Exception as e: |
|
512 | error_string = "Error in step 10.1 of tenant_energy_input_item.worker " + str(e) |
|
513 | print(error_string) |
|
514 | return error_string |
|
515 | finally: |
|
516 | if cursor_energy_db: |
|
517 | cursor_energy_db.close() |
|
518 | if cnx_energy_db: |
|
519 | cnx_energy_db.close() |
|
520 | else: |
|
521 | if cursor_energy_db: |
|
522 | cursor_energy_db.close() |
|
523 | if cnx_energy_db: |
|
524 | cnx_energy_db.close() |
|
525 |
@@ 106-524 (lines=419) @@ | ||
103 | # NOTE: returns None or the error string because that the logger object cannot be passed in as parameter |
|
104 | ######################################################################################################################## |
|
105 | ||
106 | def worker(store): |
|
107 | #################################################################################################################### |
|
108 | # Step 1: get all input meters associated with the store |
|
109 | #################################################################################################################### |
|
110 | print("Step 1: get all input meters associated with the store " + str(store['name'])) |
|
111 | ||
112 | meter_list = list() |
|
113 | cnx_system_db = None |
|
114 | cursor_system_db = None |
|
115 | try: |
|
116 | cnx_system_db = mysql.connector.connect(**config.myems_system_db) |
|
117 | cursor_system_db = cnx_system_db.cursor() |
|
118 | except Exception as e: |
|
119 | error_string = "Error in step 1.1 of store_energy_input_item.worker " + str(e) |
|
120 | if cursor_system_db: |
|
121 | cursor_system_db.close() |
|
122 | if cnx_system_db: |
|
123 | cnx_system_db.close() |
|
124 | print(error_string) |
|
125 | return error_string |
|
126 | ||
127 | try: |
|
128 | cursor_system_db.execute(" SELECT m.id, m.name, m.energy_item_id " |
|
129 | " FROM tbl_meters m, tbl_stores_meters sm " |
|
130 | " WHERE m.id = sm.meter_id " |
|
131 | " AND m.is_counted = true " |
|
132 | " AND m.energy_item_id is NOT NULL " |
|
133 | " AND sm.store_id = %s ", |
|
134 | (store['id'],)) |
|
135 | rows_meters = cursor_system_db.fetchall() |
|
136 | ||
137 | if rows_meters is not None and len(rows_meters) > 0: |
|
138 | for row in rows_meters: |
|
139 | meter_list.append({"id": row[0], |
|
140 | "name": row[1], |
|
141 | "energy_item_id": row[2]}) |
|
142 | ||
143 | except Exception as e: |
|
144 | error_string = "Error in step 1.2 of store_energy_input_item.worker " + str(e) |
|
145 | if cursor_system_db: |
|
146 | cursor_system_db.close() |
|
147 | if cnx_system_db: |
|
148 | cnx_system_db.close() |
|
149 | print(error_string) |
|
150 | return error_string |
|
151 | ||
152 | #################################################################################################################### |
|
153 | # Step 2: get all input virtual meters associated with the store |
|
154 | #################################################################################################################### |
|
155 | print("Step 2: get all input virtual meters associated with the store") |
|
156 | virtual_meter_list = list() |
|
157 | ||
158 | try: |
|
159 | cursor_system_db.execute(" SELECT m.id, m.name, m.energy_item_id " |
|
160 | " FROM tbl_virtual_meters m, tbl_stores_virtual_meters sm " |
|
161 | " WHERE m.id = sm.virtual_meter_id " |
|
162 | " AND m.energy_item_id is NOT NULL " |
|
163 | " AND m.is_counted = true " |
|
164 | " AND sm.store_id = %s ", |
|
165 | (store['id'],)) |
|
166 | rows_virtual_meters = cursor_system_db.fetchall() |
|
167 | ||
168 | if rows_virtual_meters is not None and len(rows_virtual_meters) > 0: |
|
169 | for row in rows_virtual_meters: |
|
170 | virtual_meter_list.append({"id": row[0], |
|
171 | "name": row[1], |
|
172 | "energy_item_id": row[2]}) |
|
173 | ||
174 | except Exception as e: |
|
175 | error_string = "Error in step 2.1 of store_energy_input_item.worker " + str(e) |
|
176 | if cursor_system_db: |
|
177 | cursor_system_db.close() |
|
178 | if cnx_system_db: |
|
179 | cnx_system_db.close() |
|
180 | print(error_string) |
|
181 | return error_string |
|
182 | ||
183 | #################################################################################################################### |
|
184 | # Step 3: get all input offline meters associated with the store |
|
185 | #################################################################################################################### |
|
186 | print("Step 3: get all input offline meters associated with the store") |
|
187 | ||
188 | offline_meter_list = list() |
|
189 | ||
190 | try: |
|
191 | cursor_system_db.execute(" SELECT m.id, m.name, m.energy_item_id " |
|
192 | " FROM tbl_offline_meters m, tbl_stores_offline_meters sm " |
|
193 | " WHERE m.id = sm.offline_meter_id " |
|
194 | " AND m.energy_item_id is NOT NULL " |
|
195 | " AND m.is_counted = true " |
|
196 | " AND sm.store_id = %s ", |
|
197 | (store['id'],)) |
|
198 | rows_offline_meters = cursor_system_db.fetchall() |
|
199 | ||
200 | if rows_offline_meters is not None and len(rows_offline_meters) > 0: |
|
201 | for row in rows_offline_meters: |
|
202 | offline_meter_list.append({"id": row[0], |
|
203 | "name": row[1], |
|
204 | "energy_item_id": row[2]}) |
|
205 | ||
206 | except Exception as e: |
|
207 | error_string = "Error in step 3.1 of store_energy_input_item.worker " + str(e) |
|
208 | print(error_string) |
|
209 | return error_string |
|
210 | finally: |
|
211 | if cursor_system_db: |
|
212 | cursor_system_db.close() |
|
213 | if cnx_system_db: |
|
214 | cnx_system_db.close() |
|
215 | ||
216 | #################################################################################################################### |
|
217 | # stop to the next store if this store is empty |
|
218 | #################################################################################################################### |
|
219 | if (meter_list is None or len(meter_list) == 0) and \ |
|
220 | (virtual_meter_list is None or len(virtual_meter_list) == 0) and \ |
|
221 | (offline_meter_list is None or len(offline_meter_list) == 0): |
|
222 | print("This is an empty store ") |
|
223 | return None |
|
224 | ||
225 | #################################################################################################################### |
|
226 | # Step 4: determine start datetime and end datetime to aggregate |
|
227 | #################################################################################################################### |
|
228 | print("Step 4: determine start datetime and end datetime to aggregate") |
|
229 | cnx_energy_db = None |
|
230 | cursor_energy_db = None |
|
231 | try: |
|
232 | cnx_energy_db = mysql.connector.connect(**config.myems_energy_db) |
|
233 | cursor_energy_db = cnx_energy_db.cursor() |
|
234 | except Exception as e: |
|
235 | error_string = "Error in step 4.1 of store_energy_input_item.worker " + str(e) |
|
236 | if cursor_energy_db: |
|
237 | cursor_energy_db.close() |
|
238 | if cnx_energy_db: |
|
239 | cnx_energy_db.close() |
|
240 | print(error_string) |
|
241 | return error_string |
|
242 | ||
243 | try: |
|
244 | query = (" SELECT MAX(start_datetime_utc) " |
|
245 | " FROM tbl_store_input_item_hourly " |
|
246 | " WHERE store_id = %s ") |
|
247 | cursor_energy_db.execute(query, (store['id'],)) |
|
248 | row_datetime = cursor_energy_db.fetchone() |
|
249 | start_datetime_utc = datetime.strptime(config.start_datetime_utc, '%Y-%m-%d %H:%M:%S') |
|
250 | start_datetime_utc = start_datetime_utc.replace(minute=0, second=0, microsecond=0, tzinfo=None) |
|
251 | ||
252 | if row_datetime is not None and len(row_datetime) > 0 and isinstance(row_datetime[0], datetime): |
|
253 | # replace second and microsecond with 0 |
|
254 | # note: do not replace minute in case of calculating in half hourly |
|
255 | start_datetime_utc = row_datetime[0].replace(second=0, microsecond=0, tzinfo=None) |
|
256 | # start from the next time slot |
|
257 | start_datetime_utc += timedelta(minutes=config.minutes_to_count) |
|
258 | ||
259 | end_datetime_utc = datetime.utcnow().replace(second=0, microsecond=0, tzinfo=None) |
|
260 | ||
261 | print("start_datetime_utc: " + start_datetime_utc.isoformat()[0:19] |
|
262 | + "end_datetime_utc: " + end_datetime_utc.isoformat()[0:19]) |
|
263 | ||
264 | except Exception as e: |
|
265 | error_string = "Error in step 4.2 of store_energy_input_item.worker " + str(e) |
|
266 | if cursor_energy_db: |
|
267 | cursor_energy_db.close() |
|
268 | if cnx_energy_db: |
|
269 | cnx_energy_db.close() |
|
270 | print(error_string) |
|
271 | return error_string |
|
272 | ||
273 | #################################################################################################################### |
|
274 | # Step 5: for each meter in list, get energy input data from energy database |
|
275 | #################################################################################################################### |
|
276 | energy_meter_hourly = dict() |
|
277 | try: |
|
278 | if meter_list is not None and len(meter_list) > 0: |
|
279 | for meter in meter_list: |
|
280 | meter_id = str(meter['id']) |
|
281 | ||
282 | query = (" SELECT start_datetime_utc, actual_value " |
|
283 | " FROM tbl_meter_hourly " |
|
284 | " WHERE meter_id = %s " |
|
285 | " AND start_datetime_utc >= %s " |
|
286 | " AND start_datetime_utc < %s " |
|
287 | " ORDER BY start_datetime_utc ") |
|
288 | cursor_energy_db.execute(query, (meter_id, start_datetime_utc, end_datetime_utc,)) |
|
289 | rows_energy_values = cursor_energy_db.fetchall() |
|
290 | if rows_energy_values is None or len(rows_energy_values) == 0: |
|
291 | energy_meter_hourly[meter_id] = None |
|
292 | else: |
|
293 | energy_meter_hourly[meter_id] = dict() |
|
294 | for row_energy_value in rows_energy_values: |
|
295 | energy_meter_hourly[meter_id][row_energy_value[0]] = row_energy_value[1] |
|
296 | except Exception as e: |
|
297 | error_string = "Error in step 5.1 of store_energy_input_item.worker " + str(e) |
|
298 | if cursor_energy_db: |
|
299 | cursor_energy_db.close() |
|
300 | if cnx_energy_db: |
|
301 | cnx_energy_db.close() |
|
302 | print(error_string) |
|
303 | return error_string |
|
304 | ||
305 | #################################################################################################################### |
|
306 | # Step 6: for each virtual meter in list, get energy input data from energy database |
|
307 | #################################################################################################################### |
|
308 | energy_virtual_meter_hourly = dict() |
|
309 | if virtual_meter_list is not None and len(virtual_meter_list) > 0: |
|
310 | try: |
|
311 | for virtual_meter in virtual_meter_list: |
|
312 | virtual_meter_id = str(virtual_meter['id']) |
|
313 | ||
314 | query = (" SELECT start_datetime_utc, actual_value " |
|
315 | " FROM tbl_virtual_meter_hourly " |
|
316 | " WHERE virtual_meter_id = %s " |
|
317 | " AND start_datetime_utc >= %s " |
|
318 | " AND start_datetime_utc < %s " |
|
319 | " ORDER BY start_datetime_utc ") |
|
320 | cursor_energy_db.execute(query, (virtual_meter_id, start_datetime_utc, end_datetime_utc,)) |
|
321 | rows_energy_values = cursor_energy_db.fetchall() |
|
322 | if rows_energy_values is None or len(rows_energy_values) == 0: |
|
323 | energy_virtual_meter_hourly[virtual_meter_id] = None |
|
324 | else: |
|
325 | energy_virtual_meter_hourly[virtual_meter_id] = dict() |
|
326 | for row_energy_value in rows_energy_values: |
|
327 | energy_virtual_meter_hourly[virtual_meter_id][row_energy_value[0]] = row_energy_value[1] |
|
328 | except Exception as e: |
|
329 | error_string = "Error in step 6.1 of store_energy_input_item.worker " + str(e) |
|
330 | if cursor_energy_db: |
|
331 | cursor_energy_db.close() |
|
332 | if cnx_energy_db: |
|
333 | cnx_energy_db.close() |
|
334 | print(error_string) |
|
335 | return error_string |
|
336 | ||
337 | #################################################################################################################### |
|
338 | # Step 7: for each offline meter in list, get energy input data from energy database |
|
339 | #################################################################################################################### |
|
340 | energy_offline_meter_hourly = dict() |
|
341 | if offline_meter_list is not None and len(offline_meter_list) > 0: |
|
342 | try: |
|
343 | for offline_meter in offline_meter_list: |
|
344 | offline_meter_id = str(offline_meter['id']) |
|
345 | ||
346 | query = (" SELECT start_datetime_utc, actual_value " |
|
347 | " FROM tbl_offline_meter_hourly " |
|
348 | " WHERE offline_meter_id = %s " |
|
349 | " AND start_datetime_utc >= %s " |
|
350 | " AND start_datetime_utc < %s " |
|
351 | " ORDER BY start_datetime_utc ") |
|
352 | cursor_energy_db.execute(query, (offline_meter_id, start_datetime_utc, end_datetime_utc,)) |
|
353 | rows_energy_values = cursor_energy_db.fetchall() |
|
354 | if rows_energy_values is None or len(rows_energy_values) == 0: |
|
355 | energy_offline_meter_hourly[offline_meter_id] = None |
|
356 | else: |
|
357 | energy_offline_meter_hourly[offline_meter_id] = dict() |
|
358 | for row_energy_value in rows_energy_values: |
|
359 | energy_offline_meter_hourly[offline_meter_id][row_energy_value[0]] = row_energy_value[1] |
|
360 | ||
361 | except Exception as e: |
|
362 | error_string = "Error in step 7.1 of store_energy_input_item.worker " + str(e) |
|
363 | if cursor_energy_db: |
|
364 | cursor_energy_db.close() |
|
365 | if cnx_energy_db: |
|
366 | cnx_energy_db.close() |
|
367 | print(error_string) |
|
368 | return error_string |
|
369 | ||
370 | #################################################################################################################### |
|
371 | # Step 8: determine common time slot to aggregate |
|
372 | #################################################################################################################### |
|
373 | ||
374 | common_start_datetime_utc = start_datetime_utc |
|
375 | common_end_datetime_utc = end_datetime_utc |
|
376 | ||
377 | print("Getting common time slot of energy values for all meters") |
|
378 | if energy_meter_hourly is not None and len(energy_meter_hourly) > 0: |
|
379 | for meter_id, energy_hourly in energy_meter_hourly.items(): |
|
380 | if energy_hourly is None or len(energy_hourly) == 0: |
|
381 | common_start_datetime_utc = None |
|
382 | common_end_datetime_utc = None |
|
383 | break |
|
384 | else: |
|
385 | if common_start_datetime_utc < min(energy_hourly.keys()): |
|
386 | common_start_datetime_utc = min(energy_hourly.keys()) |
|
387 | if common_end_datetime_utc > max(energy_hourly.keys()): |
|
388 | common_end_datetime_utc = max(energy_hourly.keys()) |
|
389 | ||
390 | print("Getting common time slot of energy values for all virtual meters") |
|
391 | if common_start_datetime_utc is not None and common_start_datetime_utc is not None: |
|
392 | if energy_virtual_meter_hourly is not None and len(energy_virtual_meter_hourly) > 0: |
|
393 | for meter_id, energy_hourly in energy_virtual_meter_hourly.items(): |
|
394 | if energy_hourly is None or len(energy_hourly) == 0: |
|
395 | common_start_datetime_utc = None |
|
396 | common_end_datetime_utc = None |
|
397 | break |
|
398 | else: |
|
399 | if common_start_datetime_utc < min(energy_hourly.keys()): |
|
400 | common_start_datetime_utc = min(energy_hourly.keys()) |
|
401 | if common_end_datetime_utc > max(energy_hourly.keys()): |
|
402 | common_end_datetime_utc = max(energy_hourly.keys()) |
|
403 | ||
404 | print("Getting common time slot of energy values for all offline meters") |
|
405 | if common_start_datetime_utc is not None and common_start_datetime_utc is not None: |
|
406 | if energy_offline_meter_hourly is not None and len(energy_offline_meter_hourly) > 0: |
|
407 | for meter_id, energy_hourly in energy_offline_meter_hourly.items(): |
|
408 | if energy_hourly is None or len(energy_hourly) == 0: |
|
409 | common_start_datetime_utc = None |
|
410 | common_end_datetime_utc = None |
|
411 | break |
|
412 | else: |
|
413 | if common_start_datetime_utc < min(energy_hourly.keys()): |
|
414 | common_start_datetime_utc = min(energy_hourly.keys()) |
|
415 | if common_end_datetime_utc > max(energy_hourly.keys()): |
|
416 | common_end_datetime_utc = max(energy_hourly.keys()) |
|
417 | ||
418 | if (energy_meter_hourly is None or len(energy_meter_hourly) == 0) and \ |
|
419 | (energy_virtual_meter_hourly is None or len(energy_virtual_meter_hourly) == 0) and \ |
|
420 | (energy_offline_meter_hourly is None or len(energy_offline_meter_hourly) == 0): |
|
421 | # There isn't any energy data |
|
422 | print("There isn't any energy data") |
|
423 | # continue the for store loop to the next store |
|
424 | print("continue the for store loop to the next store") |
|
425 | if cursor_energy_db: |
|
426 | cursor_energy_db.close() |
|
427 | if cnx_energy_db: |
|
428 | cnx_energy_db.close() |
|
429 | return None |
|
430 | ||
431 | print("common_start_datetime_utc: " + str(common_start_datetime_utc)) |
|
432 | print("common_end_datetime_utc: " + str(common_end_datetime_utc)) |
|
433 | ||
434 | #################################################################################################################### |
|
435 | # Step 9: aggregate energy data in the common time slot by energy items and hourly |
|
436 | #################################################################################################################### |
|
437 | ||
438 | print("Step 9: aggregate energy data in the common time slot by energy items and hourly") |
|
439 | aggregated_values = list() |
|
440 | try: |
|
441 | current_datetime_utc = common_start_datetime_utc |
|
442 | while common_start_datetime_utc is not None \ |
|
443 | and common_end_datetime_utc is not None \ |
|
444 | and current_datetime_utc <= common_end_datetime_utc: |
|
445 | aggregated_value = dict() |
|
446 | aggregated_value['start_datetime_utc'] = current_datetime_utc |
|
447 | aggregated_value['meta_data'] = dict() |
|
448 | ||
449 | if meter_list is not None and len(meter_list) > 0: |
|
450 | for meter in meter_list: |
|
451 | meter_id = str(meter['id']) |
|
452 | energy_item_id = meter['energy_item_id'] |
|
453 | actual_value = energy_meter_hourly[meter_id].get(current_datetime_utc, Decimal(0.0)) |
|
454 | aggregated_value['meta_data'][energy_item_id] = \ |
|
455 | aggregated_value['meta_data'].get(energy_item_id, Decimal(0.0)) + actual_value |
|
456 | ||
457 | if virtual_meter_list is not None and len(virtual_meter_list) > 0: |
|
458 | for virtual_meter in virtual_meter_list: |
|
459 | virtual_meter_id = str(virtual_meter['id']) |
|
460 | energy_item_id = virtual_meter['energy_item_id'] |
|
461 | actual_value = energy_virtual_meter_hourly[virtual_meter_id].get(current_datetime_utc, Decimal(0.0)) |
|
462 | aggregated_value['meta_data'][energy_item_id] = \ |
|
463 | aggregated_value['meta_data'].get(energy_item_id, Decimal(0.0)) + actual_value |
|
464 | ||
465 | if offline_meter_list is not None and len(offline_meter_list) > 0: |
|
466 | for offline_meter in offline_meter_list: |
|
467 | offline_meter_id = str(offline_meter['id']) |
|
468 | energy_item_id = offline_meter['energy_item_id'] |
|
469 | actual_value = energy_offline_meter_hourly[offline_meter_id].get(current_datetime_utc, Decimal(0.0)) |
|
470 | aggregated_value['meta_data'][energy_item_id] = \ |
|
471 | aggregated_value['meta_data'].get(energy_item_id, Decimal(0.0)) + actual_value |
|
472 | ||
473 | aggregated_values.append(aggregated_value) |
|
474 | ||
475 | current_datetime_utc += timedelta(minutes=config.minutes_to_count) |
|
476 | ||
477 | except Exception as e: |
|
478 | error_string = "Error in step 9 of store_energy_input_item.worker " + str(e) |
|
479 | if cursor_energy_db: |
|
480 | cursor_energy_db.close() |
|
481 | if cnx_energy_db: |
|
482 | cnx_energy_db.close() |
|
483 | print(error_string) |
|
484 | return error_string |
|
485 | ||
486 | #################################################################################################################### |
|
487 | # Step 10: save energy data to energy database |
|
488 | #################################################################################################################### |
|
489 | print("Step 10: save energy data to energy database") |
|
490 | ||
491 | if len(aggregated_values) > 0: |
|
492 | try: |
|
493 | add_values = (" INSERT INTO tbl_store_input_item_hourly " |
|
494 | " (store_id, " |
|
495 | " energy_item_id, " |
|
496 | " start_datetime_utc, " |
|
497 | " actual_value) " |
|
498 | " VALUES ") |
|
499 | ||
500 | for aggregated_value in aggregated_values: |
|
501 | for energy_item_id, actual_value in aggregated_value['meta_data'].items(): |
|
502 | add_values += " (" + str(store['id']) + "," |
|
503 | add_values += " " + str(energy_item_id) + "," |
|
504 | add_values += "'" + aggregated_value['start_datetime_utc'].isoformat()[0:19] + "'," |
|
505 | add_values += str(actual_value) + "), " |
|
506 | print("add_values:" + add_values) |
|
507 | # trim ", " at the end of string and then execute |
|
508 | cursor_energy_db.execute(add_values[:-2]) |
|
509 | cnx_energy_db.commit() |
|
510 | ||
511 | except Exception as e: |
|
512 | error_string = "Error in step 10.1 of store_energy_input_item.worker " + str(e) |
|
513 | print(error_string) |
|
514 | return error_string |
|
515 | finally: |
|
516 | if cursor_energy_db: |
|
517 | cursor_energy_db.close() |
|
518 | if cnx_energy_db: |
|
519 | cnx_energy_db.close() |
|
520 | else: |
|
521 | if cursor_energy_db: |
|
522 | cursor_energy_db.close() |
|
523 | if cnx_energy_db: |
|
524 | cnx_energy_db.close() |
|
525 |
@@ 106-521 (lines=416) @@ | ||
103 | # NOTE: returns None or the error string because that the logger object cannot be passed in as parameter |
|
104 | ######################################################################################################################## |
|
105 | ||
106 | def worker(tenant): |
|
107 | #################################################################################################################### |
|
108 | # Step 1: get all input meters associated with the tenant |
|
109 | #################################################################################################################### |
|
110 | print("Step 1: get all input meters associated with the tenant " + str(tenant['name'])) |
|
111 | ||
112 | meter_list = list() |
|
113 | cnx_system_db = None |
|
114 | cursor_system_db = None |
|
115 | try: |
|
116 | cnx_system_db = mysql.connector.connect(**config.myems_system_db) |
|
117 | cursor_system_db = cnx_system_db.cursor() |
|
118 | except Exception as e: |
|
119 | error_string = "Error in step 1.1 of tenant_energy_input_category.worker " + str(e) |
|
120 | if cursor_system_db: |
|
121 | cursor_system_db.close() |
|
122 | if cnx_system_db: |
|
123 | cnx_system_db.close() |
|
124 | print(error_string) |
|
125 | return error_string |
|
126 | ||
127 | try: |
|
128 | cursor_system_db.execute(" SELECT m.id, m.name, m.energy_category_id " |
|
129 | " FROM tbl_meters m, tbl_tenants_meters tm " |
|
130 | " WHERE m.id = tm.meter_id " |
|
131 | " AND m.is_counted = true " |
|
132 | " AND tm.tenant_id = %s ", |
|
133 | (tenant['id'],)) |
|
134 | rows_meters = cursor_system_db.fetchall() |
|
135 | ||
136 | if rows_meters is not None and len(rows_meters) > 0: |
|
137 | for row in rows_meters: |
|
138 | meter_list.append({"id": row[0], |
|
139 | "name": row[1], |
|
140 | "energy_category_id": row[2]}) |
|
141 | ||
142 | except Exception as e: |
|
143 | error_string = "Error in step 1.2 of tenant_energy_input_category.worker " + str(e) |
|
144 | if cursor_system_db: |
|
145 | cursor_system_db.close() |
|
146 | if cnx_system_db: |
|
147 | cnx_system_db.close() |
|
148 | print(error_string) |
|
149 | return error_string |
|
150 | ||
151 | #################################################################################################################### |
|
152 | # Step 2: get all input virtual meters associated with the tenant |
|
153 | #################################################################################################################### |
|
154 | print("Step 2: get all input virtual meters associated with the tenant") |
|
155 | virtual_meter_list = list() |
|
156 | ||
157 | try: |
|
158 | cursor_system_db.execute(" SELECT m.id, m.name, m.energy_category_id " |
|
159 | " FROM tbl_virtual_meters m, tbl_tenants_virtual_meters tm " |
|
160 | " WHERE m.id = tm.virtual_meter_id " |
|
161 | " AND m.is_counted = true " |
|
162 | " AND tm.tenant_id = %s ", |
|
163 | (tenant['id'],)) |
|
164 | rows_virtual_meters = cursor_system_db.fetchall() |
|
165 | ||
166 | if rows_virtual_meters is not None and len(rows_virtual_meters) > 0: |
|
167 | for row in rows_virtual_meters: |
|
168 | virtual_meter_list.append({"id": row[0], |
|
169 | "name": row[1], |
|
170 | "energy_category_id": row[2]}) |
|
171 | ||
172 | except Exception as e: |
|
173 | error_string = "Error in step 2.1 of tenant_energy_input_category.worker " + str(e) |
|
174 | if cursor_system_db: |
|
175 | cursor_system_db.close() |
|
176 | if cnx_system_db: |
|
177 | cnx_system_db.close() |
|
178 | print(error_string) |
|
179 | return error_string |
|
180 | ||
181 | #################################################################################################################### |
|
182 | # Step 3: get all input offline meters associated with the tenant |
|
183 | #################################################################################################################### |
|
184 | print("Step 3: get all input offline meters associated with the tenant") |
|
185 | ||
186 | offline_meter_list = list() |
|
187 | ||
188 | try: |
|
189 | cursor_system_db.execute(" SELECT m.id, m.name, m.energy_category_id " |
|
190 | " FROM tbl_offline_meters m, tbl_tenants_offline_meters tm " |
|
191 | " WHERE m.id = tm.offline_meter_id " |
|
192 | " AND m.is_counted = true " |
|
193 | " AND tm.tenant_id = %s ", |
|
194 | (tenant['id'],)) |
|
195 | rows_offline_meters = cursor_system_db.fetchall() |
|
196 | ||
197 | if rows_offline_meters is not None and len(rows_offline_meters) > 0: |
|
198 | for row in rows_offline_meters: |
|
199 | offline_meter_list.append({"id": row[0], |
|
200 | "name": row[1], |
|
201 | "energy_category_id": row[2]}) |
|
202 | ||
203 | except Exception as e: |
|
204 | error_string = "Error in step 3.1 of tenant_energy_input_category.worker " + str(e) |
|
205 | print(error_string) |
|
206 | return error_string |
|
207 | finally: |
|
208 | if cursor_system_db: |
|
209 | cursor_system_db.close() |
|
210 | if cnx_system_db: |
|
211 | cnx_system_db.close() |
|
212 | ||
213 | #################################################################################################################### |
|
214 | # stop to the next tenant if this tenant is empty |
|
215 | #################################################################################################################### |
|
216 | if (meter_list is None or len(meter_list) == 0) and \ |
|
217 | (virtual_meter_list is None or len(virtual_meter_list) == 0) and \ |
|
218 | (offline_meter_list is None or len(offline_meter_list) == 0): |
|
219 | print("This is an empty tenant ") |
|
220 | return None |
|
221 | ||
222 | #################################################################################################################### |
|
223 | # Step 4: determine start datetime and end datetime to aggregate |
|
224 | #################################################################################################################### |
|
225 | print("Step 4: determine start datetime and end datetime to aggregate") |
|
226 | cnx_energy_db = None |
|
227 | cursor_energy_db = None |
|
228 | try: |
|
229 | cnx_energy_db = mysql.connector.connect(**config.myems_energy_db) |
|
230 | cursor_energy_db = cnx_energy_db.cursor() |
|
231 | except Exception as e: |
|
232 | error_string = "Error in step 4.1 of tenant_energy_input_category.worker " + str(e) |
|
233 | if cursor_energy_db: |
|
234 | cursor_energy_db.close() |
|
235 | if cnx_energy_db: |
|
236 | cnx_energy_db.close() |
|
237 | print(error_string) |
|
238 | return error_string |
|
239 | ||
240 | try: |
|
241 | query = (" SELECT MAX(start_datetime_utc) " |
|
242 | " FROM tbl_tenant_input_category_hourly " |
|
243 | " WHERE tenant_id = %s ") |
|
244 | cursor_energy_db.execute(query, (tenant['id'],)) |
|
245 | row_datetime = cursor_energy_db.fetchone() |
|
246 | start_datetime_utc = datetime.strptime(config.start_datetime_utc, '%Y-%m-%d %H:%M:%S') |
|
247 | start_datetime_utc = start_datetime_utc.replace(minute=0, second=0, microsecond=0, tzinfo=None) |
|
248 | ||
249 | if row_datetime is not None and len(row_datetime) > 0 and isinstance(row_datetime[0], datetime): |
|
250 | # replace second and microsecond with 0 |
|
251 | # note: do not replace minute in case of calculating in half hourly |
|
252 | start_datetime_utc = row_datetime[0].replace(second=0, microsecond=0, tzinfo=None) |
|
253 | # start from the next time slot |
|
254 | start_datetime_utc += timedelta(minutes=config.minutes_to_count) |
|
255 | ||
256 | end_datetime_utc = datetime.utcnow().replace(second=0, microsecond=0, tzinfo=None) |
|
257 | ||
258 | print("start_datetime_utc: " + start_datetime_utc.isoformat()[0:19] |
|
259 | + "end_datetime_utc: " + end_datetime_utc.isoformat()[0:19]) |
|
260 | ||
261 | except Exception as e: |
|
262 | error_string = "Error in step 4.2 of tenant_energy_input_category.worker " + str(e) |
|
263 | if cursor_energy_db: |
|
264 | cursor_energy_db.close() |
|
265 | if cnx_energy_db: |
|
266 | cnx_energy_db.close() |
|
267 | print(error_string) |
|
268 | return error_string |
|
269 | ||
270 | #################################################################################################################### |
|
271 | # Step 5: for each meter in list, get energy input data from energy database |
|
272 | #################################################################################################################### |
|
273 | energy_meter_hourly = dict() |
|
274 | try: |
|
275 | if meter_list is not None and len(meter_list) > 0: |
|
276 | for meter in meter_list: |
|
277 | meter_id = str(meter['id']) |
|
278 | ||
279 | query = (" SELECT start_datetime_utc, actual_value " |
|
280 | " FROM tbl_meter_hourly " |
|
281 | " WHERE meter_id = %s " |
|
282 | " AND start_datetime_utc >= %s " |
|
283 | " AND start_datetime_utc < %s " |
|
284 | " ORDER BY start_datetime_utc ") |
|
285 | cursor_energy_db.execute(query, (meter_id, start_datetime_utc, end_datetime_utc,)) |
|
286 | rows_energy_values = cursor_energy_db.fetchall() |
|
287 | if rows_energy_values is None or len(rows_energy_values) == 0: |
|
288 | energy_meter_hourly[meter_id] = None |
|
289 | else: |
|
290 | energy_meter_hourly[meter_id] = dict() |
|
291 | for row_energy_value in rows_energy_values: |
|
292 | energy_meter_hourly[meter_id][row_energy_value[0]] = row_energy_value[1] |
|
293 | except Exception as e: |
|
294 | error_string = "Error in step 5.1 of tenant_energy_input_category.worker " + str(e) |
|
295 | if cursor_energy_db: |
|
296 | cursor_energy_db.close() |
|
297 | if cnx_energy_db: |
|
298 | cnx_energy_db.close() |
|
299 | print(error_string) |
|
300 | return error_string |
|
301 | ||
302 | #################################################################################################################### |
|
303 | # Step 6: for each virtual meter in list, get energy input data from energy database |
|
304 | #################################################################################################################### |
|
305 | energy_virtual_meter_hourly = dict() |
|
306 | if virtual_meter_list is not None and len(virtual_meter_list) > 0: |
|
307 | try: |
|
308 | for virtual_meter in virtual_meter_list: |
|
309 | virtual_meter_id = str(virtual_meter['id']) |
|
310 | ||
311 | query = (" SELECT start_datetime_utc, actual_value " |
|
312 | " FROM tbl_virtual_meter_hourly " |
|
313 | " WHERE virtual_meter_id = %s " |
|
314 | " AND start_datetime_utc >= %s " |
|
315 | " AND start_datetime_utc < %s " |
|
316 | " ORDER BY start_datetime_utc ") |
|
317 | cursor_energy_db.execute(query, (virtual_meter_id, start_datetime_utc, end_datetime_utc,)) |
|
318 | rows_energy_values = cursor_energy_db.fetchall() |
|
319 | if rows_energy_values is None or len(rows_energy_values) == 0: |
|
320 | energy_virtual_meter_hourly[virtual_meter_id] = None |
|
321 | else: |
|
322 | energy_virtual_meter_hourly[virtual_meter_id] = dict() |
|
323 | for row_energy_value in rows_energy_values: |
|
324 | energy_virtual_meter_hourly[virtual_meter_id][row_energy_value[0]] = row_energy_value[1] |
|
325 | except Exception as e: |
|
326 | error_string = "Error in step 6.1 of tenant_energy_input_category.worker " + str(e) |
|
327 | if cursor_energy_db: |
|
328 | cursor_energy_db.close() |
|
329 | if cnx_energy_db: |
|
330 | cnx_energy_db.close() |
|
331 | print(error_string) |
|
332 | return error_string |
|
333 | ||
334 | #################################################################################################################### |
|
335 | # Step 7: for each offline meter in list, get energy input data from energy database |
|
336 | #################################################################################################################### |
|
337 | energy_offline_meter_hourly = dict() |
|
338 | if offline_meter_list is not None and len(offline_meter_list) > 0: |
|
339 | try: |
|
340 | for offline_meter in offline_meter_list: |
|
341 | offline_meter_id = str(offline_meter['id']) |
|
342 | ||
343 | query = (" SELECT start_datetime_utc, actual_value " |
|
344 | " FROM tbl_offline_meter_hourly " |
|
345 | " WHERE offline_meter_id = %s " |
|
346 | " AND start_datetime_utc >= %s " |
|
347 | " AND start_datetime_utc < %s " |
|
348 | " ORDER BY start_datetime_utc ") |
|
349 | cursor_energy_db.execute(query, (offline_meter_id, start_datetime_utc, end_datetime_utc,)) |
|
350 | rows_energy_values = cursor_energy_db.fetchall() |
|
351 | if rows_energy_values is None or len(rows_energy_values) == 0: |
|
352 | energy_offline_meter_hourly[offline_meter_id] = None |
|
353 | else: |
|
354 | energy_offline_meter_hourly[offline_meter_id] = dict() |
|
355 | for row_energy_value in rows_energy_values: |
|
356 | energy_offline_meter_hourly[offline_meter_id][row_energy_value[0]] = row_energy_value[1] |
|
357 | ||
358 | except Exception as e: |
|
359 | error_string = "Error in step 7.1 of tenant_energy_input_category.worker " + str(e) |
|
360 | if cursor_energy_db: |
|
361 | cursor_energy_db.close() |
|
362 | if cnx_energy_db: |
|
363 | cnx_energy_db.close() |
|
364 | print(error_string) |
|
365 | return error_string |
|
366 | ||
367 | #################################################################################################################### |
|
368 | # Step 8: determine common time slot to aggregate |
|
369 | #################################################################################################################### |
|
370 | ||
371 | common_start_datetime_utc = start_datetime_utc |
|
372 | common_end_datetime_utc = end_datetime_utc |
|
373 | ||
374 | print("Getting common time slot of energy values for all meters") |
|
375 | if energy_meter_hourly is not None and len(energy_meter_hourly) > 0: |
|
376 | for meter_id, energy_hourly in energy_meter_hourly.items(): |
|
377 | if energy_hourly is None or len(energy_hourly) == 0: |
|
378 | common_start_datetime_utc = None |
|
379 | common_end_datetime_utc = None |
|
380 | break |
|
381 | else: |
|
382 | if common_start_datetime_utc < min(energy_hourly.keys()): |
|
383 | common_start_datetime_utc = min(energy_hourly.keys()) |
|
384 | if common_end_datetime_utc > max(energy_hourly.keys()): |
|
385 | common_end_datetime_utc = max(energy_hourly.keys()) |
|
386 | ||
387 | print("Getting common time slot of energy values for all virtual meters") |
|
388 | if common_start_datetime_utc is not None and common_start_datetime_utc is not None: |
|
389 | if energy_virtual_meter_hourly is not None and len(energy_virtual_meter_hourly) > 0: |
|
390 | for meter_id, energy_hourly in energy_virtual_meter_hourly.items(): |
|
391 | if energy_hourly is None or len(energy_hourly) == 0: |
|
392 | common_start_datetime_utc = None |
|
393 | common_end_datetime_utc = None |
|
394 | break |
|
395 | else: |
|
396 | if common_start_datetime_utc < min(energy_hourly.keys()): |
|
397 | common_start_datetime_utc = min(energy_hourly.keys()) |
|
398 | if common_end_datetime_utc > max(energy_hourly.keys()): |
|
399 | common_end_datetime_utc = max(energy_hourly.keys()) |
|
400 | ||
401 | print("Getting common time slot of energy values for all offline meters") |
|
402 | if common_start_datetime_utc is not None and common_start_datetime_utc is not None: |
|
403 | if energy_offline_meter_hourly is not None and len(energy_offline_meter_hourly) > 0: |
|
404 | for meter_id, energy_hourly in energy_offline_meter_hourly.items(): |
|
405 | if energy_hourly is None or len(energy_hourly) == 0: |
|
406 | common_start_datetime_utc = None |
|
407 | common_end_datetime_utc = None |
|
408 | break |
|
409 | else: |
|
410 | if common_start_datetime_utc < min(energy_hourly.keys()): |
|
411 | common_start_datetime_utc = min(energy_hourly.keys()) |
|
412 | if common_end_datetime_utc > max(energy_hourly.keys()): |
|
413 | common_end_datetime_utc = max(energy_hourly.keys()) |
|
414 | ||
415 | if (energy_meter_hourly is None or len(energy_meter_hourly) == 0) and \ |
|
416 | (energy_virtual_meter_hourly is None or len(energy_virtual_meter_hourly) == 0) and \ |
|
417 | (energy_offline_meter_hourly is None or len(energy_offline_meter_hourly) == 0): |
|
418 | # There isn't any energy data |
|
419 | print("There isn't any energy data") |
|
420 | # continue the for tenant loop to the next tenant |
|
421 | print("continue the for tenant loop to the next tenant") |
|
422 | if cursor_energy_db: |
|
423 | cursor_energy_db.close() |
|
424 | if cnx_energy_db: |
|
425 | cnx_energy_db.close() |
|
426 | return None |
|
427 | ||
428 | print("common_start_datetime_utc: " + str(common_start_datetime_utc)) |
|
429 | print("common_end_datetime_utc: " + str(common_end_datetime_utc)) |
|
430 | ||
431 | #################################################################################################################### |
|
432 | # Step 9: aggregate energy data in the common time slot by energy categories and hourly |
|
433 | #################################################################################################################### |
|
434 | ||
435 | print("Step 9: aggregate energy data in the common time slot by energy categories and hourly") |
|
436 | aggregated_values = list() |
|
437 | try: |
|
438 | current_datetime_utc = common_start_datetime_utc |
|
439 | while common_start_datetime_utc is not None \ |
|
440 | and common_end_datetime_utc is not None \ |
|
441 | and current_datetime_utc <= common_end_datetime_utc: |
|
442 | aggregated_value = dict() |
|
443 | aggregated_value['start_datetime_utc'] = current_datetime_utc |
|
444 | aggregated_value['meta_data'] = dict() |
|
445 | ||
446 | if meter_list is not None and len(meter_list) > 0: |
|
447 | for meter in meter_list: |
|
448 | meter_id = str(meter['id']) |
|
449 | energy_category_id = meter['energy_category_id'] |
|
450 | actual_value = energy_meter_hourly[meter_id].get(current_datetime_utc, Decimal(0.0)) |
|
451 | aggregated_value['meta_data'][energy_category_id] = \ |
|
452 | aggregated_value['meta_data'].get(energy_category_id, Decimal(0.0)) + actual_value |
|
453 | ||
454 | if virtual_meter_list is not None and len(virtual_meter_list) > 0: |
|
455 | for virtual_meter in virtual_meter_list: |
|
456 | virtual_meter_id = str(virtual_meter['id']) |
|
457 | energy_category_id = virtual_meter['energy_category_id'] |
|
458 | actual_value = energy_virtual_meter_hourly[virtual_meter_id].get(current_datetime_utc, Decimal(0.0)) |
|
459 | aggregated_value['meta_data'][energy_category_id] = \ |
|
460 | aggregated_value['meta_data'].get(energy_category_id, Decimal(0.0)) + actual_value |
|
461 | ||
462 | if offline_meter_list is not None and len(offline_meter_list) > 0: |
|
463 | for offline_meter in offline_meter_list: |
|
464 | offline_meter_id = str(offline_meter['id']) |
|
465 | energy_category_id = offline_meter['energy_category_id'] |
|
466 | actual_value = energy_offline_meter_hourly[offline_meter_id].get(current_datetime_utc, Decimal(0.0)) |
|
467 | aggregated_value['meta_data'][energy_category_id] = \ |
|
468 | aggregated_value['meta_data'].get(energy_category_id, Decimal(0.0)) + actual_value |
|
469 | ||
470 | aggregated_values.append(aggregated_value) |
|
471 | ||
472 | current_datetime_utc += timedelta(minutes=config.minutes_to_count) |
|
473 | ||
474 | except Exception as e: |
|
475 | error_string = "Error in step 9 of tenant_energy_input_category.worker " + str(e) |
|
476 | if cursor_energy_db: |
|
477 | cursor_energy_db.close() |
|
478 | if cnx_energy_db: |
|
479 | cnx_energy_db.close() |
|
480 | print(error_string) |
|
481 | return error_string |
|
482 | ||
483 | #################################################################################################################### |
|
484 | # Step 10: save energy data to energy database |
|
485 | #################################################################################################################### |
|
486 | print("Step 10: save energy data to energy database") |
|
487 | ||
488 | if len(aggregated_values) > 0: |
|
489 | try: |
|
490 | add_values = (" INSERT INTO tbl_tenant_input_category_hourly " |
|
491 | " (tenant_id, " |
|
492 | " energy_category_id, " |
|
493 | " start_datetime_utc, " |
|
494 | " actual_value) " |
|
495 | " VALUES ") |
|
496 | ||
497 | for aggregated_value in aggregated_values: |
|
498 | for energy_category_id, actual_value in aggregated_value['meta_data'].items(): |
|
499 | add_values += " (" + str(tenant['id']) + "," |
|
500 | add_values += " " + str(energy_category_id) + "," |
|
501 | add_values += "'" + aggregated_value['start_datetime_utc'].isoformat()[0:19] + "'," |
|
502 | add_values += str(actual_value) + "), " |
|
503 | print("add_values:" + add_values) |
|
504 | # trim ", " at the end of string and then execute |
|
505 | cursor_energy_db.execute(add_values[:-2]) |
|
506 | cnx_energy_db.commit() |
|
507 | ||
508 | except Exception as e: |
|
509 | error_string = "Error in step 10.1 of tenant_energy_input_category.worker " + str(e) |
|
510 | print(error_string) |
|
511 | return error_string |
|
512 | finally: |
|
513 | if cursor_energy_db: |
|
514 | cursor_energy_db.close() |
|
515 | if cnx_energy_db: |
|
516 | cnx_energy_db.close() |
|
517 | else: |
|
518 | if cursor_energy_db: |
|
519 | cursor_energy_db.close() |
|
520 | if cnx_energy_db: |
|
521 | cnx_energy_db.close() |
|
522 |
@@ 106-521 (lines=416) @@ | ||
103 | # NOTE: returns None or the error string because that the logger object cannot be passed in as parameter |
|
104 | ######################################################################################################################## |
|
105 | ||
106 | def worker(store): |
|
107 | #################################################################################################################### |
|
108 | # Step 1: get all input meters associated with the store |
|
109 | #################################################################################################################### |
|
110 | print("Step 1: get all input meters associated with the store " + str(store['name'])) |
|
111 | ||
112 | meter_list = list() |
|
113 | cnx_system_db = None |
|
114 | cursor_system_db = None |
|
115 | try: |
|
116 | cnx_system_db = mysql.connector.connect(**config.myems_system_db) |
|
117 | cursor_system_db = cnx_system_db.cursor() |
|
118 | except Exception as e: |
|
119 | error_string = "Error in step 1.1 of store_energy_input_category.worker " + str(e) |
|
120 | if cursor_system_db: |
|
121 | cursor_system_db.close() |
|
122 | if cnx_system_db: |
|
123 | cnx_system_db.close() |
|
124 | print(error_string) |
|
125 | return error_string |
|
126 | ||
127 | try: |
|
128 | cursor_system_db.execute(" SELECT m.id, m.name, m.energy_category_id " |
|
129 | " FROM tbl_meters m, tbl_stores_meters sm " |
|
130 | " WHERE m.id = sm.meter_id " |
|
131 | " AND m.is_counted = true " |
|
132 | " AND sm.store_id = %s ", |
|
133 | (store['id'],)) |
|
134 | rows_meters = cursor_system_db.fetchall() |
|
135 | ||
136 | if rows_meters is not None and len(rows_meters) > 0: |
|
137 | for row in rows_meters: |
|
138 | meter_list.append({"id": row[0], |
|
139 | "name": row[1], |
|
140 | "energy_category_id": row[2]}) |
|
141 | ||
142 | except Exception as e: |
|
143 | error_string = "Error in step 1.2 of store_energy_input_category.worker " + str(e) |
|
144 | if cursor_system_db: |
|
145 | cursor_system_db.close() |
|
146 | if cnx_system_db: |
|
147 | cnx_system_db.close() |
|
148 | print(error_string) |
|
149 | return error_string |
|
150 | ||
151 | #################################################################################################################### |
|
152 | # Step 2: get all input virtual meters associated with the store |
|
153 | #################################################################################################################### |
|
154 | print("Step 2: get all input virtual meters associated with the store") |
|
155 | virtual_meter_list = list() |
|
156 | ||
157 | try: |
|
158 | cursor_system_db.execute(" SELECT m.id, m.name, m.energy_category_id " |
|
159 | " FROM tbl_virtual_meters m, tbl_stores_virtual_meters sm " |
|
160 | " WHERE m.id = sm.virtual_meter_id " |
|
161 | " AND m.is_counted = true " |
|
162 | " AND sm.store_id = %s ", |
|
163 | (store['id'],)) |
|
164 | rows_virtual_meters = cursor_system_db.fetchall() |
|
165 | ||
166 | if rows_virtual_meters is not None and len(rows_virtual_meters) > 0: |
|
167 | for row in rows_virtual_meters: |
|
168 | virtual_meter_list.append({"id": row[0], |
|
169 | "name": row[1], |
|
170 | "energy_category_id": row[2]}) |
|
171 | ||
172 | except Exception as e: |
|
173 | error_string = "Error in step 2.1 of store_energy_input_category.worker " + str(e) |
|
174 | if cursor_system_db: |
|
175 | cursor_system_db.close() |
|
176 | if cnx_system_db: |
|
177 | cnx_system_db.close() |
|
178 | print(error_string) |
|
179 | return error_string |
|
180 | ||
181 | #################################################################################################################### |
|
182 | # Step 3: get all input offline meters associated with the store |
|
183 | #################################################################################################################### |
|
184 | print("Step 3: get all input offline meters associated with the store") |
|
185 | ||
186 | offline_meter_list = list() |
|
187 | ||
188 | try: |
|
189 | cursor_system_db.execute(" SELECT m.id, m.name, m.energy_category_id " |
|
190 | " FROM tbl_offline_meters m, tbl_stores_offline_meters sm " |
|
191 | " WHERE m.id = sm.offline_meter_id " |
|
192 | " AND m.is_counted = true " |
|
193 | " AND sm.store_id = %s ", |
|
194 | (store['id'],)) |
|
195 | rows_offline_meters = cursor_system_db.fetchall() |
|
196 | ||
197 | if rows_offline_meters is not None and len(rows_offline_meters) > 0: |
|
198 | for row in rows_offline_meters: |
|
199 | offline_meter_list.append({"id": row[0], |
|
200 | "name": row[1], |
|
201 | "energy_category_id": row[2]}) |
|
202 | ||
203 | except Exception as e: |
|
204 | error_string = "Error in step 3.1 of store_energy_input_category.worker " + str(e) |
|
205 | print(error_string) |
|
206 | return error_string |
|
207 | finally: |
|
208 | if cursor_system_db: |
|
209 | cursor_system_db.close() |
|
210 | if cnx_system_db: |
|
211 | cnx_system_db.close() |
|
212 | ||
213 | #################################################################################################################### |
|
214 | # stop to the next store if this store is empty |
|
215 | #################################################################################################################### |
|
216 | if (meter_list is None or len(meter_list) == 0) and \ |
|
217 | (virtual_meter_list is None or len(virtual_meter_list) == 0) and \ |
|
218 | (offline_meter_list is None or len(offline_meter_list) == 0): |
|
219 | print("This is an empty store ") |
|
220 | return None |
|
221 | ||
222 | #################################################################################################################### |
|
223 | # Step 4: determine start datetime and end datetime to aggregate |
|
224 | #################################################################################################################### |
|
225 | print("Step 4: determine start datetime and end datetime to aggregate") |
|
226 | cnx_energy_db = None |
|
227 | cursor_energy_db = None |
|
228 | try: |
|
229 | cnx_energy_db = mysql.connector.connect(**config.myems_energy_db) |
|
230 | cursor_energy_db = cnx_energy_db.cursor() |
|
231 | except Exception as e: |
|
232 | error_string = "Error in step 4.1 of store_energy_input_category.worker " + str(e) |
|
233 | if cursor_energy_db: |
|
234 | cursor_energy_db.close() |
|
235 | if cnx_energy_db: |
|
236 | cnx_energy_db.close() |
|
237 | print(error_string) |
|
238 | return error_string |
|
239 | ||
240 | try: |
|
241 | query = (" SELECT MAX(start_datetime_utc) " |
|
242 | " FROM tbl_store_input_category_hourly " |
|
243 | " WHERE store_id = %s ") |
|
244 | cursor_energy_db.execute(query, (store['id'],)) |
|
245 | row_datetime = cursor_energy_db.fetchone() |
|
246 | start_datetime_utc = datetime.strptime(config.start_datetime_utc, '%Y-%m-%d %H:%M:%S') |
|
247 | start_datetime_utc = start_datetime_utc.replace(minute=0, second=0, microsecond=0, tzinfo=None) |
|
248 | ||
249 | if row_datetime is not None and len(row_datetime) > 0 and isinstance(row_datetime[0], datetime): |
|
250 | # replace second and microsecond with 0 |
|
251 | # note: do not replace minute in case of calculating in half hourly |
|
252 | start_datetime_utc = row_datetime[0].replace(second=0, microsecond=0, tzinfo=None) |
|
253 | # start from the next time slot |
|
254 | start_datetime_utc += timedelta(minutes=config.minutes_to_count) |
|
255 | ||
256 | end_datetime_utc = datetime.utcnow().replace(second=0, microsecond=0, tzinfo=None) |
|
257 | ||
258 | print("start_datetime_utc: " + start_datetime_utc.isoformat()[0:19] |
|
259 | + "end_datetime_utc: " + end_datetime_utc.isoformat()[0:19]) |
|
260 | ||
261 | except Exception as e: |
|
262 | error_string = "Error in step 4.2 of store_energy_input_category.worker " + str(e) |
|
263 | if cursor_energy_db: |
|
264 | cursor_energy_db.close() |
|
265 | if cnx_energy_db: |
|
266 | cnx_energy_db.close() |
|
267 | print(error_string) |
|
268 | return error_string |
|
269 | ||
270 | #################################################################################################################### |
|
271 | # Step 5: for each meter in list, get energy input data from energy database |
|
272 | #################################################################################################################### |
|
273 | energy_meter_hourly = dict() |
|
274 | try: |
|
275 | if meter_list is not None and len(meter_list) > 0: |
|
276 | for meter in meter_list: |
|
277 | meter_id = str(meter['id']) |
|
278 | ||
279 | query = (" SELECT start_datetime_utc, actual_value " |
|
280 | " FROM tbl_meter_hourly " |
|
281 | " WHERE meter_id = %s " |
|
282 | " AND start_datetime_utc >= %s " |
|
283 | " AND start_datetime_utc < %s " |
|
284 | " ORDER BY start_datetime_utc ") |
|
285 | cursor_energy_db.execute(query, (meter_id, start_datetime_utc, end_datetime_utc,)) |
|
286 | rows_energy_values = cursor_energy_db.fetchall() |
|
287 | if rows_energy_values is None or len(rows_energy_values) == 0: |
|
288 | energy_meter_hourly[meter_id] = None |
|
289 | else: |
|
290 | energy_meter_hourly[meter_id] = dict() |
|
291 | for row_energy_value in rows_energy_values: |
|
292 | energy_meter_hourly[meter_id][row_energy_value[0]] = row_energy_value[1] |
|
293 | except Exception as e: |
|
294 | error_string = "Error in step 5.1 of store_energy_input_category.worker " + str(e) |
|
295 | if cursor_energy_db: |
|
296 | cursor_energy_db.close() |
|
297 | if cnx_energy_db: |
|
298 | cnx_energy_db.close() |
|
299 | print(error_string) |
|
300 | return error_string |
|
301 | ||
302 | #################################################################################################################### |
|
303 | # Step 6: for each virtual meter in list, get energy input data from energy database |
|
304 | #################################################################################################################### |
|
305 | energy_virtual_meter_hourly = dict() |
|
306 | if virtual_meter_list is not None and len(virtual_meter_list) > 0: |
|
307 | try: |
|
308 | for virtual_meter in virtual_meter_list: |
|
309 | virtual_meter_id = str(virtual_meter['id']) |
|
310 | ||
311 | query = (" SELECT start_datetime_utc, actual_value " |
|
312 | " FROM tbl_virtual_meter_hourly " |
|
313 | " WHERE virtual_meter_id = %s " |
|
314 | " AND start_datetime_utc >= %s " |
|
315 | " AND start_datetime_utc < %s " |
|
316 | " ORDER BY start_datetime_utc ") |
|
317 | cursor_energy_db.execute(query, (virtual_meter_id, start_datetime_utc, end_datetime_utc,)) |
|
318 | rows_energy_values = cursor_energy_db.fetchall() |
|
319 | if rows_energy_values is None or len(rows_energy_values) == 0: |
|
320 | energy_virtual_meter_hourly[virtual_meter_id] = None |
|
321 | else: |
|
322 | energy_virtual_meter_hourly[virtual_meter_id] = dict() |
|
323 | for row_energy_value in rows_energy_values: |
|
324 | energy_virtual_meter_hourly[virtual_meter_id][row_energy_value[0]] = row_energy_value[1] |
|
325 | except Exception as e: |
|
326 | error_string = "Error in step 6.1 of store_energy_input_category.worker " + str(e) |
|
327 | if cursor_energy_db: |
|
328 | cursor_energy_db.close() |
|
329 | if cnx_energy_db: |
|
330 | cnx_energy_db.close() |
|
331 | print(error_string) |
|
332 | return error_string |
|
333 | ||
334 | #################################################################################################################### |
|
335 | # Step 7: for each offline meter in list, get energy input data from energy database |
|
336 | #################################################################################################################### |
|
337 | energy_offline_meter_hourly = dict() |
|
338 | if offline_meter_list is not None and len(offline_meter_list) > 0: |
|
339 | try: |
|
340 | for offline_meter in offline_meter_list: |
|
341 | offline_meter_id = str(offline_meter['id']) |
|
342 | ||
343 | query = (" SELECT start_datetime_utc, actual_value " |
|
344 | " FROM tbl_offline_meter_hourly " |
|
345 | " WHERE offline_meter_id = %s " |
|
346 | " AND start_datetime_utc >= %s " |
|
347 | " AND start_datetime_utc < %s " |
|
348 | " ORDER BY start_datetime_utc ") |
|
349 | cursor_energy_db.execute(query, (offline_meter_id, start_datetime_utc, end_datetime_utc,)) |
|
350 | rows_energy_values = cursor_energy_db.fetchall() |
|
351 | if rows_energy_values is None or len(rows_energy_values) == 0: |
|
352 | energy_offline_meter_hourly[offline_meter_id] = None |
|
353 | else: |
|
354 | energy_offline_meter_hourly[offline_meter_id] = dict() |
|
355 | for row_energy_value in rows_energy_values: |
|
356 | energy_offline_meter_hourly[offline_meter_id][row_energy_value[0]] = row_energy_value[1] |
|
357 | ||
358 | except Exception as e: |
|
359 | error_string = "Error in step 7.1 of store_energy_input_category.worker " + str(e) |
|
360 | if cursor_energy_db: |
|
361 | cursor_energy_db.close() |
|
362 | if cnx_energy_db: |
|
363 | cnx_energy_db.close() |
|
364 | print(error_string) |
|
365 | return error_string |
|
366 | ||
367 | #################################################################################################################### |
|
368 | # Step 8: determine common time slot to aggregate |
|
369 | #################################################################################################################### |
|
370 | ||
371 | common_start_datetime_utc = start_datetime_utc |
|
372 | common_end_datetime_utc = end_datetime_utc |
|
373 | ||
374 | print("Getting common time slot of energy values for all meters") |
|
375 | if energy_meter_hourly is not None and len(energy_meter_hourly) > 0: |
|
376 | for meter_id, energy_hourly in energy_meter_hourly.items(): |
|
377 | if energy_hourly is None or len(energy_hourly) == 0: |
|
378 | common_start_datetime_utc = None |
|
379 | common_end_datetime_utc = None |
|
380 | break |
|
381 | else: |
|
382 | if common_start_datetime_utc < min(energy_hourly.keys()): |
|
383 | common_start_datetime_utc = min(energy_hourly.keys()) |
|
384 | if common_end_datetime_utc > max(energy_hourly.keys()): |
|
385 | common_end_datetime_utc = max(energy_hourly.keys()) |
|
386 | ||
387 | print("Getting common time slot of energy values for all virtual meters") |
|
388 | if common_start_datetime_utc is not None and common_start_datetime_utc is not None: |
|
389 | if energy_virtual_meter_hourly is not None and len(energy_virtual_meter_hourly) > 0: |
|
390 | for meter_id, energy_hourly in energy_virtual_meter_hourly.items(): |
|
391 | if energy_hourly is None or len(energy_hourly) == 0: |
|
392 | common_start_datetime_utc = None |
|
393 | common_end_datetime_utc = None |
|
394 | break |
|
395 | else: |
|
396 | if common_start_datetime_utc < min(energy_hourly.keys()): |
|
397 | common_start_datetime_utc = min(energy_hourly.keys()) |
|
398 | if common_end_datetime_utc > max(energy_hourly.keys()): |
|
399 | common_end_datetime_utc = max(energy_hourly.keys()) |
|
400 | ||
401 | print("Getting common time slot of energy values for all offline meters") |
|
402 | if common_start_datetime_utc is not None and common_start_datetime_utc is not None: |
|
403 | if energy_offline_meter_hourly is not None and len(energy_offline_meter_hourly) > 0: |
|
404 | for meter_id, energy_hourly in energy_offline_meter_hourly.items(): |
|
405 | if energy_hourly is None or len(energy_hourly) == 0: |
|
406 | common_start_datetime_utc = None |
|
407 | common_end_datetime_utc = None |
|
408 | break |
|
409 | else: |
|
410 | if common_start_datetime_utc < min(energy_hourly.keys()): |
|
411 | common_start_datetime_utc = min(energy_hourly.keys()) |
|
412 | if common_end_datetime_utc > max(energy_hourly.keys()): |
|
413 | common_end_datetime_utc = max(energy_hourly.keys()) |
|
414 | ||
415 | if (energy_meter_hourly is None or len(energy_meter_hourly) == 0) and \ |
|
416 | (energy_virtual_meter_hourly is None or len(energy_virtual_meter_hourly) == 0) and \ |
|
417 | (energy_offline_meter_hourly is None or len(energy_offline_meter_hourly) == 0): |
|
418 | # There isn't any energy data |
|
419 | print("There isn't any energy data") |
|
420 | # continue the for store loop to the next store |
|
421 | print("continue the for store loop to the next store") |
|
422 | if cursor_energy_db: |
|
423 | cursor_energy_db.close() |
|
424 | if cnx_energy_db: |
|
425 | cnx_energy_db.close() |
|
426 | return None |
|
427 | ||
428 | print("common_start_datetime_utc: " + str(common_start_datetime_utc)) |
|
429 | print("common_end_datetime_utc: " + str(common_end_datetime_utc)) |
|
430 | ||
431 | #################################################################################################################### |
|
432 | # Step 9: aggregate energy data in the common time slot by energy categories and hourly |
|
433 | #################################################################################################################### |
|
434 | ||
435 | print("Step 9: aggregate energy data in the common time slot by energy categories and hourly") |
|
436 | aggregated_values = list() |
|
437 | try: |
|
438 | current_datetime_utc = common_start_datetime_utc |
|
439 | while common_start_datetime_utc is not None \ |
|
440 | and common_end_datetime_utc is not None \ |
|
441 | and current_datetime_utc <= common_end_datetime_utc: |
|
442 | aggregated_value = dict() |
|
443 | aggregated_value['start_datetime_utc'] = current_datetime_utc |
|
444 | aggregated_value['meta_data'] = dict() |
|
445 | ||
446 | if meter_list is not None and len(meter_list) > 0: |
|
447 | for meter in meter_list: |
|
448 | meter_id = str(meter['id']) |
|
449 | energy_category_id = meter['energy_category_id'] |
|
450 | actual_value = energy_meter_hourly[meter_id].get(current_datetime_utc, Decimal(0.0)) |
|
451 | aggregated_value['meta_data'][energy_category_id] = \ |
|
452 | aggregated_value['meta_data'].get(energy_category_id, Decimal(0.0)) + actual_value |
|
453 | ||
454 | if virtual_meter_list is not None and len(virtual_meter_list) > 0: |
|
455 | for virtual_meter in virtual_meter_list: |
|
456 | virtual_meter_id = str(virtual_meter['id']) |
|
457 | energy_category_id = virtual_meter['energy_category_id'] |
|
458 | actual_value = energy_virtual_meter_hourly[virtual_meter_id].get(current_datetime_utc, Decimal(0.0)) |
|
459 | aggregated_value['meta_data'][energy_category_id] = \ |
|
460 | aggregated_value['meta_data'].get(energy_category_id, Decimal(0.0)) + actual_value |
|
461 | ||
462 | if offline_meter_list is not None and len(offline_meter_list) > 0: |
|
463 | for offline_meter in offline_meter_list: |
|
464 | offline_meter_id = str(offline_meter['id']) |
|
465 | energy_category_id = offline_meter['energy_category_id'] |
|
466 | actual_value = energy_offline_meter_hourly[offline_meter_id].get(current_datetime_utc, Decimal(0.0)) |
|
467 | aggregated_value['meta_data'][energy_category_id] = \ |
|
468 | aggregated_value['meta_data'].get(energy_category_id, Decimal(0.0)) + actual_value |
|
469 | ||
470 | aggregated_values.append(aggregated_value) |
|
471 | ||
472 | current_datetime_utc += timedelta(minutes=config.minutes_to_count) |
|
473 | ||
474 | except Exception as e: |
|
475 | error_string = "Error in step 9 of store_energy_input_category.worker " + str(e) |
|
476 | if cursor_energy_db: |
|
477 | cursor_energy_db.close() |
|
478 | if cnx_energy_db: |
|
479 | cnx_energy_db.close() |
|
480 | print(error_string) |
|
481 | return error_string |
|
482 | ||
483 | #################################################################################################################### |
|
484 | # Step 10: save energy data to energy database |
|
485 | #################################################################################################################### |
|
486 | print("Step 10: save energy data to energy database") |
|
487 | ||
488 | if len(aggregated_values) > 0: |
|
489 | try: |
|
490 | add_values = (" INSERT INTO tbl_store_input_category_hourly " |
|
491 | " (store_id, " |
|
492 | " energy_category_id, " |
|
493 | " start_datetime_utc, " |
|
494 | " actual_value) " |
|
495 | " VALUES ") |
|
496 | ||
497 | for aggregated_value in aggregated_values: |
|
498 | for energy_category_id, actual_value in aggregated_value['meta_data'].items(): |
|
499 | add_values += " (" + str(store['id']) + "," |
|
500 | add_values += " " + str(energy_category_id) + "," |
|
501 | add_values += "'" + aggregated_value['start_datetime_utc'].isoformat()[0:19] + "'," |
|
502 | add_values += str(actual_value) + "), " |
|
503 | print("add_values:" + add_values) |
|
504 | # trim ", " at the end of string and then execute |
|
505 | cursor_energy_db.execute(add_values[:-2]) |
|
506 | cnx_energy_db.commit() |
|
507 | ||
508 | except Exception as e: |
|
509 | error_string = "Error in step 10.1 of store_energy_input_category.worker " + str(e) |
|
510 | print(error_string) |
|
511 | return error_string |
|
512 | finally: |
|
513 | if cursor_energy_db: |
|
514 | cursor_energy_db.close() |
|
515 | if cnx_energy_db: |
|
516 | cnx_energy_db.close() |
|
517 | else: |
|
518 | if cursor_energy_db: |
|
519 | cursor_energy_db.close() |
|
520 | if cnx_energy_db: |
|
521 | cnx_energy_db.close() |
|
522 |