Passed
Push — master ( f8b1cd...9ee003 )
by
unknown
10:23 queued 16s
created

core.privilege.PrivilegeCollection.on_get()   C

Complexity

Conditions 10

Size

Total Lines 73
Code Lines 44

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 44
dl 0
loc 73
rs 5.9999
c 0
b 0
f 0
cc 10
nop 2

How to fix   Long Method    Complexity   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

Complexity

Complex classes like core.privilege.PrivilegeCollection.on_get() often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
import falcon
2
import mysql.connector
3
import simplejson as json
4
import redis
5
from core.useractivity import user_logger, admin_control
6
import config
7
8
9 View Code Duplication
def clear_privilege_cache():
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
10
    """
11
    Clear privilege-related cache after data modification
12
    """
13
    # Check if Redis is enabled
14
    if not config.redis.get('is_enabled', False):
15
        return
16
17
    redis_client = None
18
    try:
19
        redis_client = redis.Redis(
20
            host=config.redis['host'],
21
            port=config.redis['port'],
22
            password=config.redis['password'] if config.redis['password'] else None,
23
            db=config.redis['db'],
24
            decode_responses=True,
25
            socket_connect_timeout=2,
26
            socket_timeout=2
27
        )
28
        redis_client.ping()
29
30
        # Clear privilege list cache
31
        list_cache_key = 'privilege:list'
32
        redis_client.delete(list_cache_key)
33
34
    except Exception:
35
        # If cache clear fails, ignore and continue
36
        pass
37
38
39
class PrivilegeCollection:
40
    """
41
    Privilege Collection Resource
42
43
    This class handles privilege management operations for the MyEMS system.
44
    It provides functionality to create and retrieve user privileges
45
    that define access permissions and roles.
46
    """
47
48
    def __init__(self):
49
        pass
50
51
    @staticmethod
52
    def on_options(req, resp):
53
        """
54
        Handle OPTIONS request for CORS preflight
55
56
        Args:
57
            req: Falcon request object
58
            resp: Falcon response object
59
        """
60
        _ = req
61
        resp.status = falcon.HTTP_200
62
63
    @staticmethod
64
    def on_get(req, resp):
65
        """
66
        Handle GET requests to retrieve all privileges
67
68
        Returns a list of all privileges with their metadata including:
69
        - Privilege ID and name
70
        - Privilege data (JSON configuration)
71
72
        Args:
73
            req: Falcon request object
74
            resp: Falcon response object
75
        """
76
        admin_control(req)
77
78
        # Redis cache key
79
        cache_key = 'privilege:list'
80
        cache_expire = 28800  # 8 hours in seconds (long-term cache)
81
82
        # Try to get from Redis cache (only if Redis is enabled)
83
        redis_client = None
84
        if config.redis.get('is_enabled', False):
85
            try:
86
                redis_client = redis.Redis(
87
                    host=config.redis['host'],
88
                    port=config.redis['port'],
89
                    password=config.redis['password'] if config.redis['password'] else None,
90
                    db=config.redis['db'],
91
                    decode_responses=True,
92
                    socket_connect_timeout=2,
93
                    socket_timeout=2
94
                )
95
                redis_client.ping()
96
                cached_result = redis_client.get(cache_key)
97
                if cached_result:
98
                    resp.text = cached_result
99
                    return
100
            except Exception:
101
                # If Redis connection fails, continue to database query
102
                pass
103
104
        # Cache miss or Redis error - query database
105
        cnx = mysql.connector.connect(**config.myems_user_db)
106
        cursor = cnx.cursor()
107
108
        # Query to retrieve all privileges ordered by ID descending
109
        query = (" SELECT id, name, data "
110
                 " FROM tbl_privileges "
111
                 " ORDER BY id DESC ")
112
        cursor.execute(query)
113
        rows = cursor.fetchall()
114
        cursor.close()
115
        cnx.close()
116
117
        # Build result list
118
        result = list()
119
        if rows is not None and len(rows) > 0:
120
            for row in rows:
121
                meta_result = {"id": row[0],
122
                               "name": row[1],
123
                               "data": row[2]}
124
                result.append(meta_result)
125
126
        # Store result in Redis cache
127
        result_json = json.dumps(result)
128
        if redis_client:
129
            try:
130
                redis_client.setex(cache_key, cache_expire, result_json)
131
            except Exception:
132
                # If cache set fails, ignore and continue
133
                pass
134
135
        resp.text = result_json
136
137 View Code Duplication
    @staticmethod
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
138
    @user_logger
139
    def on_post(req, resp):
140
        """
141
        Handle POST requests to create a new privilege
142
143
        Creates a new privilege with the specified name and data configuration.
144
        Requires admin privileges.
145
146
        Args:
147
            req: Falcon request object containing privilege data:
148
                - name: Privilege name (required)
149
                - data: Privilege data configuration (required)
150
            resp: Falcon response object
151
        """
152
        admin_control(req)
153
        try:
