Issues (61)

equipment_energy_input_item.py (3 issues)

1
import time
2
from datetime import datetime, timedelta
3
from decimal import Decimal
4
import mysql.connector
5
from multiprocessing import Pool
6
import random
7
import config
8
9
10
########################################################################################################################
11
# PROCEDURES
12
# Step 1: get all equipments
13
# Step 2: Create multiprocessing pool to call worker in parallel
14
########################################################################################################################
15
16
17 View Code Duplication
def main(logger):
0 ignored issues
show
This code seems to be duplicated in your project.
Loading history...
18
19
    while True:
20
        # the outermost while loop
21
        ################################################################################################################
22
        # Step 1: get all equipments
23
        ################################################################################################################
24
        cnx_system_db = None
25
        cursor_system_db = None
26
        try:
27
            cnx_system_db = mysql.connector.connect(**config.myems_system_db)
28
            cursor_system_db = cnx_system_db.cursor()
29
        except Exception as e:
30
            logger.error("Error in step 1.1 of equipment_energy_input_item.main " + str(e))
31
            if cursor_system_db:
32
                cursor_system_db.close()
33
            if cnx_system_db:
34
                cnx_system_db.close()
35
            # sleep and continue the outer loop to reconnect the database
36
            time.sleep(60)
37
            continue
38
        print("Connected to MyEMS System Database")
39
40
        try:
41
            cursor_system_db.execute(" SELECT id, name "
42
                                     " FROM tbl_equipments "
43
                                     " ORDER BY id ")
44
            rows_equipments = cursor_system_db.fetchall()
45
46
            if rows_equipments is None or len(rows_equipments) == 0:
47
                print("There isn't any equipments ")
48
                # sleep and continue the outer loop to reconnect the database
49
                time.sleep(60)
50
                continue
51
52
            equipment_list = list()
53
            for row in rows_equipments:
54
                equipment_list.append({"id": row[0], "name": row[1]})
55
56
        except Exception as e:
57
            logger.error("Error in step 1.2 of equipment_energy_input_item.main " + str(e))
58
            # sleep and continue the outer loop to reconnect the database
59
            time.sleep(60)
60
            continue
61
        finally:
62
            if cursor_system_db:
63
                cursor_system_db.close()
64
            if cnx_system_db:
65
                cnx_system_db.close()
66
67
        print("Got all equipments in MyEMS System Database")
68
69
        # shuffle the equipment list for randomly calculating the meter hourly value
70
        random.shuffle(equipment_list)
0 ignored issues
show
The variable equipment_list does not seem to be defined for all execution paths.
Loading history...
71
72
        ################################################################################################################
73
        # Step 2: Create multiprocessing pool to call worker in parallel
74
        ################################################################################################################
75
        p = Pool(processes=config.pool_size)
76
        error_list = p.map(worker, equipment_list)
77
        p.close()
78
        p.join()
79
80
        for error in error_list:
81
            if error is not None and len(error) > 0:
82
                logger.error(error)
83
84
        print("go to sleep 300 seconds...")
85
        time.sleep(300)
86
        print("wake from sleep, and continue to work...")
87
    # end of outer while
88
89
90
########################################################################################################################
91
# PROCEDURES:
92
#   Step 1: get all input meters associated with the equipment
93
#   Step 2: get all input virtual meters associated with the equipment
94
#   Step 3: get all input offline meters associated with the equipment
95
#   Step 4: determine start datetime and end datetime to aggregate
96
#   Step 5: for each meter in list, get energy input data from energy database
97
#   Step 6: for each virtual meter in list, get energy input data from energy database
98
#   Step 7: for each offline meter in list, get energy input data from energy database
99
#   Step 8: determine common time slot to aggregate
100
#   Step 9: aggregate energy data in the common time slot by energy items and hourly
101
#   Step 10: save energy data to energy database
102
#
103
# NOTE: returns None or the error string because that the logger object cannot be passed in as parameter
104
########################################################################################################################
105
106 View Code Duplication
def worker(equipment):
0 ignored issues
show
This code seems to be duplicated in your project.
Loading history...
107
    ####################################################################################################################
