1
|
|
|
""" |
2
|
|
|
Generate address space code from xml file specification |
3
|
|
|
xmlparser.py is a requirement. it is in asyncua folder but to avoid importing all code, developer can link xmlparser.py in current directory |
4
|
|
|
""" |
5
|
|
|
import sys |
6
|
|
|
import logging |
7
|
|
|
# sys.path.insert(0, "..") # load local freeopcua implementation |
8
|
|
|
from asyncua.common import xmlparser |
9
|
|
|
|
10
|
|
|
|
11
|
|
|
def _to_val(objs, attr, val): |
12
|
|
|
from asyncua import ua |
13
|
|
|
cls = getattr(ua, objs[0]) |
14
|
|
|
for o in objs[1:]: |
15
|
|
|
cls = getattr(ua, _get_uatype_name(cls, o)) |
16
|
|
|
if cls == ua.NodeId: |
17
|
|
|
return "NodeId.from_string('val')" |
18
|
|
|
return ua_type_to_python(val, _get_uatype_name(cls, attr)) |
19
|
|
|
|
20
|
|
|
|
21
|
|
|
def _get_uatype_name(cls, attname): |
22
|
|
|
for name, uat in cls.ua_types: |
23
|
|
|
if name == attname: |
24
|
|
|
return uat |
25
|
|
|
raise Exception(f"Could not find attribute {attname} in obj {cls}") |
26
|
|
|
|
27
|
|
|
|
28
|
|
|
def ua_type_to_python(val, uatype): |
29
|
|
|
if uatype == "String": |
30
|
|
|
return f"'{val}'" |
31
|
|
|
elif uatype in ("Bytes", "Bytes", "ByteString", "ByteArray"): |
32
|
|
|
return f"b'{val}'" |
33
|
|
|
else: |
34
|
|
|
return val |
35
|
|
|
|
36
|
|
|
|
37
|
|
|
def bname_code(string): |
38
|
|
|
if ":" in string: |
39
|
|
|
idx, name = string.split(":", 1) |
40
|
|
|
else: |
41
|
|
|
idx = 0 |
42
|
|
|
name = string |
43
|
|
|
return f"QualifiedName('{name}', {idx})" |
44
|
|
|
|
45
|
|
|
|
46
|
|
|
def nodeid_code(string): |
47
|
|
|
l = string.split(";") |
48
|
|
|
identifier = None |
49
|
|
|
namespace = 0 |
50
|
|
|
ntype = None |
51
|
|
|
srv = None |
52
|
|
|
nsu = None |
53
|
|
|
for el in l: |
54
|
|
|
if not el: |
55
|
|
|
continue |
56
|
|
|
k, v = el.split("=", 1) |
57
|
|
|
k = k.strip() |
58
|
|
|
v = v.strip() |
59
|
|
|
if k == "ns": |
60
|
|
|
namespace = v |
61
|
|
|
elif k == "i": |
62
|
|
|
ntype = "NumericNodeId" |
63
|
|
|
identifier = v |
64
|
|
|
elif k == "s": |
65
|
|
|
ntype = "StringNodeId" |
66
|
|
|
identifier = f"'{v}'" |
67
|
|
|
elif k == "g": |
68
|
|
|
ntype = "GuidNodeId" |
69
|
|
|
identifier = f"b'{v}'" |
70
|
|
|
elif k == "b": |
71
|
|
|
ntype = "ByteStringNodeId" |
72
|
|
|
identifier = f"b'{v}'" |
73
|
|
|
elif k == "srv": |
74
|
|
|
srv = v |
75
|
|
|
elif k == "nsu": |
76
|
|
|
nsu = v |
77
|
|
|
if identifier is None: |
78
|
|
|
raise Exception("Could not find identifier in string: " + string) |
79
|
|
|
return f"{ntype}({identifier}, {namespace})" |
80
|
|
|
|
81
|
|
|
|
82
|
|
|
class CodeGenerator: |
83
|
|
|
|
84
|
|
|
def __init__(self, input_path, output_path): |
85
|
|
|
self.input_path = input_path |
86
|
|
|
self.output_path = output_path |
87
|
|
|
self.output_file = None |
88
|
|
|
self.part = self.input_path.split(".")[-2] |
89
|
|
|
self.parser = None |
90
|
|
|
|
91
|
|
|
async def run(self): |
92
|
|
|
sys.stderr.write(f"Generating Python code {self.output_path} for XML file {self.input_path}\n") |
93
|
|
|
self.output_file = open(self.output_path, 'w', encoding='utf-8') |
94
|
|
|
self.make_header() |
95
|
|
|
self.parser = xmlparser.XMLParser() |
96
|
|
|
self.parser.parse_sync(self.input_path) |
97
|
|
|
for node in self.parser.get_node_datas(): |
98
|
|
|
if node.nodetype == 'UAObject': |
99
|
|
|
self.make_object_code(node) |
100
|
|
|
elif node.nodetype == 'UAObjectType': |
101
|
|
|
self.make_object_type_code(node) |
102
|
|
|
elif node.nodetype == 'UAVariable': |
103
|
|
|
self.make_variable_code(node) |
104
|
|
|
elif node.nodetype == 'UAVariableType': |
105
|
|
|
self.make_variable_type_code(node) |
106
|
|
|
elif node.nodetype == 'UAReferenceType': |
107
|
|
|
self.make_reference_code(node) |
108
|
|
|
elif node.nodetype == 'UADataType': |
109
|
|
|
self.make_datatype_code(node) |
110
|
|
|
elif node.nodetype == 'UAMethod': |
111
|
|
|
self.make_method_code(node) |
112
|
|
|
else: |
113
|
|
|
sys.stderr.write(f"Not implemented node type: {node.nodetype}\n") |
114
|
|
|
self.output_file.close() |
115
|
|
|
|
116
|
|
|
def writecode(self, *args): |
117
|
|
|
self.output_file.write(f'{" ".join(args)}\n') |
118
|
|
|
|
119
|
|
|
def make_header(self, ): |
120
|
|
|
self.writecode(f''' |
121
|
|
|
# -*- coding: utf-8 -*- |
122
|
|
|
""" |
123
|
|
|
DO NOT EDIT THIS FILE! |
124
|
|
|
It is automatically generated from opcfoundation.org schemas. |
125
|
|
|
""" |
126
|
|
|
|
127
|
|
|
from asyncua import ua |
128
|
|
|
from asyncua.ua import NodeId, QualifiedName, NumericNodeId, StringNodeId, GuidNodeId |
129
|
|
|
from asyncua.ua import NodeClass, LocalizedText |
130
|
|
|
|
131
|
|
|
|
132
|
|
|
def create_standard_address_space_{self.part!s}(server): |
133
|
|
|
''') |
134
|
|
|
|
135
|
|
|
def make_node_code(self, obj, indent): |
136
|
|
|
self.writecode(indent, 'node = ua.AddNodesItem()') |
137
|
|
|
self.writecode(indent, 'node.RequestedNewNodeId = {}'.format(nodeid_code(obj.nodeid))) |
138
|
|
|
self.writecode(indent, 'node.BrowseName = {}'.format(bname_code(obj.browsename))) |
139
|
|
|
self.writecode(indent, 'node.NodeClass = NodeClass.{0}'.format(obj.nodetype[2:])) |
140
|
|
|
if obj.parent and obj.parentlink: |
141
|
|
|
self.writecode(indent, 'node.ParentNodeId = {}'.format(nodeid_code(obj.parent))) |
142
|
|
|
self.writecode(indent, 'node.ReferenceTypeId = {0}'.format(self.to_ref_type(obj.parentlink))) |
143
|
|
|
if obj.typedef: |
144
|
|
|
self.writecode(indent, 'node.TypeDefinition = {}'.format(nodeid_code(obj.typedef))) |
145
|
|
|
|
146
|
|
|
def to_data_type(self, nodeid): |
147
|
|
|
if not nodeid: |
148
|
|
|
return "ua.NodeId(ua.ObjectIds.String)" |
149
|
|
|
if "=" in nodeid: |
150
|
|
|
return nodeid_code(nodeid) |
151
|
|
|
else: |
152
|
|
|
return f'ua.NodeId(ua.ObjectIds.{nodeid})' |
153
|
|
|
|
154
|
|
|
def to_ref_type(self, nodeid): |
155
|
|
|
if "=" not in nodeid: |
156
|
|
|
nodeid = self.parser.get_aliases()[nodeid] |
157
|
|
|
return nodeid_code(nodeid) |
158
|
|
|
|
159
|
|
|
def make_object_code(self, obj): |
160
|
|
|
indent = " " |
161
|
|
|
self.writecode(indent) |
162
|
|
|
self.make_node_code(obj, indent) |
163
|
|
|
self.writecode(indent, 'attrs = ua.ObjectAttributes()') |
164
|
|
|
if obj.desc: |
165
|
|
|
self.writecode(indent, 'attrs.Description = LocalizedText("{0}")'.format(obj.desc)) |
166
|
|
|
self.writecode(indent, 'attrs.DisplayName = LocalizedText("{0}")'.format(obj.displayname)) |
167
|
|
|
self.writecode(indent, 'attrs.EventNotifier = {0}'.format(obj.eventnotifier)) |
168
|
|
|
self.writecode(indent, 'node.NodeAttributes = attrs') |
169
|
|
|
self.writecode(indent, 'server.add_nodes([node])') |
170
|
|
|
self.make_refs_code(obj, indent) |
171
|
|
|
|
172
|
|
|
def make_object_type_code(self, obj): |
173
|
|
|
indent = " " |
174
|
|
|
self.writecode(indent) |
175
|
|
|
self.make_node_code(obj, indent) |
176
|
|
|
self.writecode(indent, 'attrs = ua.ObjectTypeAttributes()') |
177
|
|
|
if obj.desc: |
178
|
|
|
self.writecode(indent, 'attrs.Description = LocalizedText("{0}")'.format(obj.desc)) |
179
|
|
|
self.writecode(indent, 'attrs.DisplayName = LocalizedText("{0}")'.format(obj.displayname)) |
180
|
|
|
self.writecode(indent, 'attrs.IsAbstract = {0}'.format(obj.abstract)) |
181
|
|
|
self.writecode(indent, 'node.NodeAttributes = attrs') |
182
|
|
|
self.writecode(indent, 'server.add_nodes([node])') |
183
|
|
|
self.make_refs_code(obj, indent) |
184
|
|
|
|
185
|
|
|
def make_common_variable_code(self, indent, obj): |
186
|
|
|
if obj.desc: |
187
|
|
|
self.writecode(indent, 'attrs.Description = LocalizedText("{0}")'.format(obj.desc)) |
188
|
|
|
self.writecode(indent, 'attrs.DisplayName = LocalizedText("{0}")'.format(obj.displayname)) |
189
|
|
|
self.writecode(indent, 'attrs.DataType = {0}'.format(self.to_data_type(obj.datatype))) |
190
|
|
|
if obj.value is not None: |
191
|
|
|
if obj.valuetype == "ListOfExtensionObject": |
192
|
|
|
self.writecode(indent, 'value = []') |
193
|
|
|
for ext in obj.value: |
194
|
|
|
self.make_ext_obj_code(indent, ext) |
195
|
|
|
self.writecode(indent, 'value.append(extobj)') |
196
|
|
|
self.writecode(indent, 'attrs.Value = ua.Variant(value, ua.VariantType.ExtensionObject)') |
197
|
|
|
elif obj.valuetype == "ExtensionObject": |
198
|
|
|
self.make_ext_obj_code(indent, obj.value) |
199
|
|
|
self.writecode(indent, 'value = extobj') |
200
|
|
|
self.writecode(indent, 'attrs.Value = ua.Variant(value, ua.VariantType.ExtensionObject)') |
201
|
|
|
elif obj.valuetype == "ListOfLocalizedText": |
202
|
|
|
value = ['LocalizedText({0})'.format(repr(text)) for text in obj.value] |
203
|
|
|
self.writecode(indent, 'attrs.Value = [{}]'.format(','.join(value))) |
204
|
|
|
elif obj.valuetype == "LocalizedText": |
205
|
|
|
self.writecode(indent, 'attrs.Value = ua.Variant(LocalizedText("{0}"), ua.VariantType.LocalizedText)'.format(obj.value[1][1])) |
206
|
|
|
else: |
207
|
|
|
if obj.valuetype.startswith("ListOf"): |
208
|
|
|
obj.valuetype = obj.valuetype[6:] |
209
|
|
|
self.writecode( |
210
|
|
|
indent, |
211
|
|
|
f'attrs.Value = ua.Variant({obj.value!r}, ua.VariantType.{obj.valuetype})' |
212
|
|
|
) |
213
|
|
|
if obj.rank: |
214
|
|
|
self.writecode(indent, f'attrs.ValueRank = {obj.rank}') |
215
|
|
|
if obj.accesslevel: |
216
|
|
|
self.writecode(indent, f'attrs.AccessLevel = {obj.accesslevel}') |
217
|
|
|
if obj.useraccesslevel: |
218
|
|
|
self.writecode(indent, f'attrs.UserAccessLevel = {obj.useraccesslevel}') |
219
|
|
|
if obj.dimensions: |
220
|
|
|
self.writecode(indent, f'attrs.ArrayDimensions = {obj.dimensions}') |
221
|
|
|
|
222
|
|
|
def make_ext_obj_code(self, indent, extobj): |
223
|
|
|
self.writecode(indent, f'extobj = ua.{extobj.objname}()') |
224
|
|
|
for name, val in extobj.body: |
225
|
|
|
for k, v in val: |
226
|
|
|
if type(v) is str: |
227
|
|
|
val = _to_val([extobj.objname], k, v) |
228
|
|
|
self.writecode(indent, f'extobj.{k} = {val}') |
229
|
|
|
else: |
230
|
|
|
if k == "DataType": #hack for strange nodeid xml format |
231
|
|
|
self.writecode(indent, 'extobj.{0} = {1}'.format(k, nodeid_code(v[0][1]))) |
232
|
|
|
continue |
233
|
|
|
for k2, v2 in v: |
234
|
|
|
val2 = _to_val([extobj.objname, k], k2, v2) |
235
|
|
|
self.writecode(indent, f'extobj.{k}.{k2} = {val2}') |
236
|
|
|
|
237
|
|
|
def make_variable_code(self, obj): |
238
|
|
|
indent = " " |
239
|
|
|
self.writecode(indent) |
240
|
|
|
self.make_node_code(obj, indent) |
241
|
|
|
self.writecode(indent, 'attrs = ua.VariableAttributes()') |
242
|
|
|
if obj.minsample: |
243
|
|
|
self.writecode(indent, f'attrs.MinimumSamplingInterval = {obj.minsample}') |
244
|
|
|
self.make_common_variable_code(indent, obj) |
245
|
|
|
self.writecode(indent, 'node.NodeAttributes = attrs') |
246
|
|
|
self.writecode(indent, 'server.add_nodes([node])') |
247
|
|
|
self.make_refs_code(obj, indent) |
248
|
|
|
|
249
|
|
|
def make_variable_type_code(self, obj): |
250
|
|
|
indent = " " |
251
|
|
|
self.writecode(indent) |
252
|
|
|
self.make_node_code(obj, indent) |
253
|
|
|
self.writecode(indent, 'attrs = ua.VariableTypeAttributes()') |
254
|
|
|
if obj.desc: |
255
|
|
|
self.writecode(indent, 'attrs.Description = LocalizedText("{0}")'.format(obj.desc)) |
256
|
|
|
self.writecode(indent, 'attrs.DisplayName = LocalizedText("{0}")'.format(obj.displayname)) |
257
|
|
|
if obj.abstract: |
258
|
|
|
self.writecode(indent, f'attrs.IsAbstract = {obj.abstract}') |
259
|
|
|
self.make_common_variable_code(indent, obj) |
260
|
|
|
self.writecode(indent, 'node.NodeAttributes = attrs') |
261
|
|
|
self.writecode(indent, 'server.add_nodes([node])') |
262
|
|
|
self.make_refs_code(obj, indent) |
263
|
|
|
|
264
|
|
|
def make_method_code(self, obj): |
265
|
|
|
indent = " " |
266
|
|
|
self.writecode(indent) |
267
|
|
|
self.make_node_code(obj, indent) |
268
|
|
|
self.writecode(indent, 'attrs = ua.MethodAttributes()') |
269
|
|
|
if obj.desc: |
270
|
|
|
self.writecode(indent, 'attrs.Description = LocalizedText("{0}")'.format(obj.desc)) |
271
|
|
|
self.writecode(indent, 'attrs.DisplayName = LocalizedText("{0}")'.format(obj.displayname)) |
272
|
|
|
self.writecode(indent, 'node.NodeAttributes = attrs') |
273
|
|
|
self.writecode(indent, 'server.add_nodes([node])') |
274
|
|
|
self.make_refs_code(obj, indent) |
275
|
|
|
|
276
|
|
|
def make_reference_code(self, obj): |
277
|
|
|
indent = " " |
278
|
|
|
self.writecode(indent) |
279
|
|
|
self.make_node_code(obj, indent) |
280
|
|
|
self.writecode(indent, 'attrs = ua.ReferenceTypeAttributes()') |
281
|
|
|
if obj.desc: |
282
|
|
|
self.writecode(indent, 'attrs.Description = LocalizedText("{0}")'.format(obj.desc)) |
283
|
|
|
self.writecode(indent, 'attrs.DisplayName = LocalizedText("{0}")'.format(obj.displayname)) |
284
|
|
|
if obj. inversename: |
285
|
|
|
self.writecode(indent, 'attrs.InverseName = LocalizedText("{0}")'.format(obj.inversename)) |
286
|
|
|
if obj.abstract: |
287
|
|
|
self.writecode(indent, f'attrs.IsAbstract = {obj.abstract}') |
288
|
|
|
if obj.symmetric: |
289
|
|
|
self.writecode(indent, f'attrs.Symmetric = {obj.symmetric}') |
290
|
|
|
self.writecode(indent, 'node.NodeAttributes = attrs') |
291
|
|
|
self.writecode(indent, 'server.add_nodes([node])') |
292
|
|
|
self.make_refs_code(obj, indent) |
293
|
|
|
|
294
|
|
|
def make_datatype_code(self, obj): |
295
|
|
|
indent = " " |
296
|
|
|
self.writecode(indent) |
297
|
|
|
self.make_node_code(obj, indent) |
298
|
|
|
self.writecode(indent, 'attrs = ua.DataTypeAttributes()') |
299
|
|
|
if obj.desc: |
300
|
|
|
self.writecode(indent, u'attrs.Description = LocalizedText("{0}")'.format(obj.desc)) |
301
|
|
|
self.writecode(indent, 'attrs.DisplayName = LocalizedText("{0}")'.format(obj.displayname)) |
302
|
|
|
if obj.abstract: |
303
|
|
|
self.writecode(indent, f'attrs.IsAbstract = {obj.abstract}') |
304
|
|
|
self.writecode(indent, 'node.NodeAttributes = attrs') |
305
|
|
|
self.writecode(indent, 'server.add_nodes([node])') |
306
|
|
|
self.make_refs_code(obj, indent) |
307
|
|
|
|
308
|
|
|
def make_refs_code(self, obj, indent): |
309
|
|
|
if not obj.refs: |
310
|
|
|
return |
311
|
|
|
self.writecode(indent, "refs = []") |
312
|
|
|
for ref in obj.refs: |
313
|
|
|
self.writecode(indent, 'ref = ua.AddReferencesItem()') |
314
|
|
|
self.writecode(indent, 'ref.IsForward = {0}'.format(ref.forward)) |
315
|
|
|
self.writecode(indent, 'ref.ReferenceTypeId = {0}'.format(self.to_ref_type(ref.reftype))) |
316
|
|
|
self.writecode(indent, 'ref.SourceNodeId = {0}'.format(nodeid_code(obj.nodeid))) |
317
|
|
|
self.writecode(indent, 'ref.TargetNodeClass = NodeClass.DataType') |
318
|
|
|
self.writecode(indent, 'ref.TargetNodeId = {0}'.format(nodeid_code(ref.target))) |
319
|
|
|
self.writecode(indent, "refs.append(ref)") |
320
|
|
|
self.writecode(indent, 'server.add_references(refs)') |
321
|
|
|
|
322
|
|
|
|
323
|
|
|
def save_aspace_to_disk(): |
324
|
|
|
import os.path |
325
|
|
|
path = os.path.join('..', 'asyncua', 'binary_address_space.pickle') |
326
|
|
|
print('Saving standard address space to:', path) |
327
|
|
|
sys.path.append('..') |
328
|
|
|
from asyncua.server.standard_address_space import standard_address_space |
329
|
|
|
from asyncua.server.address_space import NodeManagementService, AddressSpace |
330
|
|
|
a_space = AddressSpace() |
331
|
|
|
standard_address_space.fill_address_space(NodeManagementService(a_space)) |
332
|
|
|
a_space.dump(path) |
333
|
|
|
|
334
|
|
|
|
335
|
|
|
if __name__ == '__main__': |
336
|
|
|
logging.basicConfig(level=logging.WARN) |
337
|
|
|
for i in (3, 4, 5, 8, 9, 10, 11, 13): |
338
|
|
|
xml_path = f'Opc.Ua.NodeSet2.Part{i}.xml' |
339
|
|
|
py_path = f'../asyncua/server/standard_address_space/standard_address_space_part{i}.py' |
340
|
|
|
CodeGenerator(xml_path, py_path).run() |
341
|
|
|
save_aspace_to_disk() |
342
|
|
|
|