Passed
Push — master ( 964aa4...00eff6 )
by Guangyu
17:25 queued 10s
created

virtualpoint   F

Complexity

Total Complexity 65

Size/Duplication

Total Lines 302
Duplicated Lines 23.84 %

Importance

Changes 0
Metric Value
wmc 65
eloc 191
dl 72
loc 302
rs 3.2
c 0
b 0
f 0

2 Functions

Rating   Name   Duplication   Size   Complexity  
F worker() 0 204 51
F calculate() 72 72 14

How to fix   Duplicated Code    Complexity   

Duplicated Code

Duplicate code is one of the most pungent code smells. A rule that is often used is to re-structure code once it is duplicated in three or more places.

Common duplication problems, and corresponding solutions are:

Complexity

 Tip:   Before tackling complexity, make sure that you eliminate any duplication first. This often can reduce the size of classes significantly.

Complex classes like virtualpoint often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
import time
2
from datetime import datetime, timedelta
3
import mysql.connector
4
from sympy import sympify
5
from multiprocessing import Pool
6
import random
7
import json
8
import config
9
10
11
########################################################################################################################
12
# PROCEDURES:
13
# Step 1: Query all virtual points
14
# Step 2: Create multiprocessing pool to call worker in parallel
15
########################################################################################################################
16
17 View Code Duplication
def calculate(logger):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
18
19
    while True:
20
        # outer loop to reconnect server if there is a connection error
21
        cnx_system_db = None
22
        cursor_system_db = None
23
        try:
24
            cnx_system_db = mysql.connector.connect(**config.myems_system_db)
25
            cursor_system_db = cnx_system_db.cursor(dictionary=True)
26
        except Exception as e:
27
            logger.error("Error in step 0 of virtual point calculate " + str(e))
28
            if cursor_system_db:
29
                cursor_system_db.close()
30
            if cnx_system_db:
31
                cnx_system_db.close()
32
            # sleep and continue the outer loop to reconnect the database
33
            time.sleep(60)
34
            continue
35
36
        print("Connected to MyEMS System Database")
37
38
        virtual_point_list = list()
39
        try:
40
            cursor_system_db.execute(" SELECT id, name, data_source_id, high_limit, low_limit, address "
41
                                     " FROM tbl_points "
42
                                     " WHERE is_virtual = TRUE AND object_type = 'ANALOG_VALUE' ")
43
            rows_virtual_points = cursor_system_db.fetchall()
44
45
            if rows_virtual_points is None or len(rows_virtual_points) == 0:
46
                # sleep several minutes and continue the outer loop to reconnect the database
47
                time.sleep(60)
48
                continue
49
50
            for row in rows_virtual_points:
51
                meta_result = {"id": row['id'],
52
                               "name": row['name'],
53
                               "data_source_id": row['data_source_id'],
54
                               "high_limit": row['high_limit'],
55
                               "low_limit": row['low_limit'],
56
                               "address": row['address']}
57
                virtual_point_list.append(meta_result)
58
59
        except Exception as e:
60
            logger.error("Error in step 1 of virtual point calculate " + str(e))
61
            # sleep and continue the outer loop to reconnect the database
62
            time.sleep(60)
63
            continue
64
        finally:
65
            if cursor_system_db:
66
                cursor_system_db.close()
67
            if cnx_system_db:
68
                cnx_system_db.close()
69
70
        # shuffle the virtual point list for randomly calculating
71
        random.shuffle(virtual_point_list)
72
73
        print("Got all virtual points in MyEMS System Database")
74
        ################################################################################################################
75
        # Step 2: Create multiprocessing pool to call worker in parallel
76
        ################################################################################################################
77
        p = Pool(processes=config.pool_size)
78
        error_list = p.map(worker, virtual_point_list)
79
        p.close()
80
        p.join()
81
82
        for error in error_list:
83
            if error is not None and len(error) > 0:
84
                logger.error(error)
85
86
        print("go to sleep ...")
87
        time.sleep(60)
88
        print("wake from sleep, and continue to work...")
89
90
91
########################################################################################################################
92
# Step 1: get start datetime and end datetime
93
# Step 2: parse the expression and get all points in substitutions
94
# Step 3: query points value from historical database
95
# Step 4: evaluate the equation with points values
96
########################################################################################################################
97
98
def worker(virtual_point):
99
    cnx_historical_db = None
