combined_equipment_energy_input_item   F
last analyzed

Complexity

Total Complexity 169

Size/Duplication

Total Lines 681
Duplicated Lines 86.05 %

Importance

Changes 0
Metric Value
wmc 169
eloc 412
dl 586
loc 681
rs 2
c 0
b 0
f 0

2 Functions

Rating   Name   Duplication   Size   Complexity  
F main() 70 91 14
F worker() 516 529 155

How to fix   Duplicated Code    Complexity   

Duplicated Code

Duplicate code is one of the most pungent code smells. A rule that is often used is to re-structure code once it is duplicated in three or more places.

Common duplication problems, and corresponding solutions are:

Complexity

 Tip:   Before tackling complexity, make sure that you eliminate any duplication first. This often can reduce the size of classes significantly.

Complex classes like combined_equipment_energy_input_item often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
"""
2
MyEMS Combined Equipment Energy Input Item Aggregation Service
3
4
This module handles the aggregation of energy input consumption data for combined equipment by energy items.
5
It processes energy consumption data from various sources (meters, virtual meters, offline meters, equipment)
6
associated with each combined equipment and aggregates them into hourly energy consumption by items.
7
8
The service follows a systematic approach:
9
1. Retrieves all combined equipment from the system database
10
2. Creates a multiprocessing pool to process combined equipment in parallel
11
3. For each combined equipment, retrieves associated input sources (meters, virtual meters, offline meters, equipment)
12
4. Determines the time range for data aggregation
13
5. Fetches energy consumption data from all input sources
14
6. Finds the common time slot across all sources
15
7. Aggregates energy data by energy items and time slots
16
8. Saves the aggregated data to the energy database
17
18
This service runs continuously, processing new energy data as it becomes available and
19
ensuring accurate energy consumption aggregation for all combined equipment in the system.
20
"""
21
22
import random
23
import time
24
from datetime import datetime, timedelta
25
from decimal import Decimal
26
from multiprocessing import Pool
27
28
import mysql.connector
29
30
import config
31
32
33
########################################################################################################################
34
# PROCEDURES
35
# Step 1: get all combined equipments
36
# Step 2: Create multiprocessing pool to call worker in parallel
37
########################################################################################################################
38
39
40 View Code Duplication
def main(logger):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
41
    """
42
    Main function for combined equipment energy input item aggregation service.
43
44
    This function runs continuously and processes energy aggregation for all combined equipment
45
    by energy items. It uses multiprocessing to handle multiple combined equipment in parallel
46
    for better performance.
47
48
    Args:
49
        logger: Logger instance for recording activities and errors
50
51
    The function follows these steps:
52
    1. Connects to the system database and retrieves all combined equipment
53
    2. Creates a multiprocessing pool to process combined equipment in parallel
54
    3. Sleeps for 300 seconds before the next processing cycle
55
    """
56
57
    while True:
58
        # Main processing loop - runs continuously
59
        ################################################################################################################
60
        # Step 1: Get all combined equipment from system database
61
        ################################################################################################################
62
        cnx_system_db = None
63
        cursor_system_db = None
64
        try:
65
            # Connect to MyEMS System Database
66
            cnx_system_db = mysql.connector.connect(**config.myems_system_db)
67
            cursor_system_db = cnx_system_db.cursor()
68
        except Exception as e:
69
            logger.error("Error in step 1.1 of combined_equipment_energy_input_item.main " + str(e))
70
            if cursor_system_db:
71
                cursor_system_db.close()
72
            if cnx_system_db:
73
                cnx_system_db.close()
74
            # Sleep and continue the main loop to reconnect the database
75
            time.sleep(60)
76
            continue
77
        print("Connected to MyEMS System Database")
78
79
        # Retrieve all combined equipment from the system database
80
        combined_equipment_list = list()
81
        try:
82
            cursor_system_db.execute(" SELECT id, name "
83
                                     " FROM tbl_combined_equipments "
84
                                     " ORDER BY id ")
85
            rows_combined_equipments = cursor_system_db.fetchall()
