Completed
Pull Request — master (#494)
by Olivier
03:37
created

CustomEvent   A

Complexity

Total Complexity 4

Size/Duplication

Total Lines 19
Duplicated Lines 0 %

Test Coverage

Coverage 0%

Importance

Changes 0
Metric Value
c 0
b 0
f 0
dl 0
loc 19
ccs 0
cts 15
cp 0
rs 10
wmc 4
1 1
import copy
2
3 1
from opcua import ua
4 1
import opcua
5 1
from opcua.ua.uaerrors import UaError
6 1
from opcua.common import ua_utils
7
8
9 1
class Event(object):
10
    """
11
    OPC UA Event object.
12
    This is class in inherited by the common event objects such as BaseEvent,
13
    other auto standard events and custom events
14
    Events are used to trigger events on server side and are
15
    sent to clients for every events from server
16
17
    Developper Warning:
18
    On server side the data type of attributes should be known, thus
19
    add properties using the add_property method!!!
20
    """
21
22 1
    def __init__(self):
23
        self.server_handle = None
24
        self.select_clauses = None
25
        self.event_fields = None
26
        self.data_types = {}
27
        # save current attributes
28
        self.internal_properties = list(self.__dict__.keys())[:] + ["internal_properties"]
29
30 1
    def __str__(self):
31
        return "{0}({1})".format(
32
            self.__class__.__name__, 
33
            [str(k) + ":" + str(v) for k, v in self.__dict__.items() if k not in self.internal_properties])
34 1
    __repr__ = __str__
35
36 1
    def add_property(self, name, val, datatype):
37
        """
38
        Add a property to event and tore its data type
39
        """
40
        setattr(self, name, val)
41
        self.data_types[name] = datatype
42
43 1
    def get_event_props_as_fields_dict(self):
44
        """
45
        convert all properties of the Event class to a dict of variants
46
        """
47
        field_vars = {}
48
        for key, value in vars(self).items():
49
            if not key.startswith("__") and key not in self.internal_properties:
50
                field_vars[key] = ua.Variant(value, self.data_types[key])
51
        return field_vars
52
53 1
    @staticmethod
54
    def from_field_dict(fields):
55
        """
56
        Create an Event object from a dict of name and variants
57
        """
58
        ev = Event()
59
        for k, v in fields.items():
60
            ev.add_property(k, v.Value, v.VariantType)
61
        return ev
62
63 1
    def to_event_fields_using_subscription_fields(self, select_clauses):
64
        """
65
        Using a new select_clauses and the original select_clauses
66
        used during subscription, return a field list 
67
        """
68
        fields = []
69
        for sattr in select_clauses:
70
            for idx, o_sattr in enumerate(self.select_clauses):
71
                if sattr.BrowsePath == o_sattr.BrowsePath and sattr.AttributeId == o_sattr.AttributeId:
72
                    fields.append(self.event_fields[idx])
73
                    break
74
        return fields
75
76 1
    def to_event_fields(self, select_clauses):
77
        """
78
        return a field list using a select clause and the object properties
79
        """
80
        fields = []
81
        for sattr in select_clauses:
82
            if not sattr.BrowsePath:
83
                name = ua.AttributeIds(sattr.AttributeId).name
84
            else:
85
                name = sattr.BrowsePath[0].Name
86
            try:
87
                val = getattr(self, name)
88
            except AttributeError:
89
                field = ua.Variant(None)
90
            else:
91
                field = ua.Variant(copy.deepcopy(val), self.data_types[name])
92
            fields.append(field)
93
        return fields
94
95 1
    @staticmethod
96
    def from_event_fields(select_clauses, fields):
97
        """
98
        Instantiate an Event object from a select_clauses and fields
99
        """
100
        ev = Event()
101
        ev.select_clauses = select_clauses
102
        ev.event_fields = fields
103
        for idx, sattr in enumerate(select_clauses):
104
            if len(sattr.BrowsePath) == 0:
105
                name = sattr.AttributeId.name
106
            else:
107
                name = sattr.BrowsePath[0].Name
108
            ev.add_property(name, fields[idx].Value, fields[idx].VariantType)
109
        return ev
110
111
112 1
def get_filter_from_event_type(eventtypes):
113
    evfilter = ua.EventFilter()
114
    evfilter.SelectClauses = select_clauses_from_evtype(eventtypes)
115
    evfilter.WhereClause = where_clause_from_evtype(eventtypes)
116
    return evfilter
117
118
119 1
def select_clauses_from_evtype(evtypes):
120
    clauses = []
121
122
    selected_paths = []
123
    for evtype in evtypes:
124
        for prop in get_event_properties_from_type_node(evtype):
125
            if prop.get_browse_name() not in selected_paths:
126
                op = ua.SimpleAttributeOperand()
127
                op.AttributeId = ua.AttributeIds.Value
128
                op.BrowsePath = [prop.get_browse_name()]
129
                clauses.append(op)
130
                selected_paths.append(prop.get_browse_name())
131
    return clauses
132
133
134 1
def where_clause_from_evtype(evtypes):
135
    cf = ua.ContentFilter()
136
    el = ua.ContentFilterElement()
137 View Code Duplication
    
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
138
    # operands can be ElementOperand, LiteralOperand, AttributeOperand, SimpleAttribute
139
    # Create a clause where the generate event type property EventType
140
    # must be a subtype of events in evtypes argument
141
142
    # the first operand is the attribute event type
143
    op = ua.SimpleAttributeOperand()
144
    # op.TypeDefinitionId = evtype.nodeid
145
    op.BrowsePath.append(ua.QualifiedName("EventType", 0))
146
    op.AttributeId = ua.AttributeIds.Value
147
    el.FilterOperands.append(op)
148
149
    # now create a list of all subtypes we want to accept
150
    subtypes = []
151
    for evtype in evtypes:
152
        subtypes += [st.nodeid for st in ua_utils.get_node_subtypes(evtype)]
153
    subtypes = list(set(subtypes))  # remove duplicates
154
    for subtypeid in subtypes:
155
        op = ua.LiteralOperand()
156
        op.Value = ua.Variant(subtypeid)
157
        el.FilterOperands.append(op)
158
159
    el.FilterOperator = ua.FilterOperator.InList
160
    cf.Elements.append(el)
161
162
    return cf
163
164
165 1
def get_event_properties_from_type_node(node):
166
    properties = []
167
    curr_node = node
168
169
    while True:
170
        properties.extend(curr_node.get_properties())
171
172
        if curr_node.nodeid.Identifier == ua.ObjectIds.BaseEventType:
173
            break
174
175
        parents = curr_node.get_referenced_nodes(refs=ua.ObjectIds.HasSubtype, direction=ua.BrowseDirection.Inverse, includesubtypes=True)
176
        if len(parents) != 1:  # Something went wrong
177
            return None
178
        curr_node = parents[0]
179
180
    return properties
181
182
183 1
def get_event_obj_from_type_node(node):
184
    """
185
    return an Event object from an event type node
186
    """
187
    if node.nodeid.Identifier in opcua.common.event_objects.IMPLEMENTED_EVENTS.keys():
188
        return opcua.common.event_objects.IMPLEMENTED_EVENTS[node.nodeid.Identifier]()
189
    else:
190
        parent_identifier, parent_eventtype = _find_parent_eventtype(node)
191
192
        class CustomEvent(parent_eventtype):
193
194
            def __init__(self):
195
                parent_eventtype.__init__(self)
196
                self.EventType = node.nodeid
197
                curr_node = node
198
199
                while curr_node.nodeid.Identifier != parent_identifier:
200
                    for prop in curr_node.get_properties():
201
                        name = prop.get_browse_name().Name
202
                        val = prop.get_data_value()
203
                        self.add_property(name, val.Value.Value, val.Value.VariantType)
204
                    parents = curr_node.get_referenced_nodes(refs=ua.ObjectIds.HasSubtype, direction=ua.BrowseDirection.Inverse, includesubtypes=True)
205
206
                    if len(parents) != 1:  # Something went wrong
207
                        raise UaError("Parent of event type could notbe found")
208
                    curr_node = parents[0]
209
210
                self._freeze = True
211
212
    return CustomEvent()
213
214
215 1
def _find_parent_eventtype(node):
216
    """
217
    """
218
    parents = node.get_referenced_nodes(refs=ua.ObjectIds.HasSubtype, direction=ua.BrowseDirection.Inverse, includesubtypes=True)
219
220
    if len(parents) != 1:   # Something went wrong
221
        raise UaError("Parent of event type could notbe found")
222
    if parents[0].nodeid.Identifier in opcua.common.event_objects.IMPLEMENTED_EVENTS.keys():
223
        return parents[0].nodeid.Identifier, opcua.common.event_objects.IMPLEMENTED_EVENTS[parents[0].nodeid.Identifier]
224
    else:
225
        return _find_parent_eventtype(parents[0])
226
227