Completed
Push — master ( f41327...987601 )
by Olivier
04:37 queued 01:54
created

generate_address_space.CodeGenerator.make_common_variable_code()   D

Complexity

Conditions 13

Size

Total Lines 36
Code Lines 34

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 13
eloc 34
nop 3
dl 0
loc 36
rs 4.2
c 0
b 0
f 0

How to fix   Complexity   

Complexity

Complex classes like generate_address_space.CodeGenerator.make_common_variable_code() often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
"""
2
Generate address space code from xml file specification
3
xmlparser.py is a requirement. it is in asyncua folder but to avoid importing all code, developer can link xmlparser.py in current directory
4
"""
5
import sys
6
import logging
7
# sys.path.insert(0, "..")  # load local freeopcua implementation
8
from asyncua.common import xmlparser
9
10
11
def _to_val(objs, attr, val):
12
    from asyncua import ua
13
    cls = getattr(ua, objs[0])
14
    for o in objs[1:]:
15
        cls = getattr(ua, _get_uatype_name(cls, o))
16
    if cls == ua.NodeId:
17
        return "NodeId.from_string('val')"
18
    return ua_type_to_python(val, _get_uatype_name(cls, attr))
19
20
21
def _get_uatype_name(cls, attname):
22
    for name, uat in cls.ua_types:
23
        if name == attname:
24
            return uat
25
    raise Exception(f"Could not find attribute {attname} in obj {cls}")
26
27
28
def ua_type_to_python(val, uatype):
29
    if uatype == "String":
30
        return f"'{val}'"
31
    elif uatype in ("Bytes", "Bytes", "ByteString", "ByteArray"):
32
        return f"b'{val}'"
33
    else:
34
        return val
35
36
37
def bname_code(string):
38
    if ":" in string:
39
        idx, name = string.split(":", 1)
40
    else:
41
        idx = 0
42
        name = string
43
    return f"QualifiedName('{name}', {idx})"
44
45
46
def nodeid_code(string):
47
    l = string.split(";")
48
    identifier = None
49
    namespace = 0
50
    ntype = None
51
    srv = None
52
    nsu = None
53
    for el in l:
54
        if not el:
55
            continue
56
        k, v = el.split("=", 1)
57
        k = k.strip()
58
        v = v.strip()
59
        if k == "ns":
60
            namespace = v
61
        elif k == "i":
62
            ntype = "NumericNodeId"
63
            identifier = v
64
        elif k == "s":
65
            ntype = "StringNodeId"
66
            identifier = f"'{v}'"
67
        elif k == "g":
68
            ntype = "GuidNodeId"
69
            identifier = f"b'{v}'"
70
        elif k == "b":
71
            ntype = "ByteStringNodeId"
72
            identifier = f"b'{v}'"
73
        elif k == "srv":
74
            srv = v
75
        elif k == "nsu":
76
            nsu = v
77
    if identifier is None:
78
        raise Exception("Could not find identifier in string: " + string)
79
    return f"{ntype}({identifier}, {namespace})"
80
81
82
class CodeGenerator:
83
84
    def __init__(self, input_path, output_path):
85
        self.input_path = input_path
86
        self.output_path = output_path
87
        self.output_file = None
88
        self.part = self.input_path.split(".")[-2]
89
        self.parser = None
90
91
    async def run(self):
92
        sys.stderr.write(f"Generating Python code {self.output_path} for XML file {self.input_path}\n")
93
        self.output_file = open(self.output_path, 'w', encoding='utf-8')
94
        self.make_header()
95
        self.parser = xmlparser.XMLParser()
96
        self.parser.parse_sync(self.input_path)
97
        for node in self.parser.get_node_datas():
98
            if node.nodetype == 'UAObject':
99
                self.make_object_code(node)
100
            elif node.nodetype == 'UAObjectType':
101
                self.make_object_type_code(node)
102
            elif node.nodetype == 'UAVariable':
103
                self.make_variable_code(node)
104
            elif node.nodetype == 'UAVariableType':
105
                self.make_variable_type_code(node)
106
            elif node.nodetype == 'UAReferenceType':
107
                self.make_reference_code(node)
108
            elif node.nodetype == 'UADataType':
109
                self.make_datatype_code(node)
110
            elif node.nodetype == 'UAMethod':
111
                self.make_method_code(node)
