Code Duplication    Length = 418-424 lines in 7 locations

myems-aggregation/equipment_energy_input_item.py 1 location

@@ 108-531 (lines=424) @@
105
# NOTE: returns None or the error string because that the logger object cannot be passed in as parameter
106
########################################################################################################################
107
108
def worker(equipment):
109
    ####################################################################################################################
110
    # Step 1: get all input meters associated with the equipment
111
    ####################################################################################################################
112
    print("Step 1: get all input meters associated with the equipment " + str(equipment['name']))
113
114
    meter_list = list()
115
    cnx_system_db = None
116
    cursor_system_db = None
117
    try:
118
        cnx_system_db = mysql.connector.connect(**config.myems_system_db)
119
        cursor_system_db = cnx_system_db.cursor()
120
    except Exception as e:
121
        error_string = "Error in step 1.1 of equipment_energy_input_item.worker " + str(e)
122
        if cursor_system_db:
123
            cursor_system_db.close()
124
        if cnx_system_db:
125
            cnx_system_db.close()
126
        print(error_string)
127
        return error_string
128
129
    try:
130
        cursor_system_db.execute(" SELECT m.id, m.name, m.energy_item_id "
131
                                 " FROM tbl_meters m, tbl_equipments_meters em "
132
                                 " WHERE m.id = em.meter_id "
133
                                 "       AND m.is_counted = 1 "
134
                                 "       AND m.energy_item_id is NOT NULL "
135
                                 "       AND em.is_output = 0 "
136
                                 "       AND em.equipment_id = %s ",
137
                                 (equipment['id'],))
138
        rows_meters = cursor_system_db.fetchall()
139
140
        if rows_meters is not None and len(rows_meters) > 0:
141
            for row in rows_meters:
142
                meter_list.append({"id": row[0],
143
                                   "name": row[1],
144
                                   "energy_item_id": row[2]})
145
146
    except Exception as e:
147
        error_string = "Error in step 1.2 of equipment_energy_input_item.worker " + str(e)
148
        if cursor_system_db:
149
            cursor_system_db.close()
150
        if cnx_system_db:
151
            cnx_system_db.close()
152
        print(error_string)
153
        return error_string
154
155
    ####################################################################################################################
156
    # Step 2: get all input virtual meters associated with the equipment
157
    ####################################################################################################################
158
    print("Step 2: get all input virtual meters associated with the equipment")
159
    virtual_meter_list = list()
160
161
    try:
162
        cursor_system_db.execute(" SELECT m.id, m.name, m.energy_item_id "
163
                                 " FROM tbl_virtual_meters m, tbl_equipments_virtual_meters em "
164
                                 " WHERE m.id = em.virtual_meter_id "
165
                                 "       AND m.energy_item_id is NOT NULL "
166
                                 "       AND m.is_counted = 1 "
167
                                 "       AND em.is_output = 0 "
168
                                 "       AND em.equipment_id = %s ",
169
                                 (equipment['id'],))
170
        rows_virtual_meters = cursor_system_db.fetchall()
171
172
        if rows_virtual_meters is not None and len(rows_virtual_meters) > 0:
173
            for row in rows_virtual_meters:
174
                virtual_meter_list.append({"id": row[0],
175
                                           "name": row[1],
176
                                           "energy_item_id": row[2]})
177
178
    except Exception as e:
179
        error_string = "Error in step 2.1 of equipment_energy_input_item.worker " + str(e)
180
        if cursor_system_db:
181
            cursor_system_db.close()
182
        if cnx_system_db:
183
            cnx_system_db.close()
184
        print(error_string)
185
        return error_string
186
187
    ####################################################################################################################
188
    # Step 3: get all input offline meters associated with the equipment
189
    ####################################################################################################################
190
    print("Step 3: get all input offline meters associated with the equipment")
191
192
    offline_meter_list = list()
193
194
    try:
195
        cursor_system_db.execute(" SELECT m.id, m.name, m.energy_item_id "
196
                                 " FROM tbl_offline_meters m, tbl_equipments_offline_meters em "
197
                                 " WHERE m.id = em.offline_meter_id "
198
                                 "       AND m.energy_item_id is NOT NULL "
199
                                 "       AND m.is_counted = 1 "
200
                                 "       AND em.is_output = 0 "
201
                                 "       AND em.equipment_id = %s ",
202
                                 (equipment['id'],))
203
        rows_offline_meters = cursor_system_db.fetchall()
204
205
        if rows_offline_meters is not None and len(rows_offline_meters) > 0:
206
            for row in rows_offline_meters:
207
                offline_meter_list.append({"id": row[0],
208
                                           "name": row[1],
209
                                           "energy_item_id": row[2]})
210
211
    except Exception as e:
212
        error_string = "Error in step 3.1 of equipment_energy_input_item.worker " + str(e)
213
        print(error_string)
214
        return error_string
215
    finally:
216
        if cursor_system_db:
217
            cursor_system_db.close()
218
        if cnx_system_db:
219
            cnx_system_db.close()
220
221
    ####################################################################################################################
222
    # stop to the next equipment if this equipment is empty
223
    ####################################################################################################################
224
    if (meter_list is None or len(meter_list) == 0) and \
225
            (virtual_meter_list is None or len(virtual_meter_list) == 0) and \
226
            (offline_meter_list is None or len(offline_meter_list) == 0):
227
        print("This is an empty equipment ")
228
        return None
229
230
    ####################################################################################################################
231
    # Step 4: determine start datetime and end datetime to aggregate
232
    ####################################################################################################################
233
    print("Step 4: determine start datetime and end datetime to aggregate")
234
    cnx_energy_db = None
235
    cursor_energy_db = None
236
    try:
237
        cnx_energy_db = mysql.connector.connect(**config.myems_energy_db)
238
        cursor_energy_db = cnx_energy_db.cursor()
239
    except Exception as e:
240
        error_string = "Error in step 4.1 of equipment_energy_input_item.worker " + str(e)
241
        if cursor_energy_db:
242
            cursor_energy_db.close()
243
        if cnx_energy_db:
244
            cnx_energy_db.close()
245
        print(error_string)
246
        return error_string
247
248
    try:
249
        query = (" SELECT MAX(start_datetime_utc) "
250
                 " FROM tbl_equipment_input_item_hourly "
251
                 " WHERE equipment_id = %s ")
252
        cursor_energy_db.execute(query, (equipment['id'],))
253
        row_datetime = cursor_energy_db.fetchone()
254
        start_datetime_utc = datetime.strptime(config.start_datetime_utc, '%Y-%m-%d %H:%M:%S')
255
        start_datetime_utc = start_datetime_utc.replace(minute=0, second=0, microsecond=0, tzinfo=None)
256
257
        if row_datetime is not None and len(row_datetime) > 0 and isinstance(row_datetime[0], datetime):
258
            # replace second and microsecond with 0
259
            # note: do not replace minute in case of calculating in half hourly
260
            start_datetime_utc = row_datetime[0].replace(second=0, microsecond=0, tzinfo=None)
261
            # start from the next time slot
262
            start_datetime_utc += timedelta(minutes=config.minutes_to_count)
263
264
        end_datetime_utc = datetime.utcnow().replace(second=0, microsecond=0, tzinfo=None)
265
266
        print("start_datetime_utc: " + start_datetime_utc.isoformat()[0:19]
267
              + "end_datetime_utc: " + end_datetime_utc.isoformat()[0:19])
268
269
    except Exception as e:
270
        error_string = "Error in step 4.2 of equipment_energy_input_item.worker " + str(e)
271
        if cursor_energy_db:
272
            cursor_energy_db.close()
273
        if cnx_energy_db:
274
            cnx_energy_db.close()
275
        print(error_string)
276
        return error_string
277
278
    ####################################################################################################################
279
    # Step 5: for each meter in list, get energy input data from energy database
280
    ####################################################################################################################
281
    energy_meter_hourly = dict()
282
    try:
283
        if meter_list is not None and len(meter_list) > 0:
284
            for meter in meter_list:
285
                meter_id = str(meter['id'])
286
287
                query = (" SELECT start_datetime_utc, actual_value "
288
                         " FROM tbl_meter_hourly "
289
                         " WHERE meter_id = %s "
290
                         "       AND start_datetime_utc >= %s "
291
                         "       AND start_datetime_utc < %s "
292
                         " ORDER BY start_datetime_utc ")
293
                cursor_energy_db.execute(query, (meter_id, start_datetime_utc, end_datetime_utc,))
294
                rows_energy_values = cursor_energy_db.fetchall()
295
                if rows_energy_values is None or len(rows_energy_values) == 0:
296
                    energy_meter_hourly[meter_id] = None
297
                else:
298
                    energy_meter_hourly[meter_id] = dict()
299
                    for row_energy_value in rows_energy_values:
300
                        energy_meter_hourly[meter_id][row_energy_value[0]] = row_energy_value[1]
301
    except Exception as e:
302
        error_string = "Error in step 5.1 of equipment_energy_input_item.worker " + str(e)
303
        if cursor_energy_db:
304
            cursor_energy_db.close()
305
        if cnx_energy_db:
306
            cnx_energy_db.close()
307
        print(error_string)
308
        return error_string
309
310
    ####################################################################################################################
311
    # Step 6: for each virtual meter in list, get energy input data from energy database
312
    ####################################################################################################################
313
    energy_virtual_meter_hourly = dict()
314
    if virtual_meter_list is not None and len(virtual_meter_list) > 0:
315
        try:
316
            for virtual_meter in virtual_meter_list:
317
                virtual_meter_id = str(virtual_meter['id'])
318
319
                query = (" SELECT start_datetime_utc, actual_value "
320
                         " FROM tbl_virtual_meter_hourly "
321
                         " WHERE virtual_meter_id = %s "
322
                         "       AND start_datetime_utc >= %s "
323
                         "       AND start_datetime_utc < %s "
324
                         " ORDER BY start_datetime_utc ")
325
                cursor_energy_db.execute(query, (virtual_meter_id, start_datetime_utc, end_datetime_utc,))
326
                rows_energy_values = cursor_energy_db.fetchall()
327
                if rows_energy_values is None or len(rows_energy_values) == 0:
328
                    energy_virtual_meter_hourly[virtual_meter_id] = None
329
                else:
330
                    energy_virtual_meter_hourly[virtual_meter_id] = dict()
331
                    for row_energy_value in rows_energy_values:
332
                        energy_virtual_meter_hourly[virtual_meter_id][row_energy_value[0]] = row_energy_value[1]
333
        except Exception as e:
334
            error_string = "Error in step 6.1 of equipment_energy_input_item.worker " + str(e)
335
            if cursor_energy_db:
336
                cursor_energy_db.close()
337
            if cnx_energy_db:
338
                cnx_energy_db.close()
339
            print(error_string)
340
            return error_string
341
342
    ####################################################################################################################
343
    # Step 7: for each offline meter in list, get energy input data from energy database
344
    ####################################################################################################################
345
    energy_offline_meter_hourly = dict()
346
    if offline_meter_list is not None and len(offline_meter_list) > 0:
347
        try:
348
            for offline_meter in offline_meter_list:
349
                offline_meter_id = str(offline_meter['id'])
350
351
                query = (" SELECT start_datetime_utc, actual_value "
352
                         " FROM tbl_offline_meter_hourly "
353
                         " WHERE offline_meter_id = %s "
354
                         "       AND start_datetime_utc >= %s "
355
                         "       AND start_datetime_utc < %s "
356
                         " ORDER BY start_datetime_utc ")
357
                cursor_energy_db.execute(query, (offline_meter_id, start_datetime_utc, end_datetime_utc,))
358
                rows_energy_values = cursor_energy_db.fetchall()
359
                if rows_energy_values is None or len(rows_energy_values) == 0:
360
                    energy_offline_meter_hourly[offline_meter_id] = None
361
                else:
362
                    energy_offline_meter_hourly[offline_meter_id] = dict()
363
                    for row_energy_value in rows_energy_values:
364
                        energy_offline_meter_hourly[offline_meter_id][row_energy_value[0]] = row_energy_value[1]
365
366
        except Exception as e:
367
            error_string = "Error in step 7.1 of equipment_energy_input_item.worker " + str(e)
368
            if cursor_energy_db:
369
                cursor_energy_db.close()
370
            if cnx_energy_db:
371
                cnx_energy_db.close()
372
            print(error_string)
373
            return error_string
374
375
    ####################################################################################################################
376
    # Step 8: determine common time slot to aggregate
377
    ####################################################################################################################
378
379
    common_start_datetime_utc = start_datetime_utc
380
    common_end_datetime_utc = end_datetime_utc
381
382
    print("Getting common time slot of energy values for all meters")
383
    if energy_meter_hourly is not None and len(energy_meter_hourly) > 0:
384
        for meter_id, energy_hourly in energy_meter_hourly.items():
385
            if energy_hourly is None or len(energy_hourly) == 0:
386
                common_start_datetime_utc = None
387
                common_end_datetime_utc = None
388
                break
389
            else:
390
                if common_start_datetime_utc < min(energy_hourly.keys()):
391
                    common_start_datetime_utc = min(energy_hourly.keys())
392
                if common_end_datetime_utc > max(energy_hourly.keys()):
393
                    common_end_datetime_utc = max(energy_hourly.keys())
394
395
    print("Getting common time slot of energy values for all virtual meters")
396
    if common_start_datetime_utc is not None and common_end_datetime_utc is not None:
397
        if energy_virtual_meter_hourly is not None and len(energy_virtual_meter_hourly) > 0:
398
            for meter_id, energy_hourly in energy_virtual_meter_hourly.items():
399
                if energy_hourly is None or len(energy_hourly) == 0:
400
                    common_start_datetime_utc = None
401
                    common_end_datetime_utc = None
402
                    break
403
                else:
404
                    if common_start_datetime_utc < min(energy_hourly.keys()):
405
                        common_start_datetime_utc = min(energy_hourly.keys())
406
                    if common_end_datetime_utc > max(energy_hourly.keys()):
407
                        common_end_datetime_utc = max(energy_hourly.keys())
408
409
    print("Getting common time slot of energy values for all offline meters")
410
    if common_start_datetime_utc is not None and common_end_datetime_utc is not None:
411
        if energy_offline_meter_hourly is not None and len(energy_offline_meter_hourly) > 0:
412
            for meter_id, energy_hourly in energy_offline_meter_hourly.items():
413
                if energy_hourly is None or len(energy_hourly) == 0:
414
                    common_start_datetime_utc = None
415
                    common_end_datetime_utc = None
416
                    break
417
                else:
418
                    if common_start_datetime_utc < min(energy_hourly.keys()):
419
                        common_start_datetime_utc = min(energy_hourly.keys())
420
                    if common_end_datetime_utc > max(energy_hourly.keys()):
421
                        common_end_datetime_utc = max(energy_hourly.keys())
422
423
    if (energy_meter_hourly is None or len(energy_meter_hourly) == 0) and \
424
            (energy_virtual_meter_hourly is None or len(energy_virtual_meter_hourly) == 0) and \
425
            (energy_offline_meter_hourly is None or len(energy_offline_meter_hourly) == 0):
426
        # There isn't any energy data
427
        print("There isn't any energy data")
428
        # continue the for equipment loop to the next equipment
429
        print("continue the for equipment loop to the next equipment")
430
        if cursor_energy_db:
431
            cursor_energy_db.close()
432
        if cnx_energy_db:
433
            cnx_energy_db.close()
434
        return None
435
436
    print("common_start_datetime_utc: " + str(common_start_datetime_utc))
437
    print("common_end_datetime_utc: " + str(common_end_datetime_utc))
438
439
    ####################################################################################################################
440
    # Step 9: aggregate energy data in the common time slot by energy items and hourly
441
    ####################################################################################################################
442
443
    print("Step 9: aggregate energy data in the common time slot by energy items and hourly")
444
    aggregated_values = list()
445
    try:
446
        current_datetime_utc = common_start_datetime_utc
447
        while common_start_datetime_utc is not None \
448
                and common_end_datetime_utc is not None \
449
                and current_datetime_utc <= common_end_datetime_utc:
450
            aggregated_value = dict()
451
            aggregated_value['start_datetime_utc'] = current_datetime_utc
452
            aggregated_value['meta_data'] = dict()
453
454
            if meter_list is not None and len(meter_list) > 0:
455
                for meter in meter_list:
456
                    meter_id = str(meter['id'])
457
                    energy_item_id = meter['energy_item_id']
458
                    actual_value = energy_meter_hourly[meter_id].get(current_datetime_utc, Decimal(0.0))
459
                    aggregated_value['meta_data'][energy_item_id] = \
460
                        aggregated_value['meta_data'].get(energy_item_id, Decimal(0.0)) + actual_value
461
462
            if virtual_meter_list is not None and len(virtual_meter_list) > 0:
463
                for virtual_meter in virtual_meter_list:
464
                    virtual_meter_id = str(virtual_meter['id'])
465
                    energy_item_id = virtual_meter['energy_item_id']
466
                    actual_value = energy_virtual_meter_hourly[virtual_meter_id].get(current_datetime_utc, Decimal(0.0))
467
                    aggregated_value['meta_data'][energy_item_id] = \
