Code Duplication    Length = 29-30 lines in 3 locations

kytos/core/auth.py 3 locations

@@ 258-287 (lines=30) @@
255
                break
256
        return response["answer"], response["code"]
257
258
    @authenticated
259
    def _delete_user(self, uid):
260
        """Delete a user using Storehouse."""
261
        response = {}
262
263
        def _delete_user_callback(_event, box, error):
264
            nonlocal response
265
            if error or not box:
266
                response = {
267
                    "answer": "User has not been deleted",
268
                    "code": HTTPStatus.INTERNAL_SERVER_ERROR.value,
269
                }
270
            else:
271
                response = {
272
                    "answer": "User successfully deleted",
273
                    "code": HTTPStatus.OK.value,
274
                }
275
276
        content = {
277
            "box_id": uid,
278
            "namespace": self.namespace,
279
            "callback": _delete_user_callback,
280
        }
281
        event = KytosEvent(name="kytos.storehouse.delete", content=content)
282
        self.controller.buffers.app.put(event)
283
        while True:
284
            time.sleep(0.1)
285
            if response:
286
                break
287
        return response["answer"], response["code"]
288
289
    @authenticated
290
    def _update_user(self, uid):
@@ 189-217 (lines=29) @@
186
            del answer['data']['password']
187
        return answer, code
188
189
    @authenticated
190
    def _list_users(self):
191
        """List all users using Storehouse."""
192
        response = {}
193
194
        def _list_users_callback(_event, boxes, error):
195
            nonlocal response
196
            if error:
197
                response = {
198
                    "answer": "Users cannot be listed",
199
                    "code": HTTPStatus.INTERNAL_SERVER_ERROR.value,
200
                }
201
            else:
202
                response = {
203
                    "answer": {"users": boxes},
204
                    "code": HTTPStatus.OK.value,
205
                }
206
207
        content = {
208
            "namespace": self.namespace,
209
            "callback": _list_users_callback,
210
        }
211
        event = KytosEvent(name="kytos.storehouse.list", content=content)
212
        self.controller.buffers.app.put(event)
213
        while True:
214
            time.sleep(0.1)
215
            if response:
216
                break
217
        return response["answer"], response["code"]
218
219
    @authenticated
220
    def _create_user(self):
@@ 151-179 (lines=29) @@
148
            result = "Incorrect username or password"
149
            return result, HTTPStatus.UNAUTHORIZED.value
150
151
    def _find_user(self, uid):
152
        """Find a specific user using Storehouse."""
153
        response = {}
154
155
        def _find_user_callback(_event, box, error):
156
            nonlocal response
157
            if error or not box:
158
                response = {
159
                    "answer": "User data cannot be shown",
160
                    "code": HTTPStatus.INTERNAL_SERVER_ERROR.value,
161
                }
162
            else:
163
                response = {
164
                    "answer": {"data": box.data},
165
                    "code": HTTPStatus.OK.value,
166
                }
167
168
        content = {
169
            "box_id": uid,
170
            "namespace": self.namespace,
171
            "callback": _find_user_callback,
172
        }
173
        event = KytosEvent(name="kytos.storehouse.retrieve", content=content)
174
        self.controller.buffers.app.put(event)
175
        while True:
176
            time.sleep(0.1)
177
            if response:
178
                break
179
        return response["answer"], response["code"]
180
181
    @authenticated
182
    def _list_user(self, uid):