Completed
Pull Request — master (#190)
by Olivier
03:44
created

EventResult.to_event_fields()   A

Complexity

Conditions 3

Size

Total Lines 15

Duplication

Lines 0
Ratio 0 %

Importance

Changes 1
Bugs 0 Features 1
Metric Value
cc 3
c 1
b 0
f 1
dl 0
loc 15
rs 9.4285
1
from opcua import ua
2
3
4
class EventResult(object):
5
    """
6
    To be sent to clients for every events from server
7
    """
8
9
    def __init__(self):
10
        self.server_handle = None
11
        self.select_clauses = None
12
        self.event_fields = None
13
        self.data_types = {}
14
        # save current attributes
15
        self.internal_properties = list(self.__dict__.keys())[:] + ["internal_properties"]
16
17
    def __str__(self):
18
        return "EventResult({})".format([str(k) + ":" + str(v) for k, v in self.__dict__.items() if k not in self.internal_properties])
19
    __repr__ = __str__
20
21
    def get_event_props_as_fields_dict(self):
22
        """
23
        convert all properties of the EventResult class to a dict of variants
24
        """
25
        field_vars = {}
26
        for key, value in vars(self).items():
27
            if not key.startswith("__") and key not in self.internal_properties:
28
                field_vars[key] = ua.Variant(value, self.data_types[key])
29
        return field_vars
30
31
    @staticmethod
32
    def from_field_dict(fields):
33
        """
34
        Create an Event object from a dict of name and variants
35
        """
36
        result = EventResult()
37
        for k, v in fields.items():
38
            setattr(result, k, v.Value)
39
            result.data_types[k] = v.VariantType
40
        return result
41
42
    def to_event_fields_using_subscription_fields(self, select_clauses):
43
        """
44
        Using a new select_clauses and the original select_clauses
45
        used during subscription, return a field list 
46
        """
47
        fields = []
48
        for sattr in select_clauses:
49
            for idx, o_sattr in enumerate(self.select_clauses):
50
                if sattr.BrowsePath == o_sattr.BrowsePath and sattr.AttributeId == o_sattr.AttributeId:
51
                    fields.append(self.event_fields[idx])
52
                    break
53
        return fields
54
55
    def to_event_fields(self, select_clauses):
56
        """
57
        return a field list using a select clause and the object properties
58
        """
59
        fields = []
60
        for sattr in select_clauses:
61
            if len(sattr.BrowsePath) == 0:
62
                name = sattr.AttributeId.name
63
            else:
64
                name = sattr.BrowsePath[0].Name
65
            print(self)
66
            print(dir(self))
67
            field = getattr(self, name)
68
            fields.append(ua.Variant(field, self.data_types[name]))
69
        return fields
70
71
    @staticmethod
72
    def from_event_fields(select_clauses, fields):
73
        """
74
        Instanciate an Event object from a select_clauses and fields 
75
        """
76
        result = EventResult()
77
        result.select_clauses = select_clauses
78
        result.event_fields = fields
79
        for idx, sattr in enumerate(select_clauses):
80
            if len(sattr.BrowsePath) == 0:
81
                name = sattr.AttributeId.name
82
            else:
83
                name = sattr.BrowsePath[0].Name
84
            setattr(result, name, fields[idx].Value)
85
            result.data_types[name] = fields[idx].VariantType
86
        return result
87
88
89
def get_filter_from_event_type(eventtype):
90
    evfilter = ua.EventFilter()
91
    evfilter.SelectClauses = select_clauses_from_evtype(eventtype)
92
    evfilter.WhereClause = where_clause_from_evtype(eventtype)
93
    return evfilter
94
95
96
def select_clauses_from_evtype(evtype):
97
    clauses = []
98
    for prop in get_event_properties_from_type_node(evtype):
99
        op = ua.SimpleAttributeOperand()
100
        op.TypeDefinitionId = evtype.nodeid
101
        op.AttributeId = ua.AttributeIds.Value
102
        op.BrowsePath = [prop.get_browse_name()]
103
        clauses.append(op)
104
    return clauses
105
106
107
def where_clause_from_evtype(evtype):
108
    cf = ua.ContentFilter()
109
    el = ua.ContentFilterElement()
110
    # operands can be ElementOperand, LiteralOperand, AttributeOperand, SimpleAttribute
111
    op = ua.SimpleAttributeOperand()
112
    op.TypeDefinitionId = evtype.nodeid
113
    op.BrowsePath.append(ua.QualifiedName("EventType", 0))
114
    op.AttributeId = ua.AttributeIds.Value
115
    el.FilterOperands.append(op)
116
    for subtypeid in [st.nodeid for st in get_node_subtypes(evtype)]:
117
        op = ua.LiteralOperand()
118
        op.Value = ua.Variant(subtypeid)
119
        el.FilterOperands.append(op)
120
    el.FilterOperator = ua.FilterOperator.InList
121
122
    cf.Elements.append(el)
123
    return cf
124
125
126
def get_node_subtypes(node, nodes=None):
127
    if nodes is None:
128
        nodes = [node]
129
    for child in node.get_children(refs=ua.ObjectIds.HasSubtype):
130
        nodes.append(child)
131
        get_node_subtypes(child, nodes)
132
    return nodes
133
134
135
def get_event_properties_from_type_node(node):
136
    properties = []
137
    curr_node = node
138
139
    while True:
140
        properties.extend(curr_node.get_properties())
141
142
        if curr_node.nodeid.Identifier == ua.ObjectIds.BaseEventType:
143
            break
144
145
        parents = curr_node.get_referenced_nodes(refs=ua.ObjectIds.HasSubtype, direction=ua.BrowseDirection.Inverse, includesubtypes=False)
146
        if len(parents) != 1:  # Something went wrong
147
            return None
148
        curr_node = parents[0]
149
150
    return properties
151
152
153