equipment_energy_input_category   F
last analyzed

Complexity

Total Complexity 134

Size/Duplication

Total Lines 529
Duplicated Lines 92.82 %

Importance

Changes 0
Metric Value
wmc 134
eloc 345
dl 491
loc 529
rs 2
c 0
b 0
f 0

2 Functions

Rating   Name   Duplication   Size   Complexity  
F main() 70 70 14
F worker() 421 421 120

How to fix   Duplicated Code    Complexity   

Duplicated Code

Duplicate code is one of the most pungent code smells. A rule that is often used is to re-structure code once it is duplicated in three or more places.

Common duplication problems, and corresponding solutions are:

Complexity

 Tip:   Before tackling complexity, make sure that you eliminate any duplication first. This often can reduce the size of classes significantly.

Complex classes like equipment_energy_input_category often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
import random
2
import time
3
from datetime import datetime, timedelta
4
from decimal import Decimal
5
from multiprocessing import Pool
6
7
import mysql.connector
8
9
import config
10
11
12
########################################################################################################################
13
# PROCEDURES
14
# Step 1: get all equipments
15
# Step 2: Create multiprocessing pool to call worker in parallel
16
########################################################################################################################
17
18
19 View Code Duplication
def main(logger):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
20
21
    while True:
22
        # the outermost while loop
23
        ################################################################################################################
24
        # Step 1: get all equipments
25
        ################################################################################################################
26
        cnx_system_db = None
27
        cursor_system_db = None
28
        try:
29
            cnx_system_db = mysql.connector.connect(**config.myems_system_db)
30
            cursor_system_db = cnx_system_db.cursor()
31
        except Exception as e:
32
            logger.error("Error in step 1.1 of equipment_energy_input_category.main " + str(e))
33
            if cursor_system_db:
34
                cursor_system_db.close()
35
            if cnx_system_db:
36
                cnx_system_db.close()
37
            # sleep and continue the outer loop to reconnect the database
38
            time.sleep(60)
39
            continue
40
        print("Connected to MyEMS System Database")
41
42
        equipment_list = list()
43
        try:
44
            cursor_system_db.execute(" SELECT id, name "
45
                                     " FROM tbl_equipments "
46
                                     " ORDER BY id ")
47
            rows_equipments = cursor_system_db.fetchall()
48
49
            if rows_equipments is None or len(rows_equipments) == 0:
50
                print("There isn't any equipments ")
51
                # sleep and continue the outer loop to reconnect the database
52
                time.sleep(60)
53
                continue
54
55
            for row in rows_equipments:
56
                equipment_list.append({"id": row[0], "name": row[1]})
57
58
        except Exception as e:
59
            logger.error("Error in step 1.2 of equipment_energy_input_category.main " + str(e))
60
            # sleep and continue the outer loop to reconnect the database
61
            time.sleep(60)
62
            continue
63
        finally:
64
            if cursor_system_db:
65
                cursor_system_db.close()
66
            if cnx_system_db:
67
                cnx_system_db.close()
68
69
        print("Got all equipments in MyEMS System Database")
70
71
        # shuffle the equipment list for randomly calculating the meter hourly value
72
        random.shuffle(equipment_list)
73
74
        ################################################################################################################
75
        # Step 2: Create multiprocessing pool to call worker in parallel
76
        ################################################################################################################
77
        p = Pool(processes=config.pool_size)
78
        error_list = p.map(worker, equipment_list)
79
        p.close()
80
        p.join()
81
82
        for error in error_list:
83
            if error is not None and len(error) > 0:
84
                logger.error(error)
85
86
        print("go to sleep 300 seconds...")
87
        time.sleep(300)
88
        print("wake from sleep, and continue to work...")
89
    # end of outer while