154
            raw_json = req.stream.read().decode('utf-8')
155
            new_values = json.loads(raw_json)
156
        except UnicodeDecodeError as ex:
157
            print("Failed to decode request")
158
            raise falcon.HTTPError(status=falcon.HTTP_400,
159
                                   title='API.BAD_REQUEST',
160
                                   description='API.INVALID_ENCODING')
161
        except json.JSONDecodeError as ex:
162
            print("Failed to parse JSON")
163
            raise falcon.HTTPError(status=falcon.HTTP_400,
164
                                   title='API.BAD_REQUEST',
165
                                   description='API.INVALID_JSON_FORMAT')
166
        except Exception as ex:
167
            print("Unexpected error reading request stream")
168
            raise falcon.HTTPError(status=falcon.HTTP_400,
169
                                   title='API.BAD_REQUEST',
170
                                   description='API.FAILED_TO_READ_REQUEST_STREAM')
171
172
        # Validate privilege name
173
        if 'name' not in new_values['data'] or \
174
            not isinstance(new_values['data']['name'], str) or \
175
                len(str.strip(new_values['data']['name'])) == 0:
176
            raise falcon.HTTPError(status=falcon.HTTP_400, title='API.BAD_REQUEST',
177
                                   description='API.INVALID_PRIVILEGE_NAME')
178
        name = str.strip(new_values['data']['name'])
179
180
        # Validate privilege data
181
        if 'data' not in new_values['data'] or \
182
            not isinstance(new_values['data']['data'], str) or \
183
                len(str.strip(new_values['data']['data'])) == 0:
184
            raise falcon.HTTPError(status=falcon.HTTP_400, title='API.BAD_REQUEST',
185
                                   description='API.INVALID_PRIVILEGE_DATA')
186
        data = str.strip(new_values['data']['data'])
187
188
        cnx = mysql.connector.connect(**config.myems_user_db)
189
        cursor = cnx.cursor()
190
191
        # Check if privilege name already exists
192
        cursor.execute(" SELECT name "
193
                       " FROM tbl_privileges "
194
                       " WHERE name = %s ", (name,))
195
        if cursor.fetchone() is not None:
196
            cursor.close()
197
            cnx.close()
198
            raise falcon.HTTPError(status=falcon.HTTP_400, title='API.BAD_REQUEST',
199
                                   description='API.PRIVILEGE_NAME_IS_ALREADY_IN_USE')
200
201
        # Insert new privilege into database
202
        add_row = (" INSERT INTO tbl_privileges "
203
                   "             (name, data) "
204
                   " VALUES (%s, %s) ")
205
206
        cursor.execute(add_row, (name, data, ))
207
        new_id = cursor.lastrowid
208
        cnx.commit()
209
        cursor.close()
210
        cnx.close()
211
212
        # Clear cache after creating new privilege
213
        clear_privilege_cache()
214
215
        resp.status = falcon.HTTP_201
216
        resp.location = '/privileges/' + str(new_id)
217
218
219
class PrivilegeItem:
220
    """
221
    Privilege Item Resource
222
223
    This class handles individual privilege operations including:
224
    - Updating privilege information
225
    - Deleting privileges (with relationship checks)
226
    """
227
228
    def __init__(self):
229
        pass
230
231
    @staticmethod
232
    def on_options(req, resp, id_):
233
        """
234
        Handle OPTIONS request for CORS preflight
235
236
        Args:
237
            req: Falcon request object
238
            resp: Falcon response object
239
            id_: Privilege ID parameter
240
        """
241
        _ = req
242
        resp.status = falcon.HTTP_200
243
        _ = id_
244
245
    @staticmethod
246
    @user_logger
247
    def on_delete(req, resp, id_):
248
        """
249
        Handle DELETE requests to remove a privilege
250
251
        Deletes the specified privilege from the database.
252
        Checks for existing relationships with users before deletion.
253
        Requires admin privileges.
254
255
        Args:
256
            req: Falcon request object
257
            resp: Falcon response object
258
            id_: Privilege ID to delete
259
        """
260
        admin_control(req)
261
        if not id_.isdigit() or int(id_) <= 0:
262
            raise falcon.HTTPError(status=falcon.HTTP_400, title='API.BAD_REQUEST',
263
                                   description='API.INVALID_PRIVILEGE_ID')
264
265
        cnx = mysql.connector.connect(**config.myems_user_db)
266
        cursor = cnx.cursor()
267
268
        # Check for relationships with users
269
        cursor.execute(" SELECT id "
270
                       " FROM tbl_users "
271
                       " WHERE privilege_id = %s ", (id_,))
272
        rows_users = cursor.fetchall()
273
        if rows_users is not None and len(rows_users) > 0:
274
            cursor.close()
275
            cnx.close()
276
            raise falcon.HTTPError(status=falcon.HTTP_400,
277
                                   title='API.BAD_REQUEST',
278
                                   description='API.THERE_IS_RELATION_WITH_USERS')
279
280
        # Check if privilege exists