100
    cursor_historical_db = None
101
102
    try:
103
        cnx_historical_db = mysql.connector.connect(**config.myems_historical_db)
104
        cursor_historical_db = cnx_historical_db.cursor()
105
    except Exception as e:
106
        if cursor_historical_db:
107
            cursor_historical_db.close()
108
        if cnx_historical_db:
109
            cnx_historical_db.close()
110
        return "Error in step 1.1 of virtual point worker " + str(e) + " for '" + virtual_point['name'] + "'"
111
112
    print("Start to process virtual point: " + "'" + virtual_point['name']+"'")
113
114
    ####################################################################################################################
115
    # step 1: get start datetime and end datetime
116
    ####################################################################################################################
117
118
    try:
119
        query = (" SELECT MAX(utc_date_time) "
120
                 " FROM tbl_analog_value "
121
                 " WHERE point_id = %s ")
122
        cursor_historical_db.execute(query, (virtual_point['id'],))
123
        row = cursor_historical_db.fetchone()
124
    except Exception as e:
125
        if cursor_historical_db:
126
            cursor_historical_db.close()
127
        if cnx_historical_db:
128
            cnx_historical_db.close()
129
        return "Error in step 1.2 of virtual point worker " + str(e) + " for '" + virtual_point['name'] + "'"
130
131
    start_datetime_utc = datetime.strptime(config.start_datetime_utc, '%Y-%m-%d %H:%M:%S')
132
    start_datetime_utc = start_datetime_utc.replace(minute=0, second=0, microsecond=0, tzinfo=None)
133
134
    if row is not None and len(row) > 0 and isinstance(row[0], datetime):
135
        # replace second and microsecond with 0
136
        # note: do not replace minute in case of calculating in half hourly
137
        start_datetime_utc = row[0].replace(second=0, microsecond=0, tzinfo=None)
138
        # start from the next time slot
139
        start_datetime_utc += timedelta(minutes=config.minutes_to_count)
140
141
    end_datetime_utc = datetime.utcnow().replace()
142
    end_datetime_utc = end_datetime_utc.replace(second=0, microsecond=0, tzinfo=None)
143
144
    if end_datetime_utc <= start_datetime_utc:
145
        if cursor_historical_db:
146
            cursor_historical_db.close()
147
        if cnx_historical_db:
148
            cnx_historical_db.close()
149
        return "it's too early to calculate" + " for '" + virtual_point['name'] + "'"
150
151
    print("start_datetime_utc: " + start_datetime_utc.isoformat()[0:19]
152
          + "end_datetime_utc: " + end_datetime_utc.isoformat()[0:19])
153
154
    ############################################################################################################
155
    # Step 2: parse the expression and get all points in substitutions
156
    ############################################################################################################
157
    point_list = list()
158
    try:
159
        ########################################################################################################
160
        # parse the expression and get all points in substitutions
161
        ########################################################################################################
162
        address = json.loads(virtual_point['address'])
163
        # example: '{"expression": "x1-x2", "substitutions": {"x1":1,"x2":2}}'
164
        if 'expression' not in address.keys() \
165
                or 'substitutions' not in address.keys() \
166
                or len(address['expression']) == 0 \
167
                or len(address['substitutions']) == 0:
168
            return "Error in step 2.1 of virtual point worker for '" + virtual_point['name'] + "'"
169
        expression = address['expression']
170
        substitutions = address['substitutions']
171
        for variable_name, point_id in substitutions.items():
172
            point_list.append({"variable_name": variable_name, "point_id": point_id})
173
    except Exception as e:
174
        if cursor_historical_db:
175
            cursor_historical_db.close()
176
        if cnx_historical_db:
177
            cnx_historical_db.close()
178
        return "Error in step 2.2 of virtual point worker " + str(e) + " for '" + virtual_point['name'] + "'"
179
180
    ############################################################################################################
181
    # Step 3: query points value from historical database
182
    ############################################################################################################
183
184
    print("getting point values ...")
185
    point_values_dict = dict()
186
    if point_list is not None and len(point_list) > 0:
187
        try:
188
            for point in point_list:
189
                point_id = str(point['point_id'])
190
                query = (" SELECT utc_date_time, actual_value "
191
                         " FROM tbl_analog_value "
192
                         " WHERE point_id = %s AND utc_date_time >= %s AND utc_date_time < %s "
193
                         " ORDER BY utc_date_time ")