90
91
92
########################################################################################################################
93
# PROCEDURES:
94
#   Step 1: get all input meters associated with the equipment
95
#   Step 2: get all input virtual meters associated with the equipment
96
#   Step 3: get all input offline meters associated with the equipment
97
#   Step 4: determine start datetime and end datetime to aggregate
98
#   Step 5: for each meter in list, get energy input data from energy database
99
#   Step 6: for each virtual meter in list, get energy input data from energy database
100
#   Step 7: for each offline meter in list, get energy input data from energy database
101
#   Step 8: determine common time slot to aggregate
102
#   Step 9: aggregate energy data in the common time slot by energy categories and hourly
103
#   Step 10: save energy data to energy database
104
#
105
# NOTE: returns None or the error string because that the logger object cannot be passed in as parameter
106
########################################################################################################################
107
108 View Code Duplication
def worker(equipment):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
109
    ####################################################################################################################
110
    # Step 1: get all input meters associated with the equipment
111
    ####################################################################################################################
112
    print("Step 1: get all input meters associated with the equipment " + str(equipment['name']))
113
114
    meter_list = list()
115
    cnx_system_db = None
116
    cursor_system_db = None
117
    try:
118
        cnx_system_db = mysql.connector.connect(**config.myems_system_db)
119
        cursor_system_db = cnx_system_db.cursor()
120
    except Exception as e:
121
        error_string = "Error in step 1.1 of equipment_energy_input_category.worker " + str(e)
122
        if cursor_system_db:
123
            cursor_system_db.close()
124
        if cnx_system_db:
125
            cnx_system_db.close()
126
        print(error_string)
127
        return error_string
128
129
    try:
130
        cursor_system_db.execute(" SELECT m.id, m.name, m.energy_category_id "
131
                                 " FROM tbl_meters m, tbl_equipments_meters em "
132
                                 " WHERE m.id = em.meter_id "
133
                                 "       AND m.is_counted = 1 "
134
                                 "       AND em.is_output = 0 "
135
                                 "       AND em.equipment_id = %s ",
136
                                 (equipment['id'],))
137
        rows_meters = cursor_system_db.fetchall()
138
139
        if rows_meters is not None and len(rows_meters) > 0:
140
            for row in rows_meters:
141
                meter_list.append({"id": row[0],
142
                                   "name": row[1],
143
                                   "energy_category_id": row[2]})
144
145
    except Exception as e:
146
        error_string = "Error in step 1.2 of equipment_energy_input_category.worker " + str(e)
147
        if cursor_system_db:
148
            cursor_system_db.close()
149
        if cnx_system_db:
150
            cnx_system_db.close()
151
        print(error_string)
152
        return error_string
153
154
    ####################################################################################################################
155
    # Step 2: get all input virtual meters associated with the equipment
156
    ####################################################################################################################
157
    print("Step 2: get all input virtual meters associated with the equipment")
158
    virtual_meter_list = list()
159
160
    try:
161
        cursor_system_db.execute(" SELECT m.id, m.name, m.energy_category_id "
162
                                 " FROM tbl_virtual_meters m, tbl_equipments_virtual_meters em "
163
                                 " WHERE m.id = em.virtual_meter_id "
164
                                 "       AND m.is_counted = 1 "
165
                                 "       AND em.is_output = 0 "
166
                                 "       AND em.equipment_id = %s ",
167
                                 (equipment['id'],))
168
        rows_virtual_meters = cursor_system_db.fetchall()
169
170
        if rows_virtual_meters is not None and len(rows_virtual_meters) > 0:
171
            for row in rows_virtual_meters:
172
                virtual_meter_list.append({"id": row[0],
173
                                           "name": row[1],
174
                                           "energy_category_id": row[2]})
175
176
    except Exception as e:
177
        error_string = "Error in step 2.1 of equipment_energy_input_category.worker " + str(e)
178
        if cursor_system_db:
179
            cursor_system_db.close()
180
        if cnx_system_db:
181
            cnx_system_db.close()
182
        print(error_string)
183
        return error_string
184
185
    ####################################################################################################################
186
    # Step 3: get all input offline meters associated with the equipment
187
    ####################################################################################################################
188
    print("Step 3: get all input offline meters associated with the equipment")
189
190
    offline_meter_list = list()
191
192
    try:
