Test Failed
Pull Request — master (#226)
by Vinicius
07:46
created

kytos.core.events.KytosEvent.__lt__()   A

Complexity

Conditions 2

Size

Total Lines 5
Code Lines 4

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 2
CRAP Score 2

Importance

Changes 0
Metric Value
cc 2
eloc 4
nop 2
dl 0
loc 5
rs 10
c 0
b 0
f 0
ccs 2
cts 2
cp 1
crap 2
1
"""Module with Kytos Events."""
2 1
import json
3 1
from datetime import datetime
4 1
from uuid import uuid4
5
6 1
from kytos.core.helpers import now
7
8
9 1
class KytosEvent:
10
    """Base Event class.
11
12
    The event data will be passed in the `content` attribute, which should be a
13
    dictionary.
14
    """
15
16 1
    def __init__(self, name=None, content=None, trace_parent=None, priority=0):
17
        """Create an event to be published.
18
19
        Args:
20
            name (string): The name of the event. You should prepend it with
21
                           the name of the napp.
22
            content (dict): Dictionary with any extra data for the event.
23
            trace_parent (object): APM TraceParent for distributed tracing,
24
                                   if you have APM enabled, @listen_to will
25
                                   set the root parent, and then you have to
26
                                   pass the trace_parent to subsequent
27
                                   correlated KytosEvent(s).
28
            priority (int): Priority of this event if a PriorityQueue is being
29 1
                            used, the lower the number the higher the priority.
30 1
        """
31 1
        self.name = name
32 1
        self.content = content if content is not None else {}
33
        self.timestamp = now()
34
        self.reinjections = 0
35 1
        self.priority = priority
36 1
37
        # pylint: disable=invalid-name
38 1
        self.id = uuid4()
39 1
        self.trace_parent = trace_parent
40
41 1
    def __str__(self):
42 1
        return self.name
43
44 1
    def __repr__(self):
45
        return f"KytosEvent({self.name!r}, {self.content!r}, {self.priority})"
46 1
47
    def __lt__(self, other):
48
        """Less than operator."""
49 1
        if self.priority != other.priority:
50
            return self.priority < other.priority
51 1
        return self.timestamp < other.timestamp
52 1
53
    def as_dict(self):
54 1
        """Return KytosEvent as a dict."""
55 1
        return {'id': str(self.id), 'name': self.name, 'content': self.content,
56 1
                'timestamp': self.timestamp, 'reinjections': self.reinjections}
57
58
    def as_json(self):
59
        """Return KytosEvent as json."""
60
        as_dict = self.as_dict()
61 1
        timestamp = datetime.strftime(as_dict['timestamp'],
62 1
                                      '%Y-%m-%dT%H:%M:%S')
63
        as_dict['timestamp'] = timestamp
64 1
        try:
65
            return json.dumps(as_dict)
66 1
        except TypeError:
67
            as_dict['content'] = str(as_dict['content'])
68
            return json.dumps(as_dict)
69
70
    @property
71
    def destination(self):
72 1
        """Return the destination of KytosEvent."""
73
        return self.content.get('destination')
74 1
75 1
    def set_destination(self, destination):
76
        """Update the destination of KytosEvent.
77 1
78
        Args:
79 1
            destination (string): destination of KytosEvent.
80
        """
81
        self.content['destination'] = destination
82
83
    @property
84
    def source(self):
85 1
        """Return the source of KytosEvent."""
86
        return self.content.get('source')
87 1
88 1
    def set_source(self, source):
89
        """Update the source of KytosEvent.
90
91
        Args:
92
            source (string): source of KytosEvent.
93
        """
94
        self.content['source'] = source
95
96
    @property
97
    def message(self):
98 1
        """Return the message carried by the event if it exists.
99 1
100 1
        If there is any OpenFlow message on the event it'll be stored on
101 1
        the 'message' key of the 'content' attribute.
102
103
        Returns:
104
            A python-openflow message instance if it exists, None otherwise.
105
106
        """
107
        try:
108
            return self.content['message']
109
        except KeyError:
110
            return None
111