Test Failed
Pull Request — master (#375)
by Vinicius
05:01
created

kytos.core.dead_letter.DeadLetter.rest_delete()   B

Complexity

Conditions 8

Size

Total Lines 33
Code Lines 23

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 8
eloc 23
nop 2
dl 0
loc 33
rs 7.3333
c 0
b 0
f 0
1
"""Dead letter dict tructure."""
2
import json
3
from collections import OrderedDict, defaultdict
4
from enum import Enum
5
from typing import List
6
7
# pylint: disable=no-name-in-module,invalid-name
8
from pydantic import BaseModel, ValidationError, constr
9
10
# pylint: enable=no-name-in-module
11
from kytos.core.rest_api import (HTTPException, JSONResponse, Request,
12
                                 aget_json_or_400)
13
14
15
class KytosQueueBufferNames(str, Enum):
16
    """KytosQueueBufferNames."""
17
18
    app = "app"
19
    msg_in = "msg_in"
20
21
22
class DeadLetterDeletePayload(BaseModel):
23
    """DeadLetterDeletePayload."""
24
25
    event_name: constr(min_length=1)
26
    ids: List[str] = []
27
28
29
class DeadLetterPatchPayload(DeadLetterDeletePayload):
30
    """DeadLetterPatchPayload."""
31
32
    kytos_queue_buffer: KytosQueueBufferNames
33
34
35
class DeadLetter:
36
    """DeadLetter."""
37
38
    def __init__(self, controller):
39
        """Init DeadLetter.
40
41
        Args:
42
            controller(kytos.core.controller): A Controller instance.
43
44
        """
45
        self.controller = controller
46
        self.dict = defaultdict(OrderedDict)  # dict of KytosEvents by name
47
        self._max_len_per_event_name = 50000
48
49
    def register_endpoints(self):
50
        """Register core endpoints."""
51
        api = self.controller.api_server
52
        api.register_core_endpoint("dead_letter/", self.rest_list,
53
                                   methods=["GET"])
54
        api.register_core_endpoint("dead_letter/", self.rest_delete,
55
                                   methods=["DELETE"])
56
        api.register_core_endpoint("dead_letter/", self.rest_patch,
57
                                   methods=["PATCH"])
58
59
    async def rest_list(self, request: Request) -> JSONResponse:
60
        """List dead letter events."""
61
        event_name = request.query_params.get("event_name")
62
        response = (
63
            self.list_events()
64
            if not event_name
65
            else self.list_event(event_name)
66
        )
67
        return JSONResponse(response)
68
69
    async def rest_patch(self, request: Request) -> JSONResponse:
70
        """Reinject dead letter events."""
71
        body = await request.json()
72
        try:
73
            body = DeadLetterPatchPayload(**body)
74
        except ValidationError as exc:
75
            return JSONResponse(exc.errors(), status_code=400)
76
        except TypeError:
77
            raise HTTPException(400, detail=f"Invalid type value {body}")
78
79
        event_name = body.event_name
80
        if event_name not in self.dict:
81
            raise HTTPException(404,
82
                                detail=f"event_name {event_name} not found")
83
84
        diff_ids = set(body.ids) - set(self.dict[event_name].keys())
85
        if diff_ids:
86
            raise HTTPException(404,
87
                                detail=f"KytosEvent ids not found: {diff_ids}")
88
89
        _ids = body.ids or self.dict[event_name].keys()
90
        for _id in _ids:
91
            self.reinject(event_name, _id, body.kytos_queue_buffer)
92
93
        return JSONResponse({})
94
95
    async def rest_delete(self, request: Request) -> JSONResponse:
96
        """Delete dead letter events.
97
98
        event_name 'all' means explicitly delete all event names
99
100
        """
101
        body = await aget_json_or_400(request)
102
        try:
103
            body = DeadLetterDeletePayload(**body)
104
        except ValidationError as exc:
105
            return JSONResponse(exc.errors(), status_code=400)
106
107
        event_name = body.event_name
108
        if event_name == "all":
109
            for event_name in self.dict.keys():
110
                self.delete_event_name(event_name)
111
            return JSONResponse({})
112
113
        if event_name not in self.dict:
114
            raise HTTPException(404,
115
                                detail=f"event_name {event_name} not found")
116
117
        diff_ids = set(body.ids) - set(self.dict[event_name].keys())
118
        if diff_ids:
119
            raise HTTPException(404,
120
                                detail=f"KytosEvent ids not found: {diff_ids}")
121
122
        if not body.ids:
123
            self.delete_event_name(event_name)
124
        else:
125
            for _id in body.ids:
126
                self.delete_event(event_name, _id)
127
        return JSONResponse({})
128
129
    def list_event(self, event_name: str):
130
        """List dead letter by event name."""
131
        response = defaultdict(dict)
132
        for key, value in self.dict[event_name].items():
133
            response[event_name][key] = json.loads(value.as_json())
134
        return response
135
136
    def list_events(self) -> dict:
137
        """List dead letter events."""
138
        response = defaultdict(dict)
139
        for event_name, _dict in self.dict.items():
140
            for key, value in _dict.items():
141
                response[event_name][key] = json.loads(value.as_json())
142
        return response
143
144
    def add_event(self, event):
145
        """Add a KytoEvent to the dead letter."""
146
        if len(self.dict[event.name]) >= self._max_len_per_event_name:
147
            self.dict[event.name].popitem(last=False)
148
        self.dict[event.name][str(event.id)] = event
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable str does not seem to be defined.
Loading history...
149
150
    def delete_event(self, event_name: str, event_id: str):
151
        """Delete a KytoEvent from the dead letter."""
152
        return self.dict[event_name].pop(event_id, None)
153
154
    def delete_event_name(self, event_name: str):
155
        """Delete a event_name from the dead letter."""
156
        return self.dict.pop(event_name, None)
157
158
    def reinject(self, event_name: str, event_id: str, buffer_name: str):
159
        """Reinject an KytosEvent into a KytosEventBuffer."""
160
        if buffer_name not in {"app", "msg_in"}:
161
            return
162
        if event_name not in self.dict:
163
            return
164
        kytos_event = self.dict[event_name].pop(event_id, None)
165
        if not kytos_event:
166
            return
167
        kytos_event.reinjections += 1
168
        event_buffer = getattr(self.controller.buffers, buffer_name)
169
        event_buffer.put(kytos_event)
170