acquisition   A
last analyzed

Complexity

Total Complexity 42

Size/Duplication

Total Lines 238
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
wmc 42
eloc 147
dl 0
loc 238
rs 9.0399
c 0
b 0
f 0

3 Functions

Rating   Name   Duplication   Size   Complexity  
A on_mqtt_disconnect() 0 5 1
A on_mqtt_connect() 0 8 2
F process() 0 196 39

How to fix   Complexity   

Complexity

Complex classes like acquisition often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
import json
2
import mysql.connector
3
import time
4
import simplejson as json
5
import paho.mqtt.client as mqtt
6
import config
7
8
9
# indicates the connectivity with the MQTT broker
10
mqtt_connected_flag = False
11
12
13
# the on_connect callback function for MQTT client
14
# refer to http://www.steves-internet-guide.com/client-connections-python-mqtt/
15
def on_mqtt_connect(client, userdata, flags, rc):
16
    global mqtt_connected_flag
17
    if rc == 0:
18
        mqtt_connected_flag = True  # set flag
19
        print("MQTT connected OK")
20
    else:
21
        print("Bad MQTT connection Returned code=", rc)
22
        mqtt_connected_flag = False
23
24
25
# the on_disconnect callback function for MQTT client
26
# refer to http://www.steves-internet-guide.com/client-connections-python-mqtt/
27
def on_mqtt_disconnect(client, userdata, rc=0):
28
    global mqtt_connected_flag
29
30
    print("DisConnected, result code "+str(rc))
31
    mqtt_connected_flag = False
32
33
34
########################################################################################################################
35
# Acquisition Procedures
36
# Step 1: Get point list
37
# Step 2: Connect to the historical database
38
# Step 3: Connect to the MQTT broker
39
# Step 4: Read point values from latest tables in historical database
40
# Step 5: Publish point values
41
########################################################################################################################
42
def process(logger, object_type):
43
44
    while True:
45
        # the outermost while loop
46
47
        ################################################################################################################
48
        # Step 1: Get point list
49
        ################################################################################################################
50
        cnx_system_db = None
51
        cursor_system_db = None
52
        try:
53
            cnx_system_db = mysql.connector.connect(**config.myems_system_db)
54
            cursor_system_db = cnx_system_db.cursor()
55
        except Exception as e:
56
            logger.error("Error in step 1.1 of acquisition process " + str(e))
57
            if cursor_system_db:
58
                cursor_system_db.close()
59
            if cnx_system_db:
60
                cnx_system_db.close()
61
            # sleep and then continue the outermost loop to reload points
62
            time.sleep(60)
63
            continue
64
65
        try:
66
            if object_type == 'ANALOG_VALUE':
67
                query = (" SELECT id, name, data_source_id "
68
                         " FROM tbl_points"
69
                         " WHERE object_type = 'ANALOG_VALUE' "
70
                         " ORDER BY id ")
71
            elif object_type == 'DIGITAL_VALUE':
72
                query = (" SELECT id, name, data_source_id "
73
                         " FROM tbl_points"
74
                         " WHERE object_type = 'DIGITAL_VALUE' "
75
                         " ORDER BY id ")
76
            elif object_type == 'ENERGY_VALUE':
77
                query = (" SELECT id, name, data_source_id "
78
                         " FROM tbl_points"
79
                         " WHERE object_type = 'ENERGY_VALUE' "
80
                         " ORDER BY id ")
81
82
            cursor_system_db.execute(query, )
0 ignored issues
show
introduced by
The variable query does not seem to be defined for all execution paths.
Loading history...
83
            rows_point = cursor_system_db.fetchall()
84
        except Exception as e:
85
            logger.error("Error in step 1.2 of acquisition process: " + str(e))
86
            # sleep several minutes and continue the outer loop to reload points
87
            time.sleep(60)
88
            continue
89
        finally:
90
            if cursor_system_db:
91
                cursor_system_db.close()
92
            if cnx_system_db:
93
                cnx_system_db.close()
94
95
        if rows_point is None or len(rows_point) == 0:
0 ignored issues
show
introduced by
The variable rows_point does not seem to be defined for all execution paths.
Loading history...
96
            # there is no points
97
            logger.error("Point Not Found, acquisition process terminated ")
98
            # sleep 60 seconds and go back to the begin of outermost while loop to reload points
99
            time.sleep(60)
100
            continue
101
102
        point_dict = dict()
103
        for row_point in rows_point:
104
            point_dict[row_point[0]] = {'name': row_point[1], 'data_source_id': row_point[2]}
105
106
        ################################################################################################################
107
        # Step 2: Connect to the historical database
108
        ################################################################################################################
109
        cnx_historical_db = None
110
        cursor_historical_db = None
111
        try:
112
            cnx_historical_db = mysql.connector.connect(**config.myems_historical_db)
113
            cursor_historical_db = cnx_historical_db.cursor()
114
        except Exception as e:
115
            logger.error("Error in step 2.1 of acquisition process " + str(e))
116
            if cursor_historical_db:
117
                cursor_historical_db.close()