86
87
            if rows_combined_equipments is None or len(rows_combined_equipments) == 0:
88
                print("There isn't any combined equipments ")
89
                # Sleep and continue the main loop to reconnect the database
90
                time.sleep(60)
91
                continue
92
93
            # Build combined equipment list with id and name
94
            for row in rows_combined_equipments:
95
                combined_equipment_list.append({"id": row[0], "name": row[1]})
96
97
        except Exception as e:
98
            logger.error("Error in step 1.2 of combined_equipment_energy_input_item.main " + str(e))
99
            # Sleep and continue the main loop to reconnect the database
100
            time.sleep(60)
101
            continue
102
        finally:
103
            if cursor_system_db:
104
                cursor_system_db.close()
105
            if cnx_system_db:
106
                cnx_system_db.close()
107
108
        print("Got all combined equipments in MyEMS System Database")
109
110
        # Shuffle the combined equipment list for randomly calculating the meter hourly value
111
        random.shuffle(combined_equipment_list)
112
113
        ################################################################################################################
114
        # Step 2: Create multiprocessing pool to call worker in parallel
115
        ################################################################################################################
116
        # Create multiprocessing pool to process combined equipment in parallel
117
        p = Pool(processes=config.pool_size)
118
        error_list = p.map(worker, combined_equipment_list)
119
        p.close()
120
        p.join()
121
122
        # Log any errors that occurred during processing
123
        for error in error_list:
124
            if error is not None and len(error) > 0:
125
                logger.error(error)
126
127
        # Sleep for 300 seconds before the next processing cycle
128
        print("go to sleep 300 seconds...")
129
        time.sleep(300)
130
        print("wake from sleep, and continue to work...")
131
    # End of main processing loop
132
133
134
########################################################################################################################
135
# PROCEDURES:
136
#   Step 1: get all input meters associated with the combined equipment
137
#   Step 2: get all input virtual meters associated with the combined equipment
138
#   Step 3: get all input offline meters associated with the combined equipment
139
#   Step 4: get all equipments associated with the combined equipment
140
#   Step 5: determine start datetime and end datetime to aggregate
141
#   Step 6: for each meter in list, get energy input data from energy database
142
#   Step 7: for each virtual meter in list, get energy input data from energy database
143
#   Step 8: for each offline meter in list, get energy input data from energy database
144
#   Step 9: for each equipment in list, get energy input data from energy database
145
#   Step 10: determine common time slot to aggregate
146
#   Step 11: aggregate energy data in the common time slot by energy items and hourly
147
#   Step 12: save energy data to energy database
148
#
149
# NOTE: returns None or the error string because that the logger object cannot be passed in as parameter
150
########################################################################################################################
151
152 View Code Duplication
def worker(combined_equipment):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
153
    """
154
    Worker function to process energy aggregation for a single combined equipment by energy items.
155
156
    This function handles the complete energy aggregation process for one combined equipment,
157
    including retrieving associated input sources, fetching energy data, and saving
158
    aggregated results to the database.
159
160
    Args:
161
        combined_equipment: Dictionary containing combined equipment information (id, name)
162
163
    Returns:
164
        None if successful, error string if an error occurred
165
    """
166
    ####################################################################################################################
167
    # Step 1: get all input meters associated with the combined equipment
168
    ####################################################################################################################
169
    print("Step 1: get all input meters associated with the combined equipment " + str(combined_equipment['name']))
170
171
    meter_list = list()
172
    cnx_system_db = None
173
    cursor_system_db = None
174
    try:
175
        cnx_system_db = mysql.connector.connect(**config.myems_system_db)
176
        cursor_system_db = cnx_system_db.cursor()
177
    except Exception as e:
178
        error_string = "Error in step 1.1 of combined_equipment_energy_input_item.worker " + str(e)
179
        if cursor_system_db:
180
            cursor_system_db.close()
181
        if cnx_system_db:
182
            cnx_system_db.close()
183
        print(error_string)
184
        return error_string
185
186
    try:
187
        cursor_system_db.execute(" SELECT m.id, m.name, m.energy_item_id "
188
                                 " FROM tbl_meters m, tbl_combined_equipments_meters em "
189
                                 " WHERE m.id = em.meter_id "
190
                                 "       AND m.is_counted = 1 "
191
                                 "       AND m.energy_item_id is NOT NULL "
192
                                 "       AND em.is_output = 0 "
193
                                 "       AND em.combined_equipment_id = %s ",
194
                                 (combined_equipment['id'],))
195
        rows_meters = cursor_system_db.fetchall()
196
197
        if rows_meters is not None and len(rows_meters) > 0:
198
            for row in rows_meters:
199
                meter_list.append({"id": row[0],
200
                                   "name": row[1],
201
                                   "energy_item_id": row[2]})
202
203
    except Exception as e:
204
        error_string = "Error in step 1.2 of combined_equipment_energy_input_item.worker " + str(e)
205
        if cursor_system_db:
206
            cursor_system_db.close()
207
        if cnx_system_db:
208
            cnx_system_db.close()
209
        print(error_string)
210
        return error_string
211
212
    ####################################################################################################################
213
    # Step 2: get all input virtual meters associated with the combined equipment
214
    ####################################################################################################################
215
    print("Step 2: get all input virtual meters associated with the combined equipment")
216
    virtual_meter_list = list()
217
218
    try:
219
        cursor_system_db.execute(" SELECT m.id, m.name, m.energy_item_id "
220
                                 " FROM tbl_virtual_meters m, tbl_combined_equipments_virtual_meters em "
221
                                 " WHERE m.id = em.virtual_meter_id "
222
                                 "       AND m.energy_item_id is NOT NULL "
223
                                 "       AND m.is_counted = 1 "
224
                                 "       AND em.is_output = 0 "
225
                                 "       AND em.combined_equipment_id = %s ",
226
                                 (combined_equipment['id'],))
227
        rows_virtual_meters = cursor_system_db.fetchall()
228
229
        if rows_virtual_meters is not None and len(rows_virtual_meters) > 0:
230
            for row in rows_virtual_meters:
231
                virtual_meter_list.append({"id": row[0],
232
                                           "name": row[1],
233
                                           "energy_item_id": row[2]})
234
235
    except Exception as e:
236
        error_string = "Error in step 2.1 of combined_equipment_energy_input_item.worker " + str(e)
237
        if cursor_system_db:
238
            cursor_system_db.close()
239
        if cnx_system_db:
240
            cnx_system_db.close()
241
        print(error_string)
242
        return error_string
243
244
    ####################################################################################################################
245
    # Step 3: get all input offline meters associated with the combined equipment
246
    ####################################################################################################################
247
    print("Step 3: get all input offline meters associated with the combined equipment")
248
249
    offline_meter_list = list()
250
251
    try:
252
        cursor_system_db.execute(" SELECT m.id, m.name, m.energy_item_id "
253
                                 " FROM tbl_offline_meters m, tbl_combined_equipments_offline_meters em "
254
                                 " WHERE m.id = em.offline_meter_id "
255
                                 "       AND m.energy_item_id is NOT NULL "
256
                                 "       AND m.is_counted = 1 "
257
                                 "       AND em.is_output = 0 "
258
                                 "       AND em.combined_equipment_id = %s ",
259
                                 (combined_equipment['id'],))
260
        rows_offline_meters = cursor_system_db.fetchall()
261
262
        if rows_offline_meters is not None and len(rows_offline_meters) > 0:
263
            for row in rows_offline_meters:
264
                offline_meter_list.append({"id": row[0],
265
                                           "name": row[1],
266
                                           "energy_item_id": row[2]})
267
268
    except Exception as e:
269
        error_string = "Error in step 3.1 of combined_equipment_energy_input_item.worker " + str(e)
270
        if cursor_system_db:
271
            cursor_system_db.close()
272
        if cnx_system_db:
273
            cnx_system_db.close()
274
        print(error_string)
275
        return error_string
276
277
    ####################################################################################################################