468
                        aggregated_value['meta_data'].get(energy_item_id, Decimal(0.0)) + actual_value
469
470
            if offline_meter_list is not None and len(offline_meter_list) > 0:
471
                for offline_meter in offline_meter_list:
472
                    offline_meter_id = str(offline_meter['id'])
473
                    energy_item_id = offline_meter['energy_item_id']
474
                    actual_value = energy_offline_meter_hourly[offline_meter_id].get(current_datetime_utc, Decimal(0.0))
475
                    aggregated_value['meta_data'][energy_item_id] = \
476
                        aggregated_value['meta_data'].get(energy_item_id, Decimal(0.0)) + actual_value
477
478
            aggregated_values.append(aggregated_value)
479
480
            current_datetime_utc += timedelta(minutes=config.minutes_to_count)
481
482
    except Exception as e:
483
        error_string = "Error in step 9 of equipment_energy_input_item.worker " + str(e)
484
        if cursor_energy_db:
485
            cursor_energy_db.close()
486
        if cnx_energy_db:
487
            cnx_energy_db.close()
488
        print(error_string)
489
        return error_string
490
491
    ####################################################################################################################
492
    # Step 10: save energy data to energy database
493
    ####################################################################################################################
494
    print("Step 10: save energy data to energy database")
495
496
    while len(aggregated_values) > 0:
497
        insert_100 = aggregated_values[:100]
498
        aggregated_values = aggregated_values[100:]
499
        try:
500
            add_values = (" INSERT INTO tbl_equipment_input_item_hourly "
501
                          "             (equipment_id, "
502
                          "              energy_item_id, "
503
                          "              start_datetime_utc, "
504
                          "              actual_value) "
505
                          " VALUES  ")
506
507
            for aggregated_value in insert_100:
508
                for energy_item_id, actual_value in aggregated_value['meta_data'].items():
509
                    add_values += " (" + str(equipment['id']) + ","
510
                    add_values += " " + str(energy_item_id) + ","
511
                    add_values += "'" + aggregated_value['start_datetime_utc'].isoformat()[0:19] + "',"
512
                    add_values += str(actual_value) + "), "
513
            # print("add_values:" + add_values)
514
            # trim ", " at the end of string and then execute
515
            cursor_energy_db.execute(add_values[:-2])
516
            cnx_energy_db.commit()
517
518
        except Exception as e:
519
            error_string = "Error in step 10.1 of equipment_energy_input_item.worker " + str(e)
520
            print(error_string)
521
            if cursor_energy_db:
522
                cursor_energy_db.close()
523
            if cnx_energy_db:
524
                cnx_energy_db.close()
525
            return error_string
526
527
    if cursor_energy_db:
528
        cursor_energy_db.close()
529
    if cnx_energy_db:
530
        cnx_energy_db.close()
531
    return None
532

myems-aggregation/equipment_energy_input_category.py 1 location

@@ 108-528 (lines=421) @@
105
# NOTE: returns None or the error string because that the logger object cannot be passed in as parameter
106
########################################################################################################################
107
108
def worker(equipment):
109
    ####################################################################################################################
110
    # Step 1: get all input meters associated with the equipment
111
    ####################################################################################################################
112
    print("Step 1: get all input meters associated with the equipment " + str(equipment['name']))
113
114
    meter_list = list()
115
    cnx_system_db = None
116
    cursor_system_db = None
117
    try:
118
        cnx_system_db = mysql.connector.connect(**config.myems_system_db)
119
        cursor_system_db = cnx_system_db.cursor()
120
    except Exception as e:
121
        error_string = "Error in step 1.1 of equipment_energy_input_category.worker " + str(e)
122
        if cursor_system_db:
123
            cursor_system_db.close()
124
        if cnx_system_db:
125
            cnx_system_db.close()
126
        print(error_string)
127
        return error_string
128
129
    try:
130
        cursor_system_db.execute(" SELECT m.id, m.name, m.energy_category_id "
131
                                 " FROM tbl_meters m, tbl_equipments_meters em "
132
                                 " WHERE m.id = em.meter_id "
133
                                 "       AND m.is_counted = 1 "
134
                                 "       AND em.is_output = 0 "
135
                                 "       AND em.equipment_id = %s ",
136
                                 (equipment['id'],))
137
        rows_meters = cursor_system_db.fetchall()
138
139
        if rows_meters is not None and len(rows_meters) > 0:
140
            for row in rows_meters:
141
                meter_list.append({"id": row[0],
142
                                   "name": row[1],
143
                                   "energy_category_id": row[2]})
144
145
    except Exception as e:
146
        error_string = "Error in step 1.2 of equipment_energy_input_category.worker " + str(e)
147
        if cursor_system_db:
148
            cursor_system_db.close()
149
        if cnx_system_db:
150
            cnx_system_db.close()
151
        print(error_string)
152
        return error_string
153
154
    ####################################################################################################################
155
    # Step 2: get all input virtual meters associated with the equipment
156
    ####################################################################################################################
157
    print("Step 2: get all input virtual meters associated with the equipment")
158
    virtual_meter_list = list()
159
160
    try:
161
        cursor_system_db.execute(" SELECT m.id, m.name, m.energy_category_id "
162
                                 " FROM tbl_virtual_meters m, tbl_equipments_virtual_meters em "
163
                                 " WHERE m.id = em.virtual_meter_id "
164
                                 "       AND m.is_counted = 1 "
165
                                 "       AND em.is_output = 0 "
166
                                 "       AND em.equipment_id = %s ",
167
                                 (equipment['id'],))
168
        rows_virtual_meters = cursor_system_db.fetchall()
169
170
        if rows_virtual_meters is not None and len(rows_virtual_meters) > 0:
171
            for row in rows_virtual_meters:
172
                virtual_meter_list.append({"id": row[0],
173
                                           "name": row[1],
174
                                           "energy_category_id": row[2]})
175
176
    except Exception as e:
177
        error_string = "Error in step 2.1 of equipment_energy_input_category.worker " + str(e)
178
        if cursor_system_db:
179
            cursor_system_db.close()
180
        if cnx_system_db:
181
            cnx_system_db.close()
182
        print(error_string)
183
        return error_string
184
185
    ####################################################################################################################
186
    # Step 3: get all input offline meters associated with the equipment
187
    ####################################################################################################################
188
    print("Step 3: get all input offline meters associated with the equipment")
189
190
    offline_meter_list = list()
191
192
    try:
193
        cursor_system_db.execute(" SELECT m.id, m.name, m.energy_category_id "
194
                                 " FROM tbl_offline_meters m, tbl_equipments_offline_meters em "
195
                                 " WHERE m.id = em.offline_meter_id "
196
                                 "       AND m.is_counted = 1 "
197
                                 "       AND em.is_output = 0 "
198
                                 "       AND em.equipment_id = %s ",
199
                                 (equipment['id'],))
200
        rows_offline_meters = cursor_system_db.fetchall()
201
202
        if rows_offline_meters is not None and len(rows_offline_meters) > 0:
203
            for row in rows_offline_meters:
204
                offline_meter_list.append({"id": row[0],
205
                                           "name": row[1],
206
                                           "energy_category_id": row[2]})
207
208
    except Exception as e:
209
        error_string = "Error in step 3.1 of equipment_energy_input_category.worker " + str(e)
210
        print(error_string)
211
        return error_string
212
    finally:
213
        if cursor_system_db:
214
            cursor_system_db.close()
215
        if cnx_system_db:
216
            cnx_system_db.close()
217
218
    ####################################################################################################################
219
    # stop to the next equipment if this equipment is empty
220
    ####################################################################################################################
221
    if (meter_list is None or len(meter_list) == 0) and \
222
            (virtual_meter_list is None or len(virtual_meter_list) == 0) and \
223
            (offline_meter_list is None or len(offline_meter_list) == 0):
224
        print("This is an empty equipment ")
225
        return None
226
227
    ####################################################################################################################
228
    # Step 4: determine start datetime and end datetime to aggregate
229
    ####################################################################################################################
230
    print("Step 4: determine start datetime and end datetime to aggregate")
231
    cnx_energy_db = None
232
    cursor_energy_db = None
233
    try:
234
        cnx_energy_db = mysql.connector.connect(**config.myems_energy_db)
235
        cursor_energy_db = cnx_energy_db.cursor()
236
    except Exception as e:
237
        error_string = "Error in step 4.1 of equipment_energy_input_category.worker " + str(e)
238
        if cursor_energy_db:
239
            cursor_energy_db.close()
240
        if cnx_energy_db:
241
            cnx_energy_db.close()
242
        print(error_string)
243
        return error_string
244
245
    try:
246
        query = (" SELECT MAX(start_datetime_utc) "
247
                 " FROM tbl_equipment_input_category_hourly "
248
                 " WHERE equipment_id = %s ")
249
        cursor_energy_db.execute(query, (equipment['id'],))
250
        row_datetime = cursor_energy_db.fetchone()
251
        start_datetime_utc = datetime.strptime(config.start_datetime_utc, '%Y-%m-%d %H:%M:%S')
252
        start_datetime_utc = start_datetime_utc.replace(minute=0, second=0, microsecond=0, tzinfo=None)
253
254
        if row_datetime is not None and len(row_datetime) > 0 and isinstance(row_datetime[0], datetime):
255
            # replace second and microsecond with 0
256
            # note: do not replace minute in case of calculating in half hourly
257
            start_datetime_utc = row_datetime[0].replace(second=0, microsecond=0, tzinfo=None)
258
            # start from the next time slot
259
            start_datetime_utc += timedelta(minutes=config.minutes_to_count)
260
261
        end_datetime_utc = datetime.utcnow().replace(second=0, microsecond=0, tzinfo=None)
262
263
        print("start_datetime_utc: " + start_datetime_utc.isoformat()[0:19]
264
              + "end_datetime_utc: " + end_datetime_utc.isoformat()[0:19])
265
266
    except Exception as e:
267
        error_string = "Error in step 4.2 of equipment_energy_input_category.worker " + str(e)
268
        if cursor_energy_db:
269
            cursor_energy_db.close()
270
        if cnx_energy_db:
271
            cnx_energy_db.close()
272
        print(error_string)
273
        return error_string
274
275
    ####################################################################################################################
276
    # Step 5: for each meter in list, get energy input data from energy database
277
    ####################################################################################################################
278
    energy_meter_hourly = dict()
279
    try:
280
        if meter_list is not None and len(meter_list) > 0:
281
            for meter in meter_list:
282
                meter_id = str(meter['id'])
283
284
                query = (" SELECT start_datetime_utc, actual_value "
285
                         " FROM tbl_meter_hourly "
286
                         " WHERE meter_id = %s "
287
                         "       AND start_datetime_utc >= %s "
288
                         "       AND start_datetime_utc < %s "
289
                         " ORDER BY start_datetime_utc ")
290
                cursor_energy_db.execute(query, (meter_id, start_datetime_utc, end_datetime_utc,))
291
                rows_energy_values = cursor_energy_db.fetchall()
292
                if rows_energy_values is None or len(rows_energy_values) == 0:
293
                    energy_meter_hourly[meter_id] = None
294
                else:
295
                    energy_meter_hourly[meter_id] = dict()
296
                    for row_energy_value in rows_energy_values:
297
                        energy_meter_hourly[meter_id][row_energy_value[0]] = row_energy_value[1]
298
    except Exception as e:
299
        error_string = "Error in step 5.1 of equipment_energy_input_category.worker " + str(e)
300
        if cursor_energy_db:
301
            cursor_energy_db.close()
302
        if cnx_energy_db:
303
            cnx_energy_db.close()
304
        print(error_string)
305
        return error_string
306
307
    ####################################################################################################################
308
    # Step 6: for each virtual meter in list, get energy input data from energy database
309
    ####################################################################################################################
310
    energy_virtual_meter_hourly = dict()
311
    if virtual_meter_list is not None and len(virtual_meter_list) > 0:
312
        try:
313
            for virtual_meter in virtual_meter_list:
314
                virtual_meter_id = str(virtual_meter['id'])
315
316
                query = (" SELECT start_datetime_utc, actual_value "
317
                         " FROM tbl_virtual_meter_hourly "
318
                         " WHERE virtual_meter_id = %s "
319
                         "       AND start_datetime_utc >= %s "
320
                         "       AND start_datetime_utc < %s "
321
                         " ORDER BY start_datetime_utc ")
322
                cursor_energy_db.execute(query, (virtual_meter_id, start_datetime_utc, end_datetime_utc,))
323
                rows_energy_values = cursor_energy_db.fetchall()
324
                if rows_energy_values is None or len(rows_energy_values) == 0:
325
                    energy_virtual_meter_hourly[virtual_meter_id] = None
326
                else:
327
                    energy_virtual_meter_hourly[virtual_meter_id] = dict()
328
                    for row_energy_value in rows_energy_values:
329
                        energy_virtual_meter_hourly[virtual_meter_id][row_energy_value[0]] = row_energy_value[1]
330
        except Exception as e:
331
            error_string = "Error in step 6.1 of equipment_energy_input_category.worker " + str(e)
332
            if cursor_energy_db:
333
                cursor_energy_db.close()
334
            if cnx_energy_db:
335
                cnx_energy_db.close()
336
            print(error_string)
337
            return error_string
338
339
    ####################################################################################################################
340
    # Step 7: for each offline meter in list, get energy input data from energy database
341
    ####################################################################################################################
342
    energy_offline_meter_hourly = dict()
343
    if offline_meter_list is not None and len(offline_meter_list) > 0:
344
        try:
345
            for offline_meter in offline_meter_list:
346
                offline_meter_id = str(offline_meter['id'])
347
348
                query = (" SELECT start_datetime_utc, actual_value "
349
                         " FROM tbl_offline_meter_hourly "
350
                         " WHERE offline_meter_id = %s "
351
                         "       AND start_datetime_utc >= %s "
352
                         "       AND start_datetime_utc < %s "
353
                         " ORDER BY start_datetime_utc ")
354
                cursor_energy_db.execute(query, (offline_meter_id, start_datetime_utc, end_datetime_utc,))
355
                rows_energy_values = cursor_energy_db.fetchall()
356
                if rows_energy_values is None or len(rows_energy_values) == 0:
357
                    energy_offline_meter_hourly[offline_meter_id] = None
358
                else:
359
                    energy_offline_meter_hourly[offline_meter_id] = dict()
360
                    for row_energy_value in rows_energy_values:
361
                        energy_offline_meter_hourly[offline_meter_id][row_energy_value[0]] = row_energy_value[1]
362
363
        except Exception as e:
364
            error_string = "Error in step 7.1 of equipment_energy_input_category.worker " + str(e)
365
            if cursor_energy_db:
366
                cursor_energy_db.close()
367
            if cnx_energy_db:
368
                cnx_energy_db.close()
369
            print(error_string)
370
            return error_string
371
372
    ####################################################################################################################
373
    # Step 8: determine common time slot to aggregate
374
    ####################################################################################################################
375
376
    common_start_datetime_utc = start_datetime_utc
377
    common_end_datetime_utc = end_datetime_utc
378
379
    print("Getting common time slot of energy values for all meters")
380
    if energy_meter_hourly is not None and len(energy_meter_hourly) > 0:
381
        for meter_id, energy_hourly in energy_meter_hourly.items():
382
            if energy_hourly is None or len(energy_hourly) == 0:
383
                common_start_datetime_utc = None
384
                common_end_datetime_utc = None
385
                break
386
            else:
387
                if common_start_datetime_utc < min(energy_hourly.keys()):
388
                    common_start_datetime_utc = min(energy_hourly.keys())
389
                if common_end_datetime_utc > max(energy_hourly.keys()):
390
                    common_end_datetime_utc = max(energy_hourly.keys())
391
392
    print("Getting common time slot of energy values for all virtual meters")
393
    if common_start_datetime_utc is not None and common_end_datetime_utc is not None:
394
        if energy_virtual_meter_hourly is not None and len(energy_virtual_meter_hourly) > 0:
395
            for meter_id, energy_hourly in energy_virtual_meter_hourly.items():
396
                if energy_hourly is None or len(energy_hourly) == 0:
397
                    common_start_datetime_utc = None
398
                    common_end_datetime_utc = None
399
                    break
400
                else:
401
                    if common_start_datetime_utc < min(energy_hourly.keys()):
402
                        common_start_datetime_utc = min(energy_hourly.keys())
403
                    if common_end_datetime_utc > max(energy_hourly.keys()):
404
                        common_end_datetime_utc = max(energy_hourly.keys())
405
406
    print("Getting common time slot of energy values for all offline meters")
407
    if common_start_datetime_utc is not None and common_end_datetime_utc is not None:
408
        if energy_offline_meter_hourly is not None and len(energy_offline_meter_hourly) > 0:
409
            for meter_id, energy_hourly in energy_offline_meter_hourly.items():
410
                if energy_hourly is None or len(energy_hourly) == 0:
411
                    common_start_datetime_utc = None
412
                    common_end_datetime_utc = None
413
                    break
414
                else:
415
                    if common_start_datetime_utc < min(energy_hourly.keys()):
416
                        common_start_datetime_utc = min(energy_hourly.keys())