112
            else:
113
                sys.stderr.write(f"Not implemented node type: {node.nodetype}\n")
114
        self.output_file.close()
115
116
    def writecode(self, *args):
117
        self.output_file.write(f'{" ".join(args)}\n')
118
119
    def make_header(self, ):
120
        self.writecode(f'''
121
# -*- coding: utf-8 -*-
122
"""
123
DO NOT EDIT THIS FILE!
124
It is automatically generated from opcfoundation.org schemas.
125
"""
126
127
from asyncua import ua
128
from asyncua.ua import NodeId, QualifiedName, NumericNodeId, StringNodeId, GuidNodeId
129
from asyncua.ua import NodeClass, LocalizedText
130
131
132
def create_standard_address_space_{self.part!s}(server):
133
  ''')
134
135
    def make_node_code(self, obj, indent):
136
        self.writecode(indent, 'node = ua.AddNodesItem()')
137
        self.writecode(indent, 'node.RequestedNewNodeId = {}'.format(nodeid_code(obj.nodeid)))
138
        self.writecode(indent, 'node.BrowseName = {}'.format(bname_code(obj.browsename)))
139
        self.writecode(indent, 'node.NodeClass = NodeClass.{0}'.format(obj.nodetype[2:]))
140
        if obj.parent and obj.parentlink:
141
            self.writecode(indent, 'node.ParentNodeId = {}'.format(nodeid_code(obj.parent)))
142
            self.writecode(indent, 'node.ReferenceTypeId = {0}'.format(self.to_ref_type(obj.parentlink)))
143
        if obj.typedef:
144
            self.writecode(indent, 'node.TypeDefinition = {}'.format(nodeid_code(obj.typedef)))
145
146
    def to_data_type(self, nodeid):
147
        if not nodeid:
148
            return "ua.NodeId(ua.ObjectIds.String)"
149
        if "=" in nodeid:
150
            return nodeid_code(nodeid)
151
        else:
152
            return f'ua.NodeId(ua.ObjectIds.{nodeid})'
153
154
    def to_ref_type(self, nodeid):
155
        if "=" not in nodeid:
156
            nodeid = self.parser.get_aliases()[nodeid]
157
        return nodeid_code(nodeid)
158
159
    def make_object_code(self, obj):
160
        indent = "   "
161
        self.writecode(indent)
162
        self.make_node_code(obj, indent)
163
        self.writecode(indent, 'attrs = ua.ObjectAttributes()')
164
        if obj.desc:
165
            self.writecode(indent, 'attrs.Description = LocalizedText("{0}")'.format(obj.desc))
166
        self.writecode(indent, 'attrs.DisplayName = LocalizedText("{0}")'.format(obj.displayname))
167
        self.writecode(indent, 'attrs.EventNotifier = {0}'.format(obj.eventnotifier))
168
        self.writecode(indent, 'node.NodeAttributes = attrs')
169
        self.writecode(indent, 'server.add_nodes([node])')
170
        self.make_refs_code(obj, indent)
171
172
    def make_object_type_code(self, obj):
173
        indent = "   "
174
        self.writecode(indent)
175
        self.make_node_code(obj, indent)
176
        self.writecode(indent, 'attrs = ua.ObjectTypeAttributes()')
177
        if obj.desc:
178
            self.writecode(indent, 'attrs.Description = LocalizedText("{0}")'.format(obj.desc))
179
        self.writecode(indent, 'attrs.DisplayName = LocalizedText("{0}")'.format(obj.displayname))
180
        self.writecode(indent, 'attrs.IsAbstract = {0}'.format(obj.abstract))
181
        self.writecode(indent, 'node.NodeAttributes = attrs')
182
        self.writecode(indent, 'server.add_nodes([node])')
183
        self.make_refs_code(obj, indent)
184
185
    def make_common_variable_code(self, indent, obj):
186
        if obj.desc:
187
            self.writecode(indent, 'attrs.Description = LocalizedText("{0}")'.format(obj.desc))
188
        self.writecode(indent, 'attrs.DisplayName = LocalizedText("{0}")'.format(obj.displayname))
189
        self.writecode(indent, 'attrs.DataType = {0}'.format(self.to_data_type(obj.datatype)))
190
        if obj.value is not None:
191
            if obj.valuetype == "ListOfExtensionObject":