278
    # Step 4: get all equipments associated with the combined equipment
279
    ####################################################################################################################
280
    print("Step 4: get all equipments associated with the combined equipment")
281
282
    equipment_list = list()
283
284
    try:
285
        cursor_system_db.execute(" SELECT e.id, e.name "
286
                                 " FROM tbl_equipments e, tbl_combined_equipments_equipments ce "
287
                                 " WHERE e.id = ce.equipment_id "
288
                                 "       AND e.is_input_counted = 1 "
289
                                 "       AND ce.combined_equipment_id = %s ",
290
                                 (combined_equipment['id'],))
291
        rows_equipments = cursor_system_db.fetchall()
292
293
        if rows_equipments is not None and len(rows_equipments) > 0:
294
            for row in rows_equipments:
295
                equipment_list.append({"id": row[0],
296
                                       "name": row[1]})
297
298
    except Exception as e:
299
        error_string = "Error in step 4 of combined_equipment_energy_input_item.worker " + str(e)
300
        print(error_string)
301
        return error_string
302
    finally:
303
        if cursor_system_db:
304
            cursor_system_db.close()
305
        if cnx_system_db:
306
            cnx_system_db.close()
307
308
    ####################################################################################################################
309
    # stop to the next combined equipment if this combined equipment is empty
310
    ####################################################################################################################
311
    if (meter_list is None or len(meter_list) == 0) and \
312
            (virtual_meter_list is None or len(virtual_meter_list) == 0) and \
313
            (offline_meter_list is None or len(offline_meter_list) == 0) and \
314
            (equipment_list is None or len(equipment_list) == 0):
315
        print("This is an empty combined equipment ")
316
        return None
317
318
    ####################################################################################################################
319
    # Step 5: determine start datetime and end datetime to aggregate
320
    ####################################################################################################################
321
    print("Step 5: determine start datetime and end datetime to aggregate")
322
    cnx_energy_db = None
323
    cursor_energy_db = None
324
    try:
325
        cnx_energy_db = mysql.connector.connect(**config.myems_energy_db)
326
        cursor_energy_db = cnx_energy_db.cursor()
327
    except Exception as e:
328
        error_string = "Error in step 5.1 of combined_equipment_energy_input_item.worker " + str(e)
329
        if cursor_energy_db:
330
            cursor_energy_db.close()
331
        if cnx_energy_db:
332
            cnx_energy_db.close()
333
        print(error_string)
334
        return error_string
335
336
    try:
337
        query = (" SELECT MAX(start_datetime_utc) "
338
                 " FROM tbl_combined_equipment_input_item_hourly "
339
                 " WHERE combined_equipment_id = %s ")
340
        cursor_energy_db.execute(query, (combined_equipment['id'],))
341
        row_datetime = cursor_energy_db.fetchone()
342
        start_datetime_utc = datetime.strptime(config.start_datetime_utc, '%Y-%m-%d %H:%M:%S')
343
        start_datetime_utc = start_datetime_utc.replace(minute=0, second=0, microsecond=0, tzinfo=None)
344
345
        if row_datetime is not None and len(row_datetime) > 0 and isinstance(row_datetime[0], datetime):
346
            # replace second and microsecond with 0
347
            # note: do not replace minute in case of calculating in half hourly
348
            start_datetime_utc = row_datetime[0].replace(second=0, microsecond=0, tzinfo=None)
349
            # start from the next time slot
350
            start_datetime_utc += timedelta(minutes=config.minutes_to_count)
351
352
        end_datetime_utc = datetime.utcnow().replace(second=0, microsecond=0, tzinfo=None)
353
354
        print("start_datetime_utc: " + start_datetime_utc.isoformat()[0:19]
355
              + "end_datetime_utc: " + end_datetime_utc.isoformat()[0:19])
356
357
    except Exception as e:
358
        error_string = "Error in step 5.2 of combined_equipment_energy_input_item.worker " + str(e)
359
        if cursor_energy_db:
360
            cursor_energy_db.close()
361
        if cnx_energy_db:
362
            cnx_energy_db.close()
