tariff.get_energy_item_tariffs()   F
last analyzed

Complexity

Conditions 23

Size

Total Lines 96
Code Lines 71

Duplication

Lines 96
Ratio 100 %

Importance

Changes 0
Metric Value
eloc 71
dl 96
loc 96
rs 0
c 0
b 0
f 0
cc 23
nop 4

How to fix   Long Method    Complexity   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

Complexity

Complex classes like tariff.get_energy_item_tariffs() often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
import collections
2
from datetime import timedelta
3
4
import mysql.connector
5
6
import config
7
8
9
########################################################################################################################
10
# Get tariffs by energy category
11
########################################################################################################################
12 View Code Duplication
def get_energy_category_tariffs(cost_center_id, energy_category_id, start_datetime_utc, end_datetime_utc):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
13
    # todo: verify parameters
14
    if cost_center_id is None:
15
        return dict()
16
17
    # get timezone offset in minutes, this value will be returned to client
18
    timezone_offset = int(config.utc_offset[1:3]) * 60 + int(config.utc_offset[4:6])
19
    if config.utc_offset[0] == '-':
20
        timezone_offset = -timezone_offset
21
22
    tariff_dict = collections.OrderedDict()
23
24
    cnx = None
25
    cursor = None
26
    try:
27
        cnx = mysql.connector.connect(**config.myems_system_db)
28
        cursor = cnx.cursor()
29
        query_tariffs = (" SELECT t.id, t.valid_from_datetime_utc, t.valid_through_datetime_utc "
30
                         " FROM tbl_tariffs t, tbl_cost_centers_tariffs cct "
31
                         " WHERE t.energy_category_id = %s AND "
32
                         "       t.id = cct.tariff_id AND "
33
                         "       cct.cost_center_id = %s AND "
34
                         "       t.valid_through_datetime_utc >= %s AND "
35
                         "       t.valid_from_datetime_utc <= %s "
36
                         " ORDER BY t.valid_from_datetime_utc ")
37
        cursor.execute(query_tariffs, (energy_category_id, cost_center_id, start_datetime_utc, end_datetime_utc,))
38
        rows_tariffs = cursor.fetchall()
39
    except Exception as e:
40
        print(str(e))
41
        if cursor:
42
            cursor.close()
43
        if cnx:
44
            cnx.close()
45
        return dict()
46
47
    if rows_tariffs is None or len(rows_tariffs) == 0:
48
        if cursor:
49
            cursor.close()
50
        if cnx:
51
            cnx.close()
52
        return dict()
53
54
    for row in rows_tariffs:
55
        tariff_dict[row[0]] = {'valid_from_datetime_utc': row[1],
56
                               'valid_through_datetime_utc': row[2],
57
                               'rates': list()}
58
59
    try:
60
        query_timeofuse_tariffs = (" SELECT tariff_id, start_time_of_day, end_time_of_day, price "
61
                                   " FROM tbl_tariffs_timeofuses "
62
                                   " WHERE tariff_id IN ( " + ', '.join(map(str, tariff_dict.keys())) + ")"
63
                                   " ORDER BY tariff_id, start_time_of_day ")
64
        cursor.execute(query_timeofuse_tariffs, )
65
        rows_timeofuse_tariffs = cursor.fetchall()
66
    except Exception as e:
67
        print(str(e))
68
        if cursor:
69
            cursor.close()
70
        if cnx:
71
            cnx.close()
72
        return dict()
73
74
    if cursor:
75
        cursor.close()
76
    if cnx:
77
        cnx.close()
78
79
    if rows_timeofuse_tariffs is None or len(rows_timeofuse_tariffs) == 0:
80
        return dict()
81
82
    for row in rows_timeofuse_tariffs:
83
        tariff_dict[row[0]]['rates'].append({'start_time_of_day': row[1],
84
                                             'end_time_of_day': row[2],
85
                                             'price': row[3]})
86
87
    result = dict()
88
    for tariff_id, tariff_value in tariff_dict.items():
89
        current_datetime_utc = tariff_value['valid_from_datetime_utc']
90
        while current_datetime_utc < tariff_value['valid_through_datetime_utc']:
91
            for rate in tariff_value['rates']:
92
                current_datetime_local = current_datetime_utc + timedelta(minutes=timezone_offset)
93
                seconds_since_midnight = (current_datetime_local -
94
                                          current_datetime_local.replace(hour=0,
95
                                                                         second=0,
96
                                                                         microsecond=0,
97
                                                                         tzinfo=None)).total_seconds()
98
                if rate['start_time_of_day'].total_seconds() <= \
99
                        seconds_since_midnight < rate['end_time_of_day'].total_seconds():
100
                    result[current_datetime_utc] = rate['price']
101
                    break
