Passed
Pull Request — master (#238)
by Olivier
02:25
created

asyncua.common.structures.get_default_value()   C

Complexity

Conditions 10

Size

Total Lines 21
Code Lines 19

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 10
eloc 19
nop 2
dl 0
loc 21
rs 5.9999
c 0
b 0
f 0

How to fix   Complexity   

Complexity

Complex classes like asyncua.common.structures.get_default_value() often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
"""
2
Support for custom structures in client and server
3
We only support a subset of features but should be enough
4
for custom structures
5
"""
6
7
import uuid
8
import logging
9
# The next two imports are for generated code
10
from datetime import datetime
11
from enum import Enum, IntEnum, EnumMeta
12
from lxml import objectify
13
14
from asyncua import ua
15
16
from .structures104 import get_default_value, clean_name
17
18
_logger = logging.getLogger(__name__)
19
20
21
class EnumType(object):
22
    def __init__(self, name):
23
        self.name = name
24
        self.fields = []
25
        self.typeid = None
26
27
    def get_code(self):
28
        code = """
29
30
class {0}(IntEnum):
31
32
    '''
33
    {0} EnumInt autogenerated from xml
34
    '''
35
36
""".format(self.name)
37
38
        for EnumeratedValue in self.fields:
39
            name = EnumeratedValue.Name
40
            value = EnumeratedValue.Value
41
            code += f"    {name} = {value}\n"
42
43
        return code
44
45
46
class EnumeratedValue(object):
47
    def __init__(self, name, value):
48
        if name == "None":
49
            name = "None_"
50
        name = name.replace(" ", "")
51
        self.Name = name
52
        self.Value = value
53
54
55
class Struct(object):
56
    def __init__(self, name):
57
        self.name = clean_name(name)
58
        self.fields = []
59
        self.typeid = None
60
61
    def __str__(self):
62
        return f"Struct(name={self.name}, fields={self.fields}"
63
64
    __repr__ = __str__
65
66
    def get_code(self):
67
        code = f"""
68
69
class {self.name}(object):
70
71
    '''
72
    {self.name} structure autogenerated from xml
73
    '''
74
75
"""
76
        code += '    ua_types = [\n'
77
        for field in self.fields:
78
            prefix = 'ListOf' if field.array else ''
79
            uatype = prefix + field.uatype
80
            if uatype == 'ListOfChar':
81
                uatype = 'String'
82
            code += f"        ('{field.name}', '{uatype}'),\n"
83
        code += "    ]"
84
        code += """
85
    def __str__(self):
86
        vals = [name + ": " + str(val) for name, val in self.__dict__.items()]
87
        return self.__class__.__name__ + "(" + ", ".join(vals) + ")"
88
89
    __repr__ = __str__
90
91
    def __init__(self):
92
"""
93
        if not self.fields:
94
            code += "      pass"
95
        for field in self.fields:
96
            code += f"        self.{field.name} = {field.value}\n"
97
        return code
98
99
100
class Field(object):
101
    def __init__(self, name):
102
        self.name = name
103
        self.uatype = None
104
        self.value = None
105
        self.array = False
106
107
    def __str__(self):
108
        return f"Field(name={self.name}, uatype={self.uatype}"
109
110
    __repr__ = __str__
111
112
113
class StructGenerator(object):
114
    def __init__(self):
115
        self.model = []
116
117
    def make_model_from_string(self, xml):
118
        obj = objectify.fromstring(xml)
119
        self._make_model(obj)
120
121
    def make_model_from_file(self, path):
122
        obj = objectify.parse(path)
123
        root = obj.getroot()
124
        self._make_model(root)
125
126
    def _make_model(self, root):
127
        enums = {}
128
        for child in root.iter("{*}EnumeratedType"):
129
            intenum = EnumType(child.get("Name"))
130
            for xmlfield in child.iter("{*}EnumeratedValue"):
131
                name = xmlfield.get("Name")
132
                value = xmlfield.get("Value")
133
                enumvalue = EnumeratedValue(name, value)
134
                intenum.fields.append(enumvalue)
135
                enums[child.get("Name")] = value
136
            self.model.append(intenum)
137
138
        for child in root.iter("{*}StructuredType"):
139
            struct = Struct(child.get("Name"))
140
            array = False
141
            for xmlfield in child.iter("{*}Field"):
142
                name = xmlfield.get("Name")
143
                if name.startswith("NoOf"):
144
                    array = True
145
                    continue
146
                field = Field(clean_name(name))
147
                field.uatype = xmlfield.get("TypeName")
148
                if ":" in field.uatype:
149
                    field.uatype = field.uatype.split(":")[1]
150
                field.uatype = clean_name(field.uatype)
151
                field.value = get_default_value(field.uatype, enums)
152
                if array:
153
                    field.array = True
154
                    field.value = []
155
                    array = False
156
                struct.fields.append(field)
157
            self.model.append(struct)
158
159
    def save_to_file(self, path, register=False):
160
        _file = open(path, "w+")
161
        self._make_header(_file)
162
        for struct in self.model:
163
            _file.write(struct.get_code())
164
        if register:
165
            _file.write(self._make_registration())
166
        _file.close()
167
168
    def _make_registration(self):
169
        code = "\n\n"
170
        for struct in self.model:
171
            code += f"ua.register_extension_object('{struct.name}'," \
172
                    f" ua.NodeId.from_string('{struct.typeid}'), {struct.name})\n"
173
        return code
174
175
    def get_python_classes(self, env=None):
176
        return _generate_python_class(self.model, env=env)
177
178
    def _make_header(self, _file):
179
        _file.write("""
180
'''
181
THIS FILE IS AUTOGENERATED, DO NOT EDIT!!!
182
'''
183
184
from datetime import datetime
185
import uuid
186
187
from asyncua import ua
188
""")
189
190
    def set_typeid(self, name, typeid):
191
        for struct in self.model:
192
            if struct.name == name:
193
                struct.typeid = typeid
194
                return
195
196
197
async def load_type_definitions(server, nodes=None):
198
    """
199
    Download xml from given variable node defining custom structures.
200
    If no node is given, attemps to import variables from all nodes under
201
    "0:OPC Binary"
202
    the code is generated and imported on the fly. If you know the structures
203
    are not going to be modified it might be interresting to copy the generated files
204
    and include them in you code
205
    """
206
    if nodes is None:
207
        nodes = []
208
        for desc in await server.nodes.opc_binary.get_children_descriptions():
209
            if desc.BrowseName != ua.QualifiedName("Opc.Ua"):
210
                nodes.append(server.get_node(desc.NodeId))
211
212
    structs_dict = {}
213
    generators = []
214
    for node in nodes:
215
        xml = await node.read_value()
216
        generator = StructGenerator()
217
        generators.append(generator)
218
        generator.make_model_from_string(xml)
219
        # generate and execute new code on the fly
220
        generator.get_python_classes(structs_dict)
221
        # same but using a file that is imported. This can be usefull for debugging library
222
        # name = node.read_browse_name().Name
223
        # Make sure structure names do not contain charaters that cannot be used in Python class file names
224
        # name = clean_name(name)
225
        # name = "structures_" + node.read_browse_name().Name
226
        # generator.save_and_import(name + ".py", append_to=structs_dict)
227
228
        # register classes
229
        # every children of our node should represent a class
230
        for ndesc in await node.get_children_descriptions():
231
            ndesc_node = server.get_node(ndesc.NodeId)
232
            ref_desc_list = await ndesc_node.get_references(refs=ua.ObjectIds.HasDescription, direction=ua.BrowseDirection.Inverse)
233
            if ref_desc_list:  # some server put extra things here
234
                name = clean_name(ndesc.BrowseName.Name)
235
                if name not in structs_dict:
236
                    _logger.warning("%s is found as child of binary definition node but is not found in xml", name)
237
                    continue
238
                nodeid = ref_desc_list[0].NodeId
239
                ua.register_extension_object(name, nodeid, structs_dict[name])
240
                # save the typeid if user want to create static file for type definitnion
241
                generator.set_typeid(name, nodeid.to_string())
242
243
        for key, val in structs_dict.items():
244
            if isinstance(val, EnumMeta) and key != "IntEnum":
245
                setattr(ua, key, val)
246
247
    return generators, structs_dict
248
249
250
def _generate_python_class(model, env=None):
251
    """
252
    generate Python code and execute in a new environment
253
    return a dict of structures {name: class}
254
    Rmw: Since the code is generated on the fly, in case of error the stack trace is
255
    not available and debugging is very hard...
256
    """
257
    if env is None:
258
        env = {}
259
    #  Add the required libraries to dict
260
    if "ua" not in env:
261
        env['ua'] = ua
262
    if "datetime" not in env:
263
        env['datetime'] = datetime
264
    if "uuid" not in env:
265
        env['uuid'] = uuid
266
    if "enum" not in env:
267
        env['IntEnum'] = IntEnum
268
    # generate classes one by one and add them to dict
269
    for element in model:
270
        code = element.get_code()
271
        exec(code, env)
272
    return env
273
274
275
async def load_enums(server, env=None):
276
    """
277
    Read enumeration data types on server and generate python Enums in ua scope for them
278
    """
279
    model = []
280
    nodes = await server.nodes.enum_data_type.get_children()
281
    if env is None:
282
        env = ua.__dict__
283
    for node in nodes:
284
        name = (await node.read_browse_name()).Name
285
        try:
286
            c = await _get_enum_strings(name, node)
287
        except ua.UaError as ex:
288
            try:
289
                c = await _get_enum_values(name, node)
290
            except ua.UaError as ex:
291
                _logger.warning(f"Node {name}, {node} under DataTypes/Enumeration," f" does not seem to have a child called EnumString or EumValue: {ex}")
292
                continue
293
        if not hasattr(ua, c.name):
294
            _logger.warning("Adding enum %s to ua namespace", c)
295
            model.append(c)
296
    return _generate_python_class(model, env=env)
297
298
299
async def _get_enum_values(name, node):
300
    def_node = await node.get_child("0:EnumValues")
301
    val = await def_node.read_value()
302
    c = EnumType(name)
303
    c.fields = [EnumeratedValue(enumval.DisplayName.Text, enumval.Value) for enumval in val]
304
    return c
305
306
307
async def _get_enum_strings(name, node):
308
    def_node = await node.get_child("0:EnumStrings")
309
    val = await def_node.read_value()
310
    c = EnumType(name)
311
    c.fields = [EnumeratedValue(st.Text, idx) for idx, st in enumerate(val)]
312
    return c
313