Test Failed
Pull Request — master (#112)
by
unknown
02:26
created

asyncua.common.events.Event.to_event_fields()   B

Complexity

Conditions 7

Size

Total Lines 25
Code Lines 19

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 7
eloc 19
nop 2
dl 0
loc 25
rs 8
c 0
b 0
f 0
1
import copy
2
3
from asyncua import ua
4
import asyncua
5
from ..ua.uaerrors import UaError
6
from .ua_utils import get_node_subtypes
7
8
9
class Event:
10
    """
11
    OPC UA Event object.
12
    This is class in inherited by the common event objects such as BaseEvent,
13
    other auto standard events and custom events
14
    Events are used to trigger events on server side and are
15
    sent to clients for every events from server
16
17
    Developper Warning:
18
    On server side the data type of attributes should be known, thus
19
    add properties using the add_property method!!!
20
    """
21
22
    def __init__(self, emitting_node=ua.ObjectIds.Server):
23
        self.server_handle = None
24
        self.select_clauses = None
25
        self.event_fields = None
26
        self.data_types = {}
27
        self.emitting_node = emitting_node
28
        if isinstance(emitting_node, ua.NodeId):
29
            self.emitting_node = emitting_node
30
        else:
31
            self.emitting_node = ua.NodeId(emitting_node)
32
        # save current attributes
33
        self.internal_properties = list(self.__dict__.keys())[:] + ["internal_properties"]
34
35
    def __str__(self):
36
        return "{0}({1})".format(
37
            self.__class__.__name__,
38
            [str(k) + ":" + str(v) for k, v in self.__dict__.items() if k not in self.internal_properties])
39
    __repr__ = __str__
40
41
    def add_property(self, name, val, datatype):
42
        """
43
        Add a property to event and store its data type
44
        """
45
        setattr(self, name, val)
46
        self.data_types[name] = datatype
47
48
    def add_variable(self, name, val, datatype):
49
        """
50
        Add a variable to event and store its data type
51
        variables are able to have properties as children
52
        """
53
        setattr(self, name, val)
54
        self.data_types[name] = datatype
55
56
    def get_event_props_as_fields_dict(self):
57
        """
58
        convert all properties and variables of the Event class to a dict of variants
59
        """
60
        field_vars = {}
61
        for key, value in vars(self).items():
62
            if not key.startswith("__") and key not in self.internal_properties:
63
                field_vars[key] = ua.Variant(value, self.data_types[key])
64
        return field_vars
65
66
    @staticmethod
67
    def from_field_dict(fields):
68
        """
69
        Create an Event object from a dict of name and variants
70
        """
71
        ev = Event()
72
        for k, v in fields.items():
73
            ev.add_property(k, v.Value, v.VariantType)
74
        return ev
75
76
    def to_event_fields_using_subscription_fields(self, select_clauses):
77
        """
78
        Using a new select_clauses and the original select_clauses
79
        used during subscription, return a field list
80
        """
81
        fields = []
82
        for sattr in select_clauses:
83
            for idx, o_sattr in enumerate(self.select_clauses):
84
                if sattr.BrowsePath == o_sattr.BrowsePath and sattr.AttributeId == o_sattr.AttributeId:
85
                    fields.append(self.event_fields[idx])
86
                    break
87
        return fields
88
89
    def to_event_fields(self, select_clauses):
90
        """
91
        return a field list using a select clause and the object properties
92
        """
93
        fields = []
94
        for sattr in select_clauses:
95
            if not sattr.BrowsePath:
96
                name = ua.AttributeIds(sattr.AttributeId).name
97
            else:
98
                name = sattr.BrowsePath[0].Name
99
                iter_paths = iter(sattr.BrowsePath)
100
                next(iter_paths)
101
                for path in iter_paths:
102
                    name += '/' + path.Name
103
            try:
104
                val = getattr(self, name)
105
            except AttributeError:
106
                field = ua.Variant(None)
107
            else:
108
                if val is None:
109
                    field = ua.Variant(None)
110
                else:
111
                    field = ua.Variant(copy.deepcopy(val), self.data_types[name])
112
            fields.append(field)
113
        return fields
114
115
    @staticmethod
116
    def from_event_fields(select_clauses, fields):
117
        """
118
        Instantiate an Event object from a select_clauses and fields
119
        """
120
        ev = Event()
121
        ev.select_clauses = select_clauses
122
        ev.event_fields = fields
123
        for idx, sattr in enumerate(select_clauses):
124
            if len(sattr.BrowsePath) == 0:
125
                name = sattr.AttributeId.name
126
            else:
127
                name = sattr.BrowsePath[0].Name
128
                iter_paths = iter(sattr.BrowsePath)
129
                next(iter_paths)
130
                for path in iter_paths:
131
                    name += '/' + path.Name
132
            ev.add_property(name, fields[idx].Value, fields[idx].VariantType)
133
        return ev
134
135
136
async def get_filter_from_event_type(eventtypes):
137
    evfilter = ua.EventFilter()
138
    evfilter.SelectClauses = await select_clauses_from_evtype(eventtypes)
139
    evfilter.WhereClause = await where_clause_from_evtype(eventtypes)
140
    return evfilter
141
142
143
async def select_clauses_from_evtype(evtypes):
144
    clauses = []
145
    selected_paths = []
146
    for evtype in evtypes:
147
        for prop in await get_event_properties_from_type_node(evtype):
148
            browse_name = await prop.read_browse_name()
149
            if browse_name not in selected_paths:
150
                op = ua.SimpleAttributeOperand()
151
                op.AttributeId = ua.AttributeIds.Value
152
                op.BrowsePath = [browse_name]
153
                clauses.append(op)
154
                selected_paths.append(browse_name)
155
        for var in await get_event_variables_from_type_node(evtype):
156
            browse_name = await var.get_browse_name()
157
            if browse_name not in selected_paths:
158
                op = ua.SimpleAttributeOperand()
159
                op.AttributeId = ua.AttributeIds.Value
160
                op.BrowsePath = [browse_name]
161
                clauses.append(op)
162
                selected_paths.append(browse_name)
163
164
        for var in await get_event_variables_from_type_node(evtype):
165
            browse_name = await var.get_browse_name()
166
            if browse_name not in selected_paths:
167
                op = ua.SimpleAttributeOperand()
168
                op.AttributeId = ua.AttributeIds.Value
169
                op.BrowsePath = [browse_name]
170
                clauses.append(op)
171
                selected_paths.append(browse_name)
172
            for prop in await var.get_properties():
173
                browse_path = [browse_name, await prop.get_browse_name()]
174
                if browse_path not in selected_paths:
175
                    op = ua.SimpleAttributeOperand()
176
                    op.AttributeId = ua.AttributeIds.Value
177
                    op.BrowsePath = browse_path
178
                    clauses.append(op)
179
                    selected_paths.append(browse_path)
180
    return clauses
181
182
183
async def where_clause_from_evtype(evtypes):
184
    cf = ua.ContentFilter()
185
    el = ua.ContentFilterElement()
186
    # operands can be ElementOperand, LiteralOperand, AttributeOperand, SimpleAttribute
187
    # Create a clause where the generate event type property EventType
188
    # must be a subtype of events in evtypes argument
189
190
    # the first operand is the attribute event type
191
    op = ua.SimpleAttributeOperand()
192
    # op.TypeDefinitionId = evtype.nodeid
193
    op.BrowsePath.append(ua.QualifiedName("EventType", 0))
194
    op.AttributeId = ua.AttributeIds.Value
195
    el.FilterOperands.append(op)
196
    # now create a list of all subtypes we want to accept
197
    subtypes = []
198
    for evtype in evtypes:
199
        for st in await get_node_subtypes(evtype):
200
            subtypes.append(st.nodeid)
201
    subtypes = list(set(subtypes))  # remove duplicates
202
    for subtypeid in subtypes:
203
        op = ua.LiteralOperand()
204
        op.Value = ua.Variant(subtypeid)
205
        el.FilterOperands.append(op)
206
    el.FilterOperator = ua.FilterOperator.InList
207
    cf.Elements.append(el)
208
    return cf
209
210
211
async def get_event_variables_from_type_node(node):
212
    variables = []
213
    curr_node = node
214
    while True:
215
        variables.extend(await curr_node.get_variables())
216
        if curr_node.nodeid.Identifier == ua.ObjectIds.BaseEventType:
217
            break
218
        parents = await curr_node.get_referenced_nodes(
219
            refs=ua.ObjectIds.HasSubtype, direction=ua.BrowseDirection.Inverse, includesubtypes=True)
220
        if len(parents) != 1:  # Something went wrong
221
            return None
222
        curr_node = parents[0]
223
    return variables
224
225
226
async def get_event_properties_from_type_node(node):
227
    properties = []
228
    curr_node = node
229
    while True:
230
        properties.extend(await curr_node.get_properties())
231
        if curr_node.nodeid.Identifier == ua.ObjectIds.BaseEventType:
232
            break
233
        parents = await curr_node.get_referenced_nodes(
234
            refs=ua.ObjectIds.HasSubtype, direction=ua.BrowseDirection.Inverse, includesubtypes=True
235
        )
236
        if len(parents) != 1:  # Something went wrong
237
            return None
238
        curr_node = parents[0]
239
    return properties
240
241
242
async def get_event_obj_from_type_node(node):
243
    """
244
    return an Event object from an event type node
245
    """
246
247
    if node.nodeid.NamespaceIndex == 0:
248
        if node.nodeid.Identifier in asyncua.common.event_objects.IMPLEMENTED_EVENTS.keys():
249
            return asyncua.common.event_objects.IMPLEMENTED_EVENTS[node.nodeid.Identifier]()
250
251
    parent_identifier, parent_eventtype = await _find_parent_eventtype(node)
252
253
    class CustomEvent(parent_eventtype):
254
255
        def __init__(self):
256
            parent_eventtype.__init__(self)
0 ignored issues
show
introduced by
The variable parent_eventtype does not seem to be defined for all execution paths.
Loading history...
257
            self.EventType = node.nodeid
258
259
        async def init(self):
260
            curr_node = node
261
            while curr_node.nodeid.Identifier != parent_identifier:
0 ignored issues
show
introduced by
The variable parent_identifier does not seem to be defined for all execution paths.
Loading history...
262
                for prop in await curr_node.get_properties():
263
                    name = (await prop.get_browse_name()).Name
264
                    val = await prop.get_data_value()
265
                    self.add_property(name, val.Value.Value, val.Value.VariantType)
266
                for var in await curr_node.get_variables():
267
                    name = (await var.get_browse_name()).Name
268
                    val = await var.get_data_value()
269
                    self.add_variable(name, val.Value.Value, await var.get_data_type_as_variant_type())
270
                    for prop in await var.get_properties():
271
                        prop_name = (await prop.get_browse_name()).Name
272
                        name = '%s/%s' % (name, prop_name)
273
                        val = await prop.get_data_value()
274
                        self.add_property(name, val.Value.Value, val.Value.VariantType)
275
                parents = await curr_node.get_referenced_nodes(refs=ua.ObjectIds.HasSubtype,
276
                                                               direction=ua.BrowseDirection.Inverse,
277
                                                               includesubtypes=True)
278
                if len(parents) != 1:  # Something went wrong
279
                    raise UaError("Parent of event type could not be found")
280
                curr_node = parents[0]
281
            self._freeze = True
282
283
    ce = CustomEvent()
284
    await ce.init()
285
    return ce
286
287
288
async def _find_parent_eventtype(node):
289
    """
290
    """
291
292
    parents = await node.get_referenced_nodes(refs=ua.ObjectIds.HasSubtype, direction=ua.BrowseDirection.Inverse,
293
                                              includesubtypes=True)
294
295
    if len(parents) != 1:   # Something went wrong
296
        raise UaError("Parent of event type could not be found")
297
    if parents[0].nodeid.NamespaceIndex == 0:
298
        if parents[0].nodeid.Identifier in asyncua.common.event_objects.IMPLEMENTED_EVENTS.keys():
299
            return parents[0].nodeid.Identifier, asyncua.common.event_objects.IMPLEMENTED_EVENTS[parents[0].nodeid.Identifier]
300
    return await _find_parent_eventtype(parents[0])
301