417
                    if common_end_datetime_utc > max(energy_hourly.keys()):
418
                        common_end_datetime_utc = max(energy_hourly.keys())
419
420
    if (energy_meter_hourly is None or len(energy_meter_hourly) == 0) and \
421
            (energy_virtual_meter_hourly is None or len(energy_virtual_meter_hourly) == 0) and \
422
            (energy_offline_meter_hourly is None or len(energy_offline_meter_hourly) == 0):
423
        # There isn't any energy data
424
        print("There isn't any energy data")
425
        # continue the for equipment loop to the next equipment
426
        print("continue the for equipment loop to the next equipment")
427
        if cursor_energy_db:
428
            cursor_energy_db.close()
429
        if cnx_energy_db:
430
            cnx_energy_db.close()
431
        return None
432
433
    print("common_start_datetime_utc: " + str(common_start_datetime_utc))
434
    print("common_end_datetime_utc: " + str(common_end_datetime_utc))
435
436
    ####################################################################################################################
437
    # Step 9: aggregate energy data in the common time slot by energy categories and hourly
438
    ####################################################################################################################
439
440
    print("Step 9: aggregate energy data in the common time slot by energy categories and hourly")
441
    aggregated_values = list()
442
    try:
443
        current_datetime_utc = common_start_datetime_utc
444
        while common_start_datetime_utc is not None \
445
                and common_end_datetime_utc is not None \
446
                and current_datetime_utc <= common_end_datetime_utc:
447
            aggregated_value = dict()
448
            aggregated_value['start_datetime_utc'] = current_datetime_utc
449
            aggregated_value['meta_data'] = dict()
450
451
            if meter_list is not None and len(meter_list) > 0:
452
                for meter in meter_list:
453
                    meter_id = str(meter['id'])
454
                    energy_category_id = meter['energy_category_id']
455
                    actual_value = energy_meter_hourly[meter_id].get(current_datetime_utc, Decimal(0.0))
456
                    aggregated_value['meta_data'][energy_category_id] = \
457
                        aggregated_value['meta_data'].get(energy_category_id, Decimal(0.0)) + actual_value
458
459
            if virtual_meter_list is not None and len(virtual_meter_list) > 0:
460
                for virtual_meter in virtual_meter_list:
461
                    virtual_meter_id = str(virtual_meter['id'])
462
                    energy_category_id = virtual_meter['energy_category_id']
463
                    actual_value = energy_virtual_meter_hourly[virtual_meter_id].get(current_datetime_utc, Decimal(0.0))
464
                    aggregated_value['meta_data'][energy_category_id] = \
465
                        aggregated_value['meta_data'].get(energy_category_id, Decimal(0.0)) + actual_value
466
467
            if offline_meter_list is not None and len(offline_meter_list) > 0:
468
                for offline_meter in offline_meter_list:
469
                    offline_meter_id = str(offline_meter['id'])
470
                    energy_category_id = offline_meter['energy_category_id']
471
                    actual_value = energy_offline_meter_hourly[offline_meter_id].get(current_datetime_utc, Decimal(0.0))
472
                    aggregated_value['meta_data'][energy_category_id] = \
473
                        aggregated_value['meta_data'].get(energy_category_id, Decimal(0.0)) + actual_value
474
475
            aggregated_values.append(aggregated_value)
476
477
            current_datetime_utc += timedelta(minutes=config.minutes_to_count)
478
479
    except Exception as e:
480
        error_string = "Error in step 9 of equipment_energy_input_category.worker " + str(e)
481
        if cursor_energy_db:
482
            cursor_energy_db.close()
483
        if cnx_energy_db:
484
            cnx_energy_db.close()
485
        print(error_string)
486
        return error_string
487
488
    ####################################################################################################################
489
    # Step 10: save energy data to energy database
490
    ####################################################################################################################
491
    print("Step 10: save energy data to energy database")
492
493
    while len(aggregated_values) > 0:
494
        insert_100 = aggregated_values[:100]
495
        aggregated_values = aggregated_values[100:]
496
        try:
497
            add_values = (" INSERT INTO tbl_equipment_input_category_hourly "
498
                          "             (equipment_id, "
499
                          "              energy_category_id, "
500
                          "              start_datetime_utc, "
501
                          "              actual_value) "
502
                          " VALUES  ")
503
504
            for aggregated_value in insert_100:
505
                for energy_category_id, actual_value in aggregated_value['meta_data'].items():
506
                    add_values += " (" + str(equipment['id']) + ","
507
                    add_values += " " + str(energy_category_id) + ","
508
                    add_values += "'" + aggregated_value['start_datetime_utc'].isoformat()[0:19] + "',"
509
                    add_values += str(actual_value) + "), "
510
            # print("add_values:" + add_values)
511
            # trim ", " at the end of string and then execute
512
            cursor_energy_db.execute(add_values[:-2])
513
            cnx_energy_db.commit()
514
515
        except Exception as e:
516
            error_string = "Error in step 10.1 of equipment_energy_input_category.worker " + str(e)
517
            print(error_string)
518
            if cursor_energy_db:
519
                cursor_energy_db.close()
520
            if cnx_energy_db:
521
                cnx_energy_db.close()
522
            return error_string
523
524
    if cursor_energy_db:
525
        cursor_energy_db.close()
526
    if cnx_energy_db:
527
        cnx_energy_db.close()
528
    return None
529

myems-aggregation/store_energy_input_item.py 1 location

@@ 108-528 (lines=421) @@
105
# NOTE: returns None or the error string because that the logger object cannot be passed in as parameter
106
########################################################################################################################
107
108
def worker(store):
109
    ####################################################################################################################
110
    # Step 1: get all input meters associated with the store
111
    ####################################################################################################################
112
    print("Step 1: get all input meters associated with the store " + str(store['name']))
113
114
    meter_list = list()
115
    cnx_system_db = None
116
    cursor_system_db = None
117
    try:
118
        cnx_system_db = mysql.connector.connect(**config.myems_system_db)
119
        cursor_system_db = cnx_system_db.cursor()
120
    except Exception as e:
121
        error_string = "Error in step 1.1 of store_energy_input_item.worker " + str(e)
122
        if cursor_system_db:
123
            cursor_system_db.close()
124
        if cnx_system_db:
125
            cnx_system_db.close()
126
        print(error_string)
127
        return error_string
128
129
    try:
130
        cursor_system_db.execute(" SELECT m.id, m.name, m.energy_item_id "
131
                                 " FROM tbl_meters m, tbl_stores_meters sm "
132
                                 " WHERE m.id = sm.meter_id "
133
                                 "       AND m.is_counted = 1 "
134
                                 "       AND m.energy_item_id is NOT NULL "
135
                                 "       AND sm.store_id = %s ",
136
                                 (store['id'],))
137
        rows_meters = cursor_system_db.fetchall()
138
139
        if rows_meters is not None and len(rows_meters) > 0:
140
            for row in rows_meters:
141
                meter_list.append({"id": row[0],
142
                                   "name": row[1],
143
                                   "energy_item_id": row[2]})
144
145
    except Exception as e:
146
        error_string = "Error in step 1.2 of store_energy_input_item.worker " + str(e)
147
        if cursor_system_db:
148
            cursor_system_db.close()
149
        if cnx_system_db:
150
            cnx_system_db.close()
151
        print(error_string)
152
        return error_string
153
154
    ####################################################################################################################
155
    # Step 2: get all input virtual meters associated with the store
156
    ####################################################################################################################
157
    print("Step 2: get all input virtual meters associated with the store")
158
    virtual_meter_list = list()
159
160
    try:
161
        cursor_system_db.execute(" SELECT m.id, m.name, m.energy_item_id "
162
                                 " FROM tbl_virtual_meters m, tbl_stores_virtual_meters sm "
163
                                 " WHERE m.id = sm.virtual_meter_id "
164
                                 "       AND m.energy_item_id is NOT NULL "
165
                                 "       AND m.is_counted = 1 "
166
                                 "       AND sm.store_id = %s ",
167
                                 (store['id'],))
168
        rows_virtual_meters = cursor_system_db.fetchall()
169
170
        if rows_virtual_meters is not None and len(rows_virtual_meters) > 0:
171
            for row in rows_virtual_meters:
172
                virtual_meter_list.append({"id": row[0],
173
                                           "name": row[1],
174
                                           "energy_item_id": row[2]})
175
176
    except Exception as e:
177
        error_string = "Error in step 2.1 of store_energy_input_item.worker " + str(e)
178
        if cursor_system_db:
179
            cursor_system_db.close()
180
        if cnx_system_db:
181
            cnx_system_db.close()
182
        print(error_string)
183
        return error_string
184
185
    ####################################################################################################################
186
    # Step 3: get all input offline meters associated with the store
187
    ####################################################################################################################
188
    print("Step 3: get all input offline meters associated with the store")
189
190
    offline_meter_list = list()
191
192
    try:
193
        cursor_system_db.execute(" SELECT m.id, m.name, m.energy_item_id "
194
                                 " FROM tbl_offline_meters m, tbl_stores_offline_meters sm "
195
                                 " WHERE m.id = sm.offline_meter_id "
196
                                 "       AND m.energy_item_id is NOT NULL "
197
                                 "       AND m.is_counted = 1 "
198
                                 "       AND sm.store_id = %s ",
199
                                 (store['id'],))
200
        rows_offline_meters = cursor_system_db.fetchall()
201
202
        if rows_offline_meters is not None and len(rows_offline_meters) > 0:
203
            for row in rows_offline_meters:
204
                offline_meter_list.append({"id": row[0],
205
                                           "name": row[1],
206
                                           "energy_item_id": row[2]})
207
208
    except Exception as e:
209
        error_string = "Error in step 3.1 of store_energy_input_item.worker " + str(e)
210
        print(error_string)
211
        return error_string
212
    finally:
213
        if cursor_system_db:
214
            cursor_system_db.close()
215
        if cnx_system_db:
216
            cnx_system_db.close()
217
218
    ####################################################################################################################
219
    # stop to the next store if this store is empty
220
    ####################################################################################################################
221
    if (meter_list is None or len(meter_list) == 0) and \
222
            (virtual_meter_list is None or len(virtual_meter_list) == 0) and \
223
            (offline_meter_list is None or len(offline_meter_list) == 0):
224
        print("This is an empty store ")
225
        return None
226
227
    ####################################################################################################################
228
    # Step 4: determine start datetime and end datetime to aggregate
229
    ####################################################################################################################
230
    print("Step 4: determine start datetime and end datetime to aggregate")
231
    cnx_energy_db = None
232
    cursor_energy_db = None
233
    try:
234
        cnx_energy_db = mysql.connector.connect(**config.myems_energy_db)
235
        cursor_energy_db = cnx_energy_db.cursor()
236
    except Exception as e:
237
        error_string = "Error in step 4.1 of store_energy_input_item.worker " + str(e)
238
        if cursor_energy_db:
239
            cursor_energy_db.close()
240
        if cnx_energy_db:
241
            cnx_energy_db.close()
242
        print(error_string)
243
        return error_string
244
245
    try:
246
        query = (" SELECT MAX(start_datetime_utc) "
247
                 " FROM tbl_store_input_item_hourly "
248
                 " WHERE store_id = %s ")
249
        cursor_energy_db.execute(query, (store['id'],))
250
        row_datetime = cursor_energy_db.fetchone()
251
        start_datetime_utc = datetime.strptime(config.start_datetime_utc, '%Y-%m-%d %H:%M:%S')
252
        start_datetime_utc = start_datetime_utc.replace(minute=0, second=0, microsecond=0, tzinfo=None)
253
254
        if row_datetime is not None and len(row_datetime) > 0 and isinstance(row_datetime[0], datetime):
255
            # replace second and microsecond with 0
256
            # note: do not replace minute in case of calculating in half hourly
257
            start_datetime_utc = row_datetime[0].replace(second=0, microsecond=0, tzinfo=None)
258
            # start from the next time slot
259
            start_datetime_utc += timedelta(minutes=config.minutes_to_count)
260
261
        end_datetime_utc = datetime.utcnow().replace(second=0, microsecond=0, tzinfo=None)
262
263
        print("start_datetime_utc: " + start_datetime_utc.isoformat()[0:19]
264
              + "end_datetime_utc: " + end_datetime_utc.isoformat()[0:19])
265
266
    except Exception as e:
267
        error_string = "Error in step 4.2 of store_energy_input_item.worker " + str(e)
268
        if cursor_energy_db:
269
            cursor_energy_db.close()
270
        if cnx_energy_db:
271
            cnx_energy_db.close()
272
        print(error_string)
273
        return error_string
274
275
    ####################################################################################################################
276
    # Step 5: for each meter in list, get energy input data from energy database
277
    ####################################################################################################################
278
    energy_meter_hourly = dict()
279
    try:
280
        if meter_list is not None and len(meter_list) > 0:
281
            for meter in meter_list:
282
                meter_id = str(meter['id'])
283
284
                query = (" SELECT start_datetime_utc, actual_value "
285
                         " FROM tbl_meter_hourly "
286
                         " WHERE meter_id = %s "
287
                         "       AND start_datetime_utc >= %s "
288
                         "       AND start_datetime_utc < %s "
289
                         " ORDER BY start_datetime_utc ")
290
                cursor_energy_db.execute(query, (meter_id, start_datetime_utc, end_datetime_utc,))
291
                rows_energy_values = cursor_energy_db.fetchall()
292
                if rows_energy_values is None or len(rows_energy_values) == 0:
293
                    energy_meter_hourly[meter_id] = None
294
                else:
295
                    energy_meter_hourly[meter_id] = dict()
296
                    for row_energy_value in rows_energy_values:
297
                        energy_meter_hourly[meter_id][row_energy_value[0]] = row_energy_value[1]
298
    except Exception as e:
299
        error_string = "Error in step 5.1 of store_energy_input_item.worker " + str(e)
300
        if cursor_energy_db:
301
            cursor_energy_db.close()
302
        if cnx_energy_db:
303
            cnx_energy_db.close()
304
        print(error_string)
305
        return error_string
306
307
    ####################################################################################################################
308
    # Step 6: for each virtual meter in list, get energy input data from energy database
309
    ####################################################################################################################
310
    energy_virtual_meter_hourly = dict()
311
    if virtual_meter_list is not None and len(virtual_meter_list) > 0:
312
        try:
313
            for virtual_meter in virtual_meter_list:
314
                virtual_meter_id = str(virtual_meter['id'])
315
316
                query = (" SELECT start_datetime_utc, actual_value "
317
                         " FROM tbl_virtual_meter_hourly "
318
                         " WHERE virtual_meter_id = %s "
319
                         "       AND start_datetime_utc >= %s "
320
                         "       AND start_datetime_utc < %s "
321
                         " ORDER BY start_datetime_utc ")
322
                cursor_energy_db.execute(query, (virtual_meter_id, start_datetime_utc, end_datetime_utc,))
323
                rows_energy_values = cursor_energy_db.fetchall()
324
                if rows_energy_values is None or len(rows_energy_values) == 0:
325
                    energy_virtual_meter_hourly[virtual_meter_id] = None
326
                else:
327
                    energy_virtual_meter_hourly[virtual_meter_id] = dict()
328
                    for row_energy_value in rows_energy_values:
329
                        energy_virtual_meter_hourly[virtual_meter_id][row_energy_value[0]] = row_energy_value[1]
330
        except Exception as e:
331
            error_string = "Error in step 6.1 of store_energy_input_item.worker " + str(e)
332
            if cursor_energy_db:
333
                cursor_energy_db.close()
334
            if cnx_energy_db:
335
                cnx_energy_db.close()
336
            print(error_string)
337
            return error_string
338
339
    ####################################################################################################################
340
    # Step 7: for each offline meter in list, get energy input data from energy database
341
    ####################################################################################################################
342
    energy_offline_meter_hourly = dict()
343
    if offline_meter_list is not None and len(offline_meter_list) > 0:
344
        try:
345
            for offline_meter in offline_meter_list:
346
                offline_meter_id = str(offline_meter['id'])
347
348
                query = (" SELECT start_datetime_utc, actual_value "
349
                         " FROM tbl_offline_meter_hourly "
350
                         " WHERE offline_meter_id = %s "
351
                         "       AND start_datetime_utc >= %s "
352
                         "       AND start_datetime_utc < %s "
353
                         " ORDER BY start_datetime_utc ")
354
                cursor_energy_db.execute(query, (offline_meter_id, start_datetime_utc, end_datetime_utc,))
355
                rows_energy_values = cursor_energy_db.fetchall()
356
                if rows_energy_values is None or len(rows_energy_values) == 0:
357
                    energy_offline_meter_hourly[offline_meter_id] = None
358
                else:
359
                    energy_offline_meter_hourly[offline_meter_id] = dict()
360
                    for row_energy_value in rows_energy_values:
361
                        energy_offline_meter_hourly[offline_meter_id][row_energy_value[0]] = row_energy_value[1]
362
363
        except Exception as e:
364
            error_string = "Error in step 7.1 of store_energy_input_item.worker " + str(e)
365
            if cursor_energy_db:
366
                cursor_energy_db.close()
367
            if cnx_energy_db:
368
                cnx_energy_db.close()
