Issues (20)

asyncua/common/type_dictionary_buider.py (1 issue)

1
from asyncua import ua
2
from enum import Enum
3
import logging
4
5
import xml.etree.ElementTree as Et
6
import re
7
8
9
logger = logging.getLogger(__name__)
10
# Indicates which type should be OPC build in types
11
_ua_build_in_types = [ua_type for ua_type in ua.VariantType.__members__ if ua_type != 'ExtensionObject']
12
13
14
def _repl_func(m):
15
    """
16
    taken from
17
     https://stackoverflow.com/questions/1549641/how-to-capitalize-the-first-letter-of-each-word-in-a-string-python
18
     """
19
    return m.group(1) + m.group(2).upper()
20
21
22
def _to_camel_case(name):
23
    """
24
    Create python class name from an arbitrary string to CamelCase string
25
    e.g.                 actionlib/TestAction -> ActionlibTestAction
26
         turtle_actionlib/ShapeActionFeedback -> TurtleActionlibShapeActionFeedback
27
    """
28
    name = re.sub(r'[^a-zA-Z0-9]+', ' ', name)
29
    name = re.sub(r'(^|\s)(\S)', _repl_func, name)
30
    name = name.replace(' ', '')
31
    return name
32
33
34
class OPCTypeDictionaryBuilder:
35
36
    def __init__(self, ns_urn):
37
        """
38
        :param ns_urn: name of the name space
39
        types in dict is created as opc:xxx, otherwise as tns:xxx
40
        """
41
        head_attributes = {'xmlns:xsi': 'http://www.w3.org/2001/XMLSchema-instance', 'xmlns:tns': ns_urn,
42
                           'DefaultByteOrder': 'LittleEndian', 'xmlns:opc': 'http://opcfoundation.org/BinarySchema/',
43
                           'xmlns:ua': 'http://opcfoundation.org/UA/', 'TargetNamespace': ns_urn}
44
45
        self.etree = Et.ElementTree(Et.Element('opc:TypeDictionary', head_attributes))
46
47
        name_space = Et.SubElement(self.etree.getroot(), 'opc:Import')
48
        name_space.attrib['Namespace'] = 'http://opcfoundation.org/UA/'
49
50
        self._structs_dict = {}
51
        self._build_in_list = _ua_build_in_types
52
    
53
    def _process_type(self, data_type):
54
        if data_type in self._build_in_list:
55
            data_type = 'opc:' + data_type
56
        else:
57
            # data_type = 'tns:' + _to_camel_case(data_type)
58
            data_type = 'tns:' + data_type
59
        return data_type
60
61
    def _add_field(self, variable_name, data_type, struct_name):
62
        data_type = self._process_type(data_type)
63
        field = Et.SubElement(self._structs_dict[struct_name], 'opc:Field')
64
        field.attrib['Name'] = variable_name
65
        field.attrib['TypeName'] = data_type
66
67
    def _add_array_field(self, variable_name, data_type, struct_name):
68
        data_type = self._process_type(data_type)
69
        array_len = 'NoOf' + variable_name
70
        field = Et.SubElement(self._structs_dict[struct_name], 'opc:Field')
71
        field.attrib['Name'] = array_len
72
        field.attrib['TypeName'] = 'opc:Int32'
73
        field = Et.SubElement(self._structs_dict[struct_name], 'opc:Field')
74
        field.attrib['Name'] = variable_name
75
        field.attrib['TypeName'] = data_type
76
        field.attrib['LengthField'] = array_len
77
78
    def add_field(self, type_name, variable_name, struct_name, is_array=False):
79
        if isinstance(type_name, Enum):
80
            type_name = type_name.name
81
        if is_array:
82
            self._add_array_field(variable_name, type_name, struct_name)
83
        else:
84
            self._add_field(variable_name, type_name, struct_name)
85
86
    def append_struct(self, name):
87
        appended_struct = Et.SubElement(self.etree.getroot(), 'opc:StructuredType')
88
        appended_struct.attrib['BaseType'] = 'ua:ExtensionObject'
89
        # appended_struct.attrib['Name'] = _to_camel_case(name)
90
        appended_struct.attrib['Name'] = name
91
        self._structs_dict[name] = appended_struct
92
        return appended_struct
93
94
    def get_dict_value(self):
95
        self.indent(self.etree.getroot())
96
        # For debugging
97
        # Et.dump(self.etree.getroot())
98
        return Et.tostring(self.etree.getroot(), encoding='utf-8')
99
100 View Code Duplication
    def indent(self, elem, level=0):
0 ignored issues
show
This code seems to be duplicated in your project.
Loading history...
101
        i = '\n' + level * '  '
102
        if len(elem):
103
            if not elem.text or not elem.text.strip():
104
                elem.text = i + '  '
105
            if not elem.tail or not elem.tail.strip():
106
                elem.tail = i
