tariff.get_energy_item_tariffs()   F
last analyzed

Complexity

Conditions 23

Size

Total Lines 96
Code Lines 71

Duplication

Lines 96
Ratio 100 %

Importance

Changes 0
Metric Value
eloc 71
dl 96
loc 96
rs 0
c 0
b 0
f 0
cc 23
nop 4

How to fix   Long Method    Complexity   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

Complexity

Complex classes like tariff.get_energy_item_tariffs() often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
from datetime import timedelta
2
import mysql.connector
3
import config
4
import collections
5
6
7
########################################################################################################################
8
# Get tariffs by energy category
9
########################################################################################################################
10 View Code Duplication
def get_energy_category_tariffs(cost_center_id, energy_category_id, start_datetime_utc, end_datetime_utc):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
11
    # todo: verify parameters
12
    if cost_center_id is None:
13
        return dict()
14
15
    # get timezone offset in minutes, this value will be returned to client
16
    timezone_offset = int(config.utc_offset[1:3]) * 60 + int(config.utc_offset[4:6])
17
    if config.utc_offset[0] == '-':
18
        timezone_offset = -timezone_offset
19
20
    tariff_dict = collections.OrderedDict()
21
22
    cnx = None
23
    cursor = None
24
    try:
25
        cnx = mysql.connector.connect(**config.myems_system_db)
26
        cursor = cnx.cursor()
27
        query_tariffs = (" SELECT t.id, t.valid_from_datetime_utc, t.valid_through_datetime_utc "
28
                         " FROM tbl_tariffs t, tbl_cost_centers_tariffs cct "
29
                         " WHERE t.energy_category_id = %s AND "
30
                         "       t.id = cct.tariff_id AND "
31
                         "       cct.cost_center_id = %s AND "
32
                         "       t.valid_through_datetime_utc >= %s AND "
33
                         "       t.valid_from_datetime_utc <= %s "
34
                         " ORDER BY t.valid_from_datetime_utc ")
35
        cursor.execute(query_tariffs, (energy_category_id, cost_center_id, start_datetime_utc, end_datetime_utc,))
36
        rows_tariffs = cursor.fetchall()
37
    except Exception as e:
38
        print(str(e))
39
        if cnx:
40
            cnx.disconnect()
41
        if cursor:
42
            cursor.close()
43
        return dict()
44
45
    if rows_tariffs is None or len(rows_tariffs) == 0:
46
        if cursor:
47
            cursor.close()
48
        if cnx:
49
            cnx.disconnect()
50
        return dict()
51
52
    for row in rows_tariffs:
53
        tariff_dict[row[0]] = {'valid_from_datetime_utc': row[1],
54
                               'valid_through_datetime_utc': row[2],
55
                               'rates': list()}
56
57
    try:
58
        query_timeofuse_tariffs = (" SELECT tariff_id, start_time_of_day, end_time_of_day, price "
59
                                   " FROM tbl_tariffs_timeofuses "
60
                                   " WHERE tariff_id IN ( " + ', '.join(map(str, tariff_dict.keys())) + ")"
61
                                   " ORDER BY tariff_id, start_time_of_day ")
62
        cursor.execute(query_timeofuse_tariffs, )
63
        rows_timeofuse_tariffs = cursor.fetchall()
64
    except Exception as e:
65
        print(str(e))
66
        if cnx:
67
            cnx.disconnect()
68
        if cursor:
69
            cursor.close()
70
        return dict()
71
72
    if cursor:
73
        cursor.close()
74
    if cnx:
75
        cnx.disconnect()
76
77
    if rows_timeofuse_tariffs is None or len(rows_timeofuse_tariffs) == 0:
78
        return dict()
79
80
    for row in rows_timeofuse_tariffs:
81
        tariff_dict[row[0]]['rates'].append({'start_time_of_day': row[1],
82
                                             'end_time_of_day': row[2],
83
                                             'price': row[3]})
84
85
    result = dict()
86
    for tariff_id, tariff_value in tariff_dict.items():
87
        current_datetime_utc = tariff_value['valid_from_datetime_utc']
88
        while current_datetime_utc < tariff_value['valid_through_datetime_utc']:
89
            for rate in tariff_value['rates']:
90
                current_datetime_local = current_datetime_utc + timedelta(minutes=timezone_offset)
91
                seconds_since_midnight = (current_datetime_local -
92
                                          current_datetime_local.replace(hour=0,
93
                                                                         second=0,
94
                                                                         microsecond=0,
95
                                                                         tzinfo=None)).total_seconds()
96
                if rate['start_time_of_day'].total_seconds() <= \
97
                        seconds_since_midnight < rate['end_time_of_day'].total_seconds():
98
                    result[current_datetime_utc] = rate['price']
99
                    break
