Passed
Push — master ( 9e5bc7...0261d3 )
by Olivier
02:54
created

StructGenerator._make_model()   D

Complexity

Conditions 12

Size

Total Lines 37
Code Lines 35

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 12
eloc 35
nop 2
dl 0
loc 37
rs 4.8
c 0
b 0
f 0

How to fix   Complexity   

Complexity

Complex classes like asyncua.common.structures.StructGenerator._make_model() often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
"""
2
Support for custom structures in client and server
3
We only support a subset of features but should be enough
4
for custom structures
5
"""
6
7
import uuid
8
import logging
9
# The next two imports are for generated code
10
from datetime import datetime
11
from enum import Enum, IntEnum, EnumMeta
12
from xml.etree import ElementTree as ET
13
from asyncua import ua
14
15
from .structures104 import get_default_value, clean_name
16
17
_logger = logging.getLogger(__name__)
18
19
20
class EnumType(object):
21
    def __init__(self, name):
22
        self.name = name
23
        self.fields = []
24
        self.typeid = None
25
26
    def __str__(self):
27
        return f"EnumType({self.name, self.fields})"
28
    __repr__ = __str__
29
30
    def get_code(self):
31
        code = """
32
33
class {0}(IntEnum):
34
35
    '''
36
    {0} EnumInt autogenerated from xml
37
    '''
38
39
""".format(self.name)
40
41
        for EnumeratedValue in self.fields:
42
            name = EnumeratedValue.Name
43
            value = EnumeratedValue.Value
44
            code += f"    {name} = {value}\n"
45
46
        return code
47
48
49
class EnumeratedValue:
50
    def __init__(self, name, value):
51
        if name == "None":
52
            name = "None_"
53
        name = name.replace(" ", "")
54
        self.Name = name
55
        self.Value = value
56
57
58
class Struct:
59
    def __init__(self, name):
60
        self.name = clean_name(name)
61
        self.fields = []
62
        self.typeid = None
63
64
    def __str__(self):
65
        return f"Struct(name={self.name}, fields={self.fields}"
66
67
    __repr__ = __str__
68
69
    def get_code(self):
70
        code = f"""
71
72
class {self.name}:
73
74
    '''
75
    {self.name} structure autogenerated from xml
76
    '''
77
78
"""
79
        code += '    ua_types = [\n'
80
        for field in self.fields:
81
            prefix = 'ListOf' if field.array else ''
82
            uatype = prefix + field.uatype
83
            if uatype == 'ListOfChar':
84
                uatype = 'String'
85
            code += f"        ('{field.name}', '{uatype}'),\n"
86
        code += "    ]"
87
        code += """
88
    def __str__(self):
89
        vals = [name + ": " + str(val) for name, val in self.__dict__.items()]
90
        return self.__class__.__name__ + "(" + ", ".join(vals) + ")"
91
92
    __repr__ = __str__
93
94
    def __init__(self):
95
"""
96
        if not self.fields:
97
            code += "      pass"
98
        for field in self.fields:
99
            code += f"        self.{field.name} = {field.value}\n"
100
        return code
101
102
103
class Field(object):
104
    def __init__(self, name):
105
        self.name = name
106
        self.uatype = None
107
        self.value = None
108
        self.array = False
109
110
    def __str__(self):
111
        return f"Field(name={self.name}, uatype={self.uatype})"
112
113
    __repr__ = __str__
114
115
116
class StructGenerator(object):
117
    def __init__(self):
118
        self.model = []
119
120
    def make_model_from_string(self, xml):
121
        obj = ET.fromstring(xml)
122
        self._make_model(obj)
123
124
    def make_model_from_file(self, path):
125
        obj = ET.parse(path)
126
        root = obj.getroot()
127
        self._make_model(root)
128
129
    def _make_model(self, root):
130
        enums = {}
131
        for child in root:
132
            if child.tag.endswith("EnumeratedType"):
133
                intenum = EnumType(child.get("Name"))
134
                for xmlfield in child:
135
                    if xmlfield.tag.endswith("EnumeratedValue"):
136
                        name = xmlfield.get("Name")
137
                        value = xmlfield.get("Value")
138
                        enumvalue = EnumeratedValue(name, value)
139
                        intenum.fields.append(enumvalue)
140
                        enums[child.get("Name")] = value
141
                self.model.append(intenum)
142
143
        for child in root:
144
            if child.tag.endswith("StructuredType"):
145
                struct = Struct(child.get("Name"))
146
                array = False
147
                # these lines can be reduced in >= Python3.8 with root.iterfind("{*}Field") and similar
148
                for xmlfield in child:
149
                    if xmlfield.tag.endswith("Field"):
150
                        name = xmlfield.get("Name")
151
                        if name.startswith("NoOf"):
152
                            array = True
153
                            continue
154
                        field = Field(clean_name(name))
155
                        field.uatype = xmlfield.get("TypeName")
156
                        if ":" in field.uatype:
157
                            field.uatype = field.uatype.split(":")[1]
158
                        field.uatype = clean_name(field.uatype)
159
                        field.value = get_default_value(field.uatype, enums)
160
                        if array:
161
                            field.array = True
162
                            field.value = []
163
                            array = False
164
                        struct.fields.append(field)
165
                self.model.append(struct)
166
167
    def save_to_file(self, path, register=False):
168
        _file = open(path, "w+")
169
        self._make_header(_file)
170
        for struct in self.model:
171
            _file.write(struct.get_code())
172
        if register:
173
            _file.write(self._make_registration())
174
        _file.close()
175
176
    def _make_registration(self):
177
        code = "\n\n"
178
        for struct in self.model:
