Test Failed
Pull Request — master (#300)
by
unknown
02:17
created

asyncua.common.structures.lazy()   A

Complexity

Conditions 2

Size

Total Lines 10
Code Lines 9

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 2
eloc 9
nop 1
dl 0
loc 10
rs 9.95
c 0
b 0
f 0
1
"""
2
Support for custom structures in client and server
3
We only support a subset of features but should be enough
4
for custom structures
5
"""
6
7
import uuid
8
import logging
9
# The next two imports are for generated code
10
from datetime import datetime
11
from enum import Enum, IntEnum, EnumMeta
12
13
from asyncua import ua
14
15
from .structures104 import get_default_value, clean_name
16
17
# Use lazy loading of lxml.objectify to speed up server startup and to 
18
# slim down frozen code that doesn't require XML processing. See post on
19
# StackExchange https://stackoverflow.com/questions/42703908/how-do-i-use-importlib-lazyloader
20
import sys
21
import importlib.util
22
23
def lazy(fullname):
24
    try:
25
        return sys.modules[fullname]
26
    except KeyError:
27
        spec = importlib.util.find_spec(fullname)
28
        module = importlib.util.module_from_spec(spec)
29
        loader = importlib.util.LazyLoader.factory(spec.loader)
30
        # Make module with proper locking and get it inserted into sys.modules.
31
        loader.exec_module(module)
32
        return module
33
34
lxml = lazy("lxml")
35
objectify = lxml.objectify
36
37
_logger = logging.getLogger(__name__)
38
39
40
class EnumType(object):
41
    def __init__(self, name):
42
        self.name = name
43
        self.fields = []
44
        self.typeid = None
45
46
    def __str__(self):
47
        return f"EnumType({self.name, self.fields})"
48
    __repr__ = __str__
49
50
    def get_code(self):
51
        code = """
52
53
class {0}(IntEnum):
54
55
    '''
56
    {0} EnumInt autogenerated from xml
57
    '''
58
59
""".format(self.name)
60
61
        for EnumeratedValue in self.fields:
62
            name = EnumeratedValue.Name
63
            value = EnumeratedValue.Value
64
            code += f"    {name} = {value}\n"
65
66
        return code
67
68
69
class EnumeratedValue:
70
    def __init__(self, name, value):
71
        if name == "None":
72
            name = "None_"
73
        name = name.replace(" ", "")
74
        self.Name = name
75
        self.Value = value
76
77
78
class Struct:
79
    def __init__(self, name):
80
        self.name = clean_name(name)
81
        self.fields = []
82
        self.typeid = None
83
84
    def __str__(self):
85
        return f"Struct(name={self.name}, fields={self.fields}"
86
87
    __repr__ = __str__
88
89
    def get_code(self):
90
        code = f"""
91
92
class {self.name}:
93
94
    '''
95
    {self.name} structure autogenerated from xml
96
    '''
97
98
"""
99
        code += '    ua_types = [\n'
100
        for field in self.fields:
101
            prefix = 'ListOf' if field.array else ''
102
            uatype = prefix + field.uatype
103
            if uatype == 'ListOfChar':
104
                uatype = 'String'
105
            code += f"        ('{field.name}', '{uatype}'),\n"
106
        code += "    ]"
107
        code += """
108
    def __str__(self):
109
        vals = [name + ": " + str(val) for name, val in self.__dict__.items()]
110
        return self.__class__.__name__ + "(" + ", ".join(vals) + ")"
111
112
    __repr__ = __str__
113
114
    def __init__(self):
115
"""
116
        if not self.fields:
117
            code += "      pass"
118
        for field in self.fields:
119
            code += f"        self.{field.name} = {field.value}\n"
120
        return code
121
122
123
class Field(object):
124
    def __init__(self, name):
125
        self.name = name
126
        self.uatype = None
127
        self.value = None
128
        self.array = False
129
130
    def __str__(self):
131
        return f"Field(name={self.name}, uatype={self.uatype}"
132
133
    __repr__ = __str__
134
135
136
class StructGenerator(object):
137
    def __init__(self):
138
        self.model = []
139
140
    def make_model_from_string(self, xml):
141
        obj = objectify.fromstring(xml)
142
        self._make_model(obj)
143
144
    def make_model_from_file(self, path):
145
        obj = objectify.parse(path)
146
        root = obj.getroot()
147
        self._make_model(root)
148
149
    def _make_model(self, root):
150
        enums = {}
151
        for child in root.iter("{*}EnumeratedType"):
152
            intenum = EnumType(child.get("Name"))
153
            for xmlfield in child.iter("{*}EnumeratedValue"):
154
                name = xmlfield.get("Name")
155
                value = xmlfield.get("Value")
156
                enumvalue = EnumeratedValue(name, value)
157
                intenum.fields.append(enumvalue)
158
                enums[child.get("Name")] = value
159
            self.model.append(intenum)
160
161
        for child in root.iter("{*}StructuredType"):
162
            struct = Struct(child.get("Name"))
163
            array = False
164
            for xmlfield in child.iter("{*}Field"):
165
                name = xmlfield.get("Name")
166
                if name.startswith("NoOf"):
167
                    array = True
168
                    continue
169
                field = Field(clean_name(name))
170
                field.uatype = xmlfield.get("TypeName")
171
                if ":" in field.uatype:
172
                    field.uatype = field.uatype.split(":")[1]
173
                field.uatype = clean_name(field.uatype)
174
                field.value = get_default_value(field.uatype, enums)
175
                if array:
176
                    field.array = True
177
                    field.value = []
178
                    array = False
179
                struct.fields.append(field)
180
            self.model.append(struct)
181
182
    def save_to_file(self, path, register=False):
183
        _file = open(path, "w+")
184
        self._make_header(_file)
185
        for struct in self.model:
186
            _file.write(struct.get_code())
187
        if register:
188
            _file.write(self._make_registration())
189
        _file.close()
190
191
    def _make_registration(self):
192
        code = "\n\n"
193
        for struct in self.model:
194
            code += f"ua.register_extension_object('{struct.name}'," \
195
                    f" ua.NodeId.from_string('{struct.typeid}'), {struct.name})\n"
196
        return code
197
198
    def get_python_classes(self, env=None):
199
        return _generate_python_class(self.model, env=env)
200
201
    def _make_header(self, _file):
202
        _file.write("""
203
'''
204
THIS FILE IS AUTOGENERATED, DO NOT EDIT!!!
205
'''
206
207
from datetime import datetime
208
import uuid
209
210
from asyncua import ua
211
""")
212
213
    def set_typeid(self, name, typeid):
214
        for struct in self.model:
215
            if struct.name == name:
216
                struct.typeid = typeid
217
                return
218
219
220
async def load_type_definitions(server, nodes=None):
221
    """
222
    Download xml from given variable node defining custom structures.
223
    If no node is given, attemps to import variables from all nodes under
224
    "0:OPC Binary"
225
    the code is generated and imported on the fly. If you know the structures
226
    are not going to be modified it might be interresting to copy the generated files
227
    and include them in you code
228
    """
229
    if nodes is None:
230
        nodes = []
231
        for desc in await server.nodes.opc_binary.get_children_descriptions():
232
            if desc.BrowseName != ua.QualifiedName("Opc.Ua"):
233
                nodes.append(server.get_node(desc.NodeId))
234
235
    structs_dict = {}
236
    generators = []
237
    for node in nodes:
238
        xml = await node.read_value()
239
        generator = StructGenerator()
240
        generators.append(generator)
241
        generator.make_model_from_string(xml)
242
        # generate and execute new code on the fly
243
        generator.get_python_classes(structs_dict)
244
        # same but using a file that is imported. This can be usefull for debugging library
245
        # name = node.read_browse_name().Name
246
        # Make sure structure names do not contain charaters that cannot be used in Python class file names
247
        # name = clean_name(name)
248
        # name = "structures_" + node.read_browse_name().Name
249
        # generator.save_and_import(name + ".py", append_to=structs_dict)
250
251
        # register classes
252
        # every children of our node should represent a class
253
        for ndesc in await node.get_children_descriptions():
254
            ndesc_node = server.get_node(ndesc.NodeId)
255
            ref_desc_list = await ndesc_node.get_references(refs=ua.ObjectIds.HasDescription, direction=ua.BrowseDirection.Inverse)
256
            if ref_desc_list:  # some server put extra things here
257
                name = clean_name(ndesc.BrowseName.Name)
258
                if name not in structs_dict:
259
                    _logger.warning("%s is found as child of binary definition node but is not found in xml", name)
260
                    continue
261
                nodeid = ref_desc_list[0].NodeId
262
                ua.register_extension_object(name, nodeid, structs_dict[name])
263
                # save the typeid if user want to create static file for type definitnion
264
                generator.set_typeid(name, nodeid.to_string())
265
266
        for key, val in structs_dict.items():
267
            if isinstance(val, EnumMeta) and key != "IntEnum":
268
                setattr(ua, key, val)
269
270
    return generators, structs_dict
271
272
273
def _generate_python_class(model, env=None):
274
    """
275
    generate Python code and execute in a new environment
276
    return a dict of structures {name: class}
277
    Rmw: Since the code is generated on the fly, in case of error the stack trace is
278
    not available and debugging is very hard...
279
    """
280
    if env is None:
281
        env = ua.__dict__
282
    #  Add the required libraries to dict
283
    if "ua" not in env:
284
        env['ua'] = ua
285
    if "datetime" not in env:
286
        env['datetime'] = datetime
287
    if "uuid" not in env:
288
        env['uuid'] = uuid
289
    if "enum" not in env:
290
        env['IntEnum'] = IntEnum
291
    # generate classes one by one and add them to dict
292
    for element in model:
293
        code = element.get_code()
294
        exec(code, env)
295
    return env
296
297
298
async def load_enums(server, env=None, force=False):
299
    """
300
    Read enumeration data types on server and generate python Enums in ua scope for them
301
    """
302
    model = []
303
304
    for desc in await server.nodes.enum_data_type.get_children_descriptions(refs=ua.ObjectIds.HasSubtype):
305
        enum_name = desc.BrowseName.Name
306
        enum_node = server.get_node(desc.NodeId)
307
        if not force and hasattr(ua, enum_name):
308
            _logger.debug("Enum type %s is already in ua namespace, ignoring", enum_name)
309
            continue
310
        c = None
311
        for child_desc in await enum_node.get_children_descriptions(refs=ua.ObjectIds.HasProperty):
312
            child_node = server.get_node(child_desc.NodeId)
313
            if child_desc.BrowseName.Name == "EnumStrings":
314
                c = await _get_enum_strings(enum_name, child_node)
315
            elif child_desc.BrowseName.Name == "EnumValues":
316
                c = await _get_enum_values(enum_name, server.get_node(child_desc.NodeId))
317
            else:
318
                _logger.warning("Unexpected children of node %s: %s", desc, child_desc)
319
        if c is not None:
320
            model.append(c)
321
    return _generate_python_class(model, env=env)
322
323
324
async def _get_enum_values(name, node):
325
    val = await node.read_value()
326
    c = EnumType(name)
327
    c.fields = [EnumeratedValue(enumval.DisplayName.Text, enumval.Value) for enumval in val]
328
    return c
329
330
331
async def _get_enum_strings(name, node):
332
    val = await node.read_value()
333
    c = EnumType(name)
334
    c.fields = [EnumeratedValue(st.Text, idx) for idx, st in enumerate(val)]
335
    return c
336