1 | from asyncua import ua |
||
2 | from enum import Enum |
||
3 | import logging |
||
4 | |||
5 | import xml.etree.ElementTree as Et |
||
6 | import re |
||
7 | |||
8 | |||
9 | logger = logging.getLogger(__name__) |
||
10 | # Indicates which type should be OPC build in types |
||
11 | _ua_build_in_types = [ua_type for ua_type in ua.VariantType.__members__ if ua_type != 'ExtensionObject'] |
||
12 | |||
13 | |||
14 | def _repl_func(m): |
||
15 | """ |
||
16 | taken from |
||
17 | https://stackoverflow.com/questions/1549641/how-to-capitalize-the-first-letter-of-each-word-in-a-string-python |
||
18 | """ |
||
19 | return m.group(1) + m.group(2).upper() |
||
20 | |||
21 | |||
22 | def _to_camel_case(name): |
||
23 | """ |
||
24 | Create python class name from an arbitrary string to CamelCase string |
||
25 | e.g. actionlib/TestAction -> ActionlibTestAction |
||
26 | turtle_actionlib/ShapeActionFeedback -> TurtleActionlibShapeActionFeedback |
||
27 | """ |
||
28 | name = re.sub(r'[^a-zA-Z0-9]+', ' ', name) |
||
29 | name = re.sub(r'(^|\s)(\S)', _repl_func, name) |
||
30 | name = name.replace(' ', '') |
||
31 | return name |
||
32 | |||
33 | |||
34 | class OPCTypeDictionaryBuilder: |
||
35 | |||
36 | def __init__(self, ns_urn): |
||
37 | """ |
||
38 | :param ns_urn: name of the name space |
||
39 | types in dict is created as opc:xxx, otherwise as tns:xxx |
||
40 | """ |
||
41 | head_attributes = {'xmlns:xsi': 'http://www.w3.org/2001/XMLSchema-instance', 'xmlns:tns': ns_urn, |
||
42 | 'DefaultByteOrder': 'LittleEndian', 'xmlns:opc': 'http://opcfoundation.org/BinarySchema/', |
||
43 | 'xmlns:ua': 'http://opcfoundation.org/UA/', 'TargetNamespace': ns_urn} |
||
44 | |||
45 | self.etree = Et.ElementTree(Et.Element('opc:TypeDictionary', head_attributes)) |
||
46 | |||
47 | name_space = Et.SubElement(self.etree.getroot(), 'opc:Import') |
||
48 | name_space.attrib['Namespace'] = 'http://opcfoundation.org/UA/' |
||
49 | |||
50 | self._structs_dict = {} |
||
51 | self._build_in_list = _ua_build_in_types |
||
52 | |||
53 | def _process_type(self, data_type): |
||
54 | if data_type in self._build_in_list: |
||
55 | data_type = 'opc:' + data_type |
||
56 | else: |
||
57 | # data_type = 'tns:' + _to_camel_case(data_type) |
||
58 | data_type = 'tns:' + data_type |
||
59 | return data_type |
||
60 | |||
61 | def _add_field(self, variable_name, data_type, struct_name): |
||
62 | data_type = self._process_type(data_type) |
||
63 | field = Et.SubElement(self._structs_dict[struct_name], 'opc:Field') |
||
64 | field.attrib['Name'] = variable_name |
||
65 | field.attrib['TypeName'] = data_type |
||
66 | |||
67 | def _add_array_field(self, variable_name, data_type, struct_name): |
||
68 | data_type = self._process_type(data_type) |
||
69 | array_len = 'NoOf' + variable_name |
||
70 | field = Et.SubElement(self._structs_dict[struct_name], 'opc:Field') |
||
71 | field.attrib['Name'] = array_len |
||
72 | field.attrib['TypeName'] = 'opc:Int32' |
||
73 | field = Et.SubElement(self._structs_dict[struct_name], 'opc:Field') |
||
74 | field.attrib['Name'] = variable_name |
||
75 | field.attrib['TypeName'] = data_type |
||
76 | field.attrib['LengthField'] = array_len |
||
77 | |||
78 | def add_field(self, type_name, variable_name, struct_name, is_array=False): |
||
79 | if isinstance(type_name, Enum): |
||
80 | type_name = type_name.name |
||
81 | if is_array: |
||
82 | self._add_array_field(variable_name, type_name, struct_name) |
||
83 | else: |
||
84 | self._add_field(variable_name, type_name, struct_name) |
||
85 | |||
86 | def append_struct(self, name): |
||
87 | appended_struct = Et.SubElement(self.etree.getroot(), 'opc:StructuredType') |
||
88 | appended_struct.attrib['BaseType'] = 'ua:ExtensionObject' |
||
89 | # appended_struct.attrib['Name'] = _to_camel_case(name) |
||
90 | appended_struct.attrib['Name'] = name |
||
91 | self._structs_dict[name] = appended_struct |
||
92 | return appended_struct |
||
93 | |||
94 | def get_dict_value(self): |
||
95 | self.indent(self.etree.getroot()) |
||
96 | # For debugging |
||
97 | # Et.dump(self.etree.getroot()) |
||
98 | return Et.tostring(self.etree.getroot(), encoding='utf-8') |
||
99 | |||
100 | View Code Duplication | def indent(self, elem, level=0): |
|
0 ignored issues
–
show
Duplication
introduced
by
Loading history...
|
|||
101 | i = '\n' + level * ' ' |
||
102 | if len(elem): |
||
103 | if not elem.text or not elem.text.strip(): |
||
104 | elem.text = i + ' ' |
||
105 | if not elem.tail or not elem.tail.strip(): |
||
106 | elem.tail = i |
||
107 | for elem in elem: |
||
108 | self.indent(elem, level + 1) |
||
109 | if not elem.tail or not elem.tail.strip(): |
||
110 | elem.tail = i |
||
111 | else: |
||
112 | if level and (not elem.tail or not elem.tail.strip()): |
||
113 | elem.tail = i |
||
114 | |||
115 | |||
116 | def _reference_generator(source_id, target_id, reference_type, is_forward=True): |
||
117 | ref = ua.AddReferencesItem() |
||
118 | ref.IsForward = is_forward |
||
119 | ref.ReferenceTypeId = reference_type |
||
120 | ref.SourceNodeId = source_id |
||
121 | ref.TargetNodeClass = ua.NodeClass.DataType |
||
122 | ref.TargetNodeId = target_id |
||
123 | return ref |
||
124 | |||
125 | |||
126 | class DataTypeDictionaryBuilder: |
||
127 | |||
128 | def __init__(self, server, idx, ns_urn, dict_name, dict_node_id=None): |
||
129 | self._server = server |
||
130 | self._session_server = server.get_node(ua.ObjectIds.RootFolder).server |
||
131 | self._idx = idx |
||
132 | self.ns_urn = ns_urn |
||
133 | self.dict_name = dict_name |
||
134 | self._type_dictionary = None |
||
135 | self.dict_id = dict_node_id |
||
136 | |||
137 | async def init(self): |
||
138 | if self.dict_id is None: |
||
139 | self.dict_id = await self._add_dictionary(self.dict_name) |
||
140 | self._type_dictionary = OPCTypeDictionaryBuilder(self.ns_urn) |
||
141 | |||
142 | async def _add_dictionary(self, name): |
||
143 | try: |
||
144 | node = await self._server.nodes.opc_binary.get_child(f"{self._idx}:{name}") |
||
145 | except ua.uaerrors.BadNoMatch: |
||
146 | node = ua.AddNodesItem() |
||
147 | node.RequestedNewNodeId = ua.NodeId(0, self._idx) |
||
148 | node.BrowseName = ua.QualifiedName(name, self._idx) |
||
149 | node.NodeClass = ua.NodeClass.Variable |
||
150 | node.ParentNodeId = ua.NodeId(ua.ObjectIds.OPCBinarySchema_TypeSystem, 0) |
||
151 | node.ReferenceTypeId = ua.NodeId(ua.ObjectIds.HasComponent, 0) |
||
152 | node.TypeDefinition = ua.NodeId(ua.ObjectIds.DataTypeDictionaryType, 0) |
||
153 | attrs = ua.VariableAttributes() |
||
154 | attrs.DisplayName = ua.LocalizedText(name) |
||
155 | attrs.DataType = ua.NodeId(ua.ObjectIds.ByteString) |
||
156 | # Value should be set after all data types created by calling set_dict_byte_string |
||
157 | attrs.Value = ua.Variant(None, ua.VariantType.Null) |
||
158 | attrs.ValueRank = -1 |
||
159 | node.NodeAttributes = attrs |
||
160 | res = await self._session_server.add_nodes([node]) |
||
161 | return res[0].AddedNodeId |
||
162 | logger.warning("Making %s object for node %s which already exist, its data will be overriden", self, node) |
||
163 | # FIXME: we have an issue |
||
164 | return node.nodeid |
||
165 | |||
166 | async def _link_nodes(self, linked_obj_node_id, data_type_node_id, description_node_id): |
||
167 | """link the three node by their node ids according to UA standard""" |
||
168 | refs = [ |
||
169 | # add reverse reference to BaseDataType -> Structure |
||
170 | _reference_generator(data_type_node_id, ua.NodeId(ua.ObjectIds.Structure, 0), |
||
171 | ua.NodeId(ua.ObjectIds.HasSubtype, 0), False), |
||
172 | # add reverse reference to created data type |
||
173 | _reference_generator(linked_obj_node_id, data_type_node_id, |
||
174 | ua.NodeId(ua.ObjectIds.HasEncoding, 0), False), |
||
175 | # add HasDescription link to dictionary description |
||
176 | _reference_generator(linked_obj_node_id, description_node_id, |
||
177 | ua.NodeId(ua.ObjectIds.HasDescription, 0)), |
||
178 | # add reverse HasDescription link |
||
179 | _reference_generator(description_node_id, linked_obj_node_id, |
||
180 | ua.NodeId(ua.ObjectIds.HasDescription, 0), False), |
||
181 | # add link to the type definition node |
||
182 | _reference_generator(linked_obj_node_id, ua.NodeId(ua.ObjectIds.DataTypeEncodingType, 0), |
||
183 | ua.NodeId(ua.ObjectIds.HasTypeDefinition, 0)), |
||
184 | # add has type definition link |
||
185 | _reference_generator(description_node_id, ua.NodeId(ua.ObjectIds.DataTypeDescriptionType, 0), |
||
186 | ua.NodeId(ua.ObjectIds.HasTypeDefinition, 0)), |
||
187 | # add forward link of dict to description item |
||
188 | _reference_generator(self.dict_id, description_node_id, |
||
189 | ua.NodeId(ua.ObjectIds.HasComponent, 0)), |
||
190 | # add reverse link to dictionary |
||
191 | _reference_generator(description_node_id, self.dict_id, |
||
192 | ua.NodeId(ua.ObjectIds.HasComponent, 0), False)] |
||
193 | await self._session_server.add_references(refs) |
||
194 | |||
195 | async def _create_data_type(self, type_name, nodeid=None, init=True): |
||
196 | # name = _to_camel_case(type_name) |
||
197 | name = type_name |
||
198 | |||
199 | if nodeid is None: |
||
200 | # create data type node |
||
201 | dt_node = ua.AddNodesItem() |
||
202 | dt_node.RequestedNewNodeId = ua.NodeId(0, self._idx) |
||
203 | dt_node.BrowseName = ua.QualifiedName(name, self._idx) |
||
204 | dt_node.NodeClass = ua.NodeClass.DataType |
||
205 | dt_node.ParentNodeId = ua.NodeId(ua.ObjectIds.Structure, 0) |
||
206 | dt_node.ReferenceTypeId = ua.NodeId(ua.ObjectIds.HasSubtype, 0) |
||
207 | dt_attributes = ua.DataTypeAttributes() |
||
208 | dt_attributes.DisplayName = ua.LocalizedText(type_name) |
||
209 | dt_node.NodeAttributes = dt_attributes |
||
210 | |||
211 | res = await self._session_server.add_nodes([dt_node]) |
||
212 | data_type_node_id = res[0].AddedNodeId |
||
213 | else: |
||
214 | data_type_node_id = nodeid |
||
215 | |||
216 | added = [data_type_node_id] |
||
217 | |||
218 | if init: |
||
219 | # create description node |
||
220 | desc_node = ua.AddNodesItem() |
||
221 | desc_node.RequestedNewNodeId = ua.NodeId(0, self._idx) |
||
222 | desc_node.BrowseName = ua.QualifiedName(name, self._idx) |
||
223 | desc_node.NodeClass = ua.NodeClass.Variable |
||
224 | desc_node.ParentNodeId = self.dict_id |
||
225 | desc_node.ReferenceTypeId = ua.NodeId(ua.ObjectIds.HasComponent, 0) |
||
226 | desc_node.TypeDefinition = ua.NodeId(ua.ObjectIds.DataTypeDescriptionType, 0) |
||
227 | desc_attributes = ua.VariableAttributes() |
||
228 | desc_attributes.DisplayName = ua.LocalizedText(type_name) |
||
229 | desc_attributes.DataType = ua.NodeId(ua.ObjectIds.String) |
||
230 | desc_attributes.Value = ua.Variant(name, ua.VariantType.String) |
||
231 | desc_attributes.ValueRank = -1 |
||
232 | desc_node.NodeAttributes = desc_attributes |
||
233 | |||
234 | res = await self._session_server.add_nodes([desc_node]) |
||
235 | description_node_id = res[0].AddedNodeId |
||
236 | added.append(description_node_id) |
||
237 | |||
238 | # create object node which the loaded python class should link to |
||
239 | obj_node = ua.AddNodesItem() |
||
240 | obj_node.RequestedNewNodeId = ua.NodeId(0, self._idx) |
||
241 | obj_node.BrowseName = ua.QualifiedName('Default Binary', 0) |
||
242 | obj_node.NodeClass = ua.NodeClass.Object |
||
243 | obj_node.ParentNodeId = data_type_node_id |
||
244 | obj_node.ReferenceTypeId = ua.NodeId(ua.ObjectIds.HasEncoding, 0) |
||
245 | obj_node.TypeDefinition = ua.NodeId(ua.ObjectIds.DataTypeEncodingType, 0) |
||
246 | obj_attributes = ua.ObjectAttributes() |
||
247 | obj_attributes.DisplayName = ua.LocalizedText('Default Binary') |
||
248 | obj_attributes.EventNotifier = 0 |
||
249 | obj_node.NodeAttributes = obj_attributes |
||
250 | |||
251 | res = await self._session_server.add_nodes([obj_node]) |
||
252 | bind_obj_node_id = res[0].AddedNodeId |
||
253 | added.append(bind_obj_node_id) |
||
254 | |||
255 | await self._link_nodes(bind_obj_node_id, data_type_node_id, description_node_id) |
||
256 | |||
257 | self._type_dictionary.append_struct(type_name) |
||
258 | return StructNode(self, data_type_node_id, type_name, added) |
||
259 | |||
260 | async def create_data_type(self, type_name, nodeid=None, init=True): |
||
261 | return await self._create_data_type(type_name, nodeid, init) |
||
262 | |||
263 | def add_field(self, type_name, variable_name, struct_name, is_array=False): |
||
264 | self._type_dictionary.add_field(type_name, variable_name, struct_name, is_array) |
||
265 | |||
266 | async def set_dict_byte_string(self): |
||
267 | dict_node = self._server.get_node(self.dict_id) |
||
268 | value = self._type_dictionary.get_dict_value() |
||
269 | await dict_node.write_value(value, ua.VariantType.ByteString) |
||
270 | |||
271 | |||
272 | class StructNode: |
||
273 | |||
274 | def __init__(self, type_dict, data_type, name, node_ids): |
||
275 | self._type_dict = type_dict |
||
276 | self.data_type = data_type |
||
277 | self.name = name |
||
278 | self.node_ids = node_ids |
||
279 | |||
280 | def add_field(self, type_name, field_name, is_array=False): |
||
281 | # nested structure could directly use simple structure as field |
||
282 | if isinstance(field_name, StructNode): |
||
283 | field_name = field_name.name |
||
284 | self._type_dict.add_field(field_name, type_name, self.name, is_array) |
||
285 | |||
286 | |||
287 | def get_ua_class(ua_class_name): |
||
288 | #return getattr(ua, _to_camel_case(ua_class_name)) |
||
289 | return getattr(ua, ua_class_name) |
||
290 |