Test Failed
Pull Request — master (#204)
by Vinicius
10:17
created

kytos.core.dead_letter.DeadLetter.rest_delete()   B

Complexity

Conditions 8

Size

Total Lines 31
Code Lines 21

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 15
CRAP Score 9.4924

Importance

Changes 0
Metric Value
cc 8
eloc 21
nop 1
dl 0
loc 31
rs 7.3333
c 0
b 0
f 0
ccs 15
cts 21
cp 0.7143
crap 9.4924
1
"""Dead letter dict tructure."""
2 2
import json
3 2
from collections import OrderedDict, defaultdict
4 2
from enum import Enum
5 2
from threading import Lock
6 2
from typing import List
7
8 2
from flask import jsonify, request
9
# pylint: disable=no-name-in-module
10 2
from pydantic import BaseModel, ValidationError, constr
11
# pylint: enable=no-name-in-module
12 2
from werkzeug.exceptions import BadRequest, NotFound
13
14
15 2
class KytosQueueBufferNames(str, Enum):
16
    """KytosQueueBufferNames."""
17 2
    app = "app"
18 2
    msg_in = "msg_in"
19
20
21 2
class DeadLetterDeletePayload(BaseModel):
22
    """DeadLetterDeletePayload."""
23 2
    event_name: constr(min_length=1)
24 2
    ids: List[str] = []
25
26
27 2
class DeadLetterPatchPayload(DeadLetterDeletePayload):
28
    """DeadLetterPatchPayload."""
29 2
    kytos_queue_buffer: KytosQueueBufferNames
30
31
32 2
class DeadLetter:
33
    """DeadLetter."""
34
35 2
    def __init__(self, controller):
36
        """Init DeadLetter.
37
38
        Args:
39
            controller(kytos.core.controller): A Controller instance.
40
41
        """
42 2
        self.controller = controller
43 2
        self.dict = defaultdict(OrderedDict)  # dict of KytosEvents by name
44 2
        self._lock = Lock()
45 2
        self._max_len_per_event_name = 50000
46
47 2
    @staticmethod
48
    def _get_request():
49
        """Get request context."""
50
        return request
51
52 2
    def register_endpoints(self):
53
        """Register core endpoints."""
54 2
        api = self.controller.api_server
55 2
        api.register_core_endpoint("dead_letter/", self.rest_list,
56
                                   methods=["GET"])
57 2
        api.register_core_endpoint("dead_letter/", self.rest_delete,
58
                                   methods=["DELETE"])
59 2
        api.register_core_endpoint("dead_letter/", self.rest_patch,
60
                                   methods=["PATCH"])
61
62 2
    def rest_list(self):
63
        """List dead letter events."""
64 2
        event_name = self._get_request().args.get("event_name")
65 2
        response = (
66
            self.list_events()
67
            if not event_name
68
            else self.list_event(event_name)
69
        )
70 2
        return jsonify(response)
71
72 2
    def rest_patch(self):
73
        """Reinject dead letter events."""
74 2
        body = self._get_request().json or {}
75 2
        try:
76 2
            body = DeadLetterPatchPayload(**body)
77
        except ValidationError as exc:
78
            raise BadRequest(exc.errors())
79
80 2
        event_name = body.event_name
81 2
        if event_name not in self.dict:
82 2
            raise NotFound(f"event_name {event_name} not found")
83
84 2
        diff_ids = set(body.ids) - set(self.dict[event_name].keys())
85 2
        if diff_ids:
86 2
            raise NotFound(f"KytosEvent ids not found: {diff_ids}")
87
88 2
        _ids = body.ids or self.dict[event_name].keys()
89 2
        for _id in _ids:
90 2
            self.reinject(event_name, _id, body.kytos_queue_buffer)
91
92 2
        return jsonify()
93
94 2
    def rest_delete(self):
95
        """Delete dead letter events.
96
97
        event_name 'all' means explicitly delete all event names
98
99
        """
100 2
        body = self._get_request().json or {}
101 2
        try:
102 2
            body = DeadLetterDeletePayload(**body)
103
        except ValidationError as exc:
104
            raise BadRequest(exc.errors())
105
106 2
        event_name = body.event_name
107 2
        if event_name == "all":
108
            for event_name in self.dict.keys():
109
                self.delete_event_name(event_name)
110
            return jsonify()
111
112 2
        if event_name not in self.dict:
113 2
            raise NotFound(f"event_name {event_name} not found")
114
115 2
        diff_ids = set(body.ids) - set(self.dict[event_name].keys())
116 2
        if diff_ids:
117 2
            raise NotFound(f"KytosEvent ids not found: {diff_ids}")
118
119 2
        if not body.ids:
120
            self.delete_event_name(event_name)
121
        else:
122 2
            for _id in body.ids:
123 2
                self.delete_event(event_name, _id)
124 2
        return jsonify()
125
126 2
    def list_event(self, event_name: str):
127
        """List dead letter by event name."""
128 2
        response = defaultdict(dict)
129 2
        for key, value in self.dict[event_name].items():
130
            response[event_name][key] = json.loads(value.as_json())
131 2
        return response
132
133 2
    def list_events(self):
134
        """List dead letter events."""
135
        response = defaultdict(dict)
136
        for event_name, _dict in self.dict.items():
137
            for key, value in _dict.items():
138
                response[event_name][key] = json.loads(value.as_json())
139
        return response
140
141 2
    def add_event(self, event):
142
        """Add a KytoEvent to the dead letter."""
143 2
        if len(self.dict[event.name]) >= self._max_len_per_event_name:
144
            self.dict[event.name].popitem(last=False)
145 2
        self.dict[event.name][str(event.id)] = event
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable str does not seem to be defined.
Loading history...
146
147 2
    def delete_event(self, event_name: str, event_id: str):
148
        """Delete a KytoEvent from the dead letter."""
149 2
        return self.dict[event_name].pop(event_id, None)
150
151 2
    def delete_event_name(self, event_name: str):
152
        """Delete a event_name from the dead letter."""
153 2
        return self.dict.pop(event_name, None)
154
155 2
    def reinject(self, event_name: str, event_id: str, buffer_name: str):
156
        """Reinject an KytosEvent into a KytosEventBuffer."""
157 2
        if buffer_name not in {"app", "msg_in"}:
158
            return
159 2
        with self._lock:
160 2
            kytos_event = self.dict[event_name].pop(event_id, None)
161 2
            if not kytos_event:
162
                return
163 2
            kytos_event.reinjections += 1
164 2
        event_buffer = getattr(self.controller.buffers, buffer_name)
165
        event_buffer.put(kytos_event)
166