asyncua.common.events.Event.from_event_fields()   A
last analyzed

Complexity

Conditions 3

Size

Total Lines 15
Code Lines 11

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 3
eloc 11
nop 2
dl 0
loc 15
rs 9.85
c 0
b 0
f 0
1
import copy
2
3
from asyncua import ua
4
import asyncua
5
from ..ua.uaerrors import UaError
6
from .ua_utils import get_node_subtypes
7
8
9
class Event:
10
    """
11
    OPC UA Event object.
12
    This is class in inherited by the common event objects such as BaseEvent,
13
    other auto standard events and custom events
14
    Events are used to trigger events on server side and are
15
    sent to clients for every events from server
16
17
    Developper Warning:
18
    On server side the data type of attributes should be known, thus
19
    add properties using the add_property method!!!
20
    """
21
22
    def __init__(self, emitting_node=ua.ObjectIds.Server):
23
        self.server_handle = None
24
        self.select_clauses = None
25
        self.event_fields = None
26
        self.data_types = {}
27
        self.emitting_node = emitting_node
28
        if isinstance(emitting_node, ua.NodeId):
29
            self.emitting_node = emitting_node
30
        else:
31
            self.emitting_node = ua.NodeId(emitting_node)
32
        # save current attributes
33
        self.internal_properties = list(self.__dict__.keys())[:] + ["internal_properties"]
34
35
    def __str__(self):
36
        return "{0}({1})".format(
37
            self.__class__.__name__,
38
            [str(k) + ":" + str(v) for k, v in self.__dict__.items() if k not in self.internal_properties])
39
    __repr__ = __str__
40
41
    def add_property(self, name, val, datatype):
42
        """
43
        Add a property to event and tore its data type
44
        """
45
        setattr(self, name, val)
46
        self.data_types[name] = datatype
47
48
    def get_event_props_as_fields_dict(self):
49
        """
50
        convert all properties of the Event class to a dict of variants
51
        """
52
        field_vars = {}
53
        for key, value in vars(self).items():
54
            if not key.startswith("__") and key not in self.internal_properties:
55
                field_vars[key] = ua.Variant(value, self.data_types[key])
56
        return field_vars
57
58
    @staticmethod
59
    def from_field_dict(fields):
60
        """
61
        Create an Event object from a dict of name and variants
62
        """
63
        ev = Event()
64
        for k, v in fields.items():
65
            ev.add_property(k, v.Value, v.VariantType)
66
        return ev
67
68
    def to_event_fields_using_subscription_fields(self, select_clauses):
69
        """
70
        Using a new select_clauses and the original select_clauses
71
        used during subscription, return a field list
72
        """
73
        fields = []
74
        for sattr in select_clauses:
75
            for idx, o_sattr in enumerate(self.select_clauses):
76
                if sattr.BrowsePath == o_sattr.BrowsePath and sattr.AttributeId == o_sattr.AttributeId:
77
                    fields.append(self.event_fields[idx])
78
                    break
79
        return fields
80
81
    def to_event_fields(self, select_clauses):
82
        """
83
        return a field list using a select clause and the object properties
84
        """
85
        fields = []
86
        for sattr in select_clauses:
87
            if not sattr.BrowsePath:
88
                name = ua.AttributeIds(sattr.AttributeId).name
89
            else:
90
                name = sattr.BrowsePath[0].Name
91
            try:
92
                val = getattr(self, name)
93
            except AttributeError:
94
                field = ua.Variant(None)
95
            else:
96
                field = ua.Variant(copy.deepcopy(val), self.data_types[name])
97
            fields.append(field)
98
        return fields
99
100
    @staticmethod
101
    def from_event_fields(select_clauses, fields):
102
        """
103
        Instantiate an Event object from a select_clauses and fields
104
        """
105
        ev = Event()
106
        ev.select_clauses = select_clauses
107
        ev.event_fields = fields
108
        for idx, sattr in enumerate(select_clauses):
109
            if len(sattr.BrowsePath) == 0:
110
                name = sattr.AttributeId.name
111
            else:
112
                name = sattr.BrowsePath[0].Name
113
            ev.add_property(name, fields[idx].Value, fields[idx].VariantType)
114
        return ev
115
116
117
async def get_filter_from_event_type(eventtypes):
118
    evfilter = ua.EventFilter()
119
    evfilter.SelectClauses = await select_clauses_from_evtype(eventtypes)
120
    evfilter.WhereClause = await where_clause_from_evtype(eventtypes)
121
    return evfilter
122
123
124
async def select_clauses_from_evtype(evtypes):
125
    clauses = []
126
    selected_paths = []
127
    for evtype in evtypes:
128
        event_props_and_vars = await get_event_properties_from_type_node(evtype)
129
        event_props_and_vars.extend(await get_event_variables_from_type_node(evtype))
130
        for node in event_props_and_vars:
131
            browse_name = await node.read_browse_name()
132
            if browse_name not in selected_paths:
133
                op = ua.SimpleAttributeOperand()
134
                op.AttributeId = ua.AttributeIds.Value
135
                op.BrowsePath = [browse_name]
136
                clauses.append(op)