363
        print(error_string)
364
        return error_string
365
366
    ####################################################################################################################
367
    # Step 6: for each meter in list, get energy input data from energy database
368
    ####################################################################################################################
369
    energy_meter_hourly = dict()
370
    try:
371
        if meter_list is not None and len(meter_list) > 0:
372
            for meter in meter_list:
373
                meter_id = str(meter['id'])
374
375
                query = (" SELECT start_datetime_utc, actual_value "
376
                         " FROM tbl_meter_hourly "
377
                         " WHERE meter_id = %s "
378
                         "       AND start_datetime_utc >= %s "
379
                         "       AND start_datetime_utc < %s "
380
                         " ORDER BY start_datetime_utc ")
381
                cursor_energy_db.execute(query, (meter_id, start_datetime_utc, end_datetime_utc,))
382
                rows_energy_values = cursor_energy_db.fetchall()
383
                if rows_energy_values is None or len(rows_energy_values) == 0:
384
                    energy_meter_hourly[meter_id] = None
385
                else:
386
                    energy_meter_hourly[meter_id] = dict()
387
                    for row_energy_value in rows_energy_values:
388
                        energy_meter_hourly[meter_id][row_energy_value[0]] = row_energy_value[1]
389
    except Exception as e:
390
        error_string = "Error in step 6.1 of combined_equipment_energy_input_item.worker " + str(e)
391
        if cursor_energy_db:
392
            cursor_energy_db.close()
393
        if cnx_energy_db:
394
            cnx_energy_db.close()
395
        print(error_string)
396
        return error_string
397
398
    ####################################################################################################################
399
    # Step 7: for each virtual meter in list, get energy input data from energy database
400
    ####################################################################################################################
401
    energy_virtual_meter_hourly = dict()
402
    if virtual_meter_list is not None and len(virtual_meter_list) > 0:
403
        try:
404
            for virtual_meter in virtual_meter_list:
405
                virtual_meter_id = str(virtual_meter['id'])
406
407
                query = (" SELECT start_datetime_utc, actual_value "
408
                         " FROM tbl_virtual_meter_hourly "
409
                         " WHERE virtual_meter_id = %s "
410
                         "       AND start_datetime_utc >= %s "
411
                         "       AND start_datetime_utc < %s "
412
                         " ORDER BY start_datetime_utc ")
413
                cursor_energy_db.execute(query, (virtual_meter_id, start_datetime_utc, end_datetime_utc,))
414
                rows_energy_values = cursor_energy_db.fetchall()
415
                if rows_energy_values is None or len(rows_energy_values) == 0:
416
                    energy_virtual_meter_hourly[virtual_meter_id] = None
417
                else:
418
                    energy_virtual_meter_hourly[virtual_meter_id] = dict()
419
                    for row_energy_value in rows_energy_values:
420
                        energy_virtual_meter_hourly[virtual_meter_id][row_energy_value[0]] = row_energy_value[1]
421
        except Exception as e:
422
            error_string = "Error in step 7.1 of combined_equipment_energy_input_item.worker " + str(e)
423
            if cursor_energy_db:
424
                cursor_energy_db.close()
425
            if cnx_energy_db:
426
                cnx_energy_db.close()
427
            print(error_string)
428
            return error_string
429
430
    ####################################################################################################################
431
    # Step 8: for each offline meter in list, get energy input data from energy database
432
    ####################################################################################################################
433
    energy_offline_meter_hourly = dict()
434
    if offline_meter_list is not None and len(offline_meter_list) > 0:
435
        try:
436
            for offline_meter in offline_meter_list:
437
                offline_meter_id = str(offline_meter['id'])
438
439
                query = (" SELECT start_datetime_utc, actual_value "
440
                         " FROM tbl_offline_meter_hourly "
441
                         " WHERE offline_meter_id = %s "
442
                         "       AND start_datetime_utc >= %s "
443
                         "       AND start_datetime_utc < %s "
444
                         " ORDER BY start_datetime_utc ")