100
101
            # start from the next time slot
102
            current_datetime_utc += timedelta(minutes=config.minutes_to_count)
103
104
    return {k: v for k, v in result.items() if start_datetime_utc <= k <= end_datetime_utc}
105
106
107
########################################################################################################################
108
# Get tariffs by energy item
109
########################################################################################################################
110 View Code Duplication
def get_energy_item_tariffs(cost_center_id, energy_item_id, start_datetime_utc, end_datetime_utc):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
111
    # todo: verify parameters
112
    if cost_center_id is None:
113
        return dict()
114
115
    # get timezone offset in minutes, this value will be returned to client
116
    timezone_offset = int(config.utc_offset[1:3]) * 60 + int(config.utc_offset[4:6])
117
    if config.utc_offset[0] == '-':
118
        timezone_offset = -timezone_offset
119
120
    tariff_dict = collections.OrderedDict()
121
122
    cnx = None
123
    cursor = None
124
    try:
125
        cnx = mysql.connector.connect(**config.myems_system_db)
126
        cursor = cnx.cursor()
127
        query_tariffs = (" SELECT t.id, t.valid_from_datetime_utc, t.valid_through_datetime_utc "
128
                         " FROM tbl_tariffs t, tbl_cost_centers_tariffs cct, tbl_energy_items ei "
129
                         " WHERE ei.id = %s AND "
130
                         "       t.energy_category_id = ei.energy_category_id AND "
131
                         "       t.id = cct.tariff_id AND "
132
                         "       cct.cost_center_id = %s AND "
133
                         "       t.valid_through_datetime_utc >= %s AND "
134
                         "       t.valid_from_datetime_utc <= %s "
135
                         " ORDER BY t.valid_from_datetime_utc ")
136
        cursor.execute(query_tariffs, (energy_item_id, cost_center_id, start_datetime_utc, end_datetime_utc,))
137
        rows_tariffs = cursor.fetchall()
138
    except Exception as e:
139
        print(str(e))
140
        if cnx:
141
            cnx.disconnect()
142
        if cursor:
143
            cursor.close()
144
        return dict()
145
146
    if rows_tariffs is None or len(rows_tariffs) == 0:
147
        if cursor:
148
            cursor.close()
149
        if cnx:
150
            cnx.disconnect()
151
        return dict()
152
153
    for row in rows_tariffs:
154
        tariff_dict[row[0]] = {'valid_from_datetime_utc': row[1],
155
                               'valid_through_datetime_utc': row[2],
156
                               'rates': list()}
157
158
    try:
159
        query_timeofuse_tariffs = (" SELECT tariff_id, start_time_of_day, end_time_of_day, price "
160
                                   " FROM tbl_tariffs_timeofuses "
161
                                   " WHERE tariff_id IN ( " + ', '.join(map(str, tariff_dict.keys())) + ")"
162
                                   " ORDER BY tariff_id, start_time_of_day ")
163
        cursor.execute(query_timeofuse_tariffs, )
164
        rows_timeofuse_tariffs = cursor.fetchall()
165
    except Exception as e:
166
        print(str(e))
167
        if cnx:
168
            cnx.disconnect()
169
        if cursor:
170
            cursor.close()
171
        return dict()
172
173
    if cursor:
174
        cursor.close()
175
    if cnx:
176
        cnx.disconnect()
177
178
    if rows_timeofuse_tariffs is None or len(rows_timeofuse_tariffs) == 0:
179
        return dict()
180
181
    for row in rows_timeofuse_tariffs:
182
        tariff_dict[row[0]]['rates'].append({'start_time_of_day': row[1],
183
                                             'end_time_of_day': row[2],
184
                                             'price': row[3]})
185
186
    result = dict()
187
    for tariff_id, tariff_value in tariff_dict.items():
188
        current_datetime_utc = tariff_value['valid_from_datetime_utc']
189
        while current_datetime_utc < tariff_value['valid_through_datetime_utc']:
190
            for rate in tariff_value['rates']:
191
                current_datetime_local = current_datetime_utc + timedelta(minutes=timezone_offset)
192
                seconds_since_midnight = (current_datetime_local -
193
                                          current_datetime_local.replace(hour=0,
194
                                                                         second=0,
195
                                                                         microsecond=0,
196
                                                                         tzinfo=None)).total_seconds()
197
                if rate['start_time_of_day'].total_seconds() <= \
198
                        seconds_since_midnight < rate['end_time_of_day'].total_seconds():
199
                    result[current_datetime_utc] = rate['price']
200
                    break
201
202
            # start from the next time slot
203
            current_datetime_utc += timedelta(minutes=config.minutes_to_count)
204
205
    return {k: v for k, v in result.items() if start_datetime_utc <= k <= end_datetime_utc}
206