369
            print(error_string)
370
            return error_string
371
372
    ####################################################################################################################
373
    # Step 8: determine common time slot to aggregate
374
    ####################################################################################################################
375
376
    common_start_datetime_utc = start_datetime_utc
377
    common_end_datetime_utc = end_datetime_utc
378
379
    print("Getting common time slot of energy values for all meters")
380
    if energy_meter_hourly is not None and len(energy_meter_hourly) > 0:
381
        for meter_id, energy_hourly in energy_meter_hourly.items():
382
            if energy_hourly is None or len(energy_hourly) == 0:
383
                common_start_datetime_utc = None
384
                common_end_datetime_utc = None
385
                break
386
            else:
387
                if common_start_datetime_utc < min(energy_hourly.keys()):
388
                    common_start_datetime_utc = min(energy_hourly.keys())
389
                if common_end_datetime_utc > max(energy_hourly.keys()):
390
                    common_end_datetime_utc = max(energy_hourly.keys())
391
392
    print("Getting common time slot of energy values for all virtual meters")
393
    if common_start_datetime_utc is not None and common_end_datetime_utc is not None:
394
        if energy_virtual_meter_hourly is not None and len(energy_virtual_meter_hourly) > 0:
395
            for meter_id, energy_hourly in energy_virtual_meter_hourly.items():
396
                if energy_hourly is None or len(energy_hourly) == 0:
397
                    common_start_datetime_utc = None
398
                    common_end_datetime_utc = None
399
                    break
400
                else:
401
                    if common_start_datetime_utc < min(energy_hourly.keys()):
402
                        common_start_datetime_utc = min(energy_hourly.keys())
403
                    if common_end_datetime_utc > max(energy_hourly.keys()):
404
                        common_end_datetime_utc = max(energy_hourly.keys())
405
406
    print("Getting common time slot of energy values for all offline meters")
407
    if common_start_datetime_utc is not None and common_end_datetime_utc is not None:
408
        if energy_offline_meter_hourly is not None and len(energy_offline_meter_hourly) > 0:
409
            for meter_id, energy_hourly in energy_offline_meter_hourly.items():
410
                if energy_hourly is None or len(energy_hourly) == 0:
411
                    common_start_datetime_utc = None
412
                    common_end_datetime_utc = None
413
                    break
414
                else:
415
                    if common_start_datetime_utc < min(energy_hourly.keys()):
416
                        common_start_datetime_utc = min(energy_hourly.keys())
417
                    if common_end_datetime_utc > max(energy_hourly.keys()):
418
                        common_end_datetime_utc = max(energy_hourly.keys())
419
420
    if (energy_meter_hourly is None or len(energy_meter_hourly) == 0) and \
421
            (energy_virtual_meter_hourly is None or len(energy_virtual_meter_hourly) == 0) and \
422
            (energy_offline_meter_hourly is None or len(energy_offline_meter_hourly) == 0):
423
        # There isn't any energy data
424
        print("There isn't any energy data")
425
        # continue the for store loop to the next store
426
        print("continue the for store loop to the next store")
427
        if cursor_energy_db:
428
            cursor_energy_db.close()
429
        if cnx_energy_db:
430
            cnx_energy_db.close()
431
        return None
432
433
    print("common_start_datetime_utc: " + str(common_start_datetime_utc))
434
    print("common_end_datetime_utc: " + str(common_end_datetime_utc))
435
436
    ####################################################################################################################
437
    # Step 9: aggregate energy data in the common time slot by energy items and hourly
438
    ####################################################################################################################
439
440
    print("Step 9: aggregate energy data in the common time slot by energy items and hourly")
441
    aggregated_values = list()
442
    try:
443
        current_datetime_utc = common_start_datetime_utc
444
        while common_start_datetime_utc is not None \
445
                and common_end_datetime_utc is not None \
446
                and current_datetime_utc <= common_end_datetime_utc:
447
            aggregated_value = dict()
448
            aggregated_value['start_datetime_utc'] = current_datetime_utc
449
            aggregated_value['meta_data'] = dict()
450
451
            if meter_list is not None and len(meter_list) > 0:
452
                for meter in meter_list:
453
                    meter_id = str(meter['id'])
454
                    energy_item_id = meter['energy_item_id']
455
                    actual_value = energy_meter_hourly[meter_id].get(current_datetime_utc, Decimal(0.0))
456
                    aggregated_value['meta_data'][energy_item_id] = \
457
                        aggregated_value['meta_data'].get(energy_item_id, Decimal(0.0)) + actual_value
458
459
            if virtual_meter_list is not None and len(virtual_meter_list) > 0:
460
                for virtual_meter in virtual_meter_list:
461
                    virtual_meter_id = str(virtual_meter['id'])
462
                    energy_item_id = virtual_meter['energy_item_id']
463
                    actual_value = energy_virtual_meter_hourly[virtual_meter_id].get(current_datetime_utc, Decimal(0.0))
464
                    aggregated_value['meta_data'][energy_item_id] = \
465
                        aggregated_value['meta_data'].get(energy_item_id, Decimal(0.0)) + actual_value
466
467
            if offline_meter_list is not None and len(offline_meter_list) > 0:
468
                for offline_meter in offline_meter_list:
469
                    offline_meter_id = str(offline_meter['id'])
470
                    energy_item_id = offline_meter['energy_item_id']
471
                    actual_value = energy_offline_meter_hourly[offline_meter_id].get(current_datetime_utc, Decimal(0.0))
472
                    aggregated_value['meta_data'][energy_item_id] = \
473
                        aggregated_value['meta_data'].get(energy_item_id, Decimal(0.0)) + actual_value
474
475
            aggregated_values.append(aggregated_value)
476
477
            current_datetime_utc += timedelta(minutes=config.minutes_to_count)
478
479
    except Exception as e:
480
        error_string = "Error in step 9 of store_energy_input_item.worker " + str(e)
481
        if cursor_energy_db:
482
            cursor_energy_db.close()
483
        if cnx_energy_db:
484
            cnx_energy_db.close()
485
        print(error_string)
486
        return error_string
487
488
    ####################################################################################################################
489
    # Step 10: save energy data to energy database
490
    ####################################################################################################################
491
    print("Step 10: save energy data to energy database")
492
493
    while len(aggregated_values) > 0:
494
        insert_100 = aggregated_values[:100]
495
        aggregated_values = aggregated_values[100:]
496
        try:
497
            add_values = (" INSERT INTO tbl_store_input_item_hourly "
498
                          "             (store_id, "
499
                          "              energy_item_id, "
500
                          "              start_datetime_utc, "
501
                          "              actual_value) "
502
                          " VALUES  ")
503
504
            for aggregated_value in insert_100:
505
                for energy_item_id, actual_value in aggregated_value['meta_data'].items():
506
                    add_values += " (" + str(store['id']) + ","
507
                    add_values += " " + str(energy_item_id) + ","
508
                    add_values += "'" + aggregated_value['start_datetime_utc'].isoformat()[0:19] + "',"
509
                    add_values += str(actual_value) + "), "
510
            # print("add_values:" + add_values)
511
            # trim ", " at the end of string and then execute
512
            cursor_energy_db.execute(add_values[:-2])
513
            cnx_energy_db.commit()
514
515
        except Exception as e:
516
            error_string = "Error in step 10.1 of store_energy_input_item.worker " + str(e)
517
            print(error_string)
518
            if cursor_energy_db:
519
                cursor_energy_db.close()
520
            if cnx_energy_db:
521
                cnx_energy_db.close()
522
            return error_string
523
524
    if cursor_energy_db:
525
        cursor_energy_db.close()
526
    if cnx_energy_db:
527
        cnx_energy_db.close()
528
    return None
529

myems-aggregation/tenant_energy_input_item.py 1 location

@@ 108-528 (lines=421) @@
105
# NOTE: returns None or the error string because that the logger object cannot be passed in as parameter
106
########################################################################################################################
107
108
def worker(tenant):
109
    ####################################################################################################################
110
    # Step 1: get all input meters associated with the tenant
111
    ####################################################################################################################
112
    print("Step 1: get all input meters associated with the tenant " + str(tenant['name']))
113
114
    meter_list = list()
115
    cnx_system_db = None
116
    cursor_system_db = None
117
    try:
118
        cnx_system_db = mysql.connector.connect(**config.myems_system_db)
119
        cursor_system_db = cnx_system_db.cursor()
120
    except Exception as e:
121
        error_string = "Error in step 1.1 of tenant_energy_input_item.worker " + str(e)
122
        if cursor_system_db:
123
            cursor_system_db.close()
124
        if cnx_system_db:
125
            cnx_system_db.close()
126
        print(error_string)
127
        return error_string
128
129
    try:
130
        cursor_system_db.execute(" SELECT m.id, m.name, m.energy_item_id "
131
                                 " FROM tbl_meters m, tbl_tenants_meters tm "
132
                                 " WHERE m.id = tm.meter_id "
133
                                 "       AND m.is_counted = 1 "
134
                                 "       AND m.energy_item_id is NOT NULL "
135
                                 "       AND tm.tenant_id = %s ",
136
                                 (tenant['id'],))
137
        rows_meters = cursor_system_db.fetchall()
138
139
        if rows_meters is not None and len(rows_meters) > 0:
140
            for row in rows_meters:
141
                meter_list.append({"id": row[0],
142
                                   "name": row[1],
143
                                   "energy_item_id": row[2]})
144
145
    except Exception as e:
146
        error_string = "Error in step 1.2 of tenant_energy_input_item.worker " + str(e)
147
        if cursor_system_db:
148
            cursor_system_db.close()
149
        if cnx_system_db:
150
            cnx_system_db.close()
151
        print(error_string)
152
        return error_string
153
154
    ####################################################################################################################
155
    # Step 2: get all input virtual meters associated with the tenant
156
    ####################################################################################################################
157
    print("Step 2: get all input virtual meters associated with the tenant")
158
    virtual_meter_list = list()
159
160
    try:
161
        cursor_system_db.execute(" SELECT m.id, m.name, m.energy_item_id "
162
                                 " FROM tbl_virtual_meters m, tbl_tenants_virtual_meters tm "
163
                                 " WHERE m.id = tm.virtual_meter_id "
164
                                 "       AND m.energy_item_id is NOT NULL "
165
                                 "       AND m.is_counted = 1 "
166
                                 "       AND tm.tenant_id = %s ",
167
                                 (tenant['id'],))
168
        rows_virtual_meters = cursor_system_db.fetchall()
169
170
        if rows_virtual_meters is not None and len(rows_virtual_meters) > 0:
171
            for row in rows_virtual_meters:
172
                virtual_meter_list.append({"id": row[0],
173
                                           "name": row[1],
174
                                           "energy_item_id": row[2]})
175
176
    except Exception as e:
177
        error_string = "Error in step 2.1 of tenant_energy_input_item.worker " + str(e)
178
        if cursor_system_db:
179
            cursor_system_db.close()
180
        if cnx_system_db:
181
            cnx_system_db.close()
182
        print(error_string)
183
        return error_string
184
185
    ####################################################################################################################
186
    # Step 3: get all input offline meters associated with the tenant
187
    ####################################################################################################################
188
    print("Step 3: get all input offline meters associated with the tenant")
189
190
    offline_meter_list = list()
191
192
    try:
193
        cursor_system_db.execute(" SELECT m.id, m.name, m.energy_item_id "
194
                                 " FROM tbl_offline_meters m, tbl_tenants_offline_meters tm "
195
                                 " WHERE m.id = tm.offline_meter_id "
196
                                 "       AND m.energy_item_id is NOT NULL "
197
                                 "       AND m.is_counted = 1 "
198
                                 "       AND tm.tenant_id = %s ",
199
                                 (tenant['id'],))
200
        rows_offline_meters = cursor_system_db.fetchall()
201
202
        if rows_offline_meters is not None and len(rows_offline_meters) > 0:
203
            for row in rows_offline_meters:
204
                offline_meter_list.append({"id": row[0],
205
                                           "name": row[1],
206
                                           "energy_item_id": row[2]})
207
208
    except Exception as e:
209
        error_string = "Error in step 3.1 of tenant_energy_input_item.worker " + str(e)
210
        print(error_string)
211
        return error_string
212
    finally:
213
        if cursor_system_db:
214
            cursor_system_db.close()
215
        if cnx_system_db:
216
            cnx_system_db.close()
217
218
    ####################################################################################################################
219
    # stop to the next tenant if this tenant is empty
220
    ####################################################################################################################
221
    if (meter_list is None or len(meter_list) == 0) and \
222
            (virtual_meter_list is None or len(virtual_meter_list) == 0) and \
223
            (offline_meter_list is None or len(offline_meter_list) == 0):
224
        print("This is an empty tenant ")
225
        return None
226
227
    ####################################################################################################################
228
    # Step 4: determine start datetime and end datetime to aggregate
229
    ####################################################################################################################
230
    print("Step 4: determine start datetime and end datetime to aggregate")
231
    cnx_energy_db = None
232
    cursor_energy_db = None
233
    try:
234
        cnx_energy_db = mysql.connector.connect(**config.myems_energy_db)
235
        cursor_energy_db = cnx_energy_db.cursor()
236
    except Exception as e:
237
        error_string = "Error in step 4.1 of tenant_energy_input_item.worker " + str(e)
238
        if cursor_energy_db:
239
            cursor_energy_db.close()
240
        if cnx_energy_db:
241
            cnx_energy_db.close()
242
        print(error_string)
243
        return error_string
244
245
    try:
246
        query = (" SELECT MAX(start_datetime_utc) "
247
                 " FROM tbl_tenant_input_item_hourly "
248
                 " WHERE tenant_id = %s ")
249
        cursor_energy_db.execute(query, (tenant['id'],))
250
        row_datetime = cursor_energy_db.fetchone()
251
        start_datetime_utc = datetime.strptime(config.start_datetime_utc, '%Y-%m-%d %H:%M:%S')
252
        start_datetime_utc = start_datetime_utc.replace(minute=0, second=0, microsecond=0, tzinfo=None)
253
254
        if row_datetime is not None and len(row_datetime) > 0 and isinstance(row_datetime[0], datetime):
255
            # replace second and microsecond with 0
256
            # note: do not replace minute in case of calculating in half hourly
257
            start_datetime_utc = row_datetime[0].replace(second=0, microsecond=0, tzinfo=None)
258
            # start from the next time slot
259
            start_datetime_utc += timedelta(minutes=config.minutes_to_count)
260
261
        end_datetime_utc = datetime.utcnow().replace(second=0, microsecond=0, tzinfo=None)
262
263
        print("start_datetime_utc: " + start_datetime_utc.isoformat()[0:19]
264
              + "end_datetime_utc: " + end_datetime_utc.isoformat()[0:19])
265
266
    except Exception as e:
267
        error_string = "Error in step 4.2 of tenant_energy_input_item.worker " + str(e)
268
        if cursor_energy_db:
269
            cursor_energy_db.close()
270
        if cnx_energy_db:
271
            cnx_energy_db.close()
272
        print(error_string)
273
        return error_string
274
275
    ####################################################################################################################
276
    # Step 5: for each meter in list, get energy input data from energy database
277
    ####################################################################################################################
278
    energy_meter_hourly = dict()
279
    try:
280
        if meter_list is not None and len(meter_list) > 0:
281
            for meter in meter_list:
282
                meter_id = str(meter['id'])
283
284
                query = (" SELECT start_datetime_utc, actual_value "
285
                         " FROM tbl_meter_hourly "
286
                         " WHERE meter_id = %s "
287
                         "       AND start_datetime_utc >= %s "
288
                         "       AND start_datetime_utc < %s "
289
                         " ORDER BY start_datetime_utc ")
290
                cursor_energy_db.execute(query, (meter_id, start_datetime_utc, end_datetime_utc,))
291
                rows_energy_values = cursor_energy_db.fetchall()
292
                if rows_energy_values is None or len(rows_energy_values) == 0:
293
                    energy_meter_hourly[meter_id] = None
294
                else:
295
                    energy_meter_hourly[meter_id] = dict()
296
                    for row_energy_value in rows_energy_values:
297
                        energy_meter_hourly[meter_id][row_energy_value[0]] = row_energy_value[1]
298
    except Exception as e:
299
        error_string = "Error in step 5.1 of tenant_energy_input_item.worker " + str(e)
300
        if cursor_energy_db:
301
            cursor_energy_db.close()
302
        if cnx_energy_db:
303
            cnx_energy_db.close()
304
        print(error_string)
305
        return error_string
306
307
    ####################################################################################################################
308
    # Step 6: for each virtual meter in list, get energy input data from energy database
309
    ####################################################################################################################
310
    energy_virtual_meter_hourly = dict()
311
    if virtual_meter_list is not None and len(virtual_meter_list) > 0:
312
        try:
313
            for virtual_meter in virtual_meter_list:
314
                virtual_meter_id = str(virtual_meter['id'])
315
316
                query = (" SELECT start_datetime_utc, actual_value "
317
                         " FROM tbl_virtual_meter_hourly "
318
                         " WHERE virtual_meter_id = %s "
319
                         "       AND start_datetime_utc >= %s "
320
                         "       AND start_datetime_utc < %s "
321
                         " ORDER BY start_datetime_utc ")