108
    # Step 1: get all input meters associated with the equipment
109
    ####################################################################################################################
110
    print("Step 1: get all input meters associated with the equipment " + str(equipment['name']))
111
112
    meter_list = list()
113
    cnx_system_db = None
114
    cursor_system_db = None
115
    try:
116
        cnx_system_db = mysql.connector.connect(**config.myems_system_db)
117
        cursor_system_db = cnx_system_db.cursor()
118
    except Exception as e:
119
        error_string = "Error in step 1.1 of equipment_energy_input_item.worker " + str(e)
120
        if cursor_system_db:
121
            cursor_system_db.close()
122
        if cnx_system_db:
123
            cnx_system_db.close()
124
        print(error_string)
125
        return error_string
126
127
    try:
128
        cursor_system_db.execute(" SELECT m.id, m.name, m.energy_item_id "
129
                                 " FROM tbl_meters m, tbl_equipments_meters em "
130
                                 " WHERE m.id = em.meter_id "
131
                                 "       AND m.is_counted = true "
132
                                 "       AND m.energy_item_id is NOT NULL "
133
                                 "       AND em.is_output = false "
134
                                 "       AND em.equipment_id = %s ",
135
                                 (equipment['id'],))
136
        rows_meters = cursor_system_db.fetchall()
137
138
        if rows_meters is not None and len(rows_meters) > 0:
139
            for row in rows_meters:
140
                meter_list.append({"id": row[0],
141
                                   "name": row[1],
142
                                   "energy_item_id": row[2]})
143
144
    except Exception as e:
145
        error_string = "Error in step 1.2 of equipment_energy_input_item.worker " + str(e)
146
        if cursor_system_db:
147
            cursor_system_db.close()
148
        if cnx_system_db:
149
            cnx_system_db.close()
150
        print(error_string)
151
        return error_string
152
153
    ####################################################################################################################
154
    # Step 2: get all input virtual meters associated with the equipment
155
    ####################################################################################################################
156
    print("Step 2: get all input virtual meters associated with the equipment")
157
    virtual_meter_list = list()
158
159
    try:
160
        cursor_system_db.execute(" SELECT m.id, m.name, m.energy_item_id "
161
                                 " FROM tbl_virtual_meters m, tbl_equipments_virtual_meters em "
162
                                 " WHERE m.id = em.virtual_meter_id "
163
                                 "       AND m.energy_item_id is NOT NULL "
164
                                 "       AND m.is_counted = true "
165
                                 "       AND em.is_output = false "
166
                                 "       AND em.equipment_id = %s ",
167
                                 (equipment['id'],))
168
        rows_virtual_meters = cursor_system_db.fetchall()
169
170
        if rows_virtual_meters is not None and len(rows_virtual_meters) > 0:
171
            for row in rows_virtual_meters:
172
                virtual_meter_list.append({"id": row[0],
173
                                           "name": row[1],
174
                                           "energy_item_id": row[2]})
175
176
    except Exception as e:
177
        error_string = "Error in step 2.1 of equipment_energy_input_item.worker " + str(e)
178
        if cursor_system_db:
179
            cursor_system_db.close()
180
        if cnx_system_db:
181
            cnx_system_db.close()
182
        print(error_string)
183
        return error_string
184
185
    ####################################################################################################################
186
    # Step 3: get all input offline meters associated with the equipment
187
    ####################################################################################################################
188
    print("Step 3: get all input offline meters associated with the equipment")
189
190
    offline_meter_list = list()
191
192
    try:
193
        cursor_system_db.execute(" SELECT m.id, m.name, m.energy_item_id "
194
                                 " FROM tbl_offline_meters m, tbl_equipments_offline_meters em "
195
                                 " WHERE m.id = em.offline_meter_id "
196
                                 "       AND m.energy_item_id is NOT NULL "
197
                                 "       AND m.is_counted = true "
198
                                 "       AND em.is_output = false "
199
                                 "       AND em.equipment_id = %s ",
200
                                 (equipment['id'],))