193
        cursor_system_db.execute(" SELECT m.id, m.name, m.energy_category_id "
194
                                 " FROM tbl_offline_meters m, tbl_equipments_offline_meters em "
195
                                 " WHERE m.id = em.offline_meter_id "
196
                                 "       AND m.is_counted = 1 "
197
                                 "       AND em.is_output = 0 "
198
                                 "       AND em.equipment_id = %s ",
199
                                 (equipment['id'],))
200
        rows_offline_meters = cursor_system_db.fetchall()
201
202
        if rows_offline_meters is not None and len(rows_offline_meters) > 0:
203
            for row in rows_offline_meters:
204
                offline_meter_list.append({"id": row[0],
205
                                           "name": row[1],
206
                                           "energy_category_id": row[2]})
207
208
    except Exception as e:
209
        error_string = "Error in step 3.1 of equipment_energy_input_category.worker " + str(e)
210
        print(error_string)
211
        return error_string
212
    finally:
213
        if cursor_system_db:
214
            cursor_system_db.close()
215
        if cnx_system_db:
216
            cnx_system_db.close()
217
218
    ####################################################################################################################
219
    # stop to the next equipment if this equipment is empty
220
    ####################################################################################################################
221
    if (meter_list is None or len(meter_list) == 0) and \
222
            (virtual_meter_list is None or len(virtual_meter_list) == 0) and \
223
            (offline_meter_list is None or len(offline_meter_list) == 0):
224
        print("This is an empty equipment ")
225
        return None
226
227
    ####################################################################################################################
228
    # Step 4: determine start datetime and end datetime to aggregate
229
    ####################################################################################################################
230
    print("Step 4: determine start datetime and end datetime to aggregate")
231
    cnx_energy_db = None
232
    cursor_energy_db = None
233
    try:
234
        cnx_energy_db = mysql.connector.connect(**config.myems_energy_db)
235
        cursor_energy_db = cnx_energy_db.cursor()
236
    except Exception as e:
237
        error_string = "Error in step 4.1 of equipment_energy_input_category.worker " + str(e)
238
        if cursor_energy_db:
239
            cursor_energy_db.close()
240
        if cnx_energy_db:
241
            cnx_energy_db.close()
242
        print(error_string)
243
        return error_string
244
245
    try:
246
        query = (" SELECT MAX(start_datetime_utc) "
247
                 " FROM tbl_equipment_input_category_hourly "
248
                 " WHERE equipment_id = %s ")
249
        cursor_energy_db.execute(query, (equipment['id'],))
250
        row_datetime = cursor_energy_db.fetchone()
251
        start_datetime_utc = datetime.strptime(config.start_datetime_utc, '%Y-%m-%d %H:%M:%S')
252
        start_datetime_utc = start_datetime_utc.replace(minute=0, second=0, microsecond=0, tzinfo=None)
253
254
        if row_datetime is not None and len(row_datetime) > 0 and isinstance(row_datetime[0], datetime):
255
            # replace second and microsecond with 0
256
            # note: do not replace minute in case of calculating in half hourly
257
            start_datetime_utc = row_datetime[0].replace(second=0, microsecond=0, tzinfo=None)
258
            # start from the next time slot
259
            start_datetime_utc += timedelta(minutes=config.minutes_to_count)
260
261
        end_datetime_utc = datetime.utcnow().replace(second=0, microsecond=0, tzinfo=None)
262
263
        print("start_datetime_utc: " + start_datetime_utc.isoformat()[0:19]
264
              + "end_datetime_utc: " + end_datetime_utc.isoformat()[0:19])
265
266
    except Exception as e:
267
        error_string = "Error in step 4.2 of equipment_energy_input_category.worker " + str(e)
268
        if cursor_energy_db:
269
            cursor_energy_db.close()
270
        if cnx_energy_db:
271
            cnx_energy_db.close()
272
        print(error_string)
273
        return error_string
274
275
    ####################################################################################################################
276
    # Step 5: for each meter in list, get energy input data from energy database
277
    ####################################################################################################################
278
    energy_meter_hourly = dict()
279
    try:
280
        if meter_list is not None and len(meter_list) > 0:
281
            for meter in meter_list:
282
                meter_id = str(meter['id'])