107
            for elem in elem:
108
                self.indent(elem, level + 1)
109
            if not elem.tail or not elem.tail.strip():
110
                elem.tail = i
111
        else:
112
            if level and (not elem.tail or not elem.tail.strip()):
113
                elem.tail = i
114
115
116
def _reference_generator(source_id, target_id, reference_type, is_forward=True):
117
    ref = ua.AddReferencesItem()
118
    ref.IsForward = is_forward
119
    ref.ReferenceTypeId = reference_type
120
    ref.SourceNodeId = source_id
121
    ref.TargetNodeClass = ua.NodeClass.DataType
122
    ref.TargetNodeId = target_id
123
    return ref
124
125
126
class DataTypeDictionaryBuilder:
127
128
    def __init__(self, server, idx, ns_urn, dict_name, dict_node_id=None):
129
        self._server = server
130
        self._session_server = server.get_node(ua.ObjectIds.RootFolder).server
131
        self._idx = idx
132
        self.ns_urn = ns_urn
133
        self.dict_name = dict_name
134
        self._type_dictionary = None
135
        self.dict_id = dict_node_id
136
137
    async def init(self):
138
        if self.dict_id is None:
139
            self.dict_id = await self._add_dictionary(self.dict_name)
140
        self._type_dictionary = OPCTypeDictionaryBuilder(self.ns_urn)
141
142
    async def _add_dictionary(self, name):
143
        try:
144
            node = await self._server.nodes.opc_binary.get_child(f"{self._idx}:{name}")
145
        except ua.uaerrors.BadNoMatch:
146
            node = ua.AddNodesItem()
147
            node.RequestedNewNodeId = ua.NodeId(0, self._idx)
148
            node.BrowseName = ua.QualifiedName(name, self._idx)
149
            node.NodeClass = ua.NodeClass.Variable
150
            node.ParentNodeId = ua.NodeId(ua.ObjectIds.OPCBinarySchema_TypeSystem, 0)
151
            node.ReferenceTypeId = ua.NodeId(ua.ObjectIds.HasComponent, 0)
152
            node.TypeDefinition = ua.NodeId(ua.ObjectIds.DataTypeDictionaryType, 0)
153
            attrs = ua.VariableAttributes()
154
            attrs.DisplayName = ua.LocalizedText(name)
155
            attrs.DataType = ua.NodeId(ua.ObjectIds.ByteString)
156
            # Value should be set after all data types created by calling set_dict_byte_string
157
            attrs.Value = ua.Variant(None, ua.VariantType.Null)
158
            attrs.ValueRank = -1
159
            node.NodeAttributes = attrs
160
            res = await self._session_server.add_nodes([node])
161
            return res[0].AddedNodeId
162
        logger.warning("Making %s object for node %s which already exist, its data will be overriden", self, node)
163
        # FIXME: we have an issue
164
        return node.nodeid
165
166
    async def _link_nodes(self, linked_obj_node_id, data_type_node_id, description_node_id):
167
        """link the three node by their node ids according to UA standard"""
168
        refs = [
169
                # add reverse reference to BaseDataType -> Structure
170
                _reference_generator(data_type_node_id, ua.NodeId(ua.ObjectIds.Structure, 0),
171
                                     ua.NodeId(ua.ObjectIds.HasSubtype, 0), False),
172
                # add reverse reference to created data type
173
                _reference_generator(linked_obj_node_id, data_type_node_id,
174
                                     ua.NodeId(ua.ObjectIds.HasEncoding, 0), False),
175
                # add HasDescription link to dictionary description
176
                _reference_generator(linked_obj_node_id, description_node_id,
177
                                     ua.NodeId(ua.ObjectIds.HasDescription, 0)),
178
                # add reverse HasDescription link
179
                _reference_generator(description_node_id, linked_obj_node_id,
180
                                     ua.NodeId(ua.ObjectIds.HasDescription, 0), False),
181
                # add link to the type definition node
182
                _reference_generator(linked_obj_node_id, ua.NodeId(ua.ObjectIds.DataTypeEncodingType, 0),
183
                                     ua.NodeId(ua.ObjectIds.HasTypeDefinition, 0)),
184
                # add has type definition link
185
                _reference_generator(description_node_id, ua.NodeId(ua.ObjectIds.DataTypeDescriptionType, 0),
186
                                     ua.NodeId(ua.ObjectIds.HasTypeDefinition, 0)),
187
                # add forward link of dict to description item
188
                _reference_generator(self.dict_id, description_node_id,
189
                                     ua.NodeId(ua.ObjectIds.HasComponent, 0)),
190
                # add reverse link to dictionary
191
                _reference_generator(description_node_id, self.dict_id,
192
                                     ua.NodeId(ua.ObjectIds.HasComponent, 0), False)]
193
        await self._session_server.add_references(refs)
