Completed
Push — master ( 33bf38...9e8dfd )
by Olivier
03:48
created

load_enums()   B

Complexity

Conditions 6

Size

Total Lines 22

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 1
CRAP Score 36.3228

Importance

Changes 1
Bugs 0 Features 1
Metric Value
cc 6
c 1
b 0
f 1
dl 0
loc 22
ccs 1
cts 18
cp 0.0556
crap 36.3228
rs 7.7857
1
"""
2
Support for custom structures in client and server
3
We only support a subset of features but should be enough
4
for custom structures
5
"""
6
7 1
import os
8 1
import importlib
9 1
import re
10 1
import logging
11
# The next two imports are for generated code
12 1
from datetime import datetime
13 1
import uuid
14
15 1
from lxml import objectify
16
17
18 1
from opcua.ua.ua_binary import Primitives
19 1
from opcua import ua
20
21 1
from enum import IntEnum, EnumMeta
22
23
24 1
logger = logging.getLogger(__name__)
25
26
27 1
def get_default_value(uatype, enums):
28 1
    if uatype == "String":
29 1
        return "None" 
30 1
    elif uatype == "Guid":
31 1
        return "uuid.uuid4()" 
32 1
    elif uatype in ("ByteString", "CharArray", "Char"):
33 1
        return None 
34 1
    elif uatype == "Boolean":
35 1
        return "True"
36 1
    elif uatype == "DateTime":
37 1
        return "datetime.utcnow()"
38 1
    elif uatype in ("Int16", "Int32", "Int64", "UInt16", "UInt32", "UInt64", "Double", "Float", "Byte", "SByte"):
39 1
        return 0
40 1
    elif uatype in enums:
41 1
        return "ua." + uatype + "(" + enums[uatype] + ")"
42 1
    elif issubclass(eval("ua."+uatype), IntEnum):
43
        return "ua." + uatype + "(" + list(eval("ua."+uatype))[0] + ")"
44
    else:
45 1
        return "ua." + uatype + "()"
46
47
48 1
class EnumType(object):
49 1
    def __init__(self, name):
50 1
        self.name = name
51 1
        self.fields = []
52 1
        self.typeid = None
53
54 1
    def get_code(self):
55 1
        code = """
56
57
class {0}(IntEnum):
58
59
    '''
60
    {0} EnumInt autogenerated from xml
61
    '''
62
63
""".format(self.name)
64
65 1
        for EnumeratedValue in self.fields:
66 1
            name = EnumeratedValue.Name
67 1
            value = EnumeratedValue.Value
68 1
            code += "    {} = {}\n".format(name, value)
69
70 1
        return code
71
72
73 1
class EnumeratedValue(object):
74 1
    def __init__(self, name, value):
75 1
        if name == "None":
76
            name = "None_"
77 1
        self.Name = name
78 1
        self.Value = value
79
80
81 1
class Struct(object):
82 1
    def __init__(self, name):
83 1
        self.name = name
84 1
        self.fields = []
85 1
        self.typeid = None
86
87 1
    def get_code(self):
88 1
        code = """
89
90
class {0}(object):
91
92
    '''
93
    {0} structure autogenerated from xml
94
    '''
95
96
""".format(self.name)
97
98 1
        code += "    ua_types = [\n"
99 1
        for field in self.fields:
100 1
            prefix = "ListOf" if field.array else ""
101 1
            uatype = prefix + field.uatype
102 1
            if uatype == "ListOfChar":
103
                uatype = "String"
104 1
            code += "        ('{}', '{}'),\n".format(field.name, uatype)
105
106 1
        code += "    ]"
107 1
        code += """
108
109
    def __init__(self):
110
""".format(self.name)
111 1
        if not self.fields:
112
            code += "      pass"
113 1
        for field in self.fields:
114 1
            code += "        self.{} = {}\n".format(field.name, field.value)
115 1
        return code
116
117
118 1
class Field(object):
119 1
    def __init__(self, name):
120 1
        self.name = name
121 1
        self.uatype = None
122 1
        self.value = None
123 1
        self.array = False
124
125
126 1
class StructGenerator(object):
127 1
    def __init__(self):
128 1
        self.model = []