201
        rows_offline_meters = cursor_system_db.fetchall()
202
203
        if rows_offline_meters is not None and len(rows_offline_meters) > 0:
204
            for row in rows_offline_meters:
205
                offline_meter_list.append({"id": row[0],
206
                                           "name": row[1],
207
                                           "energy_item_id": row[2]})
208
209
    except Exception as e:
210
        error_string = "Error in step 3.1 of equipment_energy_input_item.worker " + str(e)
211
        print(error_string)
212
        return error_string
213
    finally:
214
        if cursor_system_db:
215
            cursor_system_db.close()
216
        if cnx_system_db:
217
            cnx_system_db.close()
218
219
    ####################################################################################################################
220
    # stop to the next equipment if this equipment is empty
221
    ####################################################################################################################
222
    if (meter_list is None or len(meter_list) == 0) and \
223
            (virtual_meter_list is None or len(virtual_meter_list) == 0) and \
224
            (offline_meter_list is None or len(offline_meter_list) == 0):
225
        print("This is an empty equipment ")
226
        return None
227
228
    ####################################################################################################################
229
    # Step 4: determine start datetime and end datetime to aggregate
230
    ####################################################################################################################
231
    print("Step 4: determine start datetime and end datetime to aggregate")
232
    cnx_energy_db = None
233
    cursor_energy_db = None
234
    try:
235
        cnx_energy_db = mysql.connector.connect(**config.myems_energy_db)
236
        cursor_energy_db = cnx_energy_db.cursor()
237
    except Exception as e:
238
        error_string = "Error in step 4.1 of equipment_energy_input_item.worker " + str(e)
239
        if cursor_energy_db:
240
            cursor_energy_db.close()
241
        if cnx_energy_db:
242
            cnx_energy_db.close()
243
        print(error_string)
244
        return error_string
245
246
    try:
247
        query = (" SELECT MAX(start_datetime_utc) "
248
                 " FROM tbl_equipment_input_item_hourly "
249
                 " WHERE equipment_id = %s ")
250
        cursor_energy_db.execute(query, (equipment['id'],))
251
        row_datetime = cursor_energy_db.fetchone()
252
        start_datetime_utc = datetime.strptime(config.start_datetime_utc, '%Y-%m-%d %H:%M:%S')
253
        start_datetime_utc = start_datetime_utc.replace(minute=0, second=0, microsecond=0, tzinfo=None)
254
255
        if row_datetime is not None and len(row_datetime) > 0 and isinstance(row_datetime[0], datetime):
256
            # replace second and microsecond with 0
257
            # note: do not replace minute in case of calculating in half hourly
258
            start_datetime_utc = row_datetime[0].replace(second=0, microsecond=0, tzinfo=None)
259
            # start from the next time slot
260
            start_datetime_utc += timedelta(minutes=config.minutes_to_count)
261
262
        end_datetime_utc = datetime.utcnow().replace(second=0, microsecond=0, tzinfo=None)
263
264
        print("start_datetime_utc: " + start_datetime_utc.isoformat()[0:19]
265
              + "end_datetime_utc: " + end_datetime_utc.isoformat()[0:19])
266
267
    except Exception as e:
268
        error_string = "Error in step 4.2 of equipment_energy_input_item.worker " + str(e)
269
        if cursor_energy_db:
270
            cursor_energy_db.close()
271
        if cnx_energy_db:
272
            cnx_energy_db.close()
273
        print(error_string)
274
        return error_string
275
276
    ####################################################################################################################
277
    # Step 5: for each meter in list, get energy input data from energy database
278
    ####################################################################################################################
279
    energy_meter_hourly = dict()
280
    try:
281
        if meter_list is not None and len(meter_list) > 0:
282
            for meter in meter_list:
283
                meter_id = str(meter['id'])
