Passed
Push — master ( b7641a...50d42e )
by Vinicius
15:54 queued 08:37
created

kytos.core.events.KytosEvent.set_source()   A

Complexity

Conditions 1

Size

Total Lines 7
Code Lines 2

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 2
CRAP Score 1

Importance

Changes 0
Metric Value
cc 1
eloc 2
nop 2
dl 0
loc 7
rs 10
c 0
b 0
f 0
ccs 2
cts 2
cp 1
crap 1
1
"""Module with Kytos Events."""
2 2
import json
3 2
from datetime import datetime
4 2
from uuid import uuid4
5
6 2
from kytos.core.helpers import now
7
8
9 2
class KytosEvent:
10
    """Base Event class.
11
12
    The event data will be passed in the `content` attribute, which should be a
13
    dictionary.
14
    """
15
16 2
    def __init__(self, name=None, content=None):
17
        """Create an event to be published.
18
19
        Args:
20
            name (string): The name of the event. You should prepend it with
21
                           the name of the napp.
22
            content (dict): Dictionary with any extra data for the event.
23
        """
24 2
        self.name = name
25 2
        self.content = content if content is not None else {}
26 2
        self.timestamp = now()
27 2
        self.reinjections = 0
28
29
        # pylint: disable=invalid-name
30 2
        self.id = uuid4()
31
        # pylint: enable=invalid-name
32
33 2
    def __str__(self):
34 2
        return self.name
35
36 2
    def __repr__(self):
37 2
        return f"KytosEvent({self.name!r}, {self.content!r})"
38
39 2
    def as_dict(self):
40
        """Return KytosEvent as a dict."""
41 2
        return {'id': str(self.id), 'name': self.name, 'content': self.content,
42
                'timestamp': self.timestamp, 'reinjections': self.reinjections}
43
44 2
    def as_json(self):
45
        """Return KytosEvent as json."""
46 2
        as_dict = self.as_dict()
47 2
        timestamp = datetime.strftime(as_dict['timestamp'],
48
                                      '%Y-%m-%dT%H:%M:%S')
49 2
        as_dict['timestamp'] = timestamp
50 2
        try:
51 2
            return json.dumps(as_dict)
52
        except TypeError:
53
            as_dict['content'] = str(as_dict['content'])
54
            return json.dumps(as_dict)
55
56 2
    @property
57
    def destination(self):
58
        """Return the destination of KytosEvent."""
59 2
        return self.content.get('destination')
60
61 2
    def set_destination(self, destination):
62
        """Update the destination of KytosEvent.
63
64
        Args:
65
            destination (string): destination of KytosEvent.
66
        """
67 2
        self.content['destination'] = destination
68
69 2
    @property
70
    def source(self):
71
        """Return the source of KytosEvent."""
72 2
        return self.content.get('source')
73
74 2
    def set_source(self, source):
75
        """Update the source of KytosEvent.
76
77
        Args:
78
            source (string): source of KytosEvent.
79
        """
80 2
        self.content['source'] = source
81
82 2
    @property
83
    def message(self):
84
        """Return the message carried by the event if it exists.
85
86
        If there is any OpenFlow message on the event it'll be stored on
87
        the 'message' key of the 'content' attribute.
88
89
        Returns:
90
            A python-openflow message instance if it exists, None otherwise.
91
92
        """
93 2
        try:
94 2
            return self.content['message']
95 2
        except KeyError:
96
            return None
97