194
                cursor_historical_db.execute(query, (point_id, start_datetime_utc, end_datetime_utc, ))
195
                rows = cursor_historical_db.fetchall()
196
                if rows is None or len(rows) == 0:
197
                    point_values_dict[point_id] = None
198
                else:
199
                    point_values_dict[point_id] = dict()
200
                    for row in rows:
201
                        point_values_dict[point_id][row[0]] = row[1]
202
        except Exception as e:
203
            if cursor_historical_db:
204
                cursor_historical_db.close()
205
            if cnx_historical_db:
206
                cnx_historical_db.close()
207
            return "Error in step 3.1 virtual point worker " + str(e) + " for '" + virtual_point['name'] + "'"
208
209
    ############################################################################################################
210
    # Step 4: evaluate the equation with points values
211
    ############################################################################################################
212
213
    print("getting date time set for all points...")
214
    utc_date_time_set = set()
215
    if point_values_dict is not None and len(point_values_dict) > 0:
216
        for point_id, point_values in point_values_dict.items():
217
            if point_values is not None and len(point_values) > 0:
218
                utc_date_time_set = utc_date_time_set.union(point_values.keys())
219
220
    print("evaluating the equation with SymPy...")
221
    normalized_values = list()
222
223
    ############################################################################################################
224
    # Converting Strings to SymPy Expressions
225
    # The sympify function(that’s sympify, not to be confused with simplify) can be used to
226
    # convert strings into SymPy expressions.
227
    ############################################################################################################
228
    try:
229
        expr = sympify(expression)
230
        print("the expression to be evaluated: " + str(expr))
231
        for utc_date_time in utc_date_time_set:
232
            meta_data = dict()
233
            meta_data['utc_date_time'] = utc_date_time
234
235
            ####################################################################################################
236
            # create a dictionary of Symbol: point pairs
237
            ####################################################################################################
238
239
            subs = dict()
240
241
            ####################################################################################################
242
            # Evaluating the expression at current_datetime_utc
243
            ####################################################################################################
244
245
            if point_list is not None and len(point_list) > 0:
246
                for point in point_list:
247
                    point_id = str(point['point_id'])
248
                    actual_value = point_values_dict[point_id].get(utc_date_time, None)
249
                    if actual_value is None:
250
                        break
251
                    subs[point['variable_name']] = actual_value
252
253
            if len(subs) != len(point_list):
254
                continue
255
256
            ####################################################################################################
257
            # To numerically evaluate an expression with a Symbol at a point,
258
            # we might use subs followed by evalf,
259
            # but it is more efficient and numerically stable to pass the substitution to evalf
260
            # using the subs flag, which takes a dictionary of Symbol: point pairs.
261
            ####################################################################################################
262
263
            meta_data['actual_value'] = expr.evalf(subs=subs)
264
            normalized_values.append(meta_data)
265
266
    except Exception as e:
267
        if cursor_historical_db:
268
            cursor_historical_db.close()
269
        if cnx_historical_db:
270
            cnx_historical_db.close()
271
        return "Error in step 4.1 virtual point worker " + str(e) + " for '" + virtual_point['name'] + "'"
272
273
    print("saving virtual points values to historical database...")
274
275
    if len(normalized_values) > 0:
276
        try:
277
            add_values = (" INSERT INTO tbl_analog_value "
278
                          " (point_id, utc_date_time, actual_value) "
279
                          " VALUES  ")
280
281
            for meta_data in normalized_values:
282
                add_values += " (" + str(virtual_point['id']) + ","
283
                add_values += "'" + meta_data['utc_date_time'].isoformat()[0:19] + "',"
284
                add_values += str(meta_data['actual_value']) + "), "
285
            print("add_values:" + add_values)
286
            # trim ", " at the end of string and then execute
287
            cursor_historical_db.execute(add_values[:-2])
288
            cnx_historical_db.commit()
289
        except Exception as e:
290
            if cursor_historical_db:
291
                cursor_historical_db.close()
292
            if cnx_historical_db:
293
                cnx_historical_db.close()
294
            return "Error in step 4.2 virtual point worker " + str(e) + " for '" + virtual_point['name'] + "'"
295
296
    if cursor_historical_db:
297
        cursor_historical_db.close()
298
    if cnx_historical_db:
299
        cnx_historical_db.close()
300
301
    return None
302