284
285
                query = (" SELECT start_datetime_utc, actual_value "
286
                         " FROM tbl_meter_hourly "
287
                         " WHERE meter_id = %s "
288
                         "       AND start_datetime_utc >= %s "
289
                         "       AND start_datetime_utc < %s "
290
                         " ORDER BY start_datetime_utc ")
291
                cursor_energy_db.execute(query, (meter_id, start_datetime_utc, end_datetime_utc,))
292
                rows_energy_values = cursor_energy_db.fetchall()
293
                if rows_energy_values is None or len(rows_energy_values) == 0:
294
                    energy_meter_hourly[meter_id] = None
295
                else:
296
                    energy_meter_hourly[meter_id] = dict()
297
                    for row_energy_value in rows_energy_values:
298
                        energy_meter_hourly[meter_id][row_energy_value[0]] = row_energy_value[1]
299
    except Exception as e:
300
        error_string = "Error in step 5.1 of equipment_energy_input_item.worker " + str(e)
301
        if cursor_energy_db:
302
            cursor_energy_db.close()
303
        if cnx_energy_db:
304
            cnx_energy_db.close()
305
        print(error_string)
306
        return error_string
307
308
    ####################################################################################################################
309
    # Step 6: for each virtual meter in list, get energy input data from energy database
310
    ####################################################################################################################
311
    energy_virtual_meter_hourly = dict()
312
    if virtual_meter_list is not None and len(virtual_meter_list) > 0:
313
        try:
314
            for virtual_meter in virtual_meter_list:
315
                virtual_meter_id = str(virtual_meter['id'])
316
317
                query = (" SELECT start_datetime_utc, actual_value "
318
                         " FROM tbl_virtual_meter_hourly "
319
                         " WHERE virtual_meter_id = %s "
320
                         "       AND start_datetime_utc >= %s "
321
                         "       AND start_datetime_utc < %s "
322
                         " ORDER BY start_datetime_utc ")
323
                cursor_energy_db.execute(query, (virtual_meter_id, start_datetime_utc, end_datetime_utc,))
324
                rows_energy_values = cursor_energy_db.fetchall()
325
                if rows_energy_values is None or len(rows_energy_values) == 0:
326
                    energy_virtual_meter_hourly[virtual_meter_id] = None
327
                else:
328
                    energy_virtual_meter_hourly[virtual_meter_id] = dict()
329
                    for row_energy_value in rows_energy_values:
330
                        energy_virtual_meter_hourly[virtual_meter_id][row_energy_value[0]] = row_energy_value[1]
331
        except Exception as e:
332
            error_string = "Error in step 6.1 of equipment_energy_input_item.worker " + str(e)
333
            if cursor_energy_db:
334
                cursor_energy_db.close()
335
            if cnx_energy_db:
336
                cnx_energy_db.close()
337
            print(error_string)
338
            return error_string
339
340
    ####################################################################################################################
341
    # Step 7: for each offline meter in list, get energy input data from energy database
342
    ####################################################################################################################
343
    energy_offline_meter_hourly = dict()
344
    if offline_meter_list is not None and len(offline_meter_list) > 0:
345
        try:
346
            for offline_meter in offline_meter_list:
347
                offline_meter_id = str(offline_meter['id'])
348
349
                query = (" SELECT start_datetime_utc, actual_value "
350
                         " FROM tbl_offline_meter_hourly "
351
                         " WHERE offline_meter_id = %s "
352
                         "       AND start_datetime_utc >= %s "
353
                         "       AND start_datetime_utc < %s "
354
                         " ORDER BY start_datetime_utc ")
355
                cursor_energy_db.execute(query, (offline_meter_id, start_datetime_utc, end_datetime_utc,))
356
                rows_energy_values = cursor_energy_db.fetchall()
357
                if rows_energy_values is None or len(rows_energy_values) == 0:
358
                    energy_offline_meter_hourly[offline_meter_id] = None
359
                else:
360
                    energy_offline_meter_hourly[offline_meter_id] = dict()
361
                    for row_energy_value in rows_energy_values:
362
                        energy_offline_meter_hourly[offline_meter_id][row_energy_value[0]] = row_energy_value[1]
