Completed
Pull Request — master (#300)
by
unknown
02:23
created

asyncua.common.structures.lazy()   A

Complexity

Conditions 2

Size

Total Lines 10
Code Lines 9

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 2
eloc 9
nop 1
dl 0
loc 10
rs 9.95
c 0
b 0
f 0
1
"""
2
Support for custom structures in client and server
3
We only support a subset of features but should be enough
4
for custom structures
5
"""
6
7
import uuid
8
import logging
9
# The next two imports are for generated code
10
from datetime import datetime
11
from enum import Enum, IntEnum, EnumMeta
12
13
from asyncua import ua
14
15
from .structures104 import get_default_value, clean_name
16
17
# Use lazy loading of lxml.objectify to speed up server startup and to 
18
# slim down frozen code that doesn't require XML processing
19
import importlib.util
20
21
def lazy(fullname):
22
  try:
23
    return sys.modules[fullname]
24
  except KeyError:
25
    spec = importlib.util.find_spec(fullname)
26
    module = importlib.util.module_from_spec(spec)
27
    loader = importlib.util.LazyLoader(spec.loader)
28
    # Make module with proper locking and get it inserted into sys.modules.
29
    loader.exec_module(module)
30
    return module
31
32
objectify = lazy("lxml.objectify")
33
34
_logger = logging.getLogger(__name__)
35
36
37
class EnumType(object):
38
    def __init__(self, name):
39
        self.name = name
40
        self.fields = []
41
        self.typeid = None
42
43
    def __str__(self):
44
        return f"EnumType({self.name, self.fields})"
45
    __repr__ = __str__
46
47
    def get_code(self):
48
        code = """
49
50
class {0}(IntEnum):
51
52
    '''
53
    {0} EnumInt autogenerated from xml
54
    '''
55
56
""".format(self.name)
57
58
        for EnumeratedValue in self.fields:
59
            name = EnumeratedValue.Name
60
            value = EnumeratedValue.Value
61
            code += f"    {name} = {value}\n"
62
63
        return code
64
65
66
class EnumeratedValue:
67
    def __init__(self, name, value):
68
        if name == "None":
69
            name = "None_"
70
        name = name.replace(" ", "")
71
        self.Name = name
72
        self.Value = value
73
74
75
class Struct:
76
    def __init__(self, name):
77
        self.name = clean_name(name)
78
        self.fields = []
79
        self.typeid = None
80
81
    def __str__(self):
82
        return f"Struct(name={self.name}, fields={self.fields}"
83
84
    __repr__ = __str__
85
86
    def get_code(self):
87
        code = f"""
88
89
class {self.name}:
90
91
    '''
92
    {self.name} structure autogenerated from xml
93
    '''
94
95
"""
96
        code += '    ua_types = [\n'
97
        for field in self.fields:
98
            prefix = 'ListOf' if field.array else ''
99
            uatype = prefix + field.uatype
100
            if uatype == 'ListOfChar':
101
                uatype = 'String'
102
            code += f"        ('{field.name}', '{uatype}'),\n"
103
        code += "    ]"
104
        code += """
105
    def __str__(self):
106
        vals = [name + ": " + str(val) for name, val in self.__dict__.items()]
107
        return self.__class__.__name__ + "(" + ", ".join(vals) + ")"
108
109
    __repr__ = __str__
110
111
    def __init__(self):
112
"""
113
        if not self.fields:
114
            code += "      pass"
115
        for field in self.fields:
116
            code += f"        self.{field.name} = {field.value}\n"
117
        return code
118
119
120
class Field(object):
121
    def __init__(self, name):
122
        self.name = name
123
        self.uatype = None
124
        self.value = None
125
        self.array = False
126
127
    def __str__(self):
128
        return f"Field(name={self.name}, uatype={self.uatype}"
129
130
    __repr__ = __str__
131
132
133
class StructGenerator(object):
134
    def __init__(self):
135
        self.model = []
136
137
    def make_model_from_string(self, xml):
138
        obj = objectify.fromstring(xml)
139
        self._make_model(obj)
140
141
    def make_model_from_file(self, path):
142
        obj = objectify.parse(path)
143
        root = obj.getroot()
144
        self._make_model(root)
145
146
    def _make_model(self, root):
147
        enums = {}
148
        for child in root.iter("{*}EnumeratedType"):
149
            intenum = EnumType(child.get("Name"))
150
            for xmlfield in child.iter("{*}EnumeratedValue"):
151
                name = xmlfield.get("Name")
152
                value = xmlfield.get("Value")
153
                enumvalue = EnumeratedValue(name, value)
154
                intenum.fields.append(enumvalue)
155
                enums[child.get("Name")] = value
156
            self.model.append(intenum)
157
158
        for child in root.iter("{*}StructuredType"):
159
            struct = Struct(child.get("Name"))
160
            array = False
161
            for xmlfield in child.iter("{*}Field"):
162
                name = xmlfield.get("Name")
163
                if name.startswith("NoOf"):
164
                    array = True
165
                    continue
166
                field = Field(clean_name(name))
167
                field.uatype = xmlfield.get("TypeName")
168
                if ":" in field.uatype:
169
                    field.uatype = field.uatype.split(":")[1]
170
                field.uatype = clean_name(field.uatype)
171
                field.value = get_default_value(field.uatype, enums)
172
                if array:
173
                    field.array = True
174
                    field.value = []
175
                    array = False
176
                struct.fields.append(field)
177
            self.model.append(struct)
178
179
    def save_to_file(self, path, register=False):
180
        _file = open(path, "w+")
181
        self._make_header(_file)
182
        for struct in self.model:
183
            _file.write(struct.get_code())
184
        if register:
185
            _file.write(self._make_registration())
186
        _file.close()
187
188
    def _make_registration(self):
189
        code = "\n\n"
190
        for struct in self.model:
191
            code += f"ua.register_extension_object('{struct.name}'," \
192
                    f" ua.NodeId.from_string('{struct.typeid}'), {struct.name})\n"
193
        return code
194
195
    def get_python_classes(self, env=None):
196
        return _generate_python_class(self.model, env=env)
197
198
    def _make_header(self, _file):
199
        _file.write("""
200
'''
201
THIS FILE IS AUTOGENERATED, DO NOT EDIT!!!
202
'''
203
204
from datetime import datetime
205
import uuid
206
207
from asyncua import ua
208
""")
209
210
    def set_typeid(self, name, typeid):
211
        for struct in self.model:
212
            if struct.name == name:
213
                struct.typeid = typeid
214
                return
215
216
217
async def load_type_definitions(server, nodes=None):
218
    """
219
    Download xml from given variable node defining custom structures.
220
    If no node is given, attemps to import variables from all nodes under
221
    "0:OPC Binary"
222
    the code is generated and imported on the fly. If you know the structures
223
    are not going to be modified it might be interresting to copy the generated files
224
    and include them in you code
225
    """
226
    if nodes is None:
227
        nodes = []
228
        for desc in await server.nodes.opc_binary.get_children_descriptions():
229
            if desc.BrowseName != ua.QualifiedName("Opc.Ua"):
230
                nodes.append(server.get_node(desc.NodeId))
231
232
    structs_dict = {}
233
    generators = []
234
    for node in nodes:
235
        xml = await node.read_value()
236
        generator = StructGenerator()
237
        generators.append(generator)
238
        generator.make_model_from_string(xml)
239
        # generate and execute new code on the fly
240
        generator.get_python_classes(structs_dict)
241
        # same but using a file that is imported. This can be usefull for debugging library
242
        # name = node.read_browse_name().Name
243
        # Make sure structure names do not contain charaters that cannot be used in Python class file names
244
        # name = clean_name(name)
245
        # name = "structures_" + node.read_browse_name().Name
246
        # generator.save_and_import(name + ".py", append_to=structs_dict)
247
248
        # register classes
249
        # every children of our node should represent a class
250
        for ndesc in await node.get_children_descriptions():
251
            ndesc_node = server.get_node(ndesc.NodeId)
252
            ref_desc_list = await ndesc_node.get_references(refs=ua.ObjectIds.HasDescription, direction=ua.BrowseDirection.Inverse)
253
            if ref_desc_list:  # some server put extra things here
254
                name = clean_name(ndesc.BrowseName.Name)
255
                if name not in structs_dict:
256
                    _logger.warning("%s is found as child of binary definition node but is not found in xml", name)
257
                    continue
258
                nodeid = ref_desc_list[0].NodeId
259
                ua.register_extension_object(name, nodeid, structs_dict[name])
260
                # save the typeid if user want to create static file for type definitnion
261
                generator.set_typeid(name, nodeid.to_string())
262
263
        for key, val in structs_dict.items():
264
            if isinstance(val, EnumMeta) and key != "IntEnum":
265
                setattr(ua, key, val)
266
267
    return generators, structs_dict
268
269
270
def _generate_python_class(model, env=None):
271
    """
272
    generate Python code and execute in a new environment
273
    return a dict of structures {name: class}
274
    Rmw: Since the code is generated on the fly, in case of error the stack trace is
275
    not available and debugging is very hard...
276
    """
277
    if env is None:
278
        env = ua.__dict__
279
    #  Add the required libraries to dict
280
    if "ua" not in env:
281
        env['ua'] = ua
282
    if "datetime" not in env:
283
        env['datetime'] = datetime
284
    if "uuid" not in env:
285
        env['uuid'] = uuid
286
    if "enum" not in env:
287
        env['IntEnum'] = IntEnum
288
    # generate classes one by one and add them to dict
289
    for element in model:
290
        code = element.get_code()
291
        exec(code, env)
292
    return env
293
294
295
async def load_enums(server, env=None, force=False):
296
    """
297
    Read enumeration data types on server and generate python Enums in ua scope for them
298
    """
299
    model = []
300
301
    for desc in await server.nodes.enum_data_type.get_children_descriptions(refs=ua.ObjectIds.HasSubtype):
302
        enum_name = desc.BrowseName.Name
303
        enum_node = server.get_node(desc.NodeId)
304
        if not force and hasattr(ua, enum_name):
305
            _logger.debug("Enum type %s is already in ua namespace, ignoring", enum_name)
306
            continue
307
        c = None
308
        for child_desc in await enum_node.get_children_descriptions(refs=ua.ObjectIds.HasProperty):
309
            child_node = server.get_node(child_desc.NodeId)
310
            if child_desc.BrowseName.Name == "EnumStrings":
311
                c = await _get_enum_strings(enum_name, child_node)
312
            elif child_desc.BrowseName.Name == "EnumValues":
313
                c = await _get_enum_values(enum_name, server.get_node(child_desc.NodeId))
314
            else:
315
                _logger.warning("Unexpected children of node %s: %s", desc, child_desc)
316
        if c is not None:
317
            model.append(c)
318
    return _generate_python_class(model, env=env)
319
320
321
async def _get_enum_values(name, node):
322
    val = await node.read_value()
323
    c = EnumType(name)
324
    c.fields = [EnumeratedValue(enumval.DisplayName.Text, enumval.Value) for enumval in val]
325
    return c
326
327
328
async def _get_enum_strings(name, node):
329
    val = await node.read_value()
330
    c = EnumType(name)
331
    c.fields = [EnumeratedValue(st.Text, idx) for idx, st in enumerate(val)]
332
    return c
333