Code Duplication    Length = 72-79 lines in 2 locations

myems-normalization/meter.py 1 location

@@ 38-116 (lines=79) @@
35
########################################################################################################################
36
37
38
def calculate_hourly(logger):
39
    """
40
    Main function for physical meter energy consumption normalization.
41
42
    This function runs continuously, retrieving all physical meters from the system database
43
    and processing them in parallel to calculate normalized hourly energy consumption data.
44
45
    Args:
46
        logger: Logger instance for recording normalization activities and errors
47
    """
48
    while True:
49
        ################################################################################################################
50
        # Step 1: Query all meters and associated energy value points from system database
51
        ################################################################################################################
52
        cnx_system_db = None
53
        cursor_system_db = None
54
55
        # Connect to system database to retrieve meter configuration
56
        try:
57
            cnx_system_db = mysql.connector.connect(**config.myems_system_db)
58
            cursor_system_db = cnx_system_db.cursor()
59
        except Exception as e:
60
            logger.error("Error in step 1.1 of meter.calculate_hourly process " + str(e))
61
            # Clean up database connections in case of error
62
            if cursor_system_db:
63
                cursor_system_db.close()
64
            if cnx_system_db:
65
                cnx_system_db.close()
66
            # Sleep several minutes and continue the outer loop to reconnect the database
67
            time.sleep(60)
68
            continue
69
70
        print("Connected to the MyEMS System Database")
71
72
        # Retrieve all physical meters with their associated energy value points
73
        try:
74
            cursor_system_db.execute(" SELECT m.id, m.name, m.hourly_low_limit, m.hourly_high_limit, "
75
                                     "        p.id as point_id, p.units "
76
                                     " FROM tbl_meters m, tbl_meters_points mp, tbl_points p "
77
                                     " WHERE m.id = mp.meter_id "
78
                                     "       AND mp.point_id = p.id "
79
                                     "       AND p.object_type = 'ENERGY_VALUE'")
80
            rows_meters = cursor_system_db.fetchall()
81
82
            # Check if meters were found
83
            if rows_meters is None or len(rows_meters) == 0:
84
                # Sleep several minutes and continue the outer loop to reconnect the database
85
                time.sleep(60)
86
                continue
87
88
            # Build meter list with configuration data
89
            meter_list = list()
90
            for row in rows_meters:
91
                meta_result = {"id": row[0],
92
                               "name": row[1],
93
                               "hourly_low_limit": row[2],
94
                               "hourly_high_limit": row[3],
95
                               "point_id": row[4],
96
                               "units": row[5]}
97
98
                meter_list.append(meta_result)
99
100
        except Exception as e:
101
            logger.error("Error in step 1.2 meter.calculate_hourly " + str(e))
102
            # Sleep several minutes and continue the outer loop to reconnect the database
103
            time.sleep(60)
104
            continue
105
        finally:
106
            # Always clean up database connections
107
            if cursor_system_db:
108
                cursor_system_db.close()
109
            if cnx_system_db:
110
                cnx_system_db.close()
111
112
        # Shuffle the meter list for randomly calculating the meter hourly values
113
        # This helps distribute processing load evenly across time
114
        random.shuffle(meter_list)
115
116
        print("Got all meters in MyEMS System Database")
117
118
        ################################################################################################################
119
        # Step 2: Create multiprocessing pool to call worker processes in parallel

myems-normalization/virtualpoint.py 1 location

@@ 217-288 (lines=72) @@
214
# Step 2: Create multiprocessing pool to call worker processes in parallel
215
########################################################################################################################
216
217
def calculate(logger):
218
    """
219
    Main function for virtual point calculation using mathematical expressions.
220
221
    This function runs continuously, retrieving all virtual points from the system database
222
    and processing them in parallel to calculate virtual point values using their
223
    configured mathematical expressions.
224
225
    Args:
226
        logger: Logger instance for recording calculation activities and errors
227
    """
228
    while True:
229
        # The outermost while loop to reconnect to server if there is a connection error
230
        cnx_system_db = None
231
        cursor_system_db = None
232
233
        # Connect to system database to retrieve virtual point configuration
234
        try:
235
            cnx_system_db = mysql.connector.connect(**config.myems_system_db)
236
            cursor_system_db = cnx_system_db.cursor()
237
        except Exception as e:
238
            logger.error("Error in step 0 of virtual point calculate " + str(e))
239
            if cursor_system_db:
240
                cursor_system_db.close()
241
            if cnx_system_db:
242
                cnx_system_db.close()
243
            # Sleep and continue the outer loop to reconnect the database
244
            time.sleep(60)
245
            continue
246
247
        print("Connected to MyEMS System Database")
248
249
        # Retrieve all virtual points with their configuration data
250
        virtual_point_list = list()
251
        try:
252
            cursor_system_db.execute(" SELECT id, name, data_source_id, object_type, high_limit, low_limit, address "
253
                                     " FROM tbl_points "
254
                                     " WHERE is_virtual = 1 ")
255
            rows_virtual_points = cursor_system_db.fetchall()
256
257
            # Check if virtual points were found
258
            if rows_virtual_points is None or len(rows_virtual_points) == 0:
259
                # Sleep several minutes and continue the outer loop to reconnect the database
260
                time.sleep(60)
261
                continue
262
263
            # Build virtual point list with configuration data
264
            for row in rows_virtual_points:
265
                meta_result = {"id": row[0],
266
                               "name": row[1],
267
                               "data_source_id": row[2],
268
                               "object_type": row[3],
269
                               "high_limit": row[4],
270
                               "low_limit": row[5],
271
                               "address": row[6]}
272
                virtual_point_list.append(meta_result)
273
274
        except Exception as e:
275
            logger.error("Error in step 1 of virtual point calculate " + str(e))
276
            # sleep and continue the outer loop to reconnect the database
277
            time.sleep(60)
278
            continue
279
        finally:
280
            if cursor_system_db:
281
                cursor_system_db.close()
282
            if cnx_system_db:
283
                cnx_system_db.close()
284
285
        # Shuffle the virtual point list for randomly calculating point values
286
        # This helps distribute processing load evenly across time
287
        random.shuffle(virtual_point_list)
288
289
        print("Got all virtual points in MyEMS System Database")
290
        ################################################################################################################
291
        # Step 2: Create multiprocessing pool to call worker processes in parallel