322
                cursor_energy_db.execute(query, (virtual_meter_id, start_datetime_utc, end_datetime_utc,))
323
                rows_energy_values = cursor_energy_db.fetchall()
324
                if rows_energy_values is None or len(rows_energy_values) == 0:
325
                    energy_virtual_meter_hourly[virtual_meter_id] = None
326
                else:
327
                    energy_virtual_meter_hourly[virtual_meter_id] = dict()
328
                    for row_energy_value in rows_energy_values:
329
                        energy_virtual_meter_hourly[virtual_meter_id][row_energy_value[0]] = row_energy_value[1]
330
        except Exception as e:
331
            error_string = "Error in step 6.1 of tenant_energy_input_item.worker " + str(e)
332
            if cursor_energy_db:
333
                cursor_energy_db.close()
334
            if cnx_energy_db:
335
                cnx_energy_db.close()
336
            print(error_string)
337
            return error_string
338
339
    ####################################################################################################################
340
    # Step 7: for each offline meter in list, get energy input data from energy database
341
    ####################################################################################################################
342
    energy_offline_meter_hourly = dict()
343
    if offline_meter_list is not None and len(offline_meter_list) > 0:
344
        try:
345
            for offline_meter in offline_meter_list:
346
                offline_meter_id = str(offline_meter['id'])
347
348
                query = (" SELECT start_datetime_utc, actual_value "
349
                         " FROM tbl_offline_meter_hourly "
350
                         " WHERE offline_meter_id = %s "
351
                         "       AND start_datetime_utc >= %s "
352
                         "       AND start_datetime_utc < %s "
353
                         " ORDER BY start_datetime_utc ")
354
                cursor_energy_db.execute(query, (offline_meter_id, start_datetime_utc, end_datetime_utc,))
355
                rows_energy_values = cursor_energy_db.fetchall()
356
                if rows_energy_values is None or len(rows_energy_values) == 0:
357
                    energy_offline_meter_hourly[offline_meter_id] = None
358
                else:
359
                    energy_offline_meter_hourly[offline_meter_id] = dict()
360
                    for row_energy_value in rows_energy_values:
361
                        energy_offline_meter_hourly[offline_meter_id][row_energy_value[0]] = row_energy_value[1]
362
363
        except Exception as e:
364
            error_string = "Error in step 7.1 of tenant_energy_input_item.worker " + str(e)
365
            if cursor_energy_db:
366
                cursor_energy_db.close()
367
            if cnx_energy_db:
368
                cnx_energy_db.close()
369
            print(error_string)
370
            return error_string
371
372
    ####################################################################################################################
373
    # Step 8: determine common time slot to aggregate
374
    ####################################################################################################################
375
376
    common_start_datetime_utc = start_datetime_utc
377
    common_end_datetime_utc = end_datetime_utc
378
379
    print("Getting common time slot of energy values for all meters")
380
    if energy_meter_hourly is not None and len(energy_meter_hourly) > 0:
381
        for meter_id, energy_hourly in energy_meter_hourly.items():
382
            if energy_hourly is None or len(energy_hourly) == 0:
383
                common_start_datetime_utc = None
384
                common_end_datetime_utc = None
385
                break
386
            else:
387
                if common_start_datetime_utc < min(energy_hourly.keys()):
388
                    common_start_datetime_utc = min(energy_hourly.keys())
389
                if common_end_datetime_utc > max(energy_hourly.keys()):
390
                    common_end_datetime_utc = max(energy_hourly.keys())
391
392
    print("Getting common time slot of energy values for all virtual meters")
393
    if common_start_datetime_utc is not None and common_end_datetime_utc is not None:
394
        if energy_virtual_meter_hourly is not None and len(energy_virtual_meter_hourly) > 0:
395
            for meter_id, energy_hourly in energy_virtual_meter_hourly.items():
396
                if energy_hourly is None or len(energy_hourly) == 0:
397
                    common_start_datetime_utc = None
398
                    common_end_datetime_utc = None
399
                    break
400
                else:
401
                    if common_start_datetime_utc < min(energy_hourly.keys()):
402
                        common_start_datetime_utc = min(energy_hourly.keys())
403
                    if common_end_datetime_utc > max(energy_hourly.keys()):
404
                        common_end_datetime_utc = max(energy_hourly.keys())
405
406
    print("Getting common time slot of energy values for all offline meters")
407
    if common_start_datetime_utc is not None and common_end_datetime_utc is not None:
408
        if energy_offline_meter_hourly is not None and len(energy_offline_meter_hourly) > 0:
409
            for meter_id, energy_hourly in energy_offline_meter_hourly.items():
410
                if energy_hourly is None or len(energy_hourly) == 0:
411
                    common_start_datetime_utc = None
412
                    common_end_datetime_utc = None
413
                    break
414
                else:
415
                    if common_start_datetime_utc < min(energy_hourly.keys()):
416
                        common_start_datetime_utc = min(energy_hourly.keys())
417
                    if common_end_datetime_utc > max(energy_hourly.keys()):
418
                        common_end_datetime_utc = max(energy_hourly.keys())
419
420
    if (energy_meter_hourly is None or len(energy_meter_hourly) == 0) and \
421
            (energy_virtual_meter_hourly is None or len(energy_virtual_meter_hourly) == 0) and \
422
            (energy_offline_meter_hourly is None or len(energy_offline_meter_hourly) == 0):
423
        # There isn't any energy data
424
        print("There isn't any energy data")
425
        # continue the for tenant loop to the next tenant
426
        print("continue the for tenant loop to the next tenant")
427
        if cursor_energy_db:
428
            cursor_energy_db.close()
429
        if cnx_energy_db:
430
            cnx_energy_db.close()
431
        return None
432
433
    print("common_start_datetime_utc: " + str(common_start_datetime_utc))
434
    print("common_end_datetime_utc: " + str(common_end_datetime_utc))
435
436
    ####################################################################################################################
437
    # Step 9: aggregate energy data in the common time slot by energy items and hourly
438
    ####################################################################################################################
439
440
    print("Step 9: aggregate energy data in the common time slot by energy items and hourly")
441
    aggregated_values = list()
442
    try:
443
        current_datetime_utc = common_start_datetime_utc
444
        while common_start_datetime_utc is not None \
445
                and common_end_datetime_utc is not None \
446
                and current_datetime_utc <= common_end_datetime_utc:
447
            aggregated_value = dict()
448
            aggregated_value['start_datetime_utc'] = current_datetime_utc
449
            aggregated_value['meta_data'] = dict()
450
451
            if meter_list is not None and len(meter_list) > 0:
452
                for meter in meter_list:
453
                    meter_id = str(meter['id'])
454
                    energy_item_id = meter['energy_item_id']
455
                    actual_value = energy_meter_hourly[meter_id].get(current_datetime_utc, Decimal(0.0))
456
                    aggregated_value['meta_data'][energy_item_id] = \
457
                        aggregated_value['meta_data'].get(energy_item_id, Decimal(0.0)) + actual_value
458
459
            if virtual_meter_list is not None and len(virtual_meter_list) > 0:
460
                for virtual_meter in virtual_meter_list:
461
                    virtual_meter_id = str(virtual_meter['id'])
462
                    energy_item_id = virtual_meter['energy_item_id']
463
                    actual_value = energy_virtual_meter_hourly[virtual_meter_id].get(current_datetime_utc, Decimal(0.0))
464
                    aggregated_value['meta_data'][energy_item_id] = \
465
                        aggregated_value['meta_data'].get(energy_item_id, Decimal(0.0)) + actual_value
466
467
            if offline_meter_list is not None and len(offline_meter_list) > 0:
468
                for offline_meter in offline_meter_list:
469
                    offline_meter_id = str(offline_meter['id'])
470
                    energy_item_id = offline_meter['energy_item_id']
471
                    actual_value = energy_offline_meter_hourly[offline_meter_id].get(current_datetime_utc, Decimal(0.0))
472
                    aggregated_value['meta_data'][energy_item_id] = \
473
                        aggregated_value['meta_data'].get(energy_item_id, Decimal(0.0)) + actual_value
474
475
            aggregated_values.append(aggregated_value)
476
477
            current_datetime_utc += timedelta(minutes=config.minutes_to_count)
478
479
    except Exception as e:
480
        error_string = "Error in step 9 of tenant_energy_input_item.worker " + str(e)
481
        if cursor_energy_db:
482
            cursor_energy_db.close()
483
        if cnx_energy_db:
484
            cnx_energy_db.close()
485
        print(error_string)
486
        return error_string
487
488
    ####################################################################################################################
489
    # Step 10: save energy data to energy database
490
    ####################################################################################################################
491
    print("Step 10: save energy data to energy database")
492
493
    while len(aggregated_values) > 0:
494
        insert_100 = aggregated_values[:100]
495
        aggregated_values = aggregated_values[100:]
496
        try:
497
            add_values = (" INSERT INTO tbl_tenant_input_item_hourly "
498
                          "             (tenant_id, "
499
                          "              energy_item_id, "
500
                          "              start_datetime_utc, "
501
                          "              actual_value) "
502
                          " VALUES  ")
503
504
            for aggregated_value in insert_100:
505
                for energy_item_id, actual_value in aggregated_value['meta_data'].items():
506
                    add_values += " (" + str(tenant['id']) + ","
507
                    add_values += " " + str(energy_item_id) + ","
508
                    add_values += "'" + aggregated_value['start_datetime_utc'].isoformat()[0:19] + "',"
509
                    add_values += str(actual_value) + "), "
510
            # print("add_values:" + add_values)
511
            # trim ", " at the end of string and then execute
512
            cursor_energy_db.execute(add_values[:-2])
513
            cnx_energy_db.commit()
514
515
        except Exception as e:
516
            error_string = "Error in step 10.1 of tenant_energy_input_item.worker " + str(e)
517
            print(error_string)
518
            if cursor_energy_db:
519
                cursor_energy_db.close()
520
            if cnx_energy_db:
521
                cnx_energy_db.close()
522
            return error_string
523
524
    if cursor_energy_db:
525
        cursor_energy_db.close()
526
    if cnx_energy_db:
527
        cnx_energy_db.close()
528
    return None
529

myems-aggregation/equipment_energy_output_category.py 1 location

@@ 108-528 (lines=421) @@
105
# NOTE: returns None or the error string because that the logger object cannot be passed in as parameter
106
########################################################################################################################
107
108
def worker(equipment):
109
    ####################################################################################################################
110
    # Step 1: get all output meters associated with the equipment
111
    ####################################################################################################################
112
    print("Step 1: get all output meters associated with the equipment " + str(equipment['name']))
113
114
    meter_list = list()
115
    cnx_system_db = None
116
    cursor_system_db = None
117
    try:
118
        cnx_system_db = mysql.connector.connect(**config.myems_system_db)
119
        cursor_system_db = cnx_system_db.cursor()
120
    except Exception as e:
121
        error_string = "Error in step 1.1 of equipment_energy_output_category.worker " + str(e)
122
        if cursor_system_db:
123
            cursor_system_db.close()
124
        if cnx_system_db:
125
            cnx_system_db.close()
126
        print(error_string)
127
        return error_string
128
129
    try:
130
        cursor_system_db.execute(" SELECT m.id, m.name, m.energy_category_id "
131
                                 " FROM tbl_meters m, tbl_equipments_meters em "
132
                                 " WHERE m.id = em.meter_id "
133
                                 "       AND m.is_counted = 1 "
134
                                 "       AND em.is_output = 1 "
135
                                 "       AND em.equipment_id = %s ",
136
                                 (equipment['id'],))
137
        rows_meters = cursor_system_db.fetchall()
138
139
        if rows_meters is not None and len(rows_meters) > 0:
140
            for row in rows_meters:
141
                meter_list.append({"id": row[0],
142
                                   "name": row[1],
143
                                   "energy_category_id": row[2]})
144
145
    except Exception as e:
146
        error_string = "Error in step 1.2 of equipment_energy_output_category.worker " + str(e)
147
        if cursor_system_db:
148
            cursor_system_db.close()
149
        if cnx_system_db:
150
            cnx_system_db.close()
151
        print(error_string)
152
        return error_string
153
154
    ####################################################################################################################
155
    # Step 2: get all output virtual meters associated with the equipment
156
    ####################################################################################################################
157
    print("Step 2: get all output virtual meters associated with the equipment")
158
    virtual_meter_list = list()
159
160
    try:
161
        cursor_system_db.execute(" SELECT m.id, m.name, m.energy_category_id "
162
                                 " FROM tbl_virtual_meters m, tbl_equipments_virtual_meters em "
163
                                 " WHERE m.id = em.virtual_meter_id "
164
                                 "       AND m.is_counted = 1 "
165
                                 "       AND em.is_output = 1 "
166
                                 "       AND em.equipment_id = %s ",
167
                                 (equipment['id'],))
168
        rows_virtual_meters = cursor_system_db.fetchall()
169
170
        if rows_virtual_meters is not None and len(rows_virtual_meters) > 0:
171
            for row in rows_virtual_meters:
172
                virtual_meter_list.append({"id": row[0],
173
                                           "name": row[1],
174
                                           "energy_category_id": row[2]})
175
176
    except Exception as e:
177
        error_string = "Error in step 2.1 of equipment_energy_output_category.worker " + str(e)
178
        if cursor_system_db:
179
            cursor_system_db.close()
180
        if cnx_system_db:
181
            cnx_system_db.close()
182
        print(error_string)
183
        return error_string
184
185
    ####################################################################################################################
186
    # Step 3: get all output offline meters associated with the equipment
187
    ####################################################################################################################
188
    print("Step 3: get all output offline meters associated with the equipment")
189
190
    offline_meter_list = list()
191
192
    try:
193
        cursor_system_db.execute(" SELECT m.id, m.name, m.energy_category_id "
194
                                 " FROM tbl_offline_meters m, tbl_equipments_offline_meters em "
195
                                 " WHERE m.id = em.offline_meter_id "
196
                                 "       AND m.is_counted = 1 "
197
                                 "       AND em.is_output = 1 "
198
                                 "       AND em.equipment_id = %s ",
199
                                 (equipment['id'],))
200
        rows_offline_meters = cursor_system_db.fetchall()
201
202
        if rows_offline_meters is not None and len(rows_offline_meters) > 0:
203
            for row in rows_offline_meters:
204
                offline_meter_list.append({"id": row[0],
205
                                           "name": row[1],
206
                                           "energy_category_id": row[2]})
207
208
    except Exception as e:
209
        error_string = "Error in step 3.1 of equipment_energy_output_category.worker " + str(e)
210
        print(error_string)
211
        return error_string
212
    finally:
213
        if cursor_system_db:
214
            cursor_system_db.close()
215
        if cnx_system_db:
216
            cnx_system_db.close()
217
218
    ####################################################################################################################
219
    # stop to the next equipment if this equipment is empty
220
    ####################################################################################################################
221
    if (meter_list is None or len(meter_list) == 0) and \
222
            (virtual_meter_list is None or len(virtual_meter_list) == 0) and \
223
            (offline_meter_list is None or len(offline_meter_list) == 0):
224
        print("This is an empty equipment ")
225
        return None
226
227
    ####################################################################################################################
228
    # Step 4: determine start datetime and end datetime to aggregate
229
    ####################################################################################################################
230
    print("Step 4: determine start datetime and end datetime to aggregate")
231
    cnx_energy_db = None
232
    cursor_energy_db = None
233
    try:
234
        cnx_energy_db = mysql.connector.connect(**config.myems_energy_db)
235
        cursor_energy_db = cnx_energy_db.cursor()
236
    except Exception as e:
237
        error_string = "Error in step 4.1 of equipment_energy_output_category.worker " + str(e)
238
        if cursor_energy_db:
239
            cursor_energy_db.close()
240
        if cnx_energy_db:
241
            cnx_energy_db.close()
242
        print(error_string)
243
        return error_string
244
245
    try:
246
        query = (" SELECT MAX(start_datetime_utc) "
247
                 " FROM tbl_equipment_output_category_hourly "
248
                 " WHERE equipment_id = %s ")
249
        cursor_energy_db.execute(query, (equipment['id'],))
250
        row_datetime = cursor_energy_db.fetchone()
251
        start_datetime_utc = datetime.strptime(config.start_datetime_utc, '%Y-%m-%d %H:%M:%S')
252
        start_datetime_utc = start_datetime_utc.replace(minute=0, second=0, microsecond=0, tzinfo=None)
253
254
        if row_datetime is not None and len(row_datetime) > 0 and isinstance(row_datetime[0], datetime):
255
            # replace second and microsecond with 0
256
            # note: do not replace minute in case of calculating in half hourly
257
            start_datetime_utc = row_datetime[0].replace(second=0, microsecond=0, tzinfo=None)
258
            # start from the next time slot
259
            start_datetime_utc += timedelta(minutes=config.minutes_to_count)
260
261
        end_datetime_utc = datetime.utcnow().replace(second=0, microsecond=0, tzinfo=None)
262
263
        print("start_datetime_utc: " + start_datetime_utc.isoformat()[0:19]
264
              + "end_datetime_utc: " + end_datetime_utc.isoformat()[0:19])
