Completed
Push — master ( 0e5e79...d552f1 )
by Olivier
05:52
created

StructGenerator.get_python_classes()   C

Complexity

Conditions 7

Size

Total Lines 23

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 14
CRAP Score 7.0145

Importance

Changes 3
Bugs 0 Features 0
Metric Value
cc 7
c 3
b 0
f 0
dl 0
loc 23
ccs 14
cts 15
cp 0.9333
crap 7.0145
rs 5.5
1
"""
2
Support for custom structures in client and server
3
We only support a subset of features but should be enough
4
for custom structures
5
"""
6
7 1
import os
8 1
import importlib
9 1
import re
10 1
import logging
11
# The next two imports are for generated code
12 1
from datetime import datetime
13 1
import uuid
14
15 1
from lxml import objectify
16
17
18 1
from opcua.ua.ua_binary import Primitives
19 1
from opcua import ua
20
21 1
from enum import IntEnum, EnumMeta
22
23 1
def get_default_value(uatype, enums):
24 1
    if uatype == "String":
25 1
        return "None" 
26 1
    elif uatype == "Guid":
27 1
        return "uuid.uuid4()" 
28 1
    elif uatype in ("ByteString", "CharArray", "Char"):
29 1
        return None 
30 1
    elif uatype == "Boolean":
31 1
        return "True"
32 1
    elif uatype == "DateTime":
33 1
        return "datetime.utcnow()"
34 1
    elif uatype in ("Int16", "Int32", "Int64", "UInt16", "UInt32", "UInt64", "Double", "Float", "Byte", "SByte"):
35 1
        return 0
36 1
    elif uatype in enums:
37 1
        return "ua." + uatype + "(" + enums[uatype] + ")"
38 1
    elif issubclass(eval("ua."+uatype), IntEnum):
39
        return "ua." + uatype + "(" + list(eval("ua."+uatype))[0] + ")"
40
    else:
41 1
        return "ua." + uatype + "()"
42
43 1
class EnumType(object):
44 1
    def __init__(self, name ):
45 1
        self.name = name
46 1
        self.fields= []
47 1
        self.typeid = None
48
        
49 1
    def get_code(self):
50 1
        code = """
51
52
53
54
class {0}(IntEnum):
55
56
    '''
57
    {0} EnumInt autogenerated from xml
58
    '''
59
60
""".format(self.name)
61
62 1
        for EnumeratedValue in self.fields:
63 1
            name = EnumeratedValue.Name
64 1
            value = EnumeratedValue.Value
65 1
            code += "    {} = {}\n".format(name, value)
66
          
67 1
        return code
68
        
69 1
class EnumeratedValue(object):
70 1
    def __init__(self, name, value):
71 1
        self.Name=name
72 1
        self.Value=value
73
        
74 1
class Struct(object):
75 1
    def __init__(self, name):
76 1
        self.name = name
77 1
        self.fields = []
78 1
        self.typeid = None
79
80 1
    def get_code(self):
81 1
        code = """
82
83
class {0}(object):
84
85
    '''
86
    {0} structure autogenerated from xml
87
    '''
88
89
""".format(self.name)
90
91 1
        code += "    ua_types = [\n"
92 1
        for field in self.fields:
93 1
            prefix = "ListOf" if field.array else ""
94 1
            uatype = prefix + field.uatype
95 1
            if uatype == "ListOfChar":
96
                uatype = "String"
97 1
            code += "        ('{}', '{}'),\n".format(field.name, uatype)
98
99 1
        code += "    ]"
100 1
        code += """
101
102
    def __init__(self):
103
""".format(self.name)
104 1
        if not self.fields:
105
            code += "      pass"
106 1
        for field in self.fields:
107 1
            code += "        self.{} = {}\n".format(field.name, field.value)
108 1
        return code
109
110
111 1
class Field(object):
112 1
    def __init__(self, name):
113 1
        self.name = name
114 1
        self.uatype = None
115 1
        self.value = None
116 1
        self.array = False
117
118
119 1
class StructGenerator(object):
120 1
    def __init__(self):
121 1
        self.model = []
122
123 1
    def make_model_from_string(self, xml):
124 1
        obj = objectify.fromstring(xml)
125 1
        self._make_model(obj)
126
127 1
    def make_model_from_file(self, path):
128 1
        obj = objectify.parse(path)
129 1
        root = obj.getroot()
130 1
        self._make_model(root)
131
132 1
    def _make_model(self, root):
133 1
        enums = {}
134 1
        for child in root.iter("{*}EnumeratedType"):
135 1
            intenum = EnumType(child.get("Name"))
136 1
            for xmlfield in child.iter("{*}EnumeratedValue"):
137 1
                name = xmlfield.get("Name")
138 1
                value = xmlfield.get("Value")
139 1
                enumvalue = EnumeratedValue(name, value)
140 1
                intenum.fields.append(enumvalue)
141 1
                enums[child.get("Name")] = value
142 1
            self.model.append(intenum)
143
            
144 1
        for child in root.iter("{*}StructuredType"):
145 1
            struct = Struct(child.get("Name"))
146 1
            array = False
147 1
            for xmlfield in child.iter("{*}Field"):
148 1
                name = xmlfield.get("Name")
149 1
                if name.startswith("NoOf"):
150 1
                    array = True
151 1
                    continue
152 1
                field = Field(_clean_name(name))
153 1
                field.uatype = xmlfield.get("TypeName")
154 1
                if ":" in field.uatype:
155 1
                    field.uatype = field.uatype.split(":")[1]
156 1
                field.uatype = _clean_name(field.uatype)
157 1
                field.value = get_default_value(field.uatype, enums)
158 1
                if array:
159 1
                    field.array = True
160 1
                    field.value = []
161 1
                    array = False
162 1
                struct.fields.append(field)
163 1
            self.model.append(struct)
164
165 1
    def save_to_file(self, path, register=False):
166 1
        _file = open(path, "wt")
167 1
        self._make_header(_file)
168 1
        for struct in self.model:
169 1
            _file.write(struct.get_code())
170 1
        if register:
171
            _file.write(self._make_registration())
172 1
        _file.close()
173
174 1
    def _make_registration(self):
175
        code = "\n\n"
176
        for struct in self.model:
177
            code += "ua.register_extension_object('{name}', ua.NodeId.from_string('{nodeid}'), {name})\n".format(name=struct.name, nodeid=struct.typeid)
178
        return code
179
180 1
    def get_python_classes(self, env=None):
181
        """
182
        generate Python code and execute in a new environment
183
        return a dict of structures {name: class}
184
        Rmw: Since the code is generated on the fly, in case of error the stack trace is 
185
        not available and debugging is very hard...
186
        """
187 1
        if env is None:
188
            env = {}
189
        #  Add the required libraries to dict
190 1
        if "ua" not in env:
191 1
            env['ua'] = ua
192 1
        if "datetime" not in env:
193 1
            env['datetime'] = datetime
194 1
        if "uuid" not in env:
195 1
            env['uuid'] = uuid
196 1
        if "enum" not in env:
197 1
            env['IntEnum'] = IntEnum
198
        # generate classes one by one and add them to dict
199 1
        for element in self.model:
200 1
            code = element.get_code()
201 1
            exec(code, env)
202 1
        return env
203
204 1
    def save_and_import(self, path, append_to=None):
205
        """
206
        save the new structures to a python file which be used later
207
        import the result and return resulting classes in a dict
208
        if append_to is a dict, the classes are added to the dict
209
        """
210 1
        self.save_to_file(path)
211 1
        name = os.path.basename(path)
212 1
        name = os.path.splitext(name)[0]
213 1
        mymodule = importlib.import_module(name)
214 1
        if append_to is None:
215 1
            result = {}
216
        else:
217
            result = append_to
218 1
        for struct in self.model:
219 1
            result[struct.name] = getattr(mymodule, struct.name)
220 1
        return result
221
222 1
    def _make_header(self, _file):
223 1
        _file.write("""
224
'''
225
THIS FILE IS AUTOGENERATED, DO NOT EDIT!!!
226
'''
227
228
from datetime import datetime
229
import uuid
230
231
from opcua import ua
232
""")
233
234 1
    def set_typeid(self, name, typeid):
235 1
        for struct in self.model:
236 1
            if struct.name == name:
237 1
                struct.typeid = typeid
238 1
                return
239
240
241 1
def load_type_definitions(server, nodes=None):
242
    """
243
    Download xml from given variable node defining custom structures.
244
    If no node is given, attemps to import variables from all nodes under
245
    "0:OPC Binary"
246
    the code is generated and imported on the fly. If you know the structures
247
    are not going to be modified it might be interresting to copy the generated files
248
    and include them in you code
249
    """
250 1
    if nodes is None:
251 1
        nodes = []
252 1
        for desc in server.nodes.opc_binary.get_children_descriptions():
253 1
            if desc.BrowseName != ua.QualifiedName("Opc.Ua"):
254 1
                nodes.append(server.get_node(desc.NodeId))
255
    
256 1
    structs_dict = {}
257 1
    generators = []
258 1
    for node in nodes:
259 1
        xml = node.get_value()
260 1
        xml = xml.decode("utf-8")
261 1
        generator = StructGenerator()
262 1
        generators.append(generator)
263 1
        generator.make_model_from_string(xml)
264
        # generate and execute new code on the fly
265 1
        generator.get_python_classes(structs_dict)
266
        # same but using a file that is imported. This can be usefull for debugging library
267
        #name = node.get_browse_name().Name
268
        # Make sure structure names do not contain charaters that cannot be used in Python class file names
269
        #name = _clean_name(name)
270
        #name = "structures_" + node.get_browse_name().Name
271
        #generator.save_and_import(name + ".py", append_to=structs_dict)
272
273
        # register classes
274
        # every children of our node should represent a class
275 1
        for ndesc in node.get_children_descriptions():
276 1
            ndesc_node = server.get_node(ndesc.NodeId)
277 1
            ref_desc_list = ndesc_node.get_references(refs=ua.ObjectIds.HasDescription, direction=ua.BrowseDirection.Inverse)
278 1
            if ref_desc_list:  #some server put extra things here
279 1
                name = _clean_name(ndesc.BrowseName.Name)
280 1
                if not name in structs_dict:
281
                    logging.getLogger(__name__).warning("Error {} is found as child of binary definition node but is not found in xml".format(name))
282
                    continue
283 1
                nodeid = ref_desc_list[0].NodeId
284 1
                ua.register_extension_object(name, nodeid, structs_dict[name])
285
                # save the typeid if user want to create static file for type definitnion
286 1
                generator.set_typeid(name, nodeid.to_string())
287
288 1
        for key in structs_dict.keys():
289 1
            if type(structs_dict[key]) is EnumMeta and key is not "IntEnum":
290 1
                import opcua.ua
291 1
                setattr(opcua.ua, key, structs_dict[key])
292
293 1
    return generators, structs_dict
294
295
296 1
def _clean_name(name):
297
    """
298
    Remove characters that might be present in  OPC UA structures
299
    but cannot be part of of Python class names
300
    """
301 1
    name = re.sub(r'\W+', '_', name)
302 1
    name = re.sub(r'^[0-9]+', r'_\g<0>', name)
303
304
    return name
305