Completed
Push — master ( c5c8fe...3620aa )
by Olivier
02:15
created

Event.from_event_fields()   A

Complexity

Conditions 3

Size

Total Lines 15

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 3
c 0
b 0
f 0
dl 0
loc 15
rs 9.4285
1
import copy
2
3
from opcua import ua
4
import opcua
5
from opcua.common.uaerrors import UaError
6
7
8
class Event(object):
9
    """
10
    OPC UA Event object.
11
    This is class in inherited by the common event objects such as BaseEvent,
12
    other auto standard events and custom events
13
    Events are used to trigger events on server side and are
14
    sent to clients for every events from server
15
16
    Developper Warning:
17
    On server side the data type of attributes should be known, thus
18
    add properties using the add_property method!!!
19
    """
20
21
    def __init__(self):
22
        self.server_handle = None
23
        self.select_clauses = None
24
        self.event_fields = None
25
        self.data_types = {}
26
        # save current attributes
27
        self.internal_properties = list(self.__dict__.keys())[:] + ["internal_properties"]
28
29
    def __str__(self):
30
        return "{}({})".format(
31
            self.__class__.__name__, 
32
            [str(k) + ":" + str(v) for k, v in self.__dict__.items() if k not in self.internal_properties])
33
    __repr__ = __str__
34
35
    def add_property(self, name, val, datatype):
36
        """
37
        Add a property to event and tore its data type
38
        """
39
        setattr(self, name, val)
40
        self.data_types[name] = datatype
41
42
    def get_event_props_as_fields_dict(self):
43
        """
44
        convert all properties of the Event class to a dict of variants
45
        """
46
        field_vars = {}
47
        for key, value in vars(self).items():
48
            if not key.startswith("__") and key not in self.internal_properties:
49
                field_vars[key] = ua.Variant(value, self.data_types[key])
50
        return field_vars
51
52
    @staticmethod
53
    def from_field_dict(fields):
54
        """
55
        Create an Event object from a dict of name and variants
56
        """
57
        ev = Event()
58
        for k, v in fields.items():
59
            ev.add_property(k, v.Value, v.VariantType)
60
        return ev
61
62
    def to_event_fields_using_subscription_fields(self, select_clauses):
63
        """
64
        Using a new select_clauses and the original select_clauses
65
        used during subscription, return a field list 
66
        """
67
        fields = []
68
        for sattr in select_clauses:
69
            for idx, o_sattr in enumerate(self.select_clauses):
70
                if sattr.BrowsePath == o_sattr.BrowsePath and sattr.AttributeId == o_sattr.AttributeId:
71
                    fields.append(self.event_fields[idx])
72
                    break
73
        return fields
74
75
    def to_event_fields(self, select_clauses):
76
        """
77
        return a field list using a select clause and the object properties
78
        """
79
        fields = []
80
        for sattr in select_clauses:
81
            if not sattr.BrowsePath:
82
                name = ua.AttributeIds(sattr.AttributeId).name
83
            else:
84
                name = sattr.BrowsePath[0].Name
85
            try:
86
                val = getattr(self, name)
87
            except AttributeError:
88
                field = ua.Variant(None)
89
            else:
90
                field = ua.Variant(copy.deepcopy(val), self.data_types[name])
91
            fields.append(field)
92
        return fields
93
94
    @staticmethod
95
    def from_event_fields(select_clauses, fields):
96
        """
97
        Instanciate an Event object from a select_clauses and fields
98
        """
99
        ev = Event()
100
        ev.select_clauses = select_clauses
101
        ev.event_fields = fields
102
        for idx, sattr in enumerate(select_clauses):
103
            if len(sattr.BrowsePath) == 0:
104
                name = sattr.AttributeId.name
105
            else:
106
                name = sattr.BrowsePath[0].Name
107
            ev.add_property(name, fields[idx].Value, fields[idx].VariantType)
108
        return ev
109
110
111
def get_filter_from_event_type(eventtype):
112
    evfilter = ua.EventFilter()
113
    evfilter.SelectClauses = select_clauses_from_evtype(eventtype)
114
    evfilter.WhereClause = where_clause_from_evtype(eventtype)
115
    return evfilter
116
117
118
def select_clauses_from_evtype(evtype):
119
    clauses = []