265
266
    except Exception as e:
267
        error_string = "Error in step 4.2 of equipment_energy_output_category.worker " + str(e)
268
        if cursor_energy_db:
269
            cursor_energy_db.close()
270
        if cnx_energy_db:
271
            cnx_energy_db.close()
272
        print(error_string)
273
        return error_string
274
275
    ####################################################################################################################
276
    # Step 5: for each meter in list, get energy output data from energy database
277
    ####################################################################################################################
278
    energy_meter_hourly = dict()
279
    try:
280
        if meter_list is not None and len(meter_list) > 0:
281
            for meter in meter_list:
282
                meter_id = str(meter['id'])
283
284
                query = (" SELECT start_datetime_utc, actual_value "
285
                         " FROM tbl_meter_hourly "
286
                         " WHERE meter_id = %s "
287
                         "       AND start_datetime_utc >= %s "
288
                         "       AND start_datetime_utc < %s "
289
                         " ORDER BY start_datetime_utc ")
290
                cursor_energy_db.execute(query, (meter_id, start_datetime_utc, end_datetime_utc,))
291
                rows_energy_values = cursor_energy_db.fetchall()
292
                if rows_energy_values is None or len(rows_energy_values) == 0:
293
                    energy_meter_hourly[meter_id] = None
294
                else:
295
                    energy_meter_hourly[meter_id] = dict()
296
                    for row_energy_value in rows_energy_values:
297
                        energy_meter_hourly[meter_id][row_energy_value[0]] = row_energy_value[1]
298
    except Exception as e:
299
        error_string = "Error in step 5.1 of equipment_energy_output_category.worker " + str(e)
300
        if cursor_energy_db:
301
            cursor_energy_db.close()
302
        if cnx_energy_db:
303
            cnx_energy_db.close()
304
        print(error_string)
305
        return error_string
306
307
    ####################################################################################################################
308
    # Step 6: for each virtual meter in list, get energy output data from energy database
309
    ####################################################################################################################
310
    energy_virtual_meter_hourly = dict()
311
    if virtual_meter_list is not None and len(virtual_meter_list) > 0:
312
        try:
313
            for virtual_meter in virtual_meter_list:
314
                virtual_meter_id = str(virtual_meter['id'])
315
316
                query = (" SELECT start_datetime_utc, actual_value "
317
                         " FROM tbl_virtual_meter_hourly "
318
                         " WHERE virtual_meter_id = %s "
319
                         "       AND start_datetime_utc >= %s "
320
                         "       AND start_datetime_utc < %s "
321
                         " ORDER BY start_datetime_utc ")
322
                cursor_energy_db.execute(query, (virtual_meter_id, start_datetime_utc, end_datetime_utc,))
323
                rows_energy_values = cursor_energy_db.fetchall()
324
                if rows_energy_values is None or len(rows_energy_values) == 0:
325
                    energy_virtual_meter_hourly[virtual_meter_id] = None
326
                else:
327
                    energy_virtual_meter_hourly[virtual_meter_id] = dict()
328
                    for row_energy_value in rows_energy_values:
329
                        energy_virtual_meter_hourly[virtual_meter_id][row_energy_value[0]] = row_energy_value[1]
330
        except Exception as e:
331
            error_string = "Error in step 6.1 of equipment_energy_output_category.worker " + str(e)
332
            if cursor_energy_db:
333
                cursor_energy_db.close()
334
            if cnx_energy_db:
335
                cnx_energy_db.close()
336
            print(error_string)
337
            return error_string
338
339
    ####################################################################################################################
340
    # Step 7: for each offline meter in list, get energy output data from energy database
341
    ####################################################################################################################
342
    energy_offline_meter_hourly = dict()
343
    if offline_meter_list is not None and len(offline_meter_list) > 0:
344
        try:
345
            for offline_meter in offline_meter_list:
346
                offline_meter_id = str(offline_meter['id'])
347
348
                query = (" SELECT start_datetime_utc, actual_value "
349
                         " FROM tbl_offline_meter_hourly "
350
                         " WHERE offline_meter_id = %s "
351
                         "       AND start_datetime_utc >= %s "
352
                         "       AND start_datetime_utc < %s "
353
                         " ORDER BY start_datetime_utc ")
354
                cursor_energy_db.execute(query, (offline_meter_id, start_datetime_utc, end_datetime_utc,))
355
                rows_energy_values = cursor_energy_db.fetchall()
356
                if rows_energy_values is None or len(rows_energy_values) == 0:
357
                    energy_offline_meter_hourly[offline_meter_id] = None
358
                else:
359
                    energy_offline_meter_hourly[offline_meter_id] = dict()
360
                    for row_energy_value in rows_energy_values:
361
                        energy_offline_meter_hourly[offline_meter_id][row_energy_value[0]] = row_energy_value[1]
362
363
        except Exception as e:
364
            error_string = "Error in step 7.1 of equipment_energy_output_category.worker " + str(e)
365
            if cursor_energy_db:
366
                cursor_energy_db.close()
367
            if cnx_energy_db:
368
                cnx_energy_db.close()
369
            print(error_string)
370
            return error_string
371
372
    ####################################################################################################################
373
    # Step 8: determine common time slot to aggregate
374
    ####################################################################################################################
375
376
    common_start_datetime_utc = start_datetime_utc
377
    common_end_datetime_utc = end_datetime_utc
378
379
    print("Getting common time slot of energy values for all meters")
380
    if energy_meter_hourly is not None and len(energy_meter_hourly) > 0:
381
        for meter_id, energy_hourly in energy_meter_hourly.items():
382
            if energy_hourly is None or len(energy_hourly) == 0:
383
                common_start_datetime_utc = None
384
                common_end_datetime_utc = None
385
                break
386
            else:
387
                if common_start_datetime_utc < min(energy_hourly.keys()):
388
                    common_start_datetime_utc = min(energy_hourly.keys())
389
                if common_end_datetime_utc > max(energy_hourly.keys()):
390
                    common_end_datetime_utc = max(energy_hourly.keys())
391
392
    print("Getting common time slot of energy values for all virtual meters")
393
    if common_start_datetime_utc is not None and common_end_datetime_utc is not None:
394
        if energy_virtual_meter_hourly is not None and len(energy_virtual_meter_hourly) > 0:
395
            for meter_id, energy_hourly in energy_virtual_meter_hourly.items():
396
                if energy_hourly is None or len(energy_hourly) == 0:
397
                    common_start_datetime_utc = None
398
                    common_end_datetime_utc = None
399
                    break
400
                else:
401
                    if common_start_datetime_utc < min(energy_hourly.keys()):
402
                        common_start_datetime_utc = min(energy_hourly.keys())
403
                    if common_end_datetime_utc > max(energy_hourly.keys()):
404
                        common_end_datetime_utc = max(energy_hourly.keys())
405
406
    print("Getting common time slot of energy values for all offline meters")
407
    if common_start_datetime_utc is not None and common_end_datetime_utc is not None:
408
        if energy_offline_meter_hourly is not None and len(energy_offline_meter_hourly) > 0:
409
            for meter_id, energy_hourly in energy_offline_meter_hourly.items():
410
                if energy_hourly is None or len(energy_hourly) == 0:
411
                    common_start_datetime_utc = None
412
                    common_end_datetime_utc = None
413
                    break
414
                else:
415
                    if common_start_datetime_utc < min(energy_hourly.keys()):
416
                        common_start_datetime_utc = min(energy_hourly.keys())
417
                    if common_end_datetime_utc > max(energy_hourly.keys()):
418
                        common_end_datetime_utc = max(energy_hourly.keys())
419
420
    if (energy_meter_hourly is None or len(energy_meter_hourly) == 0) and \
421
            (energy_virtual_meter_hourly is None or len(energy_virtual_meter_hourly) == 0) and \
422
            (energy_offline_meter_hourly is None or len(energy_offline_meter_hourly) == 0):
423
        # There isn't any energy data
424
        print("There isn't any energy data")
425
        # continue the for equipment loop to the next equipment
426
        print("continue the for equipment loop to the next equipment")
427
        if cursor_energy_db:
428
            cursor_energy_db.close()
429
        if cnx_energy_db:
430
            cnx_energy_db.close()
431
        return None
432
433
    print("common_start_datetime_utc: " + str(common_start_datetime_utc))
434
    print("common_end_datetime_utc: " + str(common_end_datetime_utc))
435
436
    ####################################################################################################################
437
    # Step 9: aggregate energy data in the common time slot by energy categories and hourly
438
    ####################################################################################################################
439
440
    print("Step 9: aggregate energy data in the common time slot by energy categories and hourly")
441
    aggregated_values = list()
442
    try:
443
        current_datetime_utc = common_start_datetime_utc
444
        while common_start_datetime_utc is not None \
445
                and common_end_datetime_utc is not None \
446
                and current_datetime_utc <= common_end_datetime_utc:
447
            aggregated_value = dict()
448
            aggregated_value['start_datetime_utc'] = current_datetime_utc
449
            aggregated_value['meta_data'] = dict()
450
451
            if meter_list is not None and len(meter_list) > 0:
452
                for meter in meter_list:
453
                    meter_id = str(meter['id'])
454
                    energy_category_id = meter['energy_category_id']
455
                    actual_value = energy_meter_hourly[meter_id].get(current_datetime_utc, Decimal(0.0))
456
                    aggregated_value['meta_data'][energy_category_id] = \
457
                        aggregated_value['meta_data'].get(energy_category_id, Decimal(0.0)) + actual_value
458
459
            if virtual_meter_list is not None and len(virtual_meter_list) > 0:
460
                for virtual_meter in virtual_meter_list:
461
                    virtual_meter_id = str(virtual_meter['id'])
462
                    energy_category_id = virtual_meter['energy_category_id']
463
                    actual_value = energy_virtual_meter_hourly[virtual_meter_id].get(current_datetime_utc, Decimal(0.0))
464
                    aggregated_value['meta_data'][energy_category_id] = \
465
                        aggregated_value['meta_data'].get(energy_category_id, Decimal(0.0)) + actual_value
466
467
            if offline_meter_list is not None and len(offline_meter_list) > 0:
468
                for offline_meter in offline_meter_list:
469
                    offline_meter_id = str(offline_meter['id'])
470
                    energy_category_id = offline_meter['energy_category_id']
471
                    actual_value = energy_offline_meter_hourly[offline_meter_id].get(current_datetime_utc, Decimal(0.0))
472
                    aggregated_value['meta_data'][energy_category_id] = \
473
                        aggregated_value['meta_data'].get(energy_category_id, Decimal(0.0)) + actual_value
474
475
            aggregated_values.append(aggregated_value)
476
477
            current_datetime_utc += timedelta(minutes=config.minutes_to_count)
478
479
    except Exception as e:
480
        error_string = "Error in step 9 of equipment_energy_output_category.worker " + str(e)
481
        if cursor_energy_db:
482
            cursor_energy_db.close()
483
        if cnx_energy_db:
484
            cnx_energy_db.close()
485
        print(error_string)
486
        return error_string
487
488
    ####################################################################################################################
489
    # Step 10: save energy data to energy database
490
    ####################################################################################################################
491
    print("Step 10: save energy data to energy database")
492
493
    while len(aggregated_values) > 0:
494
        insert_100 = aggregated_values[:100]
495
        aggregated_values = aggregated_values[100:]
496
        try:
497
            add_values = (" INSERT INTO tbl_equipment_output_category_hourly "
498
                          "             (equipment_id, "
499
                          "              energy_category_id, "
500
                          "              start_datetime_utc, "
501
                          "              actual_value) "
502
                          " VALUES  ")
503
504
            for aggregated_value in insert_100:
505
                for energy_category_id, actual_value in aggregated_value['meta_data'].items():
506
                    add_values += " (" + str(equipment['id']) + ","
507
                    add_values += " " + str(energy_category_id) + ","
508
                    add_values += "'" + aggregated_value['start_datetime_utc'].isoformat()[0:19] + "',"
509
                    add_values += str(actual_value) + "), "
510
            # print("add_values:" + add_values)
511
            # trim ", " at the end of string and then execute
512
            cursor_energy_db.execute(add_values[:-2])
513
            cnx_energy_db.commit()
514
515
        except Exception as e:
516
            error_string = "Error in step 10.1 of equipment_energy_output_category.worker " + str(e)
517
            print(error_string)
518
            if cursor_energy_db:
519
                cursor_energy_db.close()
520
            if cnx_energy_db:
521
                cnx_energy_db.close()
522
            return error_string
523
524
    if cursor_energy_db:
525
        cursor_energy_db.close()
526
    if cnx_energy_db:
527
        cnx_energy_db.close()
528
    return None
529

myems-aggregation/store_energy_input_category.py 1 location

@@ 108-525 (lines=418) @@
105
# NOTE: returns None or the error string because that the logger object cannot be passed in as parameter
106
########################################################################################################################
107
108
def worker(store):
109
    ####################################################################################################################
110
    # Step 1: get all input meters associated with the store
111
    ####################################################################################################################
112
    print("Step 1: get all input meters associated with the store " + str(store['name']))
113
114
    meter_list = list()
115
    cnx_system_db = None
116
    cursor_system_db = None
117
    try:
118
        cnx_system_db = mysql.connector.connect(**config.myems_system_db)
119
        cursor_system_db = cnx_system_db.cursor()
120
    except Exception as e:
121
        error_string = "Error in step 1.1 of store_energy_input_category.worker " + str(e)
122
        if cursor_system_db:
123
            cursor_system_db.close()
124
        if cnx_system_db:
125
            cnx_system_db.close()
126
        print(error_string)
127
        return error_string
128
129
    try:
130
        cursor_system_db.execute(" SELECT m.id, m.name, m.energy_category_id "
131
                                 " FROM tbl_meters m, tbl_stores_meters sm "
132
                                 " WHERE m.id = sm.meter_id "
133
                                 "       AND m.is_counted = 1 "
134
                                 "       AND sm.store_id = %s ",
135
                                 (store['id'],))
136
        rows_meters = cursor_system_db.fetchall()
137
138
        if rows_meters is not None and len(rows_meters) > 0:
139
            for row in rows_meters:
140
                meter_list.append({"id": row[0],
141
                                   "name": row[1],
142
                                   "energy_category_id": row[2]})
143
144
    except Exception as e:
145
        error_string = "Error in step 1.2 of store_energy_input_category.worker " + str(e)
146
        if cursor_system_db:
147
            cursor_system_db.close()
148
        if cnx_system_db:
149
            cnx_system_db.close()
150
        print(error_string)
151
        return error_string
152
153
    ####################################################################################################################
154
    # Step 2: get all input virtual meters associated with the store
155
    ####################################################################################################################
156
    print("Step 2: get all input virtual meters associated with the store")
157
    virtual_meter_list = list()
158
159
    try:
160
        cursor_system_db.execute(" SELECT m.id, m.name, m.energy_category_id "
161
                                 " FROM tbl_virtual_meters m, tbl_stores_virtual_meters sm "
162
                                 " WHERE m.id = sm.virtual_meter_id "
163
                                 "       AND m.is_counted = 1 "
164
                                 "       AND sm.store_id = %s ",
165
                                 (store['id'],))
166
        rows_virtual_meters = cursor_system_db.fetchall()
167
168
        if rows_virtual_meters is not None and len(rows_virtual_meters) > 0:
169
            for row in rows_virtual_meters:
170
                virtual_meter_list.append({"id": row[0],
171
                                           "name": row[1],
172
                                           "energy_category_id": row[2]})
173
174
    except Exception as e:
175
        error_string = "Error in step 2.1 of store_energy_input_category.worker " + str(e)
176
        if cursor_system_db:
177
            cursor_system_db.close()
178
        if cnx_system_db:
179
            cnx_system_db.close()
180
        print(error_string)
181
        return error_string
182
183
    ####################################################################################################################
184
    # Step 3: get all input offline meters associated with the store
185
    ####################################################################################################################
186
    print("Step 3: get all input offline meters associated with the store")
187
188
    offline_meter_list = list()
189
190
    try:
191
        cursor_system_db.execute(" SELECT m.id, m.name, m.energy_category_id "
192
                                 " FROM tbl_offline_meters m, tbl_stores_offline_meters sm "
193
                                 " WHERE m.id = sm.offline_meter_id "
194
                                 "       AND m.is_counted = 1 "
195
                                 "       AND sm.store_id = %s ",
196
                                 (store['id'],))
197
        rows_offline_meters = cursor_system_db.fetchall()
198
199
        if rows_offline_meters is not None and len(rows_offline_meters) > 0:
200
            for row in rows_offline_meters:
201
                offline_meter_list.append({"id": row[0],
202
                                           "name": row[1],
203
                                           "energy_category_id": row[2]})
204
205
    except Exception as e:
206
        error_string = "Error in step 3.1 of store_energy_input_category.worker " + str(e)
207
        print(error_string)
208
        return error_string
209
    finally:
210
        if cursor_system_db:
211
            cursor_system_db.close()
212
        if cnx_system_db:
213
            cnx_system_db.close()
214
215
    ####################################################################################################################
216
    # stop to the next store if this store is empty
217
    ####################################################################################################################