283
284
                query = (" SELECT start_datetime_utc, actual_value "
285
                         " FROM tbl_meter_hourly "
286
                         " WHERE meter_id = %s "
287
                         "       AND start_datetime_utc >= %s "
288
                         "       AND start_datetime_utc < %s "
289
                         " ORDER BY start_datetime_utc ")
290
                cursor_energy_db.execute(query, (meter_id, start_datetime_utc, end_datetime_utc,))
291
                rows_energy_values = cursor_energy_db.fetchall()
292
                if rows_energy_values is None or len(rows_energy_values) == 0:
293
                    energy_meter_hourly[meter_id] = None
294
                else:
295
                    energy_meter_hourly[meter_id] = dict()
296
                    for row_energy_value in rows_energy_values:
297
                        energy_meter_hourly[meter_id][row_energy_value[0]] = row_energy_value[1]
298
    except Exception as e:
299
        error_string = "Error in step 5.1 of equipment_energy_input_category.worker " + str(e)
300
        if cursor_energy_db:
301
            cursor_energy_db.close()
302
        if cnx_energy_db:
303
            cnx_energy_db.close()
304
        print(error_string)
305
        return error_string
306
307
    ####################################################################################################################
308
    # Step 6: for each virtual meter in list, get energy input data from energy database
309
    ####################################################################################################################
310
    energy_virtual_meter_hourly = dict()
311
    if virtual_meter_list is not None and len(virtual_meter_list) > 0:
312
        try:
313
            for virtual_meter in virtual_meter_list:
314
                virtual_meter_id = str(virtual_meter['id'])
315
316
                query = (" SELECT start_datetime_utc, actual_value "
317
                         " FROM tbl_virtual_meter_hourly "
318
                         " WHERE virtual_meter_id = %s "
319
                         "       AND start_datetime_utc >= %s "
320
                         "       AND start_datetime_utc < %s "
321
                         " ORDER BY start_datetime_utc ")
322
                cursor_energy_db.execute(query, (virtual_meter_id, start_datetime_utc, end_datetime_utc,))
323
                rows_energy_values = cursor_energy_db.fetchall()
324
                if rows_energy_values is None or len(rows_energy_values) == 0:
325
                    energy_virtual_meter_hourly[virtual_meter_id] = None
326
                else:
327
                    energy_virtual_meter_hourly[virtual_meter_id] = dict()
328
                    for row_energy_value in rows_energy_values:
329
                        energy_virtual_meter_hourly[virtual_meter_id][row_energy_value[0]] = row_energy_value[1]
330
        except Exception as e:
331
            error_string = "Error in step 6.1 of equipment_energy_input_category.worker " + str(e)
332
            if cursor_energy_db:
333
                cursor_energy_db.close()
334
            if cnx_energy_db:
335
                cnx_energy_db.close()
336
            print(error_string)
337
            return error_string
338
339
    ####################################################################################################################
340
    # Step 7: for each offline meter in list, get energy input data from energy database
341
    ####################################################################################################################
342
    energy_offline_meter_hourly = dict()
343
    if offline_meter_list is not None and len(offline_meter_list) > 0:
344
        try:
345
            for offline_meter in offline_meter_list:
346
                offline_meter_id = str(offline_meter['id'])
347
348
                query = (" SELECT start_datetime_utc, actual_value "
349
                         " FROM tbl_offline_meter_hourly "
350
                         " WHERE offline_meter_id = %s "
351
                         "       AND start_datetime_utc >= %s "
352
                         "       AND start_datetime_utc < %s "
353
                         " ORDER BY start_datetime_utc ")
354
                cursor_energy_db.execute(query, (offline_meter_id, start_datetime_utc, end_datetime_utc,))
355
                rows_energy_values = cursor_energy_db.fetchall()
356
                if rows_energy_values is None or len(rows_energy_values) == 0:
357
                    energy_offline_meter_hourly[offline_meter_id] = None
358
                else:
359
                    energy_offline_meter_hourly[offline_meter_id] = dict()
360
                    for row_energy_value in rows_energy_values:
361
                        energy_offline_meter_hourly[offline_meter_id][row_energy_value[0]] = row_energy_value[1]
362
363
        except Exception as e:
364
            error_string = "Error in step 7.1 of equipment_energy_input_category.worker " + str(e)
365
            if cursor_energy_db:
366
                cursor_energy_db.close()
367
            if cnx_energy_db:
368
                cnx_energy_db.close()
369
            print(error_string)
370
            return error_string
371
372
    ####################################################################################################################
373
    # Step 8: determine common time slot to aggregate
374
    ####################################################################################################################
375
376
    common_start_datetime_utc = start_datetime_utc
377
    common_end_datetime_utc = end_datetime_utc
378
379
    print("Getting common time slot of energy values for all meters")
380
    if energy_meter_hourly is not None and len(energy_meter_hourly) > 0:
381
        for meter_id, energy_hourly in energy_meter_hourly.items():
382
            if energy_hourly is None or len(energy_hourly) == 0:
383
                common_start_datetime_utc = None
384
                common_end_datetime_utc = None
385
                break
386
            else:
387
                if common_start_datetime_utc < min(energy_hourly.keys()):
388
                    common_start_datetime_utc = min(energy_hourly.keys())
389
                if common_end_datetime_utc > max(energy_hourly.keys()):
390
                    common_end_datetime_utc = max(energy_hourly.keys())
391
392
    print("Getting common time slot of energy values for all virtual meters")
393
    if common_start_datetime_utc is not None and common_end_datetime_utc is not None:
394
        if energy_virtual_meter_hourly is not None and len(energy_virtual_meter_hourly) > 0:
395
            for meter_id, energy_hourly in energy_virtual_meter_hourly.items():
396
                if energy_hourly is None or len(energy_hourly) == 0:
397
                    common_start_datetime_utc = None
398
                    common_end_datetime_utc = None
399
                    break
400
                else:
401
                    if common_start_datetime_utc < min(energy_hourly.keys()):
402
                        common_start_datetime_utc = min(energy_hourly.keys())
403
                    if common_end_datetime_utc > max(energy_hourly.keys()):
404
                        common_end_datetime_utc = max(energy_hourly.keys())
405
406
    print("Getting common time slot of energy values for all offline meters")
407
    if common_start_datetime_utc is not None and common_end_datetime_utc is not None:
408
        if energy_offline_meter_hourly is not None and len(energy_offline_meter_hourly) > 0:
409
            for meter_id, energy_hourly in energy_offline_meter_hourly.items():
410
                if energy_hourly is None or len(energy_hourly) == 0:
411
                    common_start_datetime_utc = None
412
                    common_end_datetime_utc = None
413
                    break
414
                else:
415
                    if common_start_datetime_utc < min(energy_hourly.keys()):
416
                        common_start_datetime_utc = min(energy_hourly.keys())
417
                    if common_end_datetime_utc > max(energy_hourly.keys()):
418
                        common_end_datetime_utc = max(energy_hourly.keys())
419
420
    if (energy_meter_hourly is None or len(energy_meter_hourly) == 0) and \
421
            (energy_virtual_meter_hourly is None or len(energy_virtual_meter_hourly) == 0) and \
422
            (energy_offline_meter_hourly is None or len(energy_offline_meter_hourly) == 0):
423
        # There isn't any energy data
424
        print("There isn't any energy data")
425
        # continue the for equipment loop to the next equipment
426
        print("continue the for equipment loop to the next equipment")
427
        if cursor_energy_db:
428
            cursor_energy_db.close()
429
        if cnx_energy_db:
430
            cnx_energy_db.close()
431
        return None
432
433
    print("common_start_datetime_utc: " + str(common_start_datetime_utc))
434
    print("common_end_datetime_utc: " + str(common_end_datetime_utc))
435
436
    ####################################################################################################################
437
    # Step 9: aggregate energy data in the common time slot by energy categories and hourly
438
    ####################################################################################################################
439
440
    print("Step 9: aggregate energy data in the common time slot by energy categories and hourly")
441
    aggregated_values = list()
442
    try:
443
        current_datetime_utc = common_start_datetime_utc