120
    for prop in get_event_properties_from_type_node(evtype):
121
        op = ua.SimpleAttributeOperand()
122
        op.TypeDefinitionId = evtype.nodeid
123
        op.AttributeId = ua.AttributeIds.Value
124
        op.BrowsePath = [prop.get_browse_name()]
125
        clauses.append(op)
126
    return clauses
127
128
129
def where_clause_from_evtype(evtype):
130
    cf = ua.ContentFilter()
131
    el = ua.ContentFilterElement()
132
    # operands can be ElementOperand, LiteralOperand, AttributeOperand, SimpleAttribute
133
    op = ua.SimpleAttributeOperand()
134
    op.TypeDefinitionId = evtype.nodeid
135
    op.BrowsePath.append(ua.QualifiedName("EventType", 0))
136
    op.AttributeId = ua.AttributeIds.Value
137 View Code Duplication
    el.FilterOperands.append(op)
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
138
    for subtypeid in [st.nodeid for st in get_node_subtypes(evtype)]:
139
        op = ua.LiteralOperand()
140
        op.Value = ua.Variant(subtypeid)
141
        el.FilterOperands.append(op)
142
    el.FilterOperator = ua.FilterOperator.InList
143
144
    cf.Elements.append(el)
145
    return cf
146
147
148
def get_node_subtypes(node, nodes=None):
149
    if nodes is None:
150
        nodes = [node]
151
    for child in node.get_children(refs=ua.ObjectIds.HasSubtype):
152
        nodes.append(child)
153
        get_node_subtypes(child, nodes)
154
    return nodes
155
156
157
def get_event_properties_from_type_node(node):
158
    properties = []
159
    curr_node = node
160
161
    while True:
162
        properties.extend(curr_node.get_properties())
163
164
        if curr_node.nodeid.Identifier == ua.ObjectIds.BaseEventType:
165
            break
166
167
        parents = curr_node.get_referenced_nodes(refs=ua.ObjectIds.HasSubtype, direction=ua.BrowseDirection.Inverse, includesubtypes=False)
168
        if len(parents) != 1:  # Something went wrong
169
            return None
170
        curr_node = parents[0]
171
172
    return properties
173
174
175
def get_event_obj_from_type_node(node):
176
    """
177
    return an Event object from an event type node
178
    """
179
    if node.nodeid.Identifier in opcua.common.event_objects.IMPLEMENTED_EVENTS.keys():
180
        return opcua.common.event_objects.IMPLEMENTED_EVENTS[node.nodeid.Identifier]()
181
    else:
182
        parent_identifier, parent_eventtype = _find_parent_eventtype(node)
183
        if not parent_eventtype:
184
            return None
185
186
        class CustomEvent(parent_eventtype):
187
188
            def __init__(self):
189
                parent_eventtype.__init__(self)
190
                self.EventType = node.nodeid
191
                curr_node = node
192
193
                while curr_node.nodeid.Identifier != parent_identifier:
194
                    for prop in curr_node.get_properties():
195
                        name = prop.get_browse_name().Name
196
                        val = prop.get_data_value()
197
                        self.add_property(name, val.Value.Value, val.Value.VariantType)
198
                    parents = curr_node.get_referenced_nodes(refs=ua.ObjectIds.HasSubtype, direction=ua.BrowseDirection.Inverse, includesubtypes=False)
199
200
                    if len(parents) != 1:  # Something went wrong
201
                        raise UaError("Parent of event type could notbe found")
202
                    curr_node = parents[0]
203
204
                self._freeze = True
205
206
    return CustomEvent()
207
208
209
def _find_parent_eventtype(node):
210
    """
211
    """
212
    parents = node.get_referenced_nodes(refs=ua.ObjectIds.HasSubtype, direction=ua.BrowseDirection.Inverse, includesubtypes=False)
213
214
    if len(parents) != 1:   # Something went wrong
215
        return None, None
216
    if parents[0].nodeid.Identifier in opcua.common.event_objects.IMPLEMENTED_EVENTS.keys():
217
        return parents[0].nodeid.Identifier, opcua.common.event_objects.IMPLEMENTED_EVENTS[parents[0].nodeid.Identifier]
218
    else:
219
        return _find_parent_eventtype(parents[0])
220
221