363
364
        except Exception as e:
365
            error_string = "Error in step 7.1 of equipment_energy_input_item.worker " + str(e)
366
            if cursor_energy_db:
367
                cursor_energy_db.close()
368
            if cnx_energy_db:
369
                cnx_energy_db.close()
370
            print(error_string)
371
            return error_string
372
373
    ####################################################################################################################
374
    # Step 8: determine common time slot to aggregate
375
    ####################################################################################################################
376
377
    common_start_datetime_utc = start_datetime_utc
378
    common_end_datetime_utc = end_datetime_utc
379
380
    print("Getting common time slot of energy values for all meters")
381
    if energy_meter_hourly is not None and len(energy_meter_hourly) > 0:
382
        for meter_id, energy_hourly in energy_meter_hourly.items():
383
            if energy_hourly is None or len(energy_hourly) == 0:
384
                common_start_datetime_utc = None
385
                common_end_datetime_utc = None
386
                break
387
            else:
388
                if common_start_datetime_utc < min(energy_hourly.keys()):
389
                    common_start_datetime_utc = min(energy_hourly.keys())
390
                if common_end_datetime_utc > max(energy_hourly.keys()):
391
                    common_end_datetime_utc = max(energy_hourly.keys())
392
393
    print("Getting common time slot of energy values for all virtual meters")
394
    if common_start_datetime_utc is not None and common_start_datetime_utc is not None:
395
        if energy_virtual_meter_hourly is not None and len(energy_virtual_meter_hourly) > 0:
396
            for meter_id, energy_hourly in energy_virtual_meter_hourly.items():
397
                if energy_hourly is None or len(energy_hourly) == 0:
398
                    common_start_datetime_utc = None
399
                    common_end_datetime_utc = None
400
                    break
401
                else:
402
                    if common_start_datetime_utc < min(energy_hourly.keys()):
403
                        common_start_datetime_utc = min(energy_hourly.keys())
404
                    if common_end_datetime_utc > max(energy_hourly.keys()):
405
                        common_end_datetime_utc = max(energy_hourly.keys())
406
407
    print("Getting common time slot of energy values for all offline meters")
408
    if common_start_datetime_utc is not None and common_start_datetime_utc is not None:
409
        if energy_offline_meter_hourly is not None and len(energy_offline_meter_hourly) > 0:
410
            for meter_id, energy_hourly in energy_offline_meter_hourly.items():
411
                if energy_hourly is None or len(energy_hourly) == 0:
412
                    common_start_datetime_utc = None
413
                    common_end_datetime_utc = None
414
                    break
415
                else:
416
                    if common_start_datetime_utc < min(energy_hourly.keys()):
417
                        common_start_datetime_utc = min(energy_hourly.keys())
418
                    if common_end_datetime_utc > max(energy_hourly.keys()):
419
                        common_end_datetime_utc = max(energy_hourly.keys())
420
421
    if (energy_meter_hourly is None or len(energy_meter_hourly) == 0) and \
422
            (energy_virtual_meter_hourly is None or len(energy_virtual_meter_hourly) == 0) and \
423
            (energy_offline_meter_hourly is None or len(energy_offline_meter_hourly) == 0):
424
        # There isn't any energy data
425
        print("There isn't any energy data")
426
        # continue the for equipment loop to the next equipment
427
        print("continue the for equipment loop to the next equipment")
428
        if cursor_energy_db:
429
            cursor_energy_db.close()
430
        if cnx_energy_db:
431
            cnx_energy_db.close()
432
        return None
433
434
    print("common_start_datetime_utc: " + str(common_start_datetime_utc))
435
    print("common_end_datetime_utc: " + str(common_end_datetime_utc))
436
437
    ####################################################################################################################
438
    # Step 9: aggregate energy data in the common time slot by energy items and hourly
439
    ####################################################################################################################
440
441
    print("Step 9: aggregate energy data in the common time slot by energy items and hourly")
442
    aggregated_values = list()
443
    try:
444
        current_datetime_utc = common_start_datetime_utc