118
            if cnx_historical_db:
119
                cnx_historical_db.close()
120
            # sleep 60 seconds and go back to the begin of outermost while loop to reload points
121
            time.sleep(60)
122
            continue
123
124
        ################################################################################################################
125
        # Step 3: Connect to the MQTT broker
126
        ################################################################################################################
127
        mqc = None
128
        try:
129
            mqc = mqtt.Client(client_id='MYEMS' + "-" + str(time.time()))
130
            mqc.username_pw_set(config.myems_mqtt_broker['username'], config.myems_mqtt_broker['password'])
131
            mqc.on_connect = on_mqtt_connect
132
            mqc.on_disconnect = on_mqtt_disconnect
133
            mqc.connect_async(config.myems_mqtt_broker['host'], config.myems_mqtt_broker['port'], 60)
134
            # The loop_start() starts a new thread, that calls the loop method at regular intervals for you.
135
            # It also handles re-connects automatically.
136
            mqc.loop_start()
137
138
        except Exception as e:
139
            logger.error("Error in step 3.1 of acquisition process " + str(e))
140
            # sleep 60 seconds and go back to the begin of outermost while loop to reload points
141
            time.sleep(60)
142
            continue
143
144
        ################################################################################################################
145
        # Step 4: Read point values from latest tables in historical database
146
        ################################################################################################################
147
        # inner loop to read all point latest values and publish them within a period
148
        while True:
149
            if object_type == 'ANALOG_VALUE':
150
                query = " SELECT point_id, utc_date_time, actual_value" \
151
                        " FROM tbl_analog_value_latest WHERE point_id IN ( "
152
            elif object_type == 'DIGITAL_VALUE':
153
                query = " SELECT point_id, utc_date_time, actual_value" \
154
                        " FROM tbl_digital_value_latest WHERE point_id IN ( "
155
            elif object_type == 'ENERGY_VALUE':
156
                query = " SELECT point_id, utc_date_time, actual_value" \
157
                        " FROM tbl_energy_value_latest WHERE point_id IN ( "
158
159
            for point_id in point_dict:
160
                query += str(point_id) + ","
161
162
            try:
163
                # replace "," at the end of string with ")"
164
                cursor_historical_db.execute(query[:-1] + ")")
165
                rows_point_values = cursor_historical_db.fetchall()
166
            except Exception as e:
167
                logger.error("Error in step 4.1 of acquisition process " + str(e))
168
                if cursor_historical_db:
169
                    cursor_historical_db.close()
170
                if cnx_historical_db:
171
                    cnx_historical_db.close()
172
173
                # destroy mqtt client
174
                if mqc and mqc.is_connected():
175
                    mqc.disconnect()
176
                del mqc
177
                # break the inner while loop
178
                break
179
180
            if rows_point_values is None or len(rows_point_values) == 0:
181
                # there is no points
182
                print(" Point value Not Found")
183
184
                # sleep 60 seconds and go back to the begin of inner while loop
185
                time.sleep(60)
186
                continue
187
188
            point_value_list = list()
189
            for row_point_value in rows_point_values:
190
                point_id = row_point_value[0]
191
                point = point_dict.get(point_id)
192
                data_source_id = point['data_source_id']
193
                utc_date_time = row_point_value[1].replace(tzinfo=None).isoformat(timespec='seconds')
194
                value = row_point_value[2]
195
                point_value_list.append({'data_source_id': data_source_id,
196
                                         'point_id': point_id,
197
                                         'object_type': object_type,
198
                                         'utc_date_time': utc_date_time,
199
                                         'value': value})
200
201
            ############################################################################################################
202
            # Step 5: Publish point values
203
            ############################################################################################################
204
205
            if len(point_value_list) > 0 and mqtt_connected_flag:
206
                for point_value in point_value_list:
207
                    try:
208
                        # publish real time value to mqtt broker
209
                        topic = config.topic_prefix + str(point_value['point_id'])
210
                        print('topic=' + topic)
211
                        payload = json.dumps({'data_source_id': point_value['data_source_id'],
212
                                              'point_id': point_value['point_id'],
213
                                              'object_type': point_value['object_type'],
214
                                              'utc_date_time': point_value['utc_date_time'],
215
                                              'value': point_value['value']})
216
                        print('payload=' + str(payload))
217
                        info = mqc.publish(topic=topic,
218
                                           payload=payload,
219
                                           qos=config.qos,
220
                                           retain=True)
221
                    except Exception as e:
222
                        logger.error("Error in step 5 of acquisition process: " + str(e))
223
                        if cursor_historical_db:
224
                            cursor_historical_db.close()
225
                        if cnx_historical_db:
226
                            cnx_historical_db.close()
227
228
                        # destroy mqtt client
229
                        if mqc and mqc.is_connected():
230
                            mqc.disconnect()
231
                        del mqc
232
233
                        # break the inner while loop
234
                        break
235
236
            # sleep some seconds
237
            time.sleep(config.interval_in_seconds)
238
        # end of inner while loop
239
240
    # end of outermost while loop
241