445
                cursor_energy_db.execute(query, (offline_meter_id, start_datetime_utc, end_datetime_utc,))
446
                rows_energy_values = cursor_energy_db.fetchall()
447
                if rows_energy_values is None or len(rows_energy_values) == 0:
448
                    energy_offline_meter_hourly[offline_meter_id] = None
449
                else:
450
                    energy_offline_meter_hourly[offline_meter_id] = dict()
451
                    for row_energy_value in rows_energy_values:
452
                        energy_offline_meter_hourly[offline_meter_id][row_energy_value[0]] = row_energy_value[1]
453
454
        except Exception as e:
455
            error_string = "Error in step 8.1 of combined_equipment_energy_input_item.worker " + str(e)
456
            if cursor_energy_db:
457
                cursor_energy_db.close()
458
            if cnx_energy_db:
459
                cnx_energy_db.close()
460
            print(error_string)
461
            return error_string
462
463
    ####################################################################################################################
464
    # Step 9: for each equipment in list, get energy input data from energy database
465
    ####################################################################################################################
466
    energy_equipment_hourly = dict()
467
    if equipment_list is not None and len(equipment_list) > 0:
468
        try:
469
            for equipment in equipment_list:
470
                equipment_id = str(equipment['id'])
471
                query = (" SELECT start_datetime_utc, energy_item_id, actual_value "
472
                         " FROM tbl_equipment_input_item_hourly "
473
                         " WHERE equipment_id = %s "
474
                         "       AND start_datetime_utc >= %s "
475
                         "       AND start_datetime_utc < %s "
476
                         " ORDER BY start_datetime_utc ")
477
                cursor_energy_db.execute(query, (equipment_id, start_datetime_utc, end_datetime_utc,))
478
                rows_energy_values = cursor_energy_db.fetchall()
479
                if rows_energy_values is None or len(rows_energy_values) == 0:
480
                    energy_equipment_hourly[equipment_id] = None
481
                else:
482
                    energy_equipment_hourly[equipment_id] = dict()
483
                    for row_value in rows_energy_values:
484
                        current_datetime_utc = row_value[0]
485
                        if current_datetime_utc not in energy_equipment_hourly[equipment_id]:
486
                            energy_equipment_hourly[equipment_id][current_datetime_utc] = dict()
487
                        energy_item_id = row_value[1]
488
                        actual_value = row_value[2]
489
                        energy_equipment_hourly[equipment_id][current_datetime_utc][energy_item_id] = \
490
                            actual_value
491
        except Exception as e:
492
            error_string = "Error in step 9 of combined_equipment_energy_input_item.worker " + str(e)
493
            if cursor_energy_db:
494
                cursor_energy_db.close()
495
            if cnx_energy_db:
496
                cnx_energy_db.close()
497
            print(error_string)
498
            return error_string
499
500
    ####################################################################################################################
501
    # Step 10: determine common time slot to aggregate
502
    ####################################################################################################################
503
504
    common_start_datetime_utc = start_datetime_utc
505
    common_end_datetime_utc = end_datetime_utc
506
507
    print("Getting common time slot of energy values for all meters")
508
    if energy_meter_hourly is not None and len(energy_meter_hourly) > 0:
509
        for meter_id, energy_hourly in energy_meter_hourly.items():
510
            if energy_hourly is None or len(energy_hourly) == 0:
511
                common_start_datetime_utc = None
512
                common_end_datetime_utc = None
513
                break
514
            else:
515
                if common_start_datetime_utc < min(energy_hourly.keys()):
516
                    common_start_datetime_utc = min(energy_hourly.keys())
517
                if common_end_datetime_utc > max(energy_hourly.keys()):
518
                    common_end_datetime_utc = max(energy_hourly.keys())
519
520
    print("Getting common time slot of energy values for all virtual meters")
521
    if common_start_datetime_utc is not None and common_end_datetime_utc is not None:
522
        if energy_virtual_meter_hourly is not None and len(energy_virtual_meter_hourly) > 0:
523
            for meter_id, energy_hourly in energy_virtual_meter_hourly.items():