444
        while common_start_datetime_utc is not None \
445
                and common_end_datetime_utc is not None \
446
                and current_datetime_utc <= common_end_datetime_utc:
447
            aggregated_value = dict()
448
            aggregated_value['start_datetime_utc'] = current_datetime_utc
449
            aggregated_value['meta_data'] = dict()
450
451
            if meter_list is not None and len(meter_list) > 0:
452
                for meter in meter_list:
453
                    meter_id = str(meter['id'])
454
                    energy_category_id = meter['energy_category_id']
455
                    actual_value = energy_meter_hourly[meter_id].get(current_datetime_utc, Decimal(0.0))
456
                    aggregated_value['meta_data'][energy_category_id] = \
457
                        aggregated_value['meta_data'].get(energy_category_id, Decimal(0.0)) + actual_value
458
459
            if virtual_meter_list is not None and len(virtual_meter_list) > 0:
460
                for virtual_meter in virtual_meter_list:
461
                    virtual_meter_id = str(virtual_meter['id'])
462
                    energy_category_id = virtual_meter['energy_category_id']
463
                    actual_value = energy_virtual_meter_hourly[virtual_meter_id].get(current_datetime_utc, Decimal(0.0))
464
                    aggregated_value['meta_data'][energy_category_id] = \
465
                        aggregated_value['meta_data'].get(energy_category_id, Decimal(0.0)) + actual_value
466
467
            if offline_meter_list is not None and len(offline_meter_list) > 0:
468
                for offline_meter in offline_meter_list:
469
                    offline_meter_id = str(offline_meter['id'])
470
                    energy_category_id = offline_meter['energy_category_id']
471
                    actual_value = energy_offline_meter_hourly[offline_meter_id].get(current_datetime_utc, Decimal(0.0))
472
                    aggregated_value['meta_data'][energy_category_id] = \
473
                        aggregated_value['meta_data'].get(energy_category_id, Decimal(0.0)) + actual_value
474
475
            aggregated_values.append(aggregated_value)
476
477
            current_datetime_utc += timedelta(minutes=config.minutes_to_count)
478
479
    except Exception as e:
480
        error_string = "Error in step 9 of equipment_energy_input_category.worker " + str(e)
481
        if cursor_energy_db:
482
            cursor_energy_db.close()
483
        if cnx_energy_db:
484
            cnx_energy_db.close()
485
        print(error_string)
486
        return error_string
487
488
    ####################################################################################################################
489
    # Step 10: save energy data to energy database
490
    ####################################################################################################################
491
    print("Step 10: save energy data to energy database")
492
493
    while len(aggregated_values) > 0:
494
        insert_100 = aggregated_values[:100]
495
        aggregated_values = aggregated_values[100:]
496
        try:
497
            add_values = (" INSERT INTO tbl_equipment_input_category_hourly "
498
                          "             (equipment_id, "
499
                          "              energy_category_id, "
500
                          "              start_datetime_utc, "
501
                          "              actual_value) "
502
                          " VALUES  ")
503
504
            for aggregated_value in insert_100:
505
                for energy_category_id, actual_value in aggregated_value['meta_data'].items():
506
                    add_values += " (" + str(equipment['id']) + ","
507
                    add_values += " " + str(energy_category_id) + ","
508
                    add_values += "'" + aggregated_value['start_datetime_utc'].isoformat()[0:19] + "',"
509
                    add_values += str(actual_value) + "), "
510
            # print("add_values:" + add_values)
511
            # trim ", " at the end of string and then execute
512
            cursor_energy_db.execute(add_values[:-2])
513
            cnx_energy_db.commit()
514
515
        except Exception as e:
516
            error_string = "Error in step 10.1 of equipment_energy_input_category.worker " + str(e)
517
            print(error_string)
518
            if cursor_energy_db:
519
                cursor_energy_db.close()
520
            if cnx_energy_db:
521
                cnx_energy_db.close()
522
            return error_string
523
524
    if cursor_energy_db:
525
        cursor_energy_db.close()
526
    if cnx_energy_db:
527
        cnx_energy_db.close()
528
    return None
529