@@ 17-95 (lines=79) @@ | ||
14 | ######################################################################################################################## |
|
15 | ||
16 | ||
17 | def calculate_hourly(logger): |
|
18 | ||
19 | while True: |
|
20 | ################################################################################################################ |
|
21 | # Step 1: Query all meters and associated energy value points |
|
22 | ################################################################################################################ |
|
23 | cnx_system_db = None |
|
24 | cursor_system_db = None |
|
25 | try: |
|
26 | cnx_system_db = mysql.connector.connect(**config.myems_system_db) |
|
27 | cursor_system_db = cnx_system_db.cursor() |
|
28 | except Exception as e: |
|
29 | logger.error("Error in step 1.1 of meter.calculate_hourly process " + str(e)) |
|
30 | if cursor_system_db: |
|
31 | cursor_system_db.close() |
|
32 | if cnx_system_db: |
|
33 | cnx_system_db.close() |
|
34 | # sleep several minutes and continue the outer loop to reconnect the database |
|
35 | time.sleep(60) |
|
36 | continue |
|
37 | ||
38 | print("Connected to the MyEMS System Database") |
|
39 | ||
40 | try: |
|
41 | cursor_system_db.execute(" SELECT m.id, m.name, m.hourly_low_limit, m.hourly_high_limit, " |
|
42 | " p.id as point_id, p.units " |
|
43 | " FROM tbl_meters m, tbl_meters_points mp, tbl_points p " |
|
44 | " WHERE m.id = mp.meter_id " |
|
45 | " AND mp.point_id = p.id " |
|
46 | " AND p.object_type = 'ENERGY_VALUE'") |
|
47 | rows_meters = cursor_system_db.fetchall() |
|
48 | ||
49 | if rows_meters is None or len(rows_meters) == 0: |
|
50 | # sleep several minutes and continue the outer loop to reconnect the database |
|
51 | time.sleep(60) |
|
52 | continue |
|
53 | ||
54 | meter_list = list() |
|
55 | for row in rows_meters: |
|
56 | meta_result = {"id": row[0], |
|
57 | "name": row[1], |
|
58 | "hourly_low_limit": row[2], |
|
59 | "hourly_high_limit": row[3], |
|
60 | "point_id": row[4], |
|
61 | "units": row[5]} |
|
62 | ||
63 | meter_list.append(meta_result) |
|
64 | ||
65 | except Exception as e: |
|
66 | logger.error("Error in step 1.2 meter.calculate_hourly " + str(e)) |
|
67 | # sleep several minutes and continue the outer loop to reconnect the database |
|
68 | time.sleep(60) |
|
69 | continue |
|
70 | finally: |
|
71 | if cursor_system_db: |
|
72 | cursor_system_db.close() |
|
73 | if cnx_system_db: |
|
74 | cnx_system_db.close() |
|
75 | ||
76 | # shuffle the meter list for randomly calculating the meter hourly value |
|
77 | random.shuffle(meter_list) |
|
78 | ||
79 | print("Got all meters in MyEMS System Database") |
|
80 | ||
81 | ################################################################################################################ |
|
82 | # Step 2: Create multiprocessing pool to call worker in parallel |
|
83 | ################################################################################################################ |
|
84 | p = Pool(processes=config.pool_size) |
|
85 | error_list = p.map(worker, meter_list) |
|
86 | p.close() |
|
87 | p.join() |
|
88 | ||
89 | for error in error_list: |
|
90 | if error is not None and len(error) > 0: |
|
91 | logger.error(error) |
|
92 | ||
93 | print("go to sleep ...") |
|
94 | time.sleep(60) |
|
95 | print("wake from sleep, and continue to work...") |
|
96 | # end of outer while |
|
97 | ||
98 |
@@ 19-90 (lines=72) @@ | ||
16 | # Step 2: Create multiprocessing pool to call worker in parallel |
|
17 | ######################################################################################################################## |
|
18 | ||
19 | def calculate(logger): |
|
20 | while True: |
|
21 | # the outermost while loop to reconnect server if there is a connection error |
|
22 | cnx_system_db = None |
|
23 | cursor_system_db = None |
|
24 | try: |
|
25 | cnx_system_db = mysql.connector.connect(**config.myems_system_db) |
|
26 | cursor_system_db = cnx_system_db.cursor() |
|
27 | except Exception as e: |
|
28 | logger.error("Error in step 0 of virtual point calculate " + str(e)) |
|
29 | if cursor_system_db: |
|
30 | cursor_system_db.close() |
|
31 | if cnx_system_db: |
|
32 | cnx_system_db.close() |
|
33 | # sleep and continue the outer loop to reconnect the database |
|
34 | time.sleep(60) |
|
35 | continue |
|
36 | ||
37 | print("Connected to MyEMS System Database") |
|
38 | ||
39 | virtual_point_list = list() |
|
40 | try: |
|
41 | cursor_system_db.execute(" SELECT id, name, data_source_id, object_type, high_limit, low_limit, address " |
|
42 | " FROM tbl_points " |
|
43 | " WHERE is_virtual = 1 ") |
|
44 | rows_virtual_points = cursor_system_db.fetchall() |
|
45 | ||
46 | if rows_virtual_points is None or len(rows_virtual_points) == 0: |
|
47 | # sleep several minutes and continue the outer loop to reconnect the database |
|
48 | time.sleep(60) |
|
49 | continue |
|
50 | ||
51 | for row in rows_virtual_points: |
|
52 | meta_result = {"id": row[0], |
|
53 | "name": row[1], |
|
54 | "data_source_id": row[2], |
|
55 | "object_type": row[3], |
|
56 | "high_limit": row[4], |
|
57 | "low_limit": row[5], |
|
58 | "address": row[6]} |
|
59 | virtual_point_list.append(meta_result) |
|
60 | ||
61 | except Exception as e: |
|
62 | logger.error("Error in step 1 of virtual point calculate " + str(e)) |
|
63 | # sleep and continue the outer loop to reconnect the database |
|
64 | time.sleep(60) |
|
65 | continue |
|
66 | finally: |
|
67 | if cursor_system_db: |
|
68 | cursor_system_db.close() |
|
69 | if cnx_system_db: |
|
70 | cnx_system_db.close() |
|
71 | ||
72 | # shuffle the virtual point list for randomly calculating |
|
73 | random.shuffle(virtual_point_list) |
|
74 | ||
75 | print("Got all virtual points in MyEMS System Database") |
|
76 | ################################################################################################################ |
|
77 | # Step 2: Create multiprocessing pool to call worker in parallel |
|
78 | ################################################################################################################ |
|
79 | p = Pool(processes=config.pool_size) |
|
80 | error_list = p.map(worker, virtual_point_list) |
|
81 | p.close() |
|
82 | p.join() |
|
83 | ||
84 | for error in error_list: |
|
85 | if error is not None and len(error) > 0: |
|
86 | logger.error(error) |
|
87 | ||
88 | print("go to sleep ") |
|
89 | time.sleep(60) |
|
90 | print("wake from sleep, and continue to work") |
|
91 | ||
92 | ||
93 | ######################################################################################################################## |