Completed
Pull Request — master (#161)
by Denis
03:18
created

EventGenerator.__str__()   A

Complexity

Conditions 1

Size

Total Lines 2

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 1
CRAP Score 1.125
Metric Value
cc 1
dl 0
loc 2
ccs 1
cts 2
cp 0.5
crap 1.125
rs 10
1
2 1
from datetime import datetime
3
4 1
from opcua import ua
5 1
from opcua import Node
6 1
import uuid
7
8
9 1
class EventGenerator(object):
10
11
    """
12
    Create an event based on an event type. Per default is BaseEventType used.
13
    Object members are dynamically created from the base event type and send to
14
    client when evebt is triggered (see example code in source)
15
16
    Arguments to constructor are:
17
18
        server: The InternalSession object to use for query and event triggering
19
20
        source: The emiting source for the node, either an objectId, NodeId or a Node
21
22
        etype: The event type, either an objectId, a NodeId or a Node object
23
    """
24
25 1
    def __init__(self, isession, etype=ua.BaseEvent(), source=ua.ObjectIds.Server):
26 1
        self.isession = isession
27 1
        self.event = None
28 1
        node = None
29
30 1
        if isinstance(etype, ua.BaseEvent):
31 1
            self.event = etype
32 1
        elif isinstance(etype, Node):
33 1
            node = etype
34 1
        elif isinstance(etype, ua.NodeId):
35 1
            node = Node(self.isession, etype)
36
        else:
37 1
            node = Node(self.isession, ua.NodeId(etype))
38
39 1
        if node:
40 1
            self.event = get_event_from_node(node)
41
42 1
        if isinstance(source, Node):
43
            pass
44 1
        elif isinstance(source, ua.NodeId):
45
            source = Node(isession, source)
46
        else:
47 1
            source = Node(isession, ua.NodeId(source))
48
49 1
        if self.event.SourceNode.Identifier:
50 1
            source = Node(self.isession, self.event.SourceNode)
51
52 1
        self.event.SourceName = source.get_display_name().Text
53 1
        source.set_attribute(ua.AttributeIds.EventNotifier, ua.DataValue(ua.Variant(1, ua.VariantType.Byte)))
54
55 1
    def __str__(self):
56
        return "EventGenerator(Type:{}, Source:{}, Time:{}, Message: {})".format(self.EventType, self.SourceNode, self.Time, self.Message)
57 1
    __repr__ = __str__
58
59 1
    def trigger(self, time=None, message=None):
60
        """
61
        Trigger the event. This will send a notification to all subscribed clients
62
        """
63
        self.event.EventId = ua.Variant(uuid.uuid4().hex, ua.VariantType.ByteString)
64
        if time:
65
            self.event.Time = time
66
        else:
67
            self.event.Time = datetime.utcnow()
68
        self.event.RecieveTime = datetime.utcnow()
69
        #FIXME: LocalTime is wrong but currently know better. For description s. Part 5 page 18
70
        self.event.LocalTime = datetime.utcnow()
71
        if message:
72
            self.Message = ua.LocalizedText(message)
73
        elif not self.event.Message:
74
            self.event.Message = ua.LocalizedText(self.source.get_browse_name().Text)
75
        self.isession.subscription_service.trigger_event(self.event)
76
77
78 1
def get_event_from_node(node):
79 1
    if node.nodeid.Identifier in ua.uaevents_auto.IMPLEMNTED_EVENTS.keys():
80 1
        return ua.uaevents_auto.IMPLEMNTED_EVENTS[node.nodeid.Identifier]()
81
    else:
82 1
        parent_identifier, parent_eventtype = _find_parent_eventtype(node)
83 1
        if not parent_eventtype:
84
            return None
85
86 1
        class CustomEvent(parent_eventtype):
87
88 1
            def __init__(self):
89 1
                super(CustomEvent, self).__init__()
90 1
                curr_node = node
91 1
                while curr_node.nodeid.Identifier != parent_identifier:
92
                    properties = curr_node.get_referenced_nodes(refs=ua.ObjectIds.HasProperty, direction=ua.BrowseDirection.Forward)
93
                    for prop in properties:
94
                        setattr(self, prop.get_browse_name().Name, prop.get_value())
95
96
                    parents = node.get_referenced_nodes(refs=ua.ObjectIds.HasSubtype, direction=ua.BrowseDirection.Inverse, includesubtypes=False)
97
                    if len(parents) != 1: # Something went wrong
98
                        return None
99
                    curr_node = parents[0]
100
101
            #TODO: Extend to show all fields of CustomEvent
102 1
            def __str__(self):
103
                s = 'CustomEvent('
104
                s += 'EventId:{}'.format(self.EventId)
105
                s += ', EventType:{}'.format(self.EventType)
106
                s += ', SourceNode:{}'.format(self.SourceNode)
107
                s += ', SourceName:{}'.format(self.SourceName)
108
                s += ', Time:{}'.format(self.Time)
109
                s += ', RecieveTime:{}'.format(self.RecieveTime)
110
                s += ', LocalTime:{}'.format(self.LocalTime)
111
                s += ', Message:{}'.format(self.Message)
112
                s += ', Severity:{}'.format(self.Severity)
113
                s += ')'
114
115
        #class CustomEvent():
116
117
            #pass
118
    #references = node.get_children_descriptions(refs=ua.ObjectIds.HasProperty)
119
    #for desc in references:
120
        #child = Node(self.isession, desc.NodeId)
121
        #setattr(self.event, desc.BrowseName.Name, child.get_value())
122
123 1
    return CustomEvent()
124
125
126 1
def _find_parent_eventtype(node):
127 1
    parents = node.get_referenced_nodes(refs=ua.ObjectIds.HasSubtype, direction=ua.BrowseDirection.Inverse, includesubtypes=False)
128
129 1
    if len(parents) != 1:   # Something went wrong
130
        return None
131 1
    if parents[0].nodeid.Identifier in ua.uaevents_auto.IMPLEMNTED_EVENTS.keys():
132 1
        return node.nodeid.Identifier, ua.uaevents_auto.IMPLEMNTED_EVENTS[parents[0].nodeid.Identifier]
133
    else:
134
        _find_parent_eventtype(parents[0])
135