Test Failed
Pull Request — master (#300)
by
unknown
02:14
created

asyncua.common.structures.lazy()   A

Complexity

Conditions 2

Size

Total Lines 10
Code Lines 9

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 2
eloc 9
nop 1
dl 0
loc 10
rs 9.95
c 0
b 0
f 0
1
"""
2
Support for custom structures in client and server
3
We only support a subset of features but should be enough
4
for custom structures
5
"""
6
7
import uuid
8
import logging
9
# The next two imports are for generated code
10
from datetime import datetime
11
from enum import Enum, IntEnum, EnumMeta
12
13
from asyncua import ua
14
15
from .structures104 import get_default_value, clean_name
16
17
# Use lazy loading of lxml.objectify to speed up server startup and to 
18
# slim down frozen code that doesn't require XML processing. See post on
19
# StackExchange https://stackoverflow.com/questions/42703908/how-do-i-use-importlib-lazyloader
20
import sys
21
import importlib.util
22
23
def lazy(fullname):
24
    try:
25
        return sys.modules[fullname]
26
    except KeyError:
27
        spec = importlib.util.find_spec(fullname)
28
        module = importlib.util.module_from_spec(spec)
29
        loader = importlib.util.LazyLoader.factory(spec.loader)
30
        # Make module with proper locking and get it inserted into sys.modules.
31
        loader.exec_module(module)
32
        return module
33
34
objectify = lazy("lxml.objectify")
35
36
_logger = logging.getLogger(__name__)
37
38
39
class EnumType(object):
40
    def __init__(self, name):
41
        self.name = name
42
        self.fields = []
43
        self.typeid = None
44
45
    def __str__(self):
46
        return f"EnumType({self.name, self.fields})"
47
    __repr__ = __str__
48
49
    def get_code(self):
50
        code = """
51
52
class {0}(IntEnum):
53
54
    '''
55
    {0} EnumInt autogenerated from xml
56
    '''
57
58
""".format(self.name)
59
60
        for EnumeratedValue in self.fields:
61
            name = EnumeratedValue.Name
62
            value = EnumeratedValue.Value
63
            code += f"    {name} = {value}\n"
64
65
        return code
66
67
68
class EnumeratedValue:
69
    def __init__(self, name, value):
70
        if name == "None":
71
            name = "None_"
72
        name = name.replace(" ", "")
73
        self.Name = name
74
        self.Value = value
75
76
77
class Struct:
78
    def __init__(self, name):
79
        self.name = clean_name(name)
80
        self.fields = []
81
        self.typeid = None
82
83
    def __str__(self):
84
        return f"Struct(name={self.name}, fields={self.fields}"
85
86
    __repr__ = __str__
87
88
    def get_code(self):
89
        code = f"""
90
91
class {self.name}:
92
93
    '''
94
    {self.name} structure autogenerated from xml
95
    '''
96
97
"""
98
        code += '    ua_types = [\n'
99
        for field in self.fields:
100
            prefix = 'ListOf' if field.array else ''
101
            uatype = prefix + field.uatype
102
            if uatype == 'ListOfChar':
103
                uatype = 'String'
104
            code += f"        ('{field.name}', '{uatype}'),\n"
105
        code += "    ]"
106
        code += """
107
    def __str__(self):
108
        vals = [name + ": " + str(val) for name, val in self.__dict__.items()]
109
        return self.__class__.__name__ + "(" + ", ".join(vals) + ")"
110
111
    __repr__ = __str__
112
113
    def __init__(self):
114
"""
115
        if not self.fields:
116
            code += "      pass"
117
        for field in self.fields:
118
            code += f"        self.{field.name} = {field.value}\n"
119
        return code
120
121
122
class Field(object):
123
    def __init__(self, name):
124
        self.name = name
125
        self.uatype = None
126
        self.value = None
127
        self.array = False
128
129
    def __str__(self):
130
        return f"Field(name={self.name}, uatype={self.uatype}"
131
132
    __repr__ = __str__
133
134
135
class StructGenerator(object):
136
    def __init__(self):
137
        self.model = []
138
139
    def make_model_from_string(self, xml):
140
        obj = objectify.fromstring(xml)
141
        self._make_model(obj)
142
143
    def make_model_from_file(self, path):
144
        obj = objectify.parse(path)
145
        root = obj.getroot()
146
        self._make_model(root)
147
148
    def _make_model(self, root):
149
        enums = {}
150
        for child in root.iter("{*}EnumeratedType"):
151
            intenum = EnumType(child.get("Name"))
152
            for xmlfield in child.iter("{*}EnumeratedValue"):
153
                name = xmlfield.get("Name")
154
                value = xmlfield.get("Value")
155
                enumvalue = EnumeratedValue(name, value)
156
                intenum.fields.append(enumvalue)
157
                enums[child.get("Name")] = value
158
            self.model.append(intenum)
159
160
        for child in root.iter("{*}StructuredType"):
161
            struct = Struct(child.get("Name"))
162
            array = False
163
            for xmlfield in child.iter("{*}Field"):
164
                name = xmlfield.get("Name")
165
                if name.startswith("NoOf"):
166
                    array = True
167
                    continue
168
                field = Field(clean_name(name))
169
                field.uatype = xmlfield.get("TypeName")
170
                if ":" in field.uatype:
171
                    field.uatype = field.uatype.split(":")[1]
172
                field.uatype = clean_name(field.uatype)
173
                field.value = get_default_value(field.uatype, enums)
174
                if array:
175
                    field.array = True
176
                    field.value = []
177
                    array = False
178
                struct.fields.append(field)
179
            self.model.append(struct)
180
181
    def save_to_file(self, path, register=False):
182
        _file = open(path, "w+")
183
        self._make_header(_file)
184
        for struct in self.model:
185
            _file.write(struct.get_code())
186
        if register:
187
            _file.write(self._make_registration())
188
        _file.close()
189
190
    def _make_registration(self):
191
        code = "\n\n"
192
        for struct in self.model:
193
            code += f"ua.register_extension_object('{struct.name}'," \
194
                    f" ua.NodeId.from_string('{struct.typeid}'), {struct.name})\n"
195
        return code
196
197
    def get_python_classes(self, env=None):
198
        return _generate_python_class(self.model, env=env)
199
200
    def _make_header(self, _file):
201
        _file.write("""
202
'''
203
THIS FILE IS AUTOGENERATED, DO NOT EDIT!!!
204
'''
205
206
from datetime import datetime
207
import uuid
208
209
from asyncua import ua
210
""")
211
212
    def set_typeid(self, name, typeid):
213
        for struct in self.model:
214
            if struct.name == name:
215
                struct.typeid = typeid
216
                return
217
218
219
async def load_type_definitions(server, nodes=None):
220
    """
221
    Download xml from given variable node defining custom structures.
222
    If no node is given, attemps to import variables from all nodes under
223
    "0:OPC Binary"
224
    the code is generated and imported on the fly. If you know the structures
225
    are not going to be modified it might be interresting to copy the generated files
226
    and include them in you code
227
    """
228
    if nodes is None:
229
        nodes = []
230
        for desc in await server.nodes.opc_binary.get_children_descriptions():
231
            if desc.BrowseName != ua.QualifiedName("Opc.Ua"):
232
                nodes.append(server.get_node(desc.NodeId))
233
234
    structs_dict = {}
235
    generators = []
236
    for node in nodes:
237
        xml = await node.read_value()
238
        generator = StructGenerator()
239
        generators.append(generator)
240
        generator.make_model_from_string(xml)
241
        # generate and execute new code on the fly
242
        generator.get_python_classes(structs_dict)
243
        # same but using a file that is imported. This can be usefull for debugging library
244
        # name = node.read_browse_name().Name
245
        # Make sure structure names do not contain charaters that cannot be used in Python class file names
246
        # name = clean_name(name)
247
        # name = "structures_" + node.read_browse_name().Name
248
        # generator.save_and_import(name + ".py", append_to=structs_dict)
249
250
        # register classes
251
        # every children of our node should represent a class
252
        for ndesc in await node.get_children_descriptions():
253
            ndesc_node = server.get_node(ndesc.NodeId)
254
            ref_desc_list = await ndesc_node.get_references(refs=ua.ObjectIds.HasDescription, direction=ua.BrowseDirection.Inverse)
255
            if ref_desc_list:  # some server put extra things here
256
                name = clean_name(ndesc.BrowseName.Name)
257
                if name not in structs_dict:
258
                    _logger.warning("%s is found as child of binary definition node but is not found in xml", name)
259
                    continue
260
                nodeid = ref_desc_list[0].NodeId
261
                ua.register_extension_object(name, nodeid, structs_dict[name])
262
                # save the typeid if user want to create static file for type definitnion
263
                generator.set_typeid(name, nodeid.to_string())
264
265
        for key, val in structs_dict.items():
266
            if isinstance(val, EnumMeta) and key != "IntEnum":
267
                setattr(ua, key, val)
268
269
    return generators, structs_dict
270
271
272
def _generate_python_class(model, env=None):
273
    """
274
    generate Python code and execute in a new environment
275
    return a dict of structures {name: class}
276
    Rmw: Since the code is generated on the fly, in case of error the stack trace is
277
    not available and debugging is very hard...
278
    """
279
    if env is None:
280
        env = ua.__dict__
281
    #  Add the required libraries to dict
282
    if "ua" not in env:
283
        env['ua'] = ua
284
    if "datetime" not in env:
285
        env['datetime'] = datetime
286
    if "uuid" not in env:
287
        env['uuid'] = uuid
288
    if "enum" not in env:
289
        env['IntEnum'] = IntEnum
290
    # generate classes one by one and add them to dict
291
    for element in model:
292
        code = element.get_code()
293
        exec(code, env)
294
    return env
295
296
297
async def load_enums(server, env=None, force=False):
298
    """
299
    Read enumeration data types on server and generate python Enums in ua scope for them
300
    """
301
    model = []
302
303
    for desc in await server.nodes.enum_data_type.get_children_descriptions(refs=ua.ObjectIds.HasSubtype):
304
        enum_name = desc.BrowseName.Name
305
        enum_node = server.get_node(desc.NodeId)
306
        if not force and hasattr(ua, enum_name):
307
            _logger.debug("Enum type %s is already in ua namespace, ignoring", enum_name)
308
            continue
309
        c = None
310
        for child_desc in await enum_node.get_children_descriptions(refs=ua.ObjectIds.HasProperty):
311
            child_node = server.get_node(child_desc.NodeId)
312
            if child_desc.BrowseName.Name == "EnumStrings":
313
                c = await _get_enum_strings(enum_name, child_node)
314
            elif child_desc.BrowseName.Name == "EnumValues":
315
                c = await _get_enum_values(enum_name, server.get_node(child_desc.NodeId))
316
            else:
317
                _logger.warning("Unexpected children of node %s: %s", desc, child_desc)
318
        if c is not None:
319
            model.append(c)
320
    return _generate_python_class(model, env=env)
321
322
323
async def _get_enum_values(name, node):
324
    val = await node.read_value()
325
    c = EnumType(name)
326
    c.fields = [EnumeratedValue(enumval.DisplayName.Text, enumval.Value) for enumval in val]
327
    return c
328
329
330
async def _get_enum_strings(name, node):
331
    val = await node.read_value()
332
    c = EnumType(name)
333
    c.fields = [EnumeratedValue(st.Text, idx) for idx, st in enumerate(val)]
334
    return c
335