Completed
Pull Request — master (#200)
by Olivier
03:33
created

Event.from_event_fields()   A

Complexity

Conditions 3

Size

Total Lines 15

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 3
c 0
b 0
f 0
dl 0
loc 15
rs 9.4285
1
from opcua import ua
2
import opcua
3
from opcua.common.uaerrors import UaError
4
5
6
class Event(object):
7
    """
8
    OPC UA Event object.
9
    This is class in inherited by the common event objects such as BaseEvent,
10
    other auto standard events and custom events
11
    Events are used to trigger events on server side and are
12
    sent to clients for every events from server
13
14
    Developper Warning:
15
    On server side the data type of attributes should be known, thus
16
    add properties using the add_property method!!!
17
    """
18
19
    def __init__(self):
20
        self.server_handle = None
21
        self.select_clauses = None
22
        self.event_fields = None
23
        self.data_types = {}
24
        # save current attributes
25
        self.internal_properties = list(self.__dict__.keys())[:] + ["internal_properties"]
26
27
    def __str__(self):
28
        return "{}({})".format(
29
            self.__class__.__name__, 
30
            [str(k) + ":" + str(v) for k, v in self.__dict__.items() if k not in self.internal_properties])
31
    __repr__ = __str__
32
33
    def add_property(self, name, val, datatype):
34
        """
35
        Add a property to event and tore its data type
36
        """
37
        setattr(self, name, val)
38
        self.data_types[name] = datatype
39
40
    def get_event_props_as_fields_dict(self):
41
        """
42
        convert all properties of the Event class to a dict of variants
43
        """
44
        field_vars = {}
45
        for key, value in vars(self).items():
46
            if not key.startswith("__") and key not in self.internal_properties:
47
                field_vars[key] = ua.Variant(value, self.data_types[key])
48
        return field_vars
49
50
    @staticmethod
51
    def from_field_dict(fields):
52
        """
53
        Create an Event object from a dict of name and variants
54
        """
55
        ev = Event()
56
        for k, v in fields.items():
57
            ev.add_property(k, v.Value, v.VariantType)
58
        return ev
59
60
    def to_event_fields_using_subscription_fields(self, select_clauses):
61
        """
62
        Using a new select_clauses and the original select_clauses
63
        used during subscription, return a field list 
64
        """
65
        fields = []
66
        for sattr in select_clauses:
67
            for idx, o_sattr in enumerate(self.select_clauses):
68
                if sattr.BrowsePath == o_sattr.BrowsePath and sattr.AttributeId == o_sattr.AttributeId:
69
                    fields.append(self.event_fields[idx])
70
                    break
71
        return fields
72
73
    def to_event_fields(self, select_clauses):
74
        """
75
        return a field list using a select clause and the object properties
76
        """
77
        fields = []
78
        for sattr in select_clauses:
79
            if len(sattr.BrowsePath) == 0:
80
                name = sattr.AttributeId.name
81
            else:
82
                name = sattr.BrowsePath[0].Name
83
            field = getattr(self, name)
84
            fields.append(ua.Variant(field, self.data_types[name]))
85
        return fields
86
87
    @staticmethod
88
    def from_event_fields(select_clauses, fields):
89
        """
90
        Instanciate an Event object from a select_clauses and fields
91
        """
92
        ev = Event()
93
        ev.select_clauses = select_clauses
94
        ev.event_fields = fields
95
        for idx, sattr in enumerate(select_clauses):
96
            if len(sattr.BrowsePath) == 0:
97
                name = sattr.AttributeId.name
98
            else:
99
                name = sattr.BrowsePath[0].Name
100
            ev.add_property(name, fields[idx].Value, fields[idx].VariantType)
101
        return ev
102
103
104
def get_filter_from_event_type(eventtype):
105
    evfilter = ua.EventFilter()
106
    evfilter.SelectClauses = select_clauses_from_evtype(eventtype)
107
    evfilter.WhereClause = where_clause_from_evtype(eventtype)
108
    return evfilter
109
110
111
def select_clauses_from_evtype(evtype):
112
    clauses = []
113
    for prop in get_event_properties_from_type_node(evtype):
114
        op = ua.SimpleAttributeOperand()
115
        op.TypeDefinitionId = evtype.nodeid
116
        op.AttributeId = ua.AttributeIds.Value
117
        op.BrowsePath = [prop.get_browse_name()]
118
        clauses.append(op)
119
    return clauses
120
121
122
def where_clause_from_evtype(evtype):
123
    cf = ua.ContentFilter()
124
    el = ua.ContentFilterElement()
125
    # operands can be ElementOperand, LiteralOperand, AttributeOperand, SimpleAttribute
126
    op = ua.SimpleAttributeOperand()
127
    op.TypeDefinitionId = evtype.nodeid
128
    op.BrowsePath.append(ua.QualifiedName("EventType", 0))
129
    op.AttributeId = ua.AttributeIds.Value
130
    el.FilterOperands.append(op)
131
    for subtypeid in [st.nodeid for st in get_node_subtypes(evtype)]:
132
        op = ua.LiteralOperand()
133
        op.Value = ua.Variant(subtypeid)
134
        el.FilterOperands.append(op)
135
    el.FilterOperator = ua.FilterOperator.InList
136
137 View Code Duplication
    cf.Elements.append(el)
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
138
    return cf
139
140
141
def get_node_subtypes(node, nodes=None):
142
    if nodes is None:
143
        nodes = [node]
144
    for child in node.get_children(refs=ua.ObjectIds.HasSubtype):
145
        nodes.append(child)
146
        get_node_subtypes(child, nodes)
147
    return nodes
148
149
150
def get_event_properties_from_type_node(node):
151
    properties = []
152
    curr_node = node
153
154
    while True:
155
        properties.extend(curr_node.get_properties())
156
157
        if curr_node.nodeid.Identifier == ua.ObjectIds.BaseEventType:
158
            break
159
160
        parents = curr_node.get_referenced_nodes(refs=ua.ObjectIds.HasSubtype, direction=ua.BrowseDirection.Inverse, includesubtypes=False)
161
        if len(parents) != 1:  # Something went wrong
162
            return None
163
        curr_node = parents[0]
164
165
    return properties
166
167
168
def get_event_obj_from_type_node(node):
169
    """
170
    return an Event object from an event type node
171
    """
172
    if node.nodeid.Identifier in opcua.common.event_objects.IMPLEMENTED_EVENTS.keys():
173
        return opcua.common.event_objects.IMPLEMENTED_EVENTS[node.nodeid.Identifier]()
174
    else:
175
        parent_identifier, parent_eventtype = _find_parent_eventtype(node)
176
        if not parent_eventtype:
177
            return None
178
179
        class CustomEvent(parent_eventtype):
180
181
            def __init__(self):
182
                parent_eventtype.__init__(self)
183
                self.EventType = node.nodeid
184
                curr_node = node
185
186
                while curr_node.nodeid.Identifier != parent_identifier:
187
                    for prop in curr_node.get_properties():
188
                        name = prop.get_browse_name().Name
189
                        val = prop.get_data_value()
190
                        self.add_property(name, val.Value.Value, val.Value.VariantType)
191
                    parents = curr_node.get_referenced_nodes(refs=ua.ObjectIds.HasSubtype, direction=ua.BrowseDirection.Inverse, includesubtypes=False)
192
193
                    if len(parents) != 1:  # Something went wrong
194
                        raise UaError("Parent of event type could notbe found")
195
                    curr_node = parents[0]
196
197
                self._freeze = True
198
199
    return CustomEvent()
200
201
202
def _find_parent_eventtype(node):
203
    """
204
    """
205
    parents = node.get_referenced_nodes(refs=ua.ObjectIds.HasSubtype, direction=ua.BrowseDirection.Inverse, includesubtypes=False)
206
207
    if len(parents) != 1:   # Something went wrong
208
        return None, None
209
    if parents[0].nodeid.Identifier in opcua.common.event_objects.IMPLEMENTED_EVENTS.keys():
210
        return parents[0].nodeid.Identifier, opcua.common.event_objects.IMPLEMENTED_EVENTS[parents[0].nodeid.Identifier]
211
    else:
212
        return _find_parent_eventtype(parents[0])
213
214