194
195
    async def _create_data_type(self, type_name, nodeid=None, init=True):
196
        # name = _to_camel_case(type_name)
197
        name = type_name
198
199
        if nodeid is None:
200
            # create data type node
201
            dt_node = ua.AddNodesItem()
202
            dt_node.RequestedNewNodeId = ua.NodeId(0, self._idx)
203
            dt_node.BrowseName = ua.QualifiedName(name, self._idx)
204
            dt_node.NodeClass = ua.NodeClass.DataType
205
            dt_node.ParentNodeId = ua.NodeId(ua.ObjectIds.Structure, 0)
206
            dt_node.ReferenceTypeId = ua.NodeId(ua.ObjectIds.HasSubtype, 0)
207
            dt_attributes = ua.DataTypeAttributes()
208
            dt_attributes.DisplayName = ua.LocalizedText(type_name)
209
            dt_node.NodeAttributes = dt_attributes
210
211
            res = await self._session_server.add_nodes([dt_node])
212
            data_type_node_id = res[0].AddedNodeId
213
        else:
214
            data_type_node_id = nodeid
215
216
        added = [data_type_node_id]
217
218
        if init:
219
            # create description node
220
            desc_node = ua.AddNodesItem()
221
            desc_node.RequestedNewNodeId = ua.NodeId(0, self._idx)
222
            desc_node.BrowseName = ua.QualifiedName(name, self._idx)
223
            desc_node.NodeClass = ua.NodeClass.Variable
224
            desc_node.ParentNodeId = self.dict_id
225
            desc_node.ReferenceTypeId = ua.NodeId(ua.ObjectIds.HasComponent, 0)
226
            desc_node.TypeDefinition = ua.NodeId(ua.ObjectIds.DataTypeDescriptionType, 0)
227
            desc_attributes = ua.VariableAttributes()
228
            desc_attributes.DisplayName = ua.LocalizedText(type_name)
229
            desc_attributes.DataType = ua.NodeId(ua.ObjectIds.String)
230
            desc_attributes.Value = ua.Variant(name, ua.VariantType.String)
231
            desc_attributes.ValueRank = -1
232
            desc_node.NodeAttributes = desc_attributes
233
234
            res = await self._session_server.add_nodes([desc_node])
235
            description_node_id = res[0].AddedNodeId
236
            added.append(description_node_id)
237
238
            # create object node which the loaded python class should link to
239
            obj_node = ua.AddNodesItem()
240
            obj_node.RequestedNewNodeId = ua.NodeId(0, self._idx)
241
            obj_node.BrowseName = ua.QualifiedName('Default Binary', 0)
242
            obj_node.NodeClass = ua.NodeClass.Object
243
            obj_node.ParentNodeId = data_type_node_id
244
            obj_node.ReferenceTypeId = ua.NodeId(ua.ObjectIds.HasEncoding, 0)
245
            obj_node.TypeDefinition = ua.NodeId(ua.ObjectIds.DataTypeEncodingType, 0)
246
            obj_attributes = ua.ObjectAttributes()
247
            obj_attributes.DisplayName = ua.LocalizedText('Default Binary')
248
            obj_attributes.EventNotifier = 0
249
            obj_node.NodeAttributes = obj_attributes
250
251
            res = await self._session_server.add_nodes([obj_node])
252
            bind_obj_node_id = res[0].AddedNodeId
253
            added.append(bind_obj_node_id)
254
255
            await self._link_nodes(bind_obj_node_id, data_type_node_id, description_node_id)
256
257
        self._type_dictionary.append_struct(type_name)
258
        return StructNode(self, data_type_node_id, type_name, added)
259
260
    async def create_data_type(self, type_name, nodeid=None, init=True):
261
        return await self._create_data_type(type_name, nodeid, init)
262
263
    def add_field(self, type_name, variable_name, struct_name, is_array=False):
264
        self._type_dictionary.add_field(type_name, variable_name, struct_name, is_array)
265
266
    async def set_dict_byte_string(self):
267
        dict_node = self._server.get_node(self.dict_id)
268
        value = self._type_dictionary.get_dict_value()
269
        await dict_node.write_value(value, ua.VariantType.ByteString)
270
271
272
class StructNode:
273
274
    def __init__(self, type_dict, data_type, name, node_ids):
275
        self._type_dict = type_dict
276
        self.data_type = data_type
277
        self.name = name
278
        self.node_ids = node_ids 
279
280
    def add_field(self, type_name, field_name, is_array=False):
281
        # nested structure could directly use simple structure as field
282
        if isinstance(field_name, StructNode):
283
            field_name = field_name.name
284
        self._type_dict.add_field(field_name, type_name, self.name, is_array)
285
286
287
def get_ua_class(ua_class_name):
288
    #return getattr(ua, _to_camel_case(ua_class_name))
289
    return getattr(ua, ua_class_name)
290