445
        while common_start_datetime_utc is not None \
446
                and common_end_datetime_utc is not None \
447
                and current_datetime_utc <= common_end_datetime_utc:
448
            aggregated_value = dict()
449
            aggregated_value['start_datetime_utc'] = current_datetime_utc
450
            aggregated_value['meta_data'] = dict()
451
452
            if meter_list is not None and len(meter_list) > 0:
453
                for meter in meter_list:
454
                    meter_id = str(meter['id'])
455
                    energy_item_id = meter['energy_item_id']
456
                    actual_value = energy_meter_hourly[meter_id].get(current_datetime_utc, Decimal(0.0))
457
                    aggregated_value['meta_data'][energy_item_id] = \
458
                        aggregated_value['meta_data'].get(energy_item_id, Decimal(0.0)) + actual_value
459
460
            if virtual_meter_list is not None and len(virtual_meter_list) > 0:
461
                for virtual_meter in virtual_meter_list:
462
                    virtual_meter_id = str(virtual_meter['id'])
463
                    energy_item_id = virtual_meter['energy_item_id']
464
                    actual_value = energy_virtual_meter_hourly[virtual_meter_id].get(current_datetime_utc, Decimal(0.0))
465
                    aggregated_value['meta_data'][energy_item_id] = \
466
                        aggregated_value['meta_data'].get(energy_item_id, Decimal(0.0)) + actual_value
467
468
            if offline_meter_list is not None and len(offline_meter_list) > 0:
469
                for offline_meter in offline_meter_list:
470
                    offline_meter_id = str(offline_meter['id'])
471
                    energy_item_id = offline_meter['energy_item_id']
472
                    actual_value = energy_offline_meter_hourly[offline_meter_id].get(current_datetime_utc, Decimal(0.0))
473
                    aggregated_value['meta_data'][energy_item_id] = \
474
                        aggregated_value['meta_data'].get(energy_item_id, Decimal(0.0)) + actual_value
475
476
            aggregated_values.append(aggregated_value)
477
478
            current_datetime_utc += timedelta(minutes=config.minutes_to_count)
479
480
    except Exception as e:
481
        error_string = "Error in step 9 of equipment_energy_input_item.worker " + str(e)
482
        if cursor_energy_db:
483
            cursor_energy_db.close()
484
        if cnx_energy_db:
485
            cnx_energy_db.close()
486
        print(error_string)
487
        return error_string
488
489
    ####################################################################################################################
490
    # Step 10: save energy data to energy database
491
    ####################################################################################################################
492
    print("Step 10: save energy data to energy database")
493
494
    if len(aggregated_values) > 0:
495
        try:
496
            add_values = (" INSERT INTO tbl_equipment_input_item_hourly "
497
                          "             (equipment_id, "
498
                          "              energy_item_id, "
499
                          "              start_datetime_utc, "
500
                          "              actual_value) "
501
                          " VALUES  ")
502
503
            for aggregated_value in aggregated_values:
504
                for energy_item_id, actual_value in aggregated_value['meta_data'].items():
505
                    add_values += " (" + str(equipment['id']) + ","
506
                    add_values += " " + str(energy_item_id) + ","
507
                    add_values += "'" + aggregated_value['start_datetime_utc'].isoformat()[0:19] + "',"
508
                    add_values += str(actual_value) + "), "
509
            print("add_values:" + add_values)
510
            # trim ", " at the end of string and then execute
511
            cursor_energy_db.execute(add_values[:-2])
512
            cnx_energy_db.commit()
513
514
        except Exception as e:
515
            error_string = "Error in step 10.1 of equipment_energy_input_item.worker " + str(e)
516
            print(error_string)
517
            return error_string
518
        finally:
519
            if cursor_energy_db:
520
                cursor_energy_db.close()
521
            if cnx_energy_db:
522
                cnx_energy_db.close()
523
    else:
524
        if cursor_energy_db:
525
            cursor_energy_db.close()
526
        if cnx_energy_db:
527
            cnx_energy_db.close()
528