192
                self.writecode(indent, 'value = []')
193
                for ext in obj.value:
194
                    self.make_ext_obj_code(indent, ext)
195
                    self.writecode(indent, 'value.append(extobj)')
196
                self.writecode(indent, 'attrs.Value = ua.Variant(value, ua.VariantType.ExtensionObject)')
197
            elif obj.valuetype == "ExtensionObject":
198
                self.make_ext_obj_code(indent, obj.value)
199
                self.writecode(indent, 'value = extobj')
200
                self.writecode(indent, 'attrs.Value = ua.Variant(value, ua.VariantType.ExtensionObject)')
201
            elif obj.valuetype == "ListOfLocalizedText":
202
                value = ['LocalizedText({0})'.format(repr(text)) for text in obj.value]
203
                self.writecode(indent, 'attrs.Value = [{}]'.format(','.join(value)))
204
            elif obj.valuetype == "LocalizedText":
205
                self.writecode(indent, 'attrs.Value = ua.Variant(LocalizedText("{0}"), ua.VariantType.LocalizedText)'.format(obj.value[1][1]))
206
            else:
207
                if obj.valuetype.startswith("ListOf"):
208
                    obj.valuetype = obj.valuetype[6:]
209
                self.writecode(
210
                    indent,
211
                    f'attrs.Value = ua.Variant({obj.value!r}, ua.VariantType.{obj.valuetype})'
212
                )
213
        if obj.rank:
214
            self.writecode(indent, f'attrs.ValueRank = {obj.rank}')
215
        if obj.accesslevel:
216
            self.writecode(indent, f'attrs.AccessLevel = {obj.accesslevel}')
217
        if obj.useraccesslevel:
218
            self.writecode(indent, f'attrs.UserAccessLevel = {obj.useraccesslevel}')
219
        if obj.dimensions:
220
            self.writecode(indent, f'attrs.ArrayDimensions = {obj.dimensions}')
221
222
    def make_ext_obj_code(self, indent, extobj):
223
        self.writecode(indent, f'extobj = ua.{extobj.objname}()')
224
        for name, val in extobj.body:
225
            for k, v in val:
226
                if type(v) is str:
227
                    val = _to_val([extobj.objname], k, v)
228
                    self.writecode(indent, f'extobj.{k} = {val}')
229
                else:
230
                    if k == "DataType":  #hack for strange nodeid xml format
231
                        self.writecode(indent, 'extobj.{0} = {1}'.format(k, nodeid_code(v[0][1])))
232
                        continue
233
                    for k2, v2 in v:
234
                        val2 = _to_val([extobj.objname, k], k2, v2)
235
                        self.writecode(indent, f'extobj.{k}.{k2} = {val2}')
236
237
    def make_variable_code(self, obj):
238
        indent = "   "
239
        self.writecode(indent)
240
        self.make_node_code(obj, indent)
241
        self.writecode(indent, 'attrs = ua.VariableAttributes()')
242
        if obj.minsample:
243
            self.writecode(indent, f'attrs.MinimumSamplingInterval = {obj.minsample}')
244
        self.make_common_variable_code(indent, obj)
245
        self.writecode(indent, 'node.NodeAttributes = attrs')
246
        self.writecode(indent, 'server.add_nodes([node])')
247
        self.make_refs_code(obj, indent)
248
249
    def make_variable_type_code(self, obj):
250
        indent = "   "
251
        self.writecode(indent)
252
        self.make_node_code(obj, indent)
253
        self.writecode(indent, 'attrs = ua.VariableTypeAttributes()')
254
        if obj.desc:
255
            self.writecode(indent, 'attrs.Description = LocalizedText("{0}")'.format(obj.desc))
256
        self.writecode(indent, 'attrs.DisplayName = LocalizedText("{0}")'.format(obj.displayname))
257
        if obj.abstract:
258
            self.writecode(indent, f'attrs.IsAbstract = {obj.abstract}')
259
        self.make_common_variable_code(indent, obj)
260
        self.writecode(indent, 'node.NodeAttributes = attrs')
261
        self.writecode(indent, 'server.add_nodes([node])')
262
        self.make_refs_code(obj, indent)
263
264
    def make_method_code(self, obj):
265
        indent = "   "
266
        self.writecode(indent)
267
        self.make_node_code(obj, indent)
