Completed
Pull Request — master (#490)
by Olivier
04:31
created

Struct.get_code()   B

Complexity

Conditions 6

Size

Total Lines 29

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 14
CRAP Score 6.0702

Importance

Changes 2
Bugs 0 Features 0
Metric Value
cc 6
c 2
b 0
f 0
dl 0
loc 29
ccs 14
cts 16
cp 0.875
crap 6.0702
rs 7.5384
1
"""
2
Support for custom structures in client and server
3
We only support a subset of features but should be enough
4
for custom structures
5
"""
6
7 1
import os
8 1
import importlib
9 1
import re
10
# The next two imports are for generated code
11 1
from datetime import datetime
12 1
import uuid
13
14 1
from lxml import objectify
15
16
17 1
from opcua.ua.ua_binary import Primitives
18 1
from opcua import ua
19
20
21 1
def get_default_value(uatype):
22 1
    if uatype == "String":
23 1
        return "None" 
24 1
    elif uatype == "Guid":
25 1
        return "uuid.uuid4()" 
26 1
    elif uatype in ("ByteString", "CharArray", "Char"):
27 1
        return None 
28 1
    elif uatype == "Boolean":
29 1
        return "True"
30 1
    elif uatype == "DateTime":
31 1
        return "datetime.utcnow()"
32 1
    elif uatype in ("Int16", "Int32", "Int64", "UInt16", "UInt32", "UInt64", "Double", "Float", "Byte", "SByte"):
33 1
        return 0
34
    else:
35 1
        return "ua." + uatype + "()"
36
37
38 1
class Struct(object):
39 1
    def __init__(self, name):
40 1
        self.name = name
41 1
        self.fields = []
42 1
        self.typeid = None
43
44 1
    def get_code(self):
45 1
        code = """
46
47
class {0}(object):
48
49
    '''
50
    {0} structure autogenerated from xml
51
    '''
52
53
""".format(self.name)
54
55 1
        code += "    ua_types = [\n"
56 1
        for field in self.fields:
57 1
            prefix = "ListOf" if field.array else ""
58 1
            uatype = prefix + field.uatype
59 1
            if uatype == "ListOfChar":
60
                uatype = "String"
61 1
            code += "        ('{}', '{}'),\n".format(field.name, uatype)
62
63 1
        code += "    ]"
64 1
        code += """
65
66
    def __init__(self):
67
""".format(self.name)
68 1
        if not self.fields:
69
            code += "    pass"
70 1
        for field in self.fields:
71 1
            code += "        self.{} = {}\n".format(field.name, field.value)
72 1
        return code
73
74
75 1
class Field(object):
76 1
    def __init__(self, name):
77 1
        self.name = name
78 1
        self.uatype = None
79 1
        self.value = None
80 1
        self.array = False
81
82
83 1
class StructGenerator(object):
84 1
    def __init__(self):
85 1
        self.model = []
86
87 1
    def make_model_from_string(self, xml):
88
        obj = objectify.fromstring(xml)
89
        self._make_model(obj)
90
91 1
    def make_model_from_file(self, path):
92 1
        obj = objectify.parse(path)
93 1
        root = obj.getroot()
94 1
        self._make_model(root)
95
96 1
    def _make_model(self, root):
97 1
        for child in root.iter("{*}StructuredType"):
98 1
            struct = Struct(child.get("Name"))
99 1
            array = False
100 1
            for xmlfield in child.iter("{*}Field"):
101 1
                name = xmlfield.get("Name")
102 1
                if name.startswith("NoOf"):
103 1
                    array = True
104 1
                    continue
105 1
                field = Field(_clean_name(name))
106 1
                field.uatype = xmlfield.get("TypeName")
107 1
                if ":" in field.uatype:
108 1
                    field.uatype = field.uatype.split(":")[1]
109 1
                field.uatype = _clean_name(field.uatype)
110 1
                field.value = get_default_value(field.uatype)
111 1
                if array:
112 1
                    field.array = True
113 1
                    field.value = []
114 1
                    array = False
115 1
                struct.fields.append(field)
116 1
            self.model.append(struct)
117
118 1
    def save_to_file(self, path, register=False):
119 1
        _file = open(path, "wt")
120 1
        self._make_header(_file)
121 1
        for struct in self.model:
122 1
            _file.write(struct.get_code())
123 1
        if register:
124
            _file.write(self._make_registration())
125 1
        _file.close()
126
127 1
    def _make_registration(self):
128
        code = "\n\n"
129
        for struct in self.model:
130
            code += "ua.register_extension_object('{name}', ua.NodeId.from_string('{nodeid}'), {name})\n".format(name=struct.name, nodeid=struct.typeid)
131
        return code
132
133 1
    def get_python_classes(self, env=None):
134
        """
135
        generate Python code and execute in a new environment
136
        return a dict of structures {name: class}
137
        Rmw: Since the code is generated on the fly, in case of error the stack trace is 
138
        not available and debugging is very hard...
139
        """
140
        if env is None:
141
            env = {}
142
        #  Add the required libraries to dict
143
        if "ua" not in env:
144
            env['ua'] = ua
145
        if "datetime" not in env:
146
            env['datetime'] = datetime
147
        if "uuid" not in env:
148
            env['uuid'] = uuid
149
        # generate classes one by one and add them to dict
150
        for struct in self.model:
151
            code = struct.get_code()
152
            exec(code, env)
153
        return env
154
155 1
    def save_and_import(self, path, append_to=None):
156
        """
157
        save the new structures to a python file which be used later
158
        import the result and return resulting classes in a dict
159
        if append_to is a dict, the classes are added to the dict
160
        """
161 1
        self.save_to_file(path)
162 1
        name = os.path.basename(path)
163 1
        name = os.path.splitext(name)[0]
164 1
        mymodule = importlib.import_module(name)
165 1
        if append_to is None:
166 1
            result = {}
167
        else:
168
            result = append_to
169 1
        for struct in self.model:
170 1
            result[struct.name] = getattr(mymodule, struct.name)
171 1
        return result
172
173 1
    def _make_header(self, _file):
174 1
        _file.write("""
175
'''
176
THIS FILE IS AUTOGENERATED, DO NOT EDIT!!!
177
'''
178
179
from datetime import datetime
180
import uuid
181
182
from opcua import ua
183
""")
184
185 1
    def set_typeid(self, name, typeid):
186
        for struct in self.model:
187
            if struct.name == name:
188
                struct.typeid = typeid
189
                return
190
191
192 1
def load_type_definitions(server, nodes=None):
193
    """
194
    Download xml from given variable node defining custom structures.
195
    If no node is given, attemps to import variables from all nodes under
196
    "0:OPC Binary"
197
    the code is generated and imported on the fly. If you know the structures
198
    are not going to be modified it might be interresting to copy the generated files
199
    and include them in you code
200
    """
201
    if nodes is None:
202
        nodes = []
203
        for desc in server.nodes.opc_binary.get_children_descriptions():
204
            if desc.BrowseName != ua.QualifiedName("Opc.Ua"):
205
                nodes.append(server.get_node(desc.NodeId))
206
    
207
    structs_dict = {}
208
    generators = []
209
    for node in nodes:
210
        xml = node.get_value()
211
        xml = xml.decode("utf-8")
212
        generator = StructGenerator()
213
        generators.append(generator)
214
        generator.make_model_from_string(xml)
215
        # generate and execute new code on the fly
216
        generator.get_python_classes(structs_dict)
217
        # same but using a file that is imported. This can be usefull for debugging library
218
        #name = node.get_browse_name().Name
219
        # Make sure structure names do not contain charaters that cannot be used in Python class file names
220
        #name = _clean_name(name)
221
        #name = "structures_" + node.get_browse_name().Name
222
        #generator.save_and_import(name + ".py", append_to=structs_dict)
223
224
        # register classes
225
        # every children of our node should represent a class
226
        for ndesc in node.get_children_descriptions():
227
            ndesc_node = server.get_node(ndesc.NodeId)
228
            ref_desc_list = ndesc_node.get_references(refs=ua.ObjectIds.HasDescription, direction=ua.BrowseDirection.Inverse)
229
            if ref_desc_list:  #some server put extra things here
230
                name = _clean_name(ndesc.BrowseName.Name)
231
                if not name in structs_dict:
232
                    print("Error {} is found as child of binary definition node but is not found in xml".format(name))
233
                    continue
234
                nodeid = ref_desc_list[0].NodeId
235
                ua.register_extension_object(name, nodeid, structs_dict[name])
236
                # save the typeid if user want to create static file for type definitnion
237
                generator.set_typeid(name, nodeid.to_string())
238
    return generators, structs_dict
239
240
241 1
def _clean_name(name):
242
    """
243
    Remove characters that might be present in  OPC UA structures
244
    but cannot be part of of Python class names
245
    """
246 1
    name = re.sub(r'\W+', '_', name)
247 1
    name = re.sub(r'^[0-9]+', r'_\g<0>', name)
248
249
    return name
250