Passed
Pull Request — master (#597)
by
unknown
06:01
created

StructGenerator.save_to_file()   A

Complexity

Conditions 3

Size

Total Lines 8

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 7
CRAP Score 3.0175

Importance

Changes 2
Bugs 0 Features 0
Metric Value
cc 3
c 2
b 0
f 0
dl 0
loc 8
ccs 7
cts 8
cp 0.875
crap 3.0175
rs 9.4285
1
"""
2
Support for custom structures in client and server
3
We only support a subset of features but should be enough
4
for custom structures
5
"""
6
7 1
import os
8 1
import importlib
9 1
import re
10 1
import logging
11
# The next two imports are for generated code
12 1
from datetime import datetime
13 1
import uuid
14
15 1
from lxml import objectify
16
17
18 1
from opcua.ua.ua_binary import Primitives
19 1
from opcua import ua
20
21 1
from enum import IntEnum, EnumMeta
22
23 1
def get_default_value(uatype, enums):
24 1
    if uatype == "String":
25 1
        return "None" 
26 1
    elif uatype == "Guid":
27 1
        return "uuid.uuid4()" 
28 1
    elif uatype in ("ByteString", "CharArray", "Char"):
29 1
        return None 
30 1
    elif uatype == "Boolean":
31 1
        return "True"
32 1
    elif uatype == "DateTime":
33 1
        return "datetime.utcnow()"
34 1
    elif uatype in ("Int16", "Int32", "Int64", "UInt16", "UInt32", "UInt64", "Double", "Float", "Byte", "SByte"):
35 1
        return 0
36 1
    elif uatype in enums:
37
        return "ua." + uatype + "(" + enums[uatype] + ")"
38 1
    elif issubclass(eval("ua."+uatype), IntEnum):
39
        return "ua." + uatype + "(" + list(eval("ua."+uatype))[0] + ")"
40
    else:
41 1
        return "ua." + uatype + "()"
42
43 1
class EnumType(object):
44 1
    def __init__(self, name ):
45
        self.name = name
46
        self.fields= []
47
        self.typeid = None
48
        
49 1
    def get_code(self):
50
        code = """
51
52
53
54
class {0}(IntEnum):
55
56
    '''
57
    {0} EnumInt autogenerated from xml
58
    '''
59
60
""".format(self.name)
61
62
        for EnumeratedValue in self.fields:
63
            name = EnumeratedValue.Name
64
            value = EnumeratedValue.Value
65
            code += "    {} = {}\n".format(name, value)
66
          
67
        return code
68
        
69 1
class EnumeratedValue(object):
70 1
    def __init__(self, name, value):
71
        self.Name=name
72
        self.Value=value
73
        
74 1
class Struct(object):
75 1
    def __init__(self, name):
76 1
        self.name = name
77 1
        self.fields = []
78 1
        self.typeid = None
79
80 1
    def get_code(self):
81 1
        code = """
82
83
class {0}(object):
84
85
    '''
86
    {0} structure autogenerated from xml
87
    '''
88
89
""".format(self.name)
90
91 1
        code += "    ua_types = [\n"
92 1
        for field in self.fields:
93 1
            prefix = "ListOf" if field.array else ""
94 1
            uatype = prefix + field.uatype
95 1
            if uatype == "ListOfChar":
96
                uatype = "String"
97 1
            code += "        ('{}', '{}'),\n".format(field.name, uatype)
98
99 1
        code += "    ]"
100 1
        code += """
101
102
    def __init__(self):
103
""".format(self.name)
104 1
        if not self.fields:
105
            code += "      pass"
106 1
        for field in self.fields:
107 1
            code += "        self.{} = {}\n".format(field.name, field.value)
108 1
        return code
109
110
111 1
class Field(object):
112 1
    def __init__(self, name):
113 1
        self.name = name
114 1
        self.uatype = None
115 1
        self.value = None
116 1
        self.array = False
117
118
119 1
class StructGenerator(object):
120 1
    def __init__(self):
121 1
        self.model = []
122
123 1
    def make_model_from_string(self, xml):
124
        obj = objectify.fromstring(xml)
125
        self._make_model(obj)
126
127 1
    def make_model_from_file(self, path):
128 1
        obj = objectify.parse(path)
129 1
        root = obj.getroot()
130 1
        self._make_model(root)
131
132 1
    def _make_model(self, root):
133 1
        enums = {}
134 1
        for child in root.iter("{*}EnumeratedType"):
135
            intenum = EnumType(child.get("Name"))
136
            for xmlfield in child.iter("{*}EnumeratedValue"):
137
                name = xmlfield.get("Name")
138
                value = xmlfield.get("Value")
139
                enumvalue = EnumeratedValue(name, value)
140
                intenum.fields.append(enumvalue)
141
                enums[child.get("Name")] = value
142
            self.model.append(intenum)
143
            
144 1
        for child in root.iter("{*}StructuredType"):
145 1
            struct = Struct(child.get("Name"))
146 1
            array = False
147 1
            for xmlfield in child.iter("{*}Field"):
148 1
                name = xmlfield.get("Name")
149 1
                if name.startswith("NoOf"):
150 1
                    array = True
151 1
                    continue
152 1
                field = Field(_clean_name(name))
153 1
                field.uatype = xmlfield.get("TypeName")
154 1
                if ":" in field.uatype:
155 1
                    field.uatype = field.uatype.split(":")[1]
156 1
                field.uatype = _clean_name(field.uatype)
157 1
                field.value = get_default_value(field.uatype, enums)
158 1
                if array:
159 1
                    field.array = True
160 1
                    field.value = []
161 1
                    array = False
162 1
                struct.fields.append(field)
163 1
            self.model.append(struct)
164
165 1
    def save_to_file(self, path, register=False):
166 1
        _file = open(path, "wt")
167 1
        self._make_header(_file)
168 1
        for struct in self.model:
169 1
            _file.write(struct.get_code())
170 1
        if register:
171
            _file.write(self._make_registration())
172 1
        _file.close()
173
174 1
    def _make_registration(self):
175
        code = "\n\n"
176
        for struct in self.model:
177
            code += "ua.register_extension_object('{name}', ua.NodeId.from_string('{nodeid}'), {name})\n".format(name=struct.name, nodeid=struct.typeid)
178
        return code
179
180 1
    def get_python_classes(self, env=None):
181
        """
182
        generate Python code and execute in a new environment
183
        return a dict of structures {name: class}
184
        Rmw: Since the code is generated on the fly, in case of error the stack trace is 
185
        not available and debugging is very hard...
186
        """
187
        if env is None:
188
            env = {}
189
        #  Add the required libraries to dict
190
        if "ua" not in env:
191
            env['ua'] = ua
192
        if "datetime" not in env:
193
            env['datetime'] = datetime
194
        if "uuid" not in env:
195
            env['uuid'] = uuid
196
        if "enum" not in env:
197
            env['IntEnum'] = IntEnum
198
        # generate classes one by one and add them to dict
199
        for element in self.model:
200
            code = element.get_code()
201
            exec(code, env)
202
        return env
203
204 1
    def save_and_import(self, path, append_to=None):
205
        """
206
        save the new structures to a python file which be used later
207
        import the result and return resulting classes in a dict
208
        if append_to is a dict, the classes are added to the dict
209
        """
210 1
        self.save_to_file(path)
211 1
        name = os.path.basename(path)
212 1
        name = os.path.splitext(name)[0]
213 1
        mymodule = importlib.import_module(name)
214 1
        if append_to is None:
215 1
            result = {}
216
        else:
217
            result = append_to
218 1
        for struct in self.model:
219 1
            result[struct.name] = getattr(mymodule, struct.name)
220 1
        return result
221
222 1
    def _make_header(self, _file):
223 1
        _file.write("""
224
'''
225
THIS FILE IS AUTOGENERATED, DO NOT EDIT!!!
226
'''
227
228
from datetime import datetime
229
import uuid
230
231
from opcua import ua
232
""")
233
234 1
    def set_typeid(self, name, typeid):
235
        for struct in self.model:
236
            if struct.name == name:
237
                struct.typeid = typeid
238
                return
239
240
241 1
def load_type_definitions(server, nodes=None):
242
    """
243
    Download xml from given variable node defining custom structures.
244
    If no node is given, attemps to import variables from all nodes under
245
    "0:OPC Binary"
246
    the code is generated and imported on the fly. If you know the structures
247
    are not going to be modified it might be interresting to copy the generated files
248
    and include them in you code
249
    """
250
    if nodes is None:
251
        nodes = []
252
        for desc in server.nodes.opc_binary.get_children_descriptions():
253
            if desc.BrowseName != ua.QualifiedName("Opc.Ua"):
254
                nodes.append(server.get_node(desc.NodeId))
255
    
256
    structs_dict = {}
257
    generators = []
258
    for node in nodes:
259
        xml = node.get_value()
260
        xml = xml.decode("utf-8")
261
        generator = StructGenerator()
262
        generators.append(generator)
263
        generator.make_model_from_string(xml)
264
        # generate and execute new code on the fly
265
        generator.get_python_classes(structs_dict)
266
        # same but using a file that is imported. This can be usefull for debugging library
267
        #name = node.get_browse_name().Name
268
        # Make sure structure names do not contain charaters that cannot be used in Python class file names
269
        #name = _clean_name(name)
270
        #name = "structures_" + node.get_browse_name().Name
271
        #generator.save_and_import(name + ".py", append_to=structs_dict)
272
273
        # register classes
274
        # every children of our node should represent a class
275
        for ndesc in node.get_children_descriptions():
276
            ndesc_node = server.get_node(ndesc.NodeId)
277
            ref_desc_list = ndesc_node.get_references(refs=ua.ObjectIds.HasDescription, direction=ua.BrowseDirection.Inverse)
278
            if ref_desc_list:  #some server put extra things here
279
                name = _clean_name(ndesc.BrowseName.Name)
280
                if not name in structs_dict:
281
                    logging.getLogger(__name__).warning("Error {} is found as child of binary definition node but is not found in xml".format(name))
282
                    continue
283
                nodeid = ref_desc_list[0].NodeId
284
                ua.register_extension_object(name, nodeid, structs_dict[name])
285
                # save the typeid if user want to create static file for type definitnion
286
                generator.set_typeid(name, nodeid.to_string())
287
288
        for key in structs_dict.keys():
289
            if type(structs_dict[key]) is EnumMeta and key is not "IntEnum":
290
                import opcua.ua
291
                setattr(opcua.ua, key, structs_dict[key])
292
293
    return generators, structs_dict
294
295
296 1
def _clean_name(name):
297
    """
298
    Remove characters that might be present in  OPC UA structures
299
    but cannot be part of of Python class names
300
    """
301 1
    name = re.sub(r'\W+', '_', name)
302 1
    name = re.sub(r'^[0-9]+', r'_\g<0>', name)
303
304
    return name
305