179
            code += f"ua.register_extension_object('{struct.name}'," \
180
                    f" ua.NodeId.from_string('{struct.typeid}'), {struct.name})\n"
181
        return code
182
183
    def get_python_classes(self, env=None):
184
        return _generate_python_class(self.model, env=env)
185
186
    def _make_header(self, _file):
187
        _file.write("""
188
'''
189
THIS FILE IS AUTOGENERATED, DO NOT EDIT!!!
190
'''
191
192
from datetime import datetime
193
import uuid
194
195
from asyncua import ua
196
""")
197
198
    def set_typeid(self, name, typeid):
199
        for struct in self.model:
200
            if struct.name == name:
201
                struct.typeid = typeid
202
                return
203
204
205
async def load_type_definitions(server, nodes=None):
206
    """
207
    Download xml from given variable node defining custom structures.
208
    If no node is given, attemps to import variables from all nodes under
209
    "0:OPC Binary"
210
    the code is generated and imported on the fly. If you know the structures
211
    are not going to be modified it might be interresting to copy the generated files
212
    and include them in you code
213
    """
214
    if nodes is None:
215
        nodes = []
216
        for desc in await server.nodes.opc_binary.get_children_descriptions():
217
            if desc.BrowseName != ua.QualifiedName("Opc.Ua"):
218
                nodes.append(server.get_node(desc.NodeId))
219
220
    structs_dict = {}
221
    generators = []
222
    for node in nodes:
223
        xml = await node.read_value()
224
        generator = StructGenerator()
225
        generators.append(generator)
226
        generator.make_model_from_string(xml)
227
        # generate and execute new code on the fly
228
        generator.get_python_classes(structs_dict)
229
        # same but using a file that is imported. This can be usefull for debugging library
230
        # name = node.read_browse_name().Name
231
        # Make sure structure names do not contain charaters that cannot be used in Python class file names
232
        # name = clean_name(name)
233
        # name = "structures_" + node.read_browse_name().Name
234
        # generator.save_and_import(name + ".py", append_to=structs_dict)
235
236
        # register classes
237
        # every children of our node should represent a class
238
        for ndesc in await node.get_children_descriptions():
239
            ndesc_node = server.get_node(ndesc.NodeId)
240
            ref_desc_list = await ndesc_node.get_references(refs=ua.ObjectIds.HasDescription, direction=ua.BrowseDirection.Inverse)
241
            if ref_desc_list:  # some server put extra things here
242
                name = clean_name(ndesc.BrowseName.Name)
243
                if name not in structs_dict:
244
                    _logger.warning("%s is found as child of binary definition node but is not found in xml", name)
245
                    continue
246
                nodeid = ref_desc_list[0].NodeId
247
                ua.register_extension_object(name, nodeid, structs_dict[name])
248
                # save the typeid if user want to create static file for type definitnion
249
                generator.set_typeid(name, nodeid.to_string())
250
251
        for key, val in structs_dict.items():
252
            if isinstance(val, EnumMeta) and key != "IntEnum":
253
                setattr(ua, key, val)
254
255
    return generators, structs_dict
256
257
258
def _generate_python_class(model, env=None):
259
    """
260
    generate Python code and execute in a new environment
261
    return a dict of structures {name: class}
262
    Rmw: Since the code is generated on the fly, in case of error the stack trace is
263
    not available and debugging is very hard...
264
    """
265
    if env is None:
266
        env = ua.__dict__
267
    #  Add the required libraries to dict
268
    if "ua" not in env:
269
        env['ua'] = ua
270
    if "datetime" not in env:
271
        env['datetime'] = datetime
272
    if "uuid" not in env:
273
        env['uuid'] = uuid
274
    if "enum" not in env:
275
        env['IntEnum'] = IntEnum
276
    # generate classes one by one and add them to dict
277
    for element in model:
278
        code = element.get_code()
279
        exec(code, env)
280
    return env
281
282
283
async def load_enums(server, env=None, force=False):
284
    """
285
    Read enumeration data types on server and generate python Enums in ua scope for them
286
    """
287
    model = []
288
289
    for desc in await server.nodes.enum_data_type.get_children_descriptions(refs=ua.ObjectIds.HasSubtype):
290
        enum_name = desc.BrowseName.Name
291
        enum_node = server.get_node(desc.NodeId)
292
        if not force and hasattr(ua, enum_name):
293
            _logger.debug("Enum type %s is already in ua namespace, ignoring", enum_name)
294
            continue
295
        c = None
296
        for child_desc in await enum_node.get_children_descriptions(refs=ua.ObjectIds.HasProperty):
297
            child_node = server.get_node(child_desc.NodeId)
298
            if child_desc.BrowseName.Name == "EnumStrings":
299
                c = await _get_enum_strings(enum_name, child_node)
300
            elif child_desc.BrowseName.Name == "EnumValues":
301
                c = await _get_enum_values(enum_name, server.get_node(child_desc.NodeId))
302
            else:
303
                _logger.warning("Unexpected children of node %s: %s", desc, child_desc)
304
        if c is not None:
305
            model.append(c)
306
    return _generate_python_class(model, env=env)
307
308
309
async def _get_enum_values(name, node):
310
    val = await node.read_value()
311
    c = EnumType(name)
312
    c.fields = [EnumeratedValue(enumval.DisplayName.Text, enumval.Value) for enumval in val]
313
    return c
314
315
316
async def _get_enum_strings(name, node):
317
    val = await node.read_value()
318
    c = EnumType(name)
319
    c.fields = [EnumeratedValue(st.Text, idx) for idx, st in enumerate(val)]
320
    return c
321