268
        self.writecode(indent, 'attrs = ua.MethodAttributes()')
269
        if obj.desc:
270
            self.writecode(indent, 'attrs.Description = LocalizedText("{0}")'.format(obj.desc))
271
        self.writecode(indent, 'attrs.DisplayName = LocalizedText("{0}")'.format(obj.displayname))
272
        self.writecode(indent, 'node.NodeAttributes = attrs')
273
        self.writecode(indent, 'server.add_nodes([node])')
274
        self.make_refs_code(obj, indent)
275
276
    def make_reference_code(self, obj):
277
        indent = "   "
278
        self.writecode(indent)
279
        self.make_node_code(obj, indent)
280
        self.writecode(indent, 'attrs = ua.ReferenceTypeAttributes()')
281
        if obj.desc:
282
            self.writecode(indent, 'attrs.Description = LocalizedText("{0}")'.format(obj.desc))
283
        self.writecode(indent, 'attrs.DisplayName = LocalizedText("{0}")'.format(obj.displayname))
284
        if obj. inversename:
285
            self.writecode(indent, 'attrs.InverseName = LocalizedText("{0}")'.format(obj.inversename))
286
        if obj.abstract:
287
            self.writecode(indent, f'attrs.IsAbstract = {obj.abstract}')
288
        if obj.symmetric:
289
            self.writecode(indent, f'attrs.Symmetric = {obj.symmetric}')
290
        self.writecode(indent, 'node.NodeAttributes = attrs')
291
        self.writecode(indent, 'server.add_nodes([node])')
292
        self.make_refs_code(obj, indent)
293
294
    def make_datatype_code(self, obj):
295
        indent = "   "
296
        self.writecode(indent)
297
        self.make_node_code(obj, indent)
298
        self.writecode(indent, 'attrs = ua.DataTypeAttributes()')
299
        if obj.desc:
300
            self.writecode(indent, u'attrs.Description = LocalizedText("{0}")'.format(obj.desc))
301
        self.writecode(indent, 'attrs.DisplayName = LocalizedText("{0}")'.format(obj.displayname))
302
        if obj.abstract:
303
            self.writecode(indent, f'attrs.IsAbstract = {obj.abstract}')
304
        self.writecode(indent, 'node.NodeAttributes = attrs')
305
        self.writecode(indent, 'server.add_nodes([node])')
306
        self.make_refs_code(obj, indent)
307
308
    def make_refs_code(self, obj, indent):
309
        if not obj.refs:
310
            return
311
        self.writecode(indent, "refs = []")
312
        for ref in obj.refs:
313
            self.writecode(indent, 'ref = ua.AddReferencesItem()')
314
            self.writecode(indent, 'ref.IsForward = {0}'.format(ref.forward))
315
            self.writecode(indent, 'ref.ReferenceTypeId = {0}'.format(self.to_ref_type(ref.reftype)))
316
            self.writecode(indent, 'ref.SourceNodeId = {0}'.format(nodeid_code(obj.nodeid)))
317
            self.writecode(indent, 'ref.TargetNodeClass = NodeClass.DataType')
318
            self.writecode(indent, 'ref.TargetNodeId = {0}'.format(nodeid_code(ref.target)))
319
            self.writecode(indent, "refs.append(ref)")
320
        self.writecode(indent, 'server.add_references(refs)')
321
322
323
def save_aspace_to_disk():
324
    import os.path
325
    path = os.path.join('..', 'asyncua', 'binary_address_space.pickle')
326
    print('Saving standard address space to:', path)
327
    sys.path.append('..')
328
    from asyncua.server.standard_address_space import standard_address_space
329
    from asyncua.server.address_space import NodeManagementService, AddressSpace
330
    a_space = AddressSpace()
331
    standard_address_space.fill_address_space(NodeManagementService(a_space))
332
    a_space.dump(path)
333
334
335
if __name__ == '__main__':
336
    logging.basicConfig(level=logging.WARN)
337
    for i in (3, 4, 5, 8, 9, 10, 11, 13):
338
        xml_path = f'Opc.Ua.NodeSet2.Part{i}.xml'
339
        py_path = f'../asyncua/server/standard_address_space/standard_address_space_part{i}.py'
340
        CodeGenerator(xml_path, py_path).run()
341
    save_aspace_to_disk()
342