137
                selected_paths.append(browse_name)
138
    return clauses
139
140
141
async def where_clause_from_evtype(evtypes):
142
    cf = ua.ContentFilter()
143
    el = ua.ContentFilterElement()
144
    # operands can be ElementOperand, LiteralOperand, AttributeOperand, SimpleAttribute
145
    # Create a clause where the generate event type property EventType
146
    # must be a subtype of events in evtypes argument
147
148
    # the first operand is the attribute event type
149
    op = ua.SimpleAttributeOperand()
150
    # op.TypeDefinitionId = evtype.nodeid
151
    op.BrowsePath.append(ua.QualifiedName("EventType", 0))
152
    op.AttributeId = ua.AttributeIds.Value
153
    el.FilterOperands.append(op)
154
    # now create a list of all subtypes we want to accept
155
    subtypes = []
156
    for evtype in evtypes:
157
        for st in await get_node_subtypes(evtype):
158
            subtypes.append(st.nodeid)
159
    subtypes = list(set(subtypes))  # remove duplicates
160
    for subtypeid in subtypes:
161
        op = ua.LiteralOperand()
162
        op.Value = ua.Variant(subtypeid)
163
        el.FilterOperands.append(op)
164
    el.FilterOperator = ua.FilterOperator.InList
165
    cf.Elements.append(el)
166
    return cf
167
168
169
async def get_event_properties_from_type_node(node):
170
    properties = []
171
    curr_node = node
172
    while True:
173
        properties.extend(await curr_node.get_properties())
174
        if curr_node.nodeid.Identifier == ua.ObjectIds.BaseEventType:
175
            break
176
        parents = await curr_node.get_referenced_nodes(
177
            refs=ua.ObjectIds.HasSubtype, direction=ua.BrowseDirection.Inverse, includesubtypes=True
178
        )
179
        if len(parents) != 1:  # Something went wrong
180
            return None
181
        curr_node = parents[0]
182
    return properties
183
184
185
async def get_event_variables_from_type_node(node):
186
    variables = []
187
    curr_node = node
188
    while True:
189
        variables.extend(await curr_node.get_variables())
190
        if curr_node.nodeid.Identifier == ua.ObjectIds.BaseEventType:
191
            break
192
        parents = await curr_node.get_referenced_nodes(
193
            refs=ua.ObjectIds.HasSubtype, direction=ua.BrowseDirection.Inverse, includesubtypes=True
194
        )
195
        if len(parents) != 1:  # Something went wrong
196
            return None
197
        curr_node = parents[0]
198
    return variables
199
200
201
async def get_event_obj_from_type_node(node):
202
    """
203
    return an Event object from an event type node
204
    """
205
    if node.nodeid.Identifier in asyncua.common.event_objects.IMPLEMENTED_EVENTS.keys():
206
        return asyncua.common.event_objects.IMPLEMENTED_EVENTS[node.nodeid.Identifier]()
207
    else:
208
        parent_identifier, parent_eventtype = await _find_parent_eventtype(node)
209
210
        class CustomEvent(parent_eventtype):
211
212
            def __init__(self):
213
                parent_eventtype.__init__(self)
0 ignored issues
show
introduced by
The variable parent_eventtype does not seem to be defined for all execution paths.
Loading history...
214
                self.EventType = node.nodeid
215
216
            async def init(self):
217
                curr_node = node
218
                while curr_node.nodeid.Identifier != parent_identifier:
0 ignored issues
show
introduced by
The variable parent_identifier does not seem to be defined for all execution paths.
Loading history...
219
                    node_props_and_vars = await curr_node.get_properties()
220
                    node_props_and_vars.extend(await curr_node.get_variables())
221
                    for field in node_props_and_vars:
222
                        name = (await field.read_browse_name()).Name
223
                        val = await field.read_data_value()
224
                        self.add_property(name, val.Value.Value, val.Value.VariantType)
225
                    parents = await curr_node.get_referenced_nodes(refs=ua.ObjectIds.HasSubtype,
226
                                                                   direction=ua.BrowseDirection.Inverse,
227
                                                                   includesubtypes=True)
228
229
                    if len(parents) != 1:  # Something went wrong
230
                        raise UaError("Parent of event type could notbe found")
231
                    curr_node = parents[0]
232
233
                self._freeze = True
234
235
    ce = CustomEvent()
236
    await ce.init()
237
    return ce
238
239
240
async def _find_parent_eventtype(node):
241
    """
242
    """
243
    parents = await node.get_referenced_nodes(refs=ua.ObjectIds.HasSubtype,
244
                                              direction=ua.BrowseDirection.Inverse, includesubtypes=True)
245
246
    if len(parents) != 1:   # Something went wrong
247
        raise UaError("Parent of event type could notbe found")
248
    if parents[0].nodeid.Identifier in asyncua.common.event_objects.IMPLEMENTED_EVENTS.keys():
249
        return parents[0].nodeid.Identifier,\
250
               asyncua.common.event_objects.IMPLEMENTED_EVENTS[parents[0].nodeid.Identifier]
251
    else:
252
        return await _find_parent_eventtype(parents[0])
253