102
103
            # start from the next time slot
104
            current_datetime_utc += timedelta(minutes=config.minutes_to_count)
105
106
    return {k: v for k, v in result.items() if start_datetime_utc <= k <= end_datetime_utc}
107
108
109
########################################################################################################################
110
# Get tariffs by energy item
111
########################################################################################################################
112 View Code Duplication
def get_energy_item_tariffs(cost_center_id, energy_item_id, start_datetime_utc, end_datetime_utc):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
113
    # todo: verify parameters
114
    if cost_center_id is None:
115
        return dict()
116
117
    # get timezone offset in minutes, this value will be returned to client
118
    timezone_offset = int(config.utc_offset[1:3]) * 60 + int(config.utc_offset[4:6])
119
    if config.utc_offset[0] == '-':
120
        timezone_offset = -timezone_offset
121
122
    tariff_dict = collections.OrderedDict()
123
124
    cnx = None
125
    cursor = None
126
    try:
127
        cnx = mysql.connector.connect(**config.myems_system_db)
128
        cursor = cnx.cursor()
129
        query_tariffs = (" SELECT t.id, t.valid_from_datetime_utc, t.valid_through_datetime_utc "
130
                         " FROM tbl_tariffs t, tbl_cost_centers_tariffs cct, tbl_energy_items ei "
131
                         " WHERE ei.id = %s AND "
132
                         "       t.energy_category_id = ei.energy_category_id AND "
133
                         "       t.id = cct.tariff_id AND "
134
                         "       cct.cost_center_id = %s AND "
135
                         "       t.valid_through_datetime_utc >= %s AND "
136
                         "       t.valid_from_datetime_utc <= %s "
137
                         " ORDER BY t.valid_from_datetime_utc ")
138
        cursor.execute(query_tariffs, (energy_item_id, cost_center_id, start_datetime_utc, end_datetime_utc,))
139
        rows_tariffs = cursor.fetchall()
140
    except Exception as e:
141
        print(str(e))
142
        if cursor:
143
            cursor.close()
144
        if cnx:
145
            cnx.close()
146
        return dict()
147
148
    if rows_tariffs is None or len(rows_tariffs) == 0:
149
        if cursor:
150
            cursor.close()
151
        if cnx:
152
            cnx.close()
153
        return dict()
154
155
    for row in rows_tariffs:
156
        tariff_dict[row[0]] = {'valid_from_datetime_utc': row[1],
157
                               'valid_through_datetime_utc': row[2],
158
                               'rates': list()}
159
160
    try:
161
        query_timeofuse_tariffs = (" SELECT tariff_id, start_time_of_day, end_time_of_day, price "
162
                                   " FROM tbl_tariffs_timeofuses "
163
                                   " WHERE tariff_id IN ( " + ', '.join(map(str, tariff_dict.keys())) + ")"
164
                                   " ORDER BY tariff_id, start_time_of_day ")
165
        cursor.execute(query_timeofuse_tariffs, )
166
        rows_timeofuse_tariffs = cursor.fetchall()
167
    except Exception as e:
168
        print(str(e))
169
        if cursor:
170
            cursor.close()
171
        if cnx:
172
            cnx.close()
173
        return dict()
174
175
    if cursor:
176
        cursor.close()
177
    if cnx:
178
        cnx.close()
179
180
    if rows_timeofuse_tariffs is None or len(rows_timeofuse_tariffs) == 0:
181
        return dict()
182
183
    for row in rows_timeofuse_tariffs:
184
        tariff_dict[row[0]]['rates'].append({'start_time_of_day': row[1],
185
                                             'end_time_of_day': row[2],
186
                                             'price': row[3]})
187
188
    result = dict()
189
    for tariff_id, tariff_value in tariff_dict.items():
190
        current_datetime_utc = tariff_value['valid_from_datetime_utc']
191
        while current_datetime_utc < tariff_value['valid_through_datetime_utc']:
192
            for rate in tariff_value['rates']:
193
                current_datetime_local = current_datetime_utc + timedelta(minutes=timezone_offset)
194
                seconds_since_midnight = (current_datetime_local -
195
                                          current_datetime_local.replace(hour=0,
196
                                                                         second=0,
197
                                                                         microsecond=0,
198
                                                                         tzinfo=None)).total_seconds()
199
                if rate['start_time_of_day'].total_seconds() <= \
200
                        seconds_since_midnight < rate['end_time_of_day'].total_seconds():
201
                    result[current_datetime_utc] = rate['price']
202
                    break
203
204
            # start from the next time slot
205
            current_datetime_utc += timedelta(minutes=config.minutes_to_count)
206
207
    return {k: v for k, v in result.items() if start_datetime_utc <= k <= end_datetime_utc}
208