281
        cursor.execute(" SELECT name "
282
                       " FROM tbl_privileges "
283
                       " WHERE id = %s ", (id_,))
284
        if cursor.fetchone() is None:
285
            cursor.close()
286
            cnx.close()
287
            raise falcon.HTTPError(status=falcon.HTTP_404, title='API.NOT_FOUND',
288
                                   description='API.PRIVILEGE_NOT_FOUND')
289
290
        # TODO: Delete associated objects before deleting privilege
291
        cursor.execute(" DELETE FROM tbl_privileges WHERE id = %s ", (id_,))
292
        cnx.commit()
293
294
        cursor.close()
295
        cnx.close()
296
297
        # Clear cache after deleting privilege
298
        clear_privilege_cache()
299
300
        resp.status = falcon.HTTP_204
301
302 View Code Duplication
    @staticmethod
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
303
    @user_logger
304
    def on_put(req, resp, id_):
305
        """
306
        Handle PUT requests to update privilege information
307
308
        Updates an existing privilege with new name and data configuration.
309
        Requires admin privileges.
310
311
        Args:
312
            req: Falcon request object containing update data:
313
                - name: New privilege name (required)
314
                - data: New privilege data configuration (required)
315
            resp: Falcon response object
316
            id_: Privilege ID to update
317
        """
318
        admin_control(req)
319
        try:
320
            raw_json = req.stream.read().decode('utf-8')
321
            new_values = json.loads(raw_json)
322
        except UnicodeDecodeError as ex:
323
            print("Failed to decode request")
324
            raise falcon.HTTPError(status=falcon.HTTP_400,
325
                                   title='API.BAD_REQUEST',
326
                                   description='API.INVALID_ENCODING')
327
        except json.JSONDecodeError as ex:
328
            print("Failed to parse JSON")
329
            raise falcon.HTTPError(status=falcon.HTTP_400,
330
                                   title='API.BAD_REQUEST',
331
                                   description='API.INVALID_JSON_FORMAT')
332
        except Exception as ex:
333
            print("Unexpected error reading request stream")
334
            raise falcon.HTTPError(status=falcon.HTTP_400,
335
                                   title='API.BAD_REQUEST',
336
                                   description='API.FAILED_TO_READ_REQUEST_STREAM')
337
338
        if not id_.isdigit() or int(id_) <= 0:
339
            raise falcon.HTTPError(status=falcon.HTTP_400, title='API.BAD_REQUEST',
340
                                   description='API.INVALID_PRIVILEGE_ID')
341
342
        # Validate privilege name
343
        if 'name' not in new_values['data'] or \
344
                not isinstance(new_values['data']['name'], str) or \
345
                len(str.strip(new_values['data']['name'])) == 0:
346
            raise falcon.HTTPError(status=falcon.HTTP_400, title='API.BAD_REQUEST',
347
                                   description='API.INVALID_PRIVILEGE_NAME')
348
        name = str.strip(new_values['data']['name'])
349
350
        # Validate privilege data
351
        if 'data' not in new_values['data'] or \
352
                not isinstance(new_values['data']['data'], str) or \
353
                len(str.strip(new_values['data']['data'])) == 0:
354
            raise falcon.HTTPError(status=falcon.HTTP_400, title='API.BAD_REQUEST',
355
                                   description='API.INVALID_PRIVILEGE_DATA')
356
        data = str.strip(new_values['data']['data'])
357
358
        cnx = mysql.connector.connect(**config.myems_user_db)
359
        cursor = cnx.cursor()
360
361
        # Check if privilege exists
362
        cursor.execute(" SELECT name "
363
                       " FROM tbl_privileges "
364
                       " WHERE id = %s ", (id_,))
365
        if cursor.fetchone() is None:
366
            cursor.close()
367
            cnx.close()
368
            raise falcon.HTTPError(status=falcon.HTTP_404, title='API.NOT_FOUND',
369
                                   description='API.PRIVILEGE_NOT_FOUND')
370
371
        # Check if new name conflicts with existing privileges (excluding current)
372
        cursor.execute(" SELECT name "
373
                       " FROM tbl_privileges "
374
                       " WHERE name = %s AND id != %s ", (name, id_))
375
        if cursor.fetchone() is not None:
376
            cursor.close()
377
            cnx.close()
378
            raise falcon.HTTPError(status=falcon.HTTP_400, title='API.BAD_REQUEST',
379
                                   description='API.PRIVILEGE_NAME_IS_ALREADY_IN_USE')
380
381
        # Update privilege information
382
        update_row = (" UPDATE tbl_privileges "
383
                      " SET name = %s, data = %s "
384
                      " WHERE id = %s ")
385
        cursor.execute(update_row, (name, data, id_,))
386
        cnx.commit()
387
388
        cursor.close()
389
        cnx.close()
390
391
        # Clear cache after updating privilege
392
        clear_privilege_cache()
393
394
        resp.status = falcon.HTTP_200
395
396