218
    if (meter_list is None or len(meter_list) == 0) and \
219
            (virtual_meter_list is None or len(virtual_meter_list) == 0) and \
220
            (offline_meter_list is None or len(offline_meter_list) == 0):
221
        print("This is an empty store ")
222
        return None
223
224
    ####################################################################################################################
225
    # Step 4: determine start datetime and end datetime to aggregate
226
    ####################################################################################################################
227
    print("Step 4: determine start datetime and end datetime to aggregate")
228
    cnx_energy_db = None
229
    cursor_energy_db = None
230
    try:
231
        cnx_energy_db = mysql.connector.connect(**config.myems_energy_db)
232
        cursor_energy_db = cnx_energy_db.cursor()
233
    except Exception as e:
234
        error_string = "Error in step 4.1 of store_energy_input_category.worker " + str(e)
235
        if cursor_energy_db:
236
            cursor_energy_db.close()
237
        if cnx_energy_db:
238
            cnx_energy_db.close()
239
        print(error_string)
240
        return error_string
241
242
    try:
243
        query = (" SELECT MAX(start_datetime_utc) "
244
                 " FROM tbl_store_input_category_hourly "
245
                 " WHERE store_id = %s ")
246
        cursor_energy_db.execute(query, (store['id'],))
247
        row_datetime = cursor_energy_db.fetchone()
248
        start_datetime_utc = datetime.strptime(config.start_datetime_utc, '%Y-%m-%d %H:%M:%S')
249
        start_datetime_utc = start_datetime_utc.replace(minute=0, second=0, microsecond=0, tzinfo=None)
250
251
        if row_datetime is not None and len(row_datetime) > 0 and isinstance(row_datetime[0], datetime):
252
            # replace second and microsecond with 0
253
            # note: do not replace minute in case of calculating in half hourly
254
            start_datetime_utc = row_datetime[0].replace(second=0, microsecond=0, tzinfo=None)
255
            # start from the next time slot
256
            start_datetime_utc += timedelta(minutes=config.minutes_to_count)
257
258
        end_datetime_utc = datetime.utcnow().replace(second=0, microsecond=0, tzinfo=None)
259
260
        print("start_datetime_utc: " + start_datetime_utc.isoformat()[0:19]
261
              + "end_datetime_utc: " + end_datetime_utc.isoformat()[0:19])
262
263
    except Exception as e:
264
        error_string = "Error in step 4.2 of store_energy_input_category.worker " + str(e)
265
        if cursor_energy_db:
266
            cursor_energy_db.close()
267
        if cnx_energy_db:
268
            cnx_energy_db.close()
269
        print(error_string)
270
        return error_string
271
272
    ####################################################################################################################
273
    # Step 5: for each meter in list, get energy input data from energy database
274
    ####################################################################################################################
275
    energy_meter_hourly = dict()
276
    try:
277
        if meter_list is not None and len(meter_list) > 0:
278
            for meter in meter_list:
279
                meter_id = str(meter['id'])
280
281
                query = (" SELECT start_datetime_utc, actual_value "
282
                         " FROM tbl_meter_hourly "
283
                         " WHERE meter_id = %s "
284
                         "       AND start_datetime_utc >= %s "
285
                         "       AND start_datetime_utc < %s "
286
                         " ORDER BY start_datetime_utc ")
287
                cursor_energy_db.execute(query, (meter_id, start_datetime_utc, end_datetime_utc,))
288
                rows_energy_values = cursor_energy_db.fetchall()
289
                if rows_energy_values is None or len(rows_energy_values) == 0:
290
                    energy_meter_hourly[meter_id] = None
291
                else:
292
                    energy_meter_hourly[meter_id] = dict()
293
                    for row_energy_value in rows_energy_values:
294
                        energy_meter_hourly[meter_id][row_energy_value[0]] = row_energy_value[1]
295
    except Exception as e:
296
        error_string = "Error in step 5.1 of store_energy_input_category.worker " + str(e)
297
        if cursor_energy_db:
298
            cursor_energy_db.close()
299
        if cnx_energy_db:
300
            cnx_energy_db.close()
301
        print(error_string)
302
        return error_string
303
304
    ####################################################################################################################
305
    # Step 6: for each virtual meter in list, get energy input data from energy database
306
    ####################################################################################################################
307
    energy_virtual_meter_hourly = dict()
308
    if virtual_meter_list is not None and len(virtual_meter_list) > 0:
309
        try:
310
            for virtual_meter in virtual_meter_list:
311
                virtual_meter_id = str(virtual_meter['id'])
312
313
                query = (" SELECT start_datetime_utc, actual_value "
314
                         " FROM tbl_virtual_meter_hourly "
315
                         " WHERE virtual_meter_id = %s "
316
                         "       AND start_datetime_utc >= %s "
317
                         "       AND start_datetime_utc < %s "
318
                         " ORDER BY start_datetime_utc ")
319
                cursor_energy_db.execute(query, (virtual_meter_id, start_datetime_utc, end_datetime_utc,))
320
                rows_energy_values = cursor_energy_db.fetchall()
321
                if rows_energy_values is None or len(rows_energy_values) == 0:
322
                    energy_virtual_meter_hourly[virtual_meter_id] = None
323
                else:
324
                    energy_virtual_meter_hourly[virtual_meter_id] = dict()
325
                    for row_energy_value in rows_energy_values:
326
                        energy_virtual_meter_hourly[virtual_meter_id][row_energy_value[0]] = row_energy_value[1]
327
        except Exception as e:
328
            error_string = "Error in step 6.1 of store_energy_input_category.worker " + str(e)
329
            if cursor_energy_db:
330
                cursor_energy_db.close()
331
            if cnx_energy_db:
332
                cnx_energy_db.close()
333
            print(error_string)
334
            return error_string
335
336
    ####################################################################################################################
337
    # Step 7: for each offline meter in list, get energy input data from energy database
338
    ####################################################################################################################
339
    energy_offline_meter_hourly = dict()
340
    if offline_meter_list is not None and len(offline_meter_list) > 0:
341
        try:
342
            for offline_meter in offline_meter_list:
343
                offline_meter_id = str(offline_meter['id'])
344
345
                query = (" SELECT start_datetime_utc, actual_value "
346
                         " FROM tbl_offline_meter_hourly "
347
                         " WHERE offline_meter_id = %s "
348
                         "       AND start_datetime_utc >= %s "
349
                         "       AND start_datetime_utc < %s "
350
                         " ORDER BY start_datetime_utc ")
351
                cursor_energy_db.execute(query, (offline_meter_id, start_datetime_utc, end_datetime_utc,))
352
                rows_energy_values = cursor_energy_db.fetchall()
353
                if rows_energy_values is None or len(rows_energy_values) == 0:
354
                    energy_offline_meter_hourly[offline_meter_id] = None
355
                else:
356
                    energy_offline_meter_hourly[offline_meter_id] = dict()
357
                    for row_energy_value in rows_energy_values:
358
                        energy_offline_meter_hourly[offline_meter_id][row_energy_value[0]] = row_energy_value[1]
359
360
        except Exception as e:
361
            error_string = "Error in step 7.1 of store_energy_input_category.worker " + str(e)
362
            if cursor_energy_db:
363
                cursor_energy_db.close()
364
            if cnx_energy_db:
365
                cnx_energy_db.close()
366
            print(error_string)
367
            return error_string
368
369
    ####################################################################################################################
370
    # Step 8: determine common time slot to aggregate
371
    ####################################################################################################################
372
373
    common_start_datetime_utc = start_datetime_utc
374
    common_end_datetime_utc = end_datetime_utc
375
376
    print("Getting common time slot of energy values for all meters")
377
    if energy_meter_hourly is not None and len(energy_meter_hourly) > 0:
378
        for meter_id, energy_hourly in energy_meter_hourly.items():
379
            if energy_hourly is None or len(energy_hourly) == 0:
380
                common_start_datetime_utc = None
381
                common_end_datetime_utc = None
382
                break
383
            else:
384
                if common_start_datetime_utc < min(energy_hourly.keys()):
385
                    common_start_datetime_utc = min(energy_hourly.keys())
386
                if common_end_datetime_utc > max(energy_hourly.keys()):
387
                    common_end_datetime_utc = max(energy_hourly.keys())
388
389
    print("Getting common time slot of energy values for all virtual meters")
390
    if common_start_datetime_utc is not None and common_end_datetime_utc is not None:
391
        if energy_virtual_meter_hourly is not None and len(energy_virtual_meter_hourly) > 0:
392
            for meter_id, energy_hourly in energy_virtual_meter_hourly.items():
393
                if energy_hourly is None or len(energy_hourly) == 0:
394
                    common_start_datetime_utc = None
395
                    common_end_datetime_utc = None
396
                    break
397
                else:
398
                    if common_start_datetime_utc < min(energy_hourly.keys()):
399
                        common_start_datetime_utc = min(energy_hourly.keys())
400
                    if common_end_datetime_utc > max(energy_hourly.keys()):
401
                        common_end_datetime_utc = max(energy_hourly.keys())
402
403
    print("Getting common time slot of energy values for all offline meters")
404
    if common_start_datetime_utc is not None and common_end_datetime_utc is not None:
405
        if energy_offline_meter_hourly is not None and len(energy_offline_meter_hourly) > 0:
406
            for meter_id, energy_hourly in energy_offline_meter_hourly.items():
407
                if energy_hourly is None or len(energy_hourly) == 0:
408
                    common_start_datetime_utc = None
409
                    common_end_datetime_utc = None
410
                    break
411
                else:
412
                    if common_start_datetime_utc < min(energy_hourly.keys()):
413
                        common_start_datetime_utc = min(energy_hourly.keys())
414
                    if common_end_datetime_utc > max(energy_hourly.keys()):
415
                        common_end_datetime_utc = max(energy_hourly.keys())
416
417
    if (energy_meter_hourly is None or len(energy_meter_hourly) == 0) and \
418
            (energy_virtual_meter_hourly is None or len(energy_virtual_meter_hourly) == 0) and \
419
            (energy_offline_meter_hourly is None or len(energy_offline_meter_hourly) == 0):
420
        # There isn't any energy data
421
        print("There isn't any energy data")
422
        # continue the for store loop to the next store
423
        print("continue the for store loop to the next store")
424
        if cursor_energy_db:
425
            cursor_energy_db.close()
426
        if cnx_energy_db:
427
            cnx_energy_db.close()
428
        return None
429
430
    print("common_start_datetime_utc: " + str(common_start_datetime_utc))
431
    print("common_end_datetime_utc: " + str(common_end_datetime_utc))
432
433
    ####################################################################################################################
434
    # Step 9: aggregate energy data in the common time slot by energy categories and hourly
435
    ####################################################################################################################
436
437
    print("Step 9: aggregate energy data in the common time slot by energy categories and hourly")
438
    aggregated_values = list()
439
    try:
440
        current_datetime_utc = common_start_datetime_utc
441
        while common_start_datetime_utc is not None \
442
                and common_end_datetime_utc is not None \
443
                and current_datetime_utc <= common_end_datetime_utc:
444
            aggregated_value = dict()
445
            aggregated_value['start_datetime_utc'] = current_datetime_utc
446
            aggregated_value['meta_data'] = dict()
447
448
            if meter_list is not None and len(meter_list) > 0:
449
                for meter in meter_list:
450
                    meter_id = str(meter['id'])
451
                    energy_category_id = meter['energy_category_id']
452
                    actual_value = energy_meter_hourly[meter_id].get(current_datetime_utc, Decimal(0.0))
453
                    aggregated_value['meta_data'][energy_category_id] = \
454
                        aggregated_value['meta_data'].get(energy_category_id, Decimal(0.0)) + actual_value
455
456
            if virtual_meter_list is not None and len(virtual_meter_list) > 0:
457
                for virtual_meter in virtual_meter_list:
458
                    virtual_meter_id = str(virtual_meter['id'])
459
                    energy_category_id = virtual_meter['energy_category_id']
460
                    actual_value = energy_virtual_meter_hourly[virtual_meter_id].get(current_datetime_utc, Decimal(0.0))
461
                    aggregated_value['meta_data'][energy_category_id] = \
462
                        aggregated_value['meta_data'].get(energy_category_id, Decimal(0.0)) + actual_value
463
464
            if offline_meter_list is not None and len(offline_meter_list) > 0:
465
                for offline_meter in offline_meter_list:
466
                    offline_meter_id = str(offline_meter['id'])
467
                    energy_category_id = offline_meter['energy_category_id']
468
                    actual_value = energy_offline_meter_hourly[offline_meter_id].get(current_datetime_utc, Decimal(0.0))
469
                    aggregated_value['meta_data'][energy_category_id] = \
470
                        aggregated_value['meta_data'].get(energy_category_id, Decimal(0.0)) + actual_value
471
472
            aggregated_values.append(aggregated_value)
473
474
            current_datetime_utc += timedelta(minutes=config.minutes_to_count)
475
476
    except Exception as e:
477
        error_string = "Error in step 9 of store_energy_input_category.worker " + str(e)
478
        if cursor_energy_db:
479
            cursor_energy_db.close()
480
        if cnx_energy_db:
481
            cnx_energy_db.close()
482
        print(error_string)
483
        return error_string
484
485
    ####################################################################################################################
486
    # Step 10: save energy data to energy database
487
    ####################################################################################################################
488
    print("Step 10: save energy data to energy database")
489
490
    while len(aggregated_values) > 0:
491
        insert_100 = aggregated_values[:100]
492
        aggregated_values = aggregated_values[100:]
493
        try:
494
            add_values = (" INSERT INTO tbl_store_input_category_hourly "
495
                          "             (store_id, "
496
                          "              energy_category_id, "
497
                          "              start_datetime_utc, "
498
                          "              actual_value) "
499
                          " VALUES  ")
500
501
            for aggregated_value in insert_100:
502
                for energy_category_id, actual_value in aggregated_value['meta_data'].items():
503
                    add_values += " (" + str(store['id']) + ","
504
                    add_values += " " + str(energy_category_id) + ","
505
                    add_values += "'" + aggregated_value['start_datetime_utc'].isoformat()[0:19] + "',"
506
                    add_values += str(actual_value) + "), "
507
            # print("add_values:" + add_values)
508
            # trim ", " at the end of string and then execute
509
            cursor_energy_db.execute(add_values[:-2])
510
            cnx_energy_db.commit()
511
512
        except Exception as e:
513
            error_string = "Error in step 10.1 of store_energy_input_category.worker " + str(e)
514
            print(error_string)
515
            if cursor_energy_db:
516
                cursor_energy_db.close()
517
            if cnx_energy_db:
518
                cnx_energy_db.close()
519
            return error_string
520
521
    if cursor_energy_db:
522
        cursor_energy_db.close()
523
    if cnx_energy_db:
524
        cnx_energy_db.close()
525
    return None
526

myems-aggregation/tenant_energy_input_category.py 1 location

@@ 108-525 (lines=418) @@
105
# NOTE: returns None or the error string because that the logger object cannot be passed in as parameter
106
########################################################################################################################
107
108
def worker(tenant):
109
    ####################################################################################################################
110
    # Step 1: get all input meters associated with the tenant
111
    ####################################################################################################################
112
    print("Step 1: get all input meters associated with the tenant " + str(tenant['name']))
113
114
    meter_list = list()
115
    cnx_system_db = None
116
    cursor_system_db = None
117
    try:
118
        cnx_system_db = mysql.connector.connect(**config.myems_system_db)
119
        cursor_system_db = cnx_system_db.cursor()
120
    except Exception as e:
121
        error_string = "Error in step 1.1 of tenant_energy_input_category.worker " + str(e)
122
        if cursor_system_db:
123
            cursor_system_db.close()
124
        if cnx_system_db:
125
            cnx_system_db.close()
126
        print(error_string)
127
        return error_string
128
129
    try:
130
        cursor_system_db.execute(" SELECT m.id, m.name, m.energy_category_id "
131
                                 " FROM tbl_meters m, tbl_tenants_meters tm "
132
                                 " WHERE m.id = tm.meter_id "
133
                                 "       AND m.is_counted = 1 "
134
                                 "       AND tm.tenant_id = %s ",
135
                                 (tenant['id'],))
136
        rows_meters = cursor_system_db.fetchall()
137
138
        if rows_meters is not None and len(rows_meters) > 0:
139
            for row in rows_meters:
140
                meter_list.append({"id": row[0],
141
                                   "name": row[1],
142
                                   "energy_category_id": row[2]})
143
144
    except Exception as e:
145
        error_string = "Error in step 1.2 of tenant_energy_input_category.worker " + str(e)
146
        if cursor_system_db:
147
            cursor_system_db.close()
148
        if cnx_system_db:
149
            cnx_system_db.close()
150
        print(error_string)
151
        return error_string
152
153
    ####################################################################################################################
154
    # Step 2: get all input virtual meters associated with the tenant
155
    ####################################################################################################################
156
    print("Step 2: get all input virtual meters associated with the tenant")
157
    virtual_meter_list = list()
158
159
    try:
160
        cursor_system_db.execute(" SELECT m.id, m.name, m.energy_category_id "
161
                                 " FROM tbl_virtual_meters m, tbl_tenants_virtual_meters tm "
162
                                 " WHERE m.id = tm.virtual_meter_id "
163
                                 "       AND m.is_counted = 1 "
164
                                 "       AND tm.tenant_id = %s ",
165
                                 (tenant['id'],))
166
        rows_virtual_meters = cursor_system_db.fetchall()
167
168
        if rows_virtual_meters is not None and len(rows_virtual_meters) > 0:
169
            for row in rows_virtual_meters:
170
                virtual_meter_list.append({"id": row[0],
171
                                           "name": row[1],
172
                                           "energy_category_id": row[2]})
173
174
    except Exception as e:
175
        error_string = "Error in step 2.1 of tenant_energy_input_category.worker " + str(e)
176
        if cursor_system_db:
177
            cursor_system_db.close()
178
        if cnx_system_db:
179
            cnx_system_db.close()
180
        print(error_string)
181
        return error_string
182
183
    ####################################################################################################################
184
    # Step 3: get all input offline meters associated with the tenant
185
    ####################################################################################################################
186
    print("Step 3: get all input offline meters associated with the tenant")
187
188
    offline_meter_list = list()
189
190
    try:
191
        cursor_system_db.execute(" SELECT m.id, m.name, m.energy_category_id "
192
                                 " FROM tbl_offline_meters m, tbl_tenants_offline_meters tm "
193
                                 " WHERE m.id = tm.offline_meter_id "
194
                                 "       AND m.is_counted = 1 "
195
                                 "       AND tm.tenant_id = %s ",
196
                                 (tenant['id'],))
197
        rows_offline_meters = cursor_system_db.fetchall()
198
199
        if rows_offline_meters is not None and len(rows_offline_meters) > 0:
200
            for row in rows_offline_meters:
201
                offline_meter_list.append({"id": row[0],
202
                                           "name": row[1],
203
                                           "energy_category_id": row[2]})
204
205
    except Exception as e:
206
        error_string = "Error in step 3.1 of tenant_energy_input_category.worker " + str(e)
207
        print(error_string)
208
        return error_string
209
    finally:
210
        if cursor_system_db:
211
            cursor_system_db.close()
212
        if cnx_system_db:
213
            cnx_system_db.close()
214
215
    ####################################################################################################################
216
    # stop to the next tenant if this tenant is empty
217
    ####################################################################################################################
218
    if (meter_list is None or len(meter_list) == 0) and \
219
            (virtual_meter_list is None or len(virtual_meter_list) == 0) and \
220
            (offline_meter_list is None or len(offline_meter_list) == 0):
221
        print("This is an empty tenant ")
222
        return None
223
224
    ####################################################################################################################
225
    # Step 4: determine start datetime and end datetime to aggregate
226
    ####################################################################################################################
227
    print("Step 4: determine start datetime and end datetime to aggregate")
228
    cnx_energy_db = None
229
    cursor_energy_db = None
230
    try:
231
        cnx_energy_db = mysql.connector.connect(**config.myems_energy_db)
232
        cursor_energy_db = cnx_energy_db.cursor()
233
    except Exception as e:
234
        error_string = "Error in step 4.1 of tenant_energy_input_category.worker " + str(e)
235
        if cursor_energy_db:
236
            cursor_energy_db.close()
237
        if cnx_energy_db:
238
            cnx_energy_db.close()
239
        print(error_string)
240
        return error_string
241
242
    try:
243
        query = (" SELECT MAX(start_datetime_utc) "
244
                 " FROM tbl_tenant_input_category_hourly "
245
                 " WHERE tenant_id = %s ")
246
        cursor_energy_db.execute(query, (tenant['id'],))
247
        row_datetime = cursor_energy_db.fetchone()
248
        start_datetime_utc = datetime.strptime(config.start_datetime_utc, '%Y-%m-%d %H:%M:%S')
249
        start_datetime_utc = start_datetime_utc.replace(minute=0, second=0, microsecond=0, tzinfo=None)
250
251
        if row_datetime is not None and len(row_datetime) > 0 and isinstance(row_datetime[0], datetime):
252
            # replace second and microsecond with 0
253
            # note: do not replace minute in case of calculating in half hourly
254
            start_datetime_utc = row_datetime[0].replace(second=0, microsecond=0, tzinfo=None)
255
            # start from the next time slot
256
            start_datetime_utc += timedelta(minutes=config.minutes_to_count)
257
258
        end_datetime_utc = datetime.utcnow().replace(second=0, microsecond=0, tzinfo=None)
259
260
        print("start_datetime_utc: " + start_datetime_utc.isoformat()[0:19]
261
              + "end_datetime_utc: " + end_datetime_utc.isoformat()[0:19])
262
263
    except Exception as e:
264
        error_string = "Error in step 4.2 of tenant_energy_input_category.worker " + str(e)
265
        if cursor_energy_db:
266
            cursor_energy_db.close()
267
        if cnx_energy_db:
268
            cnx_energy_db.close()
269
        print(error_string)
270
        return error_string
271
272
    ####################################################################################################################
273
    # Step 5: for each meter in list, get energy input data from energy database
274
    ####################################################################################################################
275
    energy_meter_hourly = dict()
276
    try:
277
        if meter_list is not None and len(meter_list) > 0:
278
            for meter in meter_list:
279
                meter_id = str(meter['id'])
280
281
                query = (" SELECT start_datetime_utc, actual_value "
282
                         " FROM tbl_meter_hourly "
283
                         " WHERE meter_id = %s "
284
                         "       AND start_datetime_utc >= %s "
285
                         "       AND start_datetime_utc < %s "
286
                         " ORDER BY start_datetime_utc ")
287
                cursor_energy_db.execute(query, (meter_id, start_datetime_utc, end_datetime_utc,))
288
                rows_energy_values = cursor_energy_db.fetchall()
289
                if rows_energy_values is None or len(rows_energy_values) == 0:
290
                    energy_meter_hourly[meter_id] = None
291
                else:
292
                    energy_meter_hourly[meter_id] = dict()
293
                    for row_energy_value in rows_energy_values:
294
                        energy_meter_hourly[meter_id][row_energy_value[0]] = row_energy_value[1]
295
    except Exception as e:
296
        error_string = "Error in step 5.1 of tenant_energy_input_category.worker " + str(e)
297
        if cursor_energy_db:
298
            cursor_energy_db.close()
299
        if cnx_energy_db:
300
            cnx_energy_db.close()
301
        print(error_string)
302
        return error_string
303
304
    ####################################################################################################################
305
    # Step 6: for each virtual meter in list, get energy input data from energy database
306
    ####################################################################################################################
307
    energy_virtual_meter_hourly = dict()
308
    if virtual_meter_list is not None and len(virtual_meter_list) > 0:
309
        try:
310
            for virtual_meter in virtual_meter_list:
311
                virtual_meter_id = str(virtual_meter['id'])
312
313
                query = (" SELECT start_datetime_utc, actual_value "
314
                         " FROM tbl_virtual_meter_hourly "
315
                         " WHERE virtual_meter_id = %s "
316
                         "       AND start_datetime_utc >= %s "
317
                         "       AND start_datetime_utc < %s "
318
                         " ORDER BY start_datetime_utc ")
319
                cursor_energy_db.execute(query, (virtual_meter_id, start_datetime_utc, end_datetime_utc,))
320
                rows_energy_values = cursor_energy_db.fetchall()
321
                if rows_energy_values is None or len(rows_energy_values) == 0:
322
                    energy_virtual_meter_hourly[virtual_meter_id] = None
323
                else:
324
                    energy_virtual_meter_hourly[virtual_meter_id] = dict()
325
                    for row_energy_value in rows_energy_values:
326
                        energy_virtual_meter_hourly[virtual_meter_id][row_energy_value[0]] = row_energy_value[1]
327
        except Exception as e:
328
            error_string = "Error in step 6.1 of tenant_energy_input_category.worker " + str(e)
329
            if cursor_energy_db:
330
                cursor_energy_db.close()
331
            if cnx_energy_db:
332
                cnx_energy_db.close()
333
            print(error_string)
334
            return error_string
335
336
    ####################################################################################################################
337
    # Step 7: for each offline meter in list, get energy input data from energy database
338
    ####################################################################################################################
339
    energy_offline_meter_hourly = dict()
340
    if offline_meter_list is not None and len(offline_meter_list) > 0:
341
        try:
342
            for offline_meter in offline_meter_list:
343
                offline_meter_id = str(offline_meter['id'])
344
345
                query = (" SELECT start_datetime_utc, actual_value "
346
                         " FROM tbl_offline_meter_hourly "
347
                         " WHERE offline_meter_id = %s "
348
                         "       AND start_datetime_utc >= %s "
349
                         "       AND start_datetime_utc < %s "
350
                         " ORDER BY start_datetime_utc ")
351
                cursor_energy_db.execute(query, (offline_meter_id, start_datetime_utc, end_datetime_utc,))
352
                rows_energy_values = cursor_energy_db.fetchall()
353
                if rows_energy_values is None or len(rows_energy_values) == 0:
354
                    energy_offline_meter_hourly[offline_meter_id] = None
355
                else:
356
                    energy_offline_meter_hourly[offline_meter_id] = dict()
357
                    for row_energy_value in rows_energy_values:
358
                        energy_offline_meter_hourly[offline_meter_id][row_energy_value[0]] = row_energy_value[1]
359
360
        except Exception as e:
361
            error_string = "Error in step 7.1 of tenant_energy_input_category.worker " + str(e)
362
            if cursor_energy_db:
363
                cursor_energy_db.close()
364
            if cnx_energy_db:
365
                cnx_energy_db.close()
366
            print(error_string)
367
            return error_string
368
369
    ####################################################################################################################
370
    # Step 8: determine common time slot to aggregate
371
    ####################################################################################################################
372
373
    common_start_datetime_utc = start_datetime_utc
374
    common_end_datetime_utc = end_datetime_utc
375
376
    print("Getting common time slot of energy values for all meters")
377
    if energy_meter_hourly is not None and len(energy_meter_hourly) > 0:
378
        for meter_id, energy_hourly in energy_meter_hourly.items():
379
            if energy_hourly is None or len(energy_hourly) == 0:
380
                common_start_datetime_utc = None
381
                common_end_datetime_utc = None
382
                break
383
            else:
384
                if common_start_datetime_utc < min(energy_hourly.keys()):
385
                    common_start_datetime_utc = min(energy_hourly.keys())
386
                if common_end_datetime_utc > max(energy_hourly.keys()):
387
                    common_end_datetime_utc = max(energy_hourly.keys())
388
389
    print("Getting common time slot of energy values for all virtual meters")
390
    if common_start_datetime_utc is not None and common_end_datetime_utc is not None:
391
        if energy_virtual_meter_hourly is not None and len(energy_virtual_meter_hourly) > 0:
392
            for meter_id, energy_hourly in energy_virtual_meter_hourly.items():
393
                if energy_hourly is None or len(energy_hourly) == 0:
394
                    common_start_datetime_utc = None
395
                    common_end_datetime_utc = None
396
                    break
397
                else:
398
                    if common_start_datetime_utc < min(energy_hourly.keys()):
399
                        common_start_datetime_utc = min(energy_hourly.keys())
400
                    if common_end_datetime_utc > max(energy_hourly.keys()):
401
                        common_end_datetime_utc = max(energy_hourly.keys())
402
403
    print("Getting common time slot of energy values for all offline meters")
404
    if common_start_datetime_utc is not None and common_end_datetime_utc is not None:
405
        if energy_offline_meter_hourly is not None and len(energy_offline_meter_hourly) > 0:
406
            for meter_id, energy_hourly in energy_offline_meter_hourly.items():
407
                if energy_hourly is None or len(energy_hourly) == 0:
408
                    common_start_datetime_utc = None
409
                    common_end_datetime_utc = None
410
                    break
411
                else:
412
                    if common_start_datetime_utc < min(energy_hourly.keys()):
413
                        common_start_datetime_utc = min(energy_hourly.keys())
414
                    if common_end_datetime_utc > max(energy_hourly.keys()):
415
                        common_end_datetime_utc = max(energy_hourly.keys())
416
417
    if (energy_meter_hourly is None or len(energy_meter_hourly) == 0) and \
418
            (energy_virtual_meter_hourly is None or len(energy_virtual_meter_hourly) == 0) and \
419
            (energy_offline_meter_hourly is None or len(energy_offline_meter_hourly) == 0):
420
        # There isn't any energy data
421
        print("There isn't any energy data")
422
        # continue the for tenant loop to the next tenant
423
        print("continue the for tenant loop to the next tenant")
424
        if cursor_energy_db:
425
            cursor_energy_db.close()
426
        if cnx_energy_db:
427
            cnx_energy_db.close()
428
        return None
429
430
    print("common_start_datetime_utc: " + str(common_start_datetime_utc))
431
    print("common_end_datetime_utc: " + str(common_end_datetime_utc))
432
433
    ####################################################################################################################
434
    # Step 9: aggregate energy data in the common time slot by energy categories and hourly
435
    ####################################################################################################################
436
437
    print("Step 9: aggregate energy data in the common time slot by energy categories and hourly")
438
    aggregated_values = list()
439
    try:
440
        current_datetime_utc = common_start_datetime_utc
441
        while common_start_datetime_utc is not None \
442
                and common_end_datetime_utc is not None \
443
                and current_datetime_utc <= common_end_datetime_utc:
444
            aggregated_value = dict()
445
            aggregated_value['start_datetime_utc'] = current_datetime_utc
446
            aggregated_value['meta_data'] = dict()
447
448
            if meter_list is not None and len(meter_list) > 0:
449
                for meter in meter_list:
450
                    meter_id = str(meter['id'])
451
                    energy_category_id = meter['energy_category_id']
452
                    actual_value = energy_meter_hourly[meter_id].get(current_datetime_utc, Decimal(0.0))
453
                    aggregated_value['meta_data'][energy_category_id] = \
454
                        aggregated_value['meta_data'].get(energy_category_id, Decimal(0.0)) + actual_value
455
456
            if virtual_meter_list is not None and len(virtual_meter_list) > 0:
457
                for virtual_meter in virtual_meter_list:
458
                    virtual_meter_id = str(virtual_meter['id'])
459
                    energy_category_id = virtual_meter['energy_category_id']
460
                    actual_value = energy_virtual_meter_hourly[virtual_meter_id].get(current_datetime_utc, Decimal(0.0))
461
                    aggregated_value['meta_data'][energy_category_id] = \
462
                        aggregated_value['meta_data'].get(energy_category_id, Decimal(0.0)) + actual_value
463
464
            if offline_meter_list is not None and len(offline_meter_list) > 0:
465
                for offline_meter in offline_meter_list:
466
                    offline_meter_id = str(offline_meter['id'])
467
                    energy_category_id = offline_meter['energy_category_id']
468
                    actual_value = energy_offline_meter_hourly[offline_meter_id].get(current_datetime_utc, Decimal(0.0))
469
                    aggregated_value['meta_data'][energy_category_id] = \
470
                        aggregated_value['meta_data'].get(energy_category_id, Decimal(0.0)) + actual_value
471
472
            aggregated_values.append(aggregated_value)
473
474
            current_datetime_utc += timedelta(minutes=config.minutes_to_count)
475
476
    except Exception as e:
477
        error_string = "Error in step 9 of tenant_energy_input_category.worker " + str(e)
478
        if cursor_energy_db:
479
            cursor_energy_db.close()
480
        if cnx_energy_db:
481
            cnx_energy_db.close()
482
        print(error_string)
483
        return error_string
484
485
    ####################################################################################################################
486
    # Step 10: save energy data to energy database
487
    ####################################################################################################################
488
    print("Step 10: save energy data to energy database")
489
490
    while len(aggregated_values) > 0:
491
        insert_100 = aggregated_values[:100]
492
        aggregated_values = aggregated_values[100:]
493
        try:
494
            add_values = (" INSERT INTO tbl_tenant_input_category_hourly "
495
                          "             (tenant_id, "
496
                          "              energy_category_id, "
497
                          "              start_datetime_utc, "
498
                          "              actual_value) "
499
                          " VALUES  ")
500
501
            for aggregated_value in insert_100:
502
                for energy_category_id, actual_value in aggregated_value['meta_data'].items():
503
                    add_values += " (" + str(tenant['id']) + ","
504
                    add_values += " " + str(energy_category_id) + ","
505
                    add_values += "'" + aggregated_value['start_datetime_utc'].isoformat()[0:19] + "',"
506
                    add_values += str(actual_value) + "), "
507
            # print("add_values:" + add_values)
508
            # trim ", " at the end of string and then execute
509
            cursor_energy_db.execute(add_values[:-2])
510
            cnx_energy_db.commit()
511
512
        except Exception as e:
513
            error_string = "Error in step 10.1 of tenant_energy_input_category.worker " + str(e)
514
            print(error_string)
515
            if cursor_energy_db:
516
                cursor_energy_db.close()
517
            if cnx_energy_db:
518
                cnx_energy_db.close()
519
            return error_string
520
521
    if cursor_energy_db:
522
        cursor_energy_db.close()
523
    if cnx_energy_db:
524
        cnx_energy_db.close()
525
    return None
526