Completed
Pull Request — master (#170)
by Denis
03:35
created

EventGenerator.trigger()   A

Complexity

Conditions 4

Size

Total Lines 17

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 9
CRAP Score 4.25

Importance

Changes 1
Bugs 0 Features 0
Metric Value
cc 4
dl 0
loc 17
ccs 9
cts 12
cp 0.75
crap 4.25
rs 9.2
c 1
b 0
f 0
1 1
import logging
2 1
from datetime import datetime
3
4 1
from opcua import ua
5 1
from opcua import Node
6 1
import uuid
7
8
9 1
class EventGenerator(object):
10
11
    """
12
    Create an event based on an event type. Per default is BaseEventType used.
13
    Object members are dynamically created from the base event type and send to
14
    client when evebt is triggered (see example code in source)
15
16
    Arguments to constructor are:
17
18
        server: The InternalSession object to use for query and event triggering
19
20
        source: The emiting source for the node, either an objectId, NodeId or a Node
21
22
        etype: The event type, either an objectId, a NodeId or a Node object
23
    """
24
25 1
    def __init__(self, isession, etype=None, source=ua.ObjectIds.Server):
26 1
        if not etype:
27
            etype = ua.BaseEvent()
28
29 1
        self.logger = logging.getLogger(__name__)
30 1
        self.isession = isession
31 1
        self.event = None
32 1
        node = None
33
34 1
        if isinstance(etype, ua.BaseEvent):
35 1
            self.event = etype
36 1
        elif isinstance(etype, Node):
37 1
            node = etype
38 1
        elif isinstance(etype, ua.NodeId):
39 1
            node = Node(self.isession, etype)
40
        else:
41 1
            node = Node(self.isession, ua.NodeId(etype))
42
43 1
        if node:
44 1
            self.event = get_event_from_type_node(node)
45
46 1
        if isinstance(source, Node):
47 1
            pass
48 1
        elif isinstance(source, ua.NodeId):
49 1
            source = Node(isession, source)
50
        else:
51 1
            source = Node(isession, ua.NodeId(source))
52
53 1
        if self.event.SourceNode:
54 1
            if source.nodeid != self.event.SourceNode:
55 1
                self.logger.warning("Source NodeId: '%s' and event SourceNode: '%s' are not the same. Using '%s' as SourceNode", str(source.nodeid), str(self.event.SourceNode), str(self.event.SourceNode))
56 1
                source = Node(self.isession, self.event.SourceNode)
57
58 1
        self.event.SourceNode = source.nodeid
59 1
        self.event.SourceName = source.get_display_name().Text
60
61 1
        source.set_attribute(ua.AttributeIds.EventNotifier, ua.DataValue(ua.Variant(1, ua.VariantType.Byte)))
62 1
        refs = []
63 1
        ref = ua.AddReferencesItem()
64 1
        ref.IsForward = True
65 1
        ref.ReferenceTypeId = ua.NodeId(ua.ObjectIds.GeneratesEvent)
66 1
        ref.SourceNodeId = source.nodeid
67 1
        ref.TargetNodeClass = ua.NodeClass.ObjectType
68 1
        ref.TargetNodeId = self.event.EventType
69 1
        refs.append(ref)
70 1
        results = self.isession.add_references(refs)
71
        #result.StatusCode.check()
72
73 1
    def __str__(self):
74
        return "EventGenerator(Type:{}, Source:{}, Time:{}, Message: {})".format(self.EventType, self.SourceNode, self.Time, self.Message)
75 1
    __repr__ = __str__
76
77 1
    def trigger(self, time=None, message=None):
78
        """
79
        Trigger the event. This will send a notification to all subscribed clients
80
        """
81 1
        self.event.EventId = ua.Variant(uuid.uuid4().hex, ua.VariantType.ByteString)
82 1
        if time:
83 1
            self.event.Time = time
84
        else:
85
            self.event.Time = datetime.utcnow()
86 1
        self.event.RecieveTime = datetime.utcnow()
87
        #FIXME: LocalTime is wrong but currently know better. For description s. Part 5 page 18
88 1
        self.event.LocalTime = datetime.utcnow()
89 1
        if message:
90 1
            self.event.Message = ua.LocalizedText(message)
91
        elif not self.event.Message:
92
            self.event.Message = ua.LocalizedText(Node(self.isession, self.event.SourceNode).get_browse_name().Text)
93 1
        self.isession.subscription_service.trigger_event(self.event)
94
95
96 1
def get_event_from_type_node(node):
97 1
    if node.nodeid.Identifier in ua.uaevents_auto.IMPLEMENTED_EVENTS.keys():
98 1
        return ua.uaevents_auto.IMPLEMENTED_EVENTS[node.nodeid.Identifier]()
99
    else:
100 1
        parent_identifier, parent_eventtype = _find_parent_eventtype(node)
101 1 View Code Duplication
        if not parent_eventtype:
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
102
            return None
103
104 1
        class CustomEvent(parent_eventtype):
105
106 1
            def __init__(self):
107 1
                super(CustomEvent, self).__init__(extended=True)
108 1
                self.EventType = node.nodeid
109 1
                curr_node = node
110
111 1
                while curr_node.nodeid.Identifier != parent_identifier:
112 1
                    for prop in curr_node.get_properties():
113 1
                        setattr(self, prop.get_browse_name().Name, prop.get_value())
114 1
                    parents = curr_node.get_referenced_nodes(refs=ua.ObjectIds.HasSubtype, direction=ua.BrowseDirection.Inverse, includesubtypes=False)
115 1
                    if len(parents) != 1: # Something went wrong
116
                        return None
117 1
                    curr_node = parents[0]
118
119 1
                self._freeze = True
120 View Code Duplication
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
121 1
    return CustomEvent()
122
123
124 1
def get_event_properties_from_type_node(node):
125 1
    properties = []
126 1
    curr_node = node
127
128 1
    while True:
129 1
        properties.extend(curr_node.get_properties())
130
131 1
        if curr_node.nodeid.Identifier == ua.ObjectIds.BaseEventType:
132 1
            break
133
134 1
        parents = curr_node.get_referenced_nodes(refs=ua.ObjectIds.HasSubtype, direction=ua.BrowseDirection.Inverse, includesubtypes=False)
135 1
        if len(parents) != 1: # Something went wrong
136
            return None
137 1
        curr_node = parents[0]
138
139 1
    return properties
140
141
142 1
def _find_parent_eventtype(node):
143 1
    parents = node.get_referenced_nodes(refs=ua.ObjectIds.HasSubtype, direction=ua.BrowseDirection.Inverse, includesubtypes=False)
144
145 1
    if len(parents) != 1:   # Something went wrong
146
        return None, None
147 1
    if parents[0].nodeid.Identifier in ua.uaevents_auto.IMPLEMENTED_EVENTS.keys():
148 1
        return parents[0].nodeid.Identifier, ua.uaevents_auto.IMPLEMENTED_EVENTS[parents[0].nodeid.Identifier]
149
    else:
150
        return _find_parent_eventtype(parents[0])
151