Passed
Push — master ( b7641a...50d42e )
by Vinicius
15:54 queued 08:37
created

DeadLetter.delete_event_name()   A

Complexity

Conditions 1

Size

Total Lines 3
Code Lines 2

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 2
CRAP Score 1

Importance

Changes 0
Metric Value
cc 1
eloc 2
nop 2
dl 0
loc 3
rs 10
c 0
b 0
f 0
ccs 2
cts 2
cp 1
crap 1
1
"""Dead letter dict tructure."""
2 2
import json
3 2
from collections import OrderedDict, defaultdict
4 2
from enum import Enum
5 2
from threading import Lock
6 2
from typing import List
7
8 2
from flask import jsonify, request
9
# pylint: disable=no-name-in-module
10 2
from pydantic import BaseModel, ValidationError, constr
11
# pylint: enable=no-name-in-module
12 2
from werkzeug.exceptions import BadRequest, NotFound
13
14
15 2
class KytosQueueBufferNames(str, Enum):
16
    """KytosQueueBufferNames."""
17
18 2
    app = "app"
19 2
    msg_in = "msg_in"
20
21
22 2
class DeadLetterDeletePayload(BaseModel):
23
    """DeadLetterDeletePayload."""
24
25 2
    event_name: constr(min_length=1)
26 2
    ids: List[str] = []
27
28
29 2
class DeadLetterPatchPayload(DeadLetterDeletePayload):
30
    """DeadLetterPatchPayload."""
31
32 2
    kytos_queue_buffer: KytosQueueBufferNames
33
34
35 2
class DeadLetter:
36
    """DeadLetter."""
37
38 2
    def __init__(self, controller):
39
        """Init DeadLetter.
40
41
        Args:
42
            controller(kytos.core.controller): A Controller instance.
43
44
        """
45 2
        self.controller = controller
46 2
        self.dict = defaultdict(OrderedDict)  # dict of KytosEvents by name
47 2
        self._lock = Lock()
48 2
        self._max_len_per_event_name = 50000
49
50 2
    @staticmethod
51
    def _get_request():
52
        """Get request context."""
53
        return request
54
55 2
    def register_endpoints(self):
56
        """Register core endpoints."""
57 2
        api = self.controller.api_server
58 2
        api.register_core_endpoint("dead_letter/", self.rest_list,
59
                                   methods=["GET"])
60 2
        api.register_core_endpoint("dead_letter/", self.rest_delete,
61
                                   methods=["DELETE"])
62 2
        api.register_core_endpoint("dead_letter/", self.rest_patch,
63
                                   methods=["PATCH"])
64
65 2
    def rest_list(self):
66
        """List dead letter events."""
67 2
        event_name = self._get_request().args.get("event_name")
68 2
        response = (
69
            self.list_events()
70
            if not event_name
71
            else self.list_event(event_name)
72
        )
73 2
        return jsonify(response)
74
75 2
    def rest_patch(self):
76
        """Reinject dead letter events."""
77 2
        body = self._get_request().json or {}
78 2
        try:
79 2
            body = DeadLetterPatchPayload(**body)
80
        except ValidationError as exc:
81
            raise BadRequest(exc.errors())
82
83 2
        event_name = body.event_name
84 2
        if event_name not in self.dict:
85 2
            raise NotFound(f"event_name {event_name} not found")
86
87 2
        diff_ids = set(body.ids) - set(self.dict[event_name].keys())
88 2
        if diff_ids:
89 2
            raise NotFound(f"KytosEvent ids not found: {diff_ids}")
90
91 2
        _ids = body.ids or self.dict[event_name].keys()
92 2
        for _id in _ids:
93 2
            self.reinject(event_name, _id, body.kytos_queue_buffer)
94
95 2
        return jsonify()
96
97 2
    def rest_delete(self):
98
        """Delete dead letter events.
99
100
        event_name 'all' means explicitly delete all event names
101
102
        """
103 2
        body = self._get_request().json or {}
104 2
        try:
105 2
            body = DeadLetterDeletePayload(**body)
106
        except ValidationError as exc:
107
            raise BadRequest(exc.errors())
108
109 2
        event_name = body.event_name
110 2
        if event_name == "all":
111
            for event_name in self.dict.keys():
112
                self.delete_event_name(event_name)
113
            return jsonify()
114
115 2
        if event_name not in self.dict:
116 2
            raise NotFound(f"event_name {event_name} not found")
117
118 2
        diff_ids = set(body.ids) - set(self.dict[event_name].keys())
119 2
        if diff_ids:
120 2
            raise NotFound(f"KytosEvent ids not found: {diff_ids}")
121
122 2
        if not body.ids:
123
            self.delete_event_name(event_name)
124
        else:
125 2
            for _id in body.ids:
126 2
                self.delete_event(event_name, _id)
127 2
        return jsonify()
128
129 2
    def list_event(self, event_name: str):
130
        """List dead letter by event name."""
131 2
        response = defaultdict(dict)
132 2
        for key, value in self.dict[event_name].items():
133
            response[event_name][key] = json.loads(value.as_json())
134 2
        return response
135
136 2
    def list_events(self):
137
        """List dead letter events."""
138
        response = defaultdict(dict)
139
        for event_name, _dict in self.dict.items():
140
            for key, value in _dict.items():
141
                response[event_name][key] = json.loads(value.as_json())
142
        return response
143
144 2
    def add_event(self, event):
145
        """Add a KytoEvent to the dead letter."""
146 2
        if len(self.dict[event.name]) >= self._max_len_per_event_name:
147
            self.dict[event.name].popitem(last=False)
148 2
        self.dict[event.name][str(event.id)] = event
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable str does not seem to be defined.
Loading history...
149
150 2
    def delete_event(self, event_name: str, event_id: str):
151
        """Delete a KytoEvent from the dead letter."""
152 2
        return self.dict[event_name].pop(event_id, None)
153
154 2
    def delete_event_name(self, event_name: str):
155
        """Delete a event_name from the dead letter."""
156 2
        return self.dict.pop(event_name, None)
157
158 2
    def reinject(self, event_name: str, event_id: str, buffer_name: str):
159
        """Reinject an KytosEvent into a KytosEventBuffer."""
160 2
        if buffer_name not in {"app", "msg_in"}:
161
            return
162 2
        with self._lock:
163 2
            kytos_event = self.dict[event_name].pop(event_id, None)
164 2
            if not kytos_event:
165
                return
166 2
            kytos_event.reinjections += 1
167 2
        event_buffer = getattr(self.controller.buffers, buffer_name)
168
        event_buffer.put(kytos_event)
169