Passed
Push — dev ( 12545c...fbdf70 )
by Olivier
10:24
created

opcua.common.Event._set_members_from_node()   A

Complexity

Conditions 2

Size

Total Lines 5

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 5
CRAP Score 2
Metric Value
cc 2
dl 0
loc 5
ccs 5
cts 5
cp 1
crap 2
rs 9.4285
1
2 1
from datetime import datetime
3
4 1
from opcua import ua
5 1
from opcua import Node
6 1
import uuid
7
8
9 1
class Event(object):
10
11
    """
12
    Create an event based on an event type. Per default is BaseEventType used.
13
    Object members are dynamically created from the base event type and send to
14
    client when evebt is triggered (see example code in source)
15
16
    Arguments to constructor are:
17
18
        server: The InternalSession object to use for query and event triggering
19
20
        source: The emiting source for the node, either an objectId, NodeId or a Node
21
22
        etype: The event type, either an objectId, a NodeId or a Node object
23
    """
24
25 1
    def __init__(self, isession, etype=ua.ObjectIds.BaseEventType, source=ua.ObjectIds.Server):
26 1
        self.isession = isession
27
28 1
        if isinstance(etype, Node):
29
            self.node = etype
30 1
        elif isinstance(etype, ua.NodeId):
31
            self.node = Node(self.isession, etype)
32
        else:
33 1
            self.node = Node(self.isession, ua.NodeId(etype))
34
35 1
        self._set_members_from_node(self.node)
36 1
        if isinstance(source, Node):
37
            self.SourceNode = source.NodeId
38 1
        elif isinstance(etype, ua.NodeId):
39
            self.SourceNode = source.NodeId
40
        else:
41 1
            self.SourceNode = ua.NodeId(source)
42
43
        # set some default values for attributes from BaseEventType, thus that all event must have
44 1
        self.EventId = uuid.uuid4().bytes
45 1
        self.EventType = self.node.nodeid
46 1
        self.LocaleTime = datetime.utcnow()
47 1
        self.ReceiveTime = datetime.utcnow()
48 1
        self.Time = datetime.utcnow()
49 1
        self.Message = ua.LocalizedText()
50 1
        self.Severity = ua.Variant(1, ua.VariantType.UInt16)
51 1
        self.SourceName = "Server"
52
53
        # og set some node attributed we also are expected to have
54 1
        self.BrowseName = self.node.get_browse_name()
55 1
        self.DisplayName = self.node.get_display_name()
56 1
        self.NodeId = self.node.nodeid
57 1
        self.NodeClass = self.node.get_node_class()
58 1
        self.Description = self.node.get_description()
59
60 1
    def __str__(self):
61
        return "Event(Type:{}, Source:{}, Time:{}, Message: {})".format(self.EventType, self.SourceNode, self.Time, self.Message)
62 1
    __repr__ = __str__
63
64 1
    def trigger(self):
65
        """
66
        Trigger the event. This will send a notification to all subscribed clients
67
        """
68 1
        self.isession.subscription_service.trigger_event(self)
69
70 1
    def _set_members_from_node(self, node):
71 1
        references = node.get_children_descriptions(refs=ua.ObjectIds.HasProperty)
72 1
        for desc in references:
73 1
            node = Node(self.isession, desc.NodeId)
74
            setattr(self, desc.BrowseName.Name, node.get_value())
75