524
                if energy_hourly is None or len(energy_hourly) == 0:
525
                    common_start_datetime_utc = None
526
                    common_end_datetime_utc = None
527
                    break
528
                else:
529
                    if common_start_datetime_utc < min(energy_hourly.keys()):
530
                        common_start_datetime_utc = min(energy_hourly.keys())
531
                    if common_end_datetime_utc > max(energy_hourly.keys()):
532
                        common_end_datetime_utc = max(energy_hourly.keys())
533
534
    print("Getting common time slot of energy values for all offline meters")
535
    if common_start_datetime_utc is not None and common_end_datetime_utc is not None:
536
        if energy_offline_meter_hourly is not None and len(energy_offline_meter_hourly) > 0:
537
            for meter_id, energy_hourly in energy_offline_meter_hourly.items():
538
                if energy_hourly is None or len(energy_hourly) == 0:
539
                    common_start_datetime_utc = None
540
                    common_end_datetime_utc = None
541
                    break
542
                else:
543
                    if common_start_datetime_utc < min(energy_hourly.keys()):
544
                        common_start_datetime_utc = min(energy_hourly.keys())
545
                    if common_end_datetime_utc > max(energy_hourly.keys()):
546
                        common_end_datetime_utc = max(energy_hourly.keys())
547
548
    print("Getting common time slot of energy values for all equipments")
549
    if common_start_datetime_utc is not None and common_end_datetime_utc is not None:
550
        if energy_equipment_hourly is not None and len(energy_equipment_hourly) > 0:
551
            for equipment_id, energy_hourly in energy_equipment_hourly.items():
552
                if energy_hourly is None or len(energy_hourly) == 0:
553
                    common_start_datetime_utc = None
554
                    common_end_datetime_utc = None
555
                    break
556
                else:
557
                    if common_start_datetime_utc < min(energy_hourly.keys()):
558
                        common_start_datetime_utc = min(energy_hourly.keys())
559
                    if common_end_datetime_utc > max(energy_hourly.keys()):
560
                        common_end_datetime_utc = max(energy_hourly.keys())
561
562
    if (energy_meter_hourly is None or len(energy_meter_hourly) == 0) and \
563
            (energy_virtual_meter_hourly is None or len(energy_virtual_meter_hourly) == 0) and \
564
            (energy_offline_meter_hourly is None or len(energy_offline_meter_hourly) == 0) and \
565
            (energy_equipment_hourly is None or len(energy_equipment_hourly) == 0):
566
        # There isn't any energy data
567
        print("There isn't any energy data")
568
        # continue the for combined equipment loop to the next combined equipment
569
        print("continue the for combined equipment loop to the next combined equipment")
570
        if cursor_energy_db:
571
            cursor_energy_db.close()
572
        if cnx_energy_db:
573
            cnx_energy_db.close()
574
        return None
575
576
    print("common_start_datetime_utc: " + str(common_start_datetime_utc))
577
    print("common_end_datetime_utc: " + str(common_end_datetime_utc))
578
579
    ####################################################################################################################
580
    # Step 11: aggregate energy data in the common time slot by energy items and hourly
581
    ####################################################################################################################
582
583
    print("Step 11: aggregate energy data in the common time slot by energy items and hourly")
584
    aggregated_values = list()
585
    try:
586
        current_datetime_utc = common_start_datetime_utc
587
        while common_start_datetime_utc is not None \
588
                and common_end_datetime_utc is not None \
589
                and current_datetime_utc <= common_end_datetime_utc:
590
            aggregated_value = dict()
591
            aggregated_value['start_datetime_utc'] = current_datetime_utc
592
            aggregated_value['meta_data'] = dict()
593
594
            if meter_list is not None and len(meter_list) > 0:
595
                for meter in meter_list:
596
                    meter_id = str(meter['id'])
597
                    energy_item_id = meter['energy_item_id']
598
                    actual_value = energy_meter_hourly[meter_id].get(current_datetime_utc, Decimal(0.0))
599
                    aggregated_value['meta_data'][energy_item_id] = \
