|
1
|
|
|
""" |
|
2
|
|
|
Generate address space code from xml file specification |
|
3
|
|
|
xmlparser.py is a requirement. it is in asyncua folder but to avoid importing all code, developer can link xmlparser.py in current directory |
|
4
|
|
|
""" |
|
5
|
|
|
import sys |
|
6
|
|
|
import logging |
|
7
|
|
|
# sys.path.insert(0, "..") # load local freeopcua implementation |
|
8
|
|
|
from asyncua.common import xmlparser |
|
9
|
|
|
|
|
10
|
|
|
|
|
11
|
|
|
def _to_val(objs, attr, val): |
|
12
|
|
|
from asyncua import ua |
|
13
|
|
|
cls = getattr(ua, objs[0]) |
|
14
|
|
|
for o in objs[1:]: |
|
15
|
|
|
cls = getattr(ua, _get_uatype_name(cls, o)) |
|
16
|
|
|
if cls == ua.NodeId: |
|
17
|
|
|
return "NodeId.from_string('val')" |
|
18
|
|
|
return ua_type_to_python(val, _get_uatype_name(cls, attr)) |
|
19
|
|
|
|
|
20
|
|
|
|
|
21
|
|
|
def _get_uatype_name(cls, attname): |
|
22
|
|
|
for name, uat in cls.ua_types: |
|
23
|
|
|
if name == attname: |
|
24
|
|
|
return uat |
|
25
|
|
|
raise Exception(f"Could not find attribute {attname} in obj {cls}") |
|
26
|
|
|
|
|
27
|
|
|
|
|
28
|
|
|
def ua_type_to_python(val, uatype): |
|
29
|
|
|
if uatype == "String": |
|
30
|
|
|
return f"'{val}'" |
|
31
|
|
|
elif uatype in ("Bytes", "Bytes", "ByteString", "ByteArray"): |
|
32
|
|
|
return f"b'{val}'" |
|
33
|
|
|
else: |
|
34
|
|
|
return val |
|
35
|
|
|
|
|
36
|
|
|
|
|
37
|
|
|
def bname_code(string): |
|
38
|
|
|
if ":" in string: |
|
39
|
|
|
idx, name = string.split(":", 1) |
|
40
|
|
|
else: |
|
41
|
|
|
idx = 0 |
|
42
|
|
|
name = string |
|
43
|
|
|
return f"QualifiedName('{name}', {idx})" |
|
44
|
|
|
|
|
45
|
|
|
|
|
46
|
|
|
def nodeid_code(string): |
|
47
|
|
|
l = string.split(";") |
|
48
|
|
|
identifier = None |
|
49
|
|
|
namespace = 0 |
|
50
|
|
|
ntype = None |
|
51
|
|
|
srv = None |
|
52
|
|
|
nsu = None |
|
53
|
|
|
for el in l: |
|
54
|
|
|
if not el: |
|
55
|
|
|
continue |
|
56
|
|
|
k, v = el.split("=", 1) |
|
57
|
|
|
k = k.strip() |
|
58
|
|
|
v = v.strip() |
|
59
|
|
|
if k == "ns": |
|
60
|
|
|
namespace = v |
|
61
|
|
|
elif k == "i": |
|
62
|
|
|
ntype = "NumericNodeId" |
|
63
|
|
|
identifier = v |
|
64
|
|
|
elif k == "s": |
|
65
|
|
|
ntype = "StringNodeId" |
|
66
|
|
|
identifier = f"'{v}'" |
|
67
|
|
|
elif k == "g": |
|
68
|
|
|
ntype = "GuidNodeId" |
|
69
|
|
|
identifier = f"b'{v}'" |
|
70
|
|
|
elif k == "b": |
|
71
|
|
|
ntype = "ByteStringNodeId" |
|
72
|
|
|
identifier = f"b'{v}'" |
|
73
|
|
|
elif k == "srv": |
|
74
|
|
|
srv = v |
|
75
|
|
|
elif k == "nsu": |
|
76
|
|
|
nsu = v |
|
77
|
|
|
if identifier is None: |
|
78
|
|
|
raise Exception("Could not find identifier in string: " + string) |
|
79
|
|
|
return f"{ntype}({identifier}, {namespace})" |
|
80
|
|
|
|
|
81
|
|
|
|
|
82
|
|
|
class CodeGenerator: |
|
83
|
|
|
|
|
84
|
|
|
def __init__(self, input_path, output_path): |
|
85
|
|
|
self.input_path = input_path |
|
86
|
|
|
self.output_path = output_path |
|
87
|
|
|
self.output_file = None |
|
88
|
|
|
self.part = self.input_path.split(".")[-2] |
|
89
|
|
|
self.parser = None |
|
90
|
|
|
|
|
91
|
|
|
async def run(self): |
|
92
|
|
|
sys.stderr.write(f"Generating Python code {self.output_path} for XML file {self.input_path}\n") |
|
93
|
|
|
self.output_file = open(self.output_path, 'w', encoding='utf-8') |
|
94
|
|
|
self.make_header() |
|
95
|
|
|
self.parser = xmlparser.XMLParser() |
|
96
|
|
|
self.parser.parse_sync(self.input_path) |
|
97
|
|
|
for node in self.parser.get_node_datas(): |
|
98
|
|
|
if node.nodetype == 'UAObject': |
|
99
|
|
|
self.make_object_code(node) |
|
100
|
|
|
elif node.nodetype == 'UAObjectType': |
|
101
|
|
|
self.make_object_type_code(node) |
|
102
|
|
|
elif node.nodetype == 'UAVariable': |
|
103
|
|
|
self.make_variable_code(node) |
|
104
|
|
|
elif node.nodetype == 'UAVariableType': |
|
105
|
|
|
self.make_variable_type_code(node) |
|
106
|
|
|
elif node.nodetype == 'UAReferenceType': |
|
107
|
|
|
self.make_reference_code(node) |
|
108
|
|
|
elif node.nodetype == 'UADataType': |
|
109
|
|
|
self.make_datatype_code(node) |
|
110
|
|
|
elif node.nodetype == 'UAMethod': |
|
111
|
|
|
self.make_method_code(node) |
|
112
|
|
|
else: |
|
113
|
|
|
sys.stderr.write(f"Not implemented node type: {node.nodetype}\n") |
|
114
|
|
|
self.output_file.close() |
|
115
|
|
|
|
|
116
|
|
|
def writecode(self, *args): |
|
117
|
|
|
self.output_file.write(f'{" ".join(args)}\n') |
|
118
|
|
|
|
|
119
|
|
|
def make_header(self, ): |
|
120
|
|
|
self.writecode(f''' |
|
121
|
|
|
# -*- coding: utf-8 -*- |
|
122
|
|
|
""" |
|
123
|
|
|
DO NOT EDIT THIS FILE! |
|
124
|
|
|
It is automatically generated from opcfoundation.org schemas. |
|
125
|
|
|
""" |
|
126
|
|
|
|
|
127
|
|
|
from asyncua import ua |
|
128
|
|
|
from asyncua.ua import NodeId, QualifiedName, NumericNodeId, StringNodeId, GuidNodeId |
|
129
|
|
|
from asyncua.ua import NodeClass, LocalizedText |
|
130
|
|
|
|
|
131
|
|
|
|
|
132
|
|
|
def create_standard_address_space_{self.part!s}(server): |
|
133
|
|
|
''') |
|
134
|
|
|
|
|
135
|
|
|
def make_node_code(self, obj, indent): |
|
136
|
|
|
self.writecode(indent, 'node = ua.AddNodesItem()') |
|
137
|
|
|
self.writecode(indent, 'node.RequestedNewNodeId = {}'.format(nodeid_code(obj.nodeid))) |
|
138
|
|
|
self.writecode(indent, 'node.BrowseName = {}'.format(bname_code(obj.browsename))) |
|
139
|
|
|
self.writecode(indent, 'node.NodeClass = NodeClass.{0}'.format(obj.nodetype[2:])) |
|
140
|
|
|
if obj.parent and obj.parentlink: |
|
141
|
|
|
self.writecode(indent, 'node.ParentNodeId = {}'.format(nodeid_code(obj.parent))) |
|
142
|
|
|
self.writecode(indent, 'node.ReferenceTypeId = {0}'.format(self.to_ref_type(obj.parentlink))) |
|
143
|
|
|
if obj.typedef: |
|
144
|
|
|
self.writecode(indent, 'node.TypeDefinition = {}'.format(nodeid_code(obj.typedef))) |
|
145
|
|
|
|
|
146
|
|
|
def to_data_type(self, nodeid): |
|
147
|
|
|
if not nodeid: |
|
148
|
|
|
return "ua.NodeId(ua.ObjectIds.String)" |
|
149
|
|
|
if "=" in nodeid: |
|
150
|
|
|
return nodeid_code(nodeid) |
|
151
|
|
|
else: |
|
152
|
|
|
return f'ua.NodeId(ua.ObjectIds.{nodeid})' |
|
153
|
|
|
|
|
154
|
|
|
def to_ref_type(self, nodeid): |
|
155
|
|
|
if "=" not in nodeid: |
|
156
|
|
|
nodeid = self.parser.get_aliases()[nodeid] |
|
157
|
|
|
return nodeid_code(nodeid) |
|
158
|
|
|
|
|
159
|
|
|
def make_object_code(self, obj): |
|
160
|
|
|
indent = " " |
|
161
|
|
|
self.writecode(indent) |
|
162
|
|
|
self.make_node_code(obj, indent) |
|
163
|
|
|
self.writecode(indent, 'attrs = ua.ObjectAttributes()') |
|
164
|
|
|
if obj.desc: |
|
165
|
|
|
self.writecode(indent, 'attrs.Description = LocalizedText("{0}")'.format(obj.desc)) |
|
166
|
|
|
self.writecode(indent, 'attrs.DisplayName = LocalizedText("{0}")'.format(obj.displayname)) |
|
167
|
|
|
self.writecode(indent, 'attrs.EventNotifier = {0}'.format(obj.eventnotifier)) |
|
168
|
|
|
self.writecode(indent, 'node.NodeAttributes = attrs') |
|
169
|
|
|
self.writecode(indent, 'server.add_nodes([node])') |
|
170
|
|
|
self.make_refs_code(obj, indent) |
|
171
|
|
|
|
|
172
|
|
|
def make_object_type_code(self, obj): |
|
173
|
|
|
indent = " " |
|
174
|
|
|
self.writecode(indent) |
|
175
|
|
|
self.make_node_code(obj, indent) |
|
176
|
|
|
self.writecode(indent, 'attrs = ua.ObjectTypeAttributes()') |
|
177
|
|
|
if obj.desc: |
|
178
|
|
|
self.writecode(indent, 'attrs.Description = LocalizedText("{0}")'.format(obj.desc)) |
|
179
|
|
|
self.writecode(indent, 'attrs.DisplayName = LocalizedText("{0}")'.format(obj.displayname)) |
|
180
|
|
|
self.writecode(indent, 'attrs.IsAbstract = {0}'.format(obj.abstract)) |
|
181
|
|
|
self.writecode(indent, 'node.NodeAttributes = attrs') |
|
182
|
|
|
self.writecode(indent, 'server.add_nodes([node])') |
|
183
|
|
|
self.make_refs_code(obj, indent) |
|
184
|
|
|
|
|
185
|
|
|
def make_common_variable_code(self, indent, obj): |
|
186
|
|
|
if obj.desc: |
|
187
|
|
|
self.writecode(indent, 'attrs.Description = LocalizedText("{0}")'.format(obj.desc)) |
|
188
|
|
|
self.writecode(indent, 'attrs.DisplayName = LocalizedText("{0}")'.format(obj.displayname)) |
|
189
|
|
|
self.writecode(indent, 'attrs.DataType = {0}'.format(self.to_data_type(obj.datatype))) |
|
190
|
|
|
if obj.value is not None: |
|
191
|
|
|
if obj.valuetype == "ListOfExtensionObject": |
|
192
|
|
|
self.writecode(indent, 'value = []') |
|
193
|
|
|
for ext in obj.value: |
|
194
|
|
|
self.make_ext_obj_code(indent, ext) |
|
195
|
|
|
self.writecode(indent, 'value.append(extobj)') |
|
196
|
|
|
self.writecode(indent, 'attrs.Value = ua.Variant(value, ua.VariantType.ExtensionObject)') |
|
197
|
|
|
elif obj.valuetype == "ExtensionObject": |
|
198
|
|
|
self.make_ext_obj_code(indent, obj.value) |
|
199
|
|
|
self.writecode(indent, 'value = extobj') |
|
200
|
|
|
self.writecode(indent, 'attrs.Value = ua.Variant(value, ua.VariantType.ExtensionObject)') |
|
201
|
|
|
elif obj.valuetype == "ListOfLocalizedText": |
|
202
|
|
|
value = ['LocalizedText({0})'.format(repr(text)) for text in obj.value] |
|
203
|
|
|
self.writecode(indent, 'attrs.Value = [{}]'.format(','.join(value))) |
|
204
|
|
|
elif obj.valuetype == "LocalizedText": |
|
205
|
|
|
self.writecode(indent, 'attrs.Value = ua.Variant(LocalizedText("{0}"), ua.VariantType.LocalizedText)'.format(obj.value[1][1])) |
|
206
|
|
|
else: |
|
207
|
|
|
if obj.valuetype.startswith("ListOf"): |
|
208
|
|
|
obj.valuetype = obj.valuetype[6:] |
|
209
|
|
|
self.writecode( |
|
210
|
|
|
indent, |
|
211
|
|
|
f'attrs.Value = ua.Variant({obj.value!r}, ua.VariantType.{obj.valuetype})' |
|
212
|
|
|
) |
|
213
|
|
|
if obj.rank: |
|
214
|
|
|
self.writecode(indent, f'attrs.ValueRank = {obj.rank}') |
|
215
|
|
|
if obj.accesslevel: |
|
216
|
|
|
self.writecode(indent, f'attrs.AccessLevel = {obj.accesslevel}') |
|
217
|
|
|
if obj.useraccesslevel: |
|
218
|
|
|
self.writecode(indent, f'attrs.UserAccessLevel = {obj.useraccesslevel}') |
|
219
|
|
|
if obj.dimensions: |
|
220
|
|
|
self.writecode(indent, f'attrs.ArrayDimensions = {obj.dimensions}') |
|
221
|
|
|
|
|
222
|
|
|
def make_ext_obj_code(self, indent, extobj): |
|
223
|
|
|
self.writecode(indent, f'extobj = ua.{extobj.objname}()') |
|
224
|
|
|
for name, val in extobj.body: |
|
225
|
|
|
for k, v in val: |
|
226
|
|
|
if type(v) is str: |
|
227
|
|
|
val = _to_val([extobj.objname], k, v) |
|
228
|
|
|
self.writecode(indent, f'extobj.{k} = {val}') |
|
229
|
|
|
else: |
|
230
|
|
|
if k == "DataType": #hack for strange nodeid xml format |
|
231
|
|
|
self.writecode(indent, 'extobj.{0} = {1}'.format(k, nodeid_code(v[0][1]))) |
|
232
|
|
|
continue |
|
233
|
|
|
for k2, v2 in v: |
|
234
|
|
|
val2 = _to_val([extobj.objname, k], k2, v2) |
|
235
|
|
|
self.writecode(indent, f'extobj.{k}.{k2} = {val2}') |
|
236
|
|
|
|
|
237
|
|
|
def make_variable_code(self, obj): |
|
238
|
|
|
indent = " " |
|
239
|
|
|
self.writecode(indent) |
|
240
|
|
|
self.make_node_code(obj, indent) |
|
241
|
|
|
self.writecode(indent, 'attrs = ua.VariableAttributes()') |
|
242
|
|
|
if obj.minsample: |
|
243
|
|
|
self.writecode(indent, f'attrs.MinimumSamplingInterval = {obj.minsample}') |
|
244
|
|
|
self.make_common_variable_code(indent, obj) |
|
245
|
|
|
self.writecode(indent, 'node.NodeAttributes = attrs') |
|
246
|
|
|
self.writecode(indent, 'server.add_nodes([node])') |
|
247
|
|
|
self.make_refs_code(obj, indent) |
|
248
|
|
|
|
|
249
|
|
|
def make_variable_type_code(self, obj): |
|
250
|
|
|
indent = " " |
|
251
|
|
|
self.writecode(indent) |
|
252
|
|
|
self.make_node_code(obj, indent) |
|
253
|
|
|
self.writecode(indent, 'attrs = ua.VariableTypeAttributes()') |
|
254
|
|
|
if obj.desc: |
|
255
|
|
|
self.writecode(indent, 'attrs.Description = LocalizedText("{0}")'.format(obj.desc)) |
|
256
|
|
|
self.writecode(indent, 'attrs.DisplayName = LocalizedText("{0}")'.format(obj.displayname)) |
|
257
|
|
|
if obj.abstract: |
|
258
|
|
|
self.writecode(indent, f'attrs.IsAbstract = {obj.abstract}') |
|
259
|
|
|
self.make_common_variable_code(indent, obj) |
|
260
|
|
|
self.writecode(indent, 'node.NodeAttributes = attrs') |
|
261
|
|
|
self.writecode(indent, 'server.add_nodes([node])') |
|
262
|
|
|
self.make_refs_code(obj, indent) |
|
263
|
|
|
|
|
264
|
|
|
def make_method_code(self, obj): |
|
265
|
|
|
indent = " " |
|
266
|
|
|
self.writecode(indent) |
|
267
|
|
|
self.make_node_code(obj, indent) |
|
268
|
|
|
self.writecode(indent, 'attrs = ua.MethodAttributes()') |
|
269
|
|
|
if obj.desc: |
|
270
|
|
|
self.writecode(indent, 'attrs.Description = LocalizedText("{0}")'.format(obj.desc)) |
|
271
|
|
|
self.writecode(indent, 'attrs.DisplayName = LocalizedText("{0}")'.format(obj.displayname)) |
|
272
|
|
|
self.writecode(indent, 'node.NodeAttributes = attrs') |
|
273
|
|
|
self.writecode(indent, 'server.add_nodes([node])') |
|
274
|
|
|
self.make_refs_code(obj, indent) |
|
275
|
|
|
|
|
276
|
|
|
def make_reference_code(self, obj): |
|
277
|
|
|
indent = " " |
|
278
|
|
|
self.writecode(indent) |
|
279
|
|
|
self.make_node_code(obj, indent) |
|
280
|
|
|
self.writecode(indent, 'attrs = ua.ReferenceTypeAttributes()') |
|
281
|
|
|
if obj.desc: |
|
282
|
|
|
self.writecode(indent, 'attrs.Description = LocalizedText("{0}")'.format(obj.desc)) |
|
283
|
|
|
self.writecode(indent, 'attrs.DisplayName = LocalizedText("{0}")'.format(obj.displayname)) |
|
284
|
|
|
if obj. inversename: |
|
285
|
|
|
self.writecode(indent, 'attrs.InverseName = LocalizedText("{0}")'.format(obj.inversename)) |
|
286
|
|
|
if obj.abstract: |
|
287
|
|
|
self.writecode(indent, f'attrs.IsAbstract = {obj.abstract}') |
|
288
|
|
|
if obj.symmetric: |
|
289
|
|
|
self.writecode(indent, f'attrs.Symmetric = {obj.symmetric}') |
|
290
|
|
|
self.writecode(indent, 'node.NodeAttributes = attrs') |
|
291
|
|
|
self.writecode(indent, 'server.add_nodes([node])') |
|
292
|
|
|
self.make_refs_code(obj, indent) |
|
293
|
|
|
|
|
294
|
|
|
def make_datatype_code(self, obj): |
|
295
|
|
|
indent = " " |
|
296
|
|
|
self.writecode(indent) |
|
297
|
|
|
self.make_node_code(obj, indent) |
|
298
|
|
|
self.writecode(indent, 'attrs = ua.DataTypeAttributes()') |
|
299
|
|
|
if obj.desc: |
|
300
|
|
|
self.writecode(indent, u'attrs.Description = LocalizedText("{0}")'.format(obj.desc)) |
|
301
|
|
|
self.writecode(indent, 'attrs.DisplayName = LocalizedText("{0}")'.format(obj.displayname)) |
|
302
|
|
|
if obj.abstract: |
|
303
|
|
|
self.writecode(indent, f'attrs.IsAbstract = {obj.abstract}') |
|
304
|
|
|
self.writecode(indent, 'node.NodeAttributes = attrs') |
|
305
|
|
|
self.writecode(indent, 'server.add_nodes([node])') |
|
306
|
|
|
self.make_refs_code(obj, indent) |
|
307
|
|
|
|
|
308
|
|
|
def make_refs_code(self, obj, indent): |
|
309
|
|
|
if not obj.refs: |
|
310
|
|
|
return |
|
311
|
|
|
self.writecode(indent, "refs = []") |
|
312
|
|
|
for ref in obj.refs: |
|
313
|
|
|
self.writecode(indent, 'ref = ua.AddReferencesItem()') |
|
314
|
|
|
self.writecode(indent, 'ref.IsForward = {0}'.format(ref.forward)) |
|
315
|
|
|
self.writecode(indent, 'ref.ReferenceTypeId = {0}'.format(self.to_ref_type(ref.reftype))) |
|
316
|
|
|
self.writecode(indent, 'ref.SourceNodeId = {0}'.format(nodeid_code(obj.nodeid))) |
|
317
|
|
|
self.writecode(indent, 'ref.TargetNodeClass = NodeClass.DataType') |
|
318
|
|
|
self.writecode(indent, 'ref.TargetNodeId = {0}'.format(nodeid_code(ref.target))) |
|
319
|
|
|
self.writecode(indent, "refs.append(ref)") |
|
320
|
|
|
self.writecode(indent, 'server.add_references(refs)') |
|
321
|
|
|
|
|
322
|
|
|
|
|
323
|
|
|
def save_aspace_to_disk(): |
|
324
|
|
|
import os.path |
|
325
|
|
|
path = os.path.join('..', 'asyncua', 'binary_address_space.pickle') |
|
326
|
|
|
print('Saving standard address space to:', path) |
|
327
|
|
|
sys.path.append('..') |
|
328
|
|
|
from asyncua.server.standard_address_space import standard_address_space |
|
329
|
|
|
from asyncua.server.address_space import NodeManagementService, AddressSpace |
|
330
|
|
|
a_space = AddressSpace() |
|
331
|
|
|
standard_address_space.fill_address_space(NodeManagementService(a_space)) |
|
332
|
|
|
a_space.dump(path) |
|
333
|
|
|
|
|
334
|
|
|
|
|
335
|
|
|
if __name__ == '__main__': |
|
336
|
|
|
logging.basicConfig(level=logging.WARN) |
|
337
|
|
|
for i in (3, 4, 5, 8, 9, 10, 11, 13): |
|
338
|
|
|
xml_path = f'Opc.Ua.NodeSet2.Part{i}.xml' |
|
339
|
|
|
py_path = f'../asyncua/server/standard_address_space/standard_address_space_part{i}.py' |
|
340
|
|
|
CodeGenerator(xml_path, py_path).run() |
|
341
|
|
|
save_aspace_to_disk() |
|
342
|
|
|
|