129
130 1
    def make_model_from_string(self, xml):
131 1
        obj = objectify.fromstring(xml)
132 1
        self._make_model(obj)
133
134 1
    def make_model_from_file(self, path):
135 1
        obj = objectify.parse(path)
136 1
        root = obj.getroot()
137 1
        self._make_model(root)
138
139 1
    def _make_model(self, root):
140 1
        enums = {}
141 1
        for child in root.iter("{*}EnumeratedType"):
142 1
            intenum = EnumType(child.get("Name"))
143 1
            for xmlfield in child.iter("{*}EnumeratedValue"):
144 1
                name = xmlfield.get("Name")
145 1
                value = xmlfield.get("Value")
146 1
                enumvalue = EnumeratedValue(name, value)
147 1
                intenum.fields.append(enumvalue)
148 1
                enums[child.get("Name")] = value
149 1
            self.model.append(intenum)
150
            
151 1
        for child in root.iter("{*}StructuredType"):
152 1
            struct = Struct(child.get("Name"))
153 1
            array = False
154 1
            for xmlfield in child.iter("{*}Field"):
155 1
                name = xmlfield.get("Name")
156 1
                if name.startswith("NoOf"):
157 1
                    array = True
158 1
                    continue
159 1
                field = Field(_clean_name(name))
160 1
                field.uatype = xmlfield.get("TypeName")
161 1
                if ":" in field.uatype:
162 1
                    field.uatype = field.uatype.split(":")[1]
163 1
                field.uatype = _clean_name(field.uatype)
164 1
                field.value = get_default_value(field.uatype, enums)
165 1
                if array:
166 1
                    field.array = True
167 1
                    field.value = []
168 1
                    array = False
169 1
                struct.fields.append(field)
170 1
            self.model.append(struct)
171
172 1
    def save_to_file(self, path, register=False):
173 1
        _file = open(path, "wt")
174 1
        self._make_header(_file)
175 1
        for struct in self.model:
176 1
            _file.write(struct.get_code())
177 1
        if register:
178
            _file.write(self._make_registration())
179 1
        _file.close()
180
181 1
    def _make_registration(self):
182
        code = "\n\n"
183
        for struct in self.model:
184
            code += "ua.register_extension_object('{name}', ua.NodeId.from_string('{nodeid}'), {name})\n".format(name=struct.name, nodeid=struct.typeid)
185
        return code
186
187 1
    def get_python_classes(self, env=None):
188 1
        return _generate_python_class(self.model, env=env)
189
190 1
    def save_and_import(self, path, append_to=None):
191
        """
192
        save the new structures to a python file which be used later
193
        import the result and return resulting classes in a dict
194
        if append_to is a dict, the classes are added to the dict
195
        """
196 1
        self.save_to_file(path)
197 1
        name = os.path.basename(path)
198 1
        name = os.path.splitext(name)[0]
199 1
        mymodule = importlib.import_module(name)
200 1
        if append_to is None:
201 1
            result = {}
202
        else:
203
            result = append_to
204 1
        for struct in self.model:
205 1
            result[struct.name] = getattr(mymodule, struct.name)
206 1
        return result
207
208 1
    def _make_header(self, _file):
209 1
        _file.write("""
210
'''
211
THIS FILE IS AUTOGENERATED, DO NOT EDIT!!!
212
'''
213
214
from datetime import datetime
215
import uuid
216
217
from opcua import ua
218
""")
219
220 1
    def set_typeid(self, name, typeid):
221 1
        for struct in self.model:
222 1
            if struct.name == name:
223 1
                struct.typeid = typeid
224 1
                return
225
226
227 1
def load_type_definitions(server, nodes=None):
228
    """
229
    Download xml from given variable node defining custom structures.
230
    If no node is given, attemps to import variables from all nodes under
231
    "0:OPC Binary"
232
    the code is generated and imported on the fly. If you know the structures
233
    are not going to be modified it might be interresting to copy the generated files
234
    and include them in you code
235
    """
236 1
    if nodes is None:
237 1
        nodes = []
238 1
        for desc in server.nodes.opc_binary.get_children_descriptions():
239 1
            if desc.BrowseName != ua.QualifiedName("Opc.Ua"):
240 1
                nodes.append(server.get_node(desc.NodeId))
241
    
242 1
    structs_dict = {}
243 1
    generators = []
244 1
    for node in nodes:
245 1
        xml = node.get_value()
246 1
        xml = xml.decode("utf-8")
247 1
        generator = StructGenerator()
248 1
        generators.append(generator)
249 1
        generator.make_model_from_string(xml)
250
        # generate and execute new code on the fly
251 1
        generator.get_python_classes(structs_dict)
252
        # same but using a file that is imported. This can be usefull for debugging library
253
        #name = node.get_browse_name().Name
254
        # Make sure structure names do not contain charaters that cannot be used in Python class file names
255
        #name = _clean_name(name)
256
        #name = "structures_" + node.get_browse_name().Name
257
        #generator.save_and_import(name + ".py", append_to=structs_dict)
258
259
        # register classes
260
        # every children of our node should represent a class
261 1
        for ndesc in node.get_children_descriptions():
262 1
            ndesc_node = server.get_node(ndesc.NodeId)
263 1
            ref_desc_list = ndesc_node.get_references(refs=ua.ObjectIds.HasDescription, direction=ua.BrowseDirection.Inverse)
264 1
            if ref_desc_list:  #some server put extra things here
265 1
                name = _clean_name(ndesc.BrowseName.Name)
266 1
                if not name in structs_dict:
267
                    logger.warning("Error {} is found as child of binary definition node but is not found in xml".format(name))
268
                    continue
269 1
                nodeid = ref_desc_list[0].NodeId
270 1
                ua.register_extension_object(name, nodeid, structs_dict[name])
271
                # save the typeid if user want to create static file for type definitnion
272 1
                generator.set_typeid(name, nodeid.to_string())
273
274 1
        for key in structs_dict.keys():
275 1
            if type(structs_dict[key]) is EnumMeta and key is not "IntEnum":
276 1
                import opcua.ua
277 1
                setattr(opcua.ua, key, structs_dict[key])
278
279 1
    return generators, structs_dict
280
281
282 1
def _clean_name(name):
283
    """
284
    Remove characters that might be present in  OPC UA structures
285
    but cannot be part of of Python class names
286
    """
287 1
    name = re.sub(r'\W+', '_', name)
288 1
    name = re.sub(r'^[0-9]+', r'_\g<0>', name)
289
290 1
    return name
291
292
293 1
def _generate_python_class(model, env=None):
294
    """
295
    generate Python code and execute in a new environment
296
    return a dict of structures {name: class}
297
    Rmw: Since the code is generated on the fly, in case of error the stack trace is 
298
    not available and debugging is very hard...
299
    """
300 1
    if env is None:
301
        env = {}
302
    #  Add the required libraries to dict
303 1
    if "ua" not in env:
304 1
        env['ua'] = ua
305 1
    if "datetime" not in env:
306 1
        env['datetime'] = datetime
307 1
    if "uuid" not in env:
308 1
        env['uuid'] = uuid
309 1
    if "enum" not in env:
310 1
        env['IntEnum'] = IntEnum
311
    # generate classes one by one and add them to dict
312 1
    for element in model:
313 1
        code = element.get_code()
314 1
        print("Generating", code)
315 1
        exec(code, env)
316 1
    return env
317
318
319 1
def load_enums(server, env=None):
320
    """
321
    read enumeration data types and generate python enums for them
322
    Not sure this methods is necessary, alternatives are welcome
323
    """
324
    model = []
325
    nodes = server.nodes.enum_data_type.get_children()
326
    if env is None:
327
        env = ua.__dict__
328
    for node in nodes:
329
        name = node.get_browse_name().Name
330
        try:
331
            def_node = node.get_child("0:EnumStrings")
332
        except ua.UaError as ex:
333
            print(node, ex)
334
            continue
335
        val = def_node.get_value()
336
        c = EnumType(name)
337
        c.fields = [EnumeratedValue(st.Text, idx) for idx, st in enumerate(val)]
338
        if not hasattr(ua, c.name):
339
            model.append(c)
340
    return _generate_python_class(model, env=env)
341