600
                        aggregated_value['meta_data'].get(energy_item_id, Decimal(0.0)) + actual_value
601
602
            if virtual_meter_list is not None and len(virtual_meter_list) > 0:
603
                for virtual_meter in virtual_meter_list:
604
                    virtual_meter_id = str(virtual_meter['id'])
605
                    energy_item_id = virtual_meter['energy_item_id']
606
                    actual_value = energy_virtual_meter_hourly[virtual_meter_id].get(current_datetime_utc, Decimal(0.0))
607
                    aggregated_value['meta_data'][energy_item_id] = \
608
                        aggregated_value['meta_data'].get(energy_item_id, Decimal(0.0)) + actual_value
609
610
            if offline_meter_list is not None and len(offline_meter_list) > 0:
611
                for offline_meter in offline_meter_list:
612
                    offline_meter_id = str(offline_meter['id'])
613
                    energy_item_id = offline_meter['energy_item_id']
614
                    actual_value = energy_offline_meter_hourly[offline_meter_id].get(current_datetime_utc, Decimal(0.0))
615
                    aggregated_value['meta_data'][energy_item_id] = \
616
                        aggregated_value['meta_data'].get(energy_item_id, Decimal(0.0)) + actual_value
617
618
            if equipment_list is not None and len(equipment_list) > 0:
619
                for equipment in equipment_list:
620
                    equipment_id = str(equipment['id'])
621
                    meta_data_dict = energy_equipment_hourly[equipment_id].get(current_datetime_utc, None)
622
                    if meta_data_dict is not None and len(meta_data_dict) > 0:
623
                        for energy_item_id, actual_value in meta_data_dict.items():
624
                            aggregated_value['meta_data'][energy_item_id] = \
625
                                aggregated_value['meta_data'].get(energy_item_id, Decimal(0.0)) + actual_value
626
627
            aggregated_values.append(aggregated_value)
628
629
            current_datetime_utc += timedelta(minutes=config.minutes_to_count)
630
631
    except Exception as e:
632
        error_string = "Error in step 11 of combined_equipment_energy_input_item.worker " + str(e)
633
        if cursor_energy_db:
634
            cursor_energy_db.close()
635
        if cnx_energy_db:
636
            cnx_energy_db.close()
637
        print(error_string)
638
        return error_string
639
640
    ####################################################################################################################
641
    # Step 12: save energy data to energy database
642
    ####################################################################################################################
643
    print("Step 12: save energy data to energy database")
644
645
    while len(aggregated_values) > 0:
646
        insert_100 = aggregated_values[:100]
647
        aggregated_values = aggregated_values[100:]
648
        try:
649
            add_values = (" INSERT INTO tbl_combined_equipment_input_item_hourly "
650
                          "             (combined_equipment_id, "
651
                          "              energy_item_id, "
652
                          "              start_datetime_utc, "
653
                          "              actual_value) "
654
                          " VALUES  ")
655
656
            for aggregated_value in insert_100:
657
                for energy_item_id, actual_value in aggregated_value['meta_data'].items():
658
                    add_values += " (" + str(combined_equipment['id']) + ","
659
                    add_values += " " + str(energy_item_id) + ","
660
                    add_values += "'" + aggregated_value['start_datetime_utc'].isoformat()[0:19] + "',"
661
                    add_values += str(actual_value) + "), "
662
            # print("add_values:" + add_values)
663
            # trim ", " at the end of string and then execute
664
            cursor_energy_db.execute(add_values[:-2])
665
            cnx_energy_db.commit()
666
667
        except Exception as e:
668
            error_string = "Error in step 12.1 of combined_equipment_energy_input_item.worker " + str(e)
669
            print(error_string)
670
            if cursor_energy_db:
671
                cursor_energy_db.close()
672
            if cnx_energy_db:
673
                cnx_energy_db.close()
674
            return error_string
675
676
    if cursor_energy_db:
677
        cursor_energy_db.close()
678
    if cnx_energy_db:
679
        cnx_energy_db.close()
680
    return None
681