Completed
Pull Request — master (#494)
by Olivier
07:24
created

StructGenerator.__init__()   A

Complexity

Conditions 1

Size

Total Lines 2

Duplication

Lines 0
Ratio 0 %

Importance

Changes 2
Bugs 0 Features 0
Metric Value
cc 1
c 2
b 0
f 0
dl 0
loc 2
rs 10
1
"""
2
Support for custom structures in client and server
3
We only support a subset of features but should be enough
4
for custom structures
5
"""
6
7
import os
8
import importlib
9
import re
10
# The next two imports are for generated code
11
from datetime import datetime
12
import uuid
13
14
from lxml import objectify
15
16
17
from opcua.ua.ua_binary import Primitives
18
from opcua import ua
19
20
21
def get_default_value(uatype):
22
    if uatype == "String":
23
        return "None" 
24
    elif uatype == "Guid":
25
        return "uuid.uuid4()" 
26
    elif uatype in ("ByteString", "CharArray", "Char"):
27
        return None 
28
    elif uatype == "Boolean":
29
        return "True"
30
    elif uatype == "DateTime":
31
        return "datetime.utcnow()"
32
    elif uatype in ("Int16", "Int32", "Int64", "UInt16", "UInt32", "UInt64", "Double", "Float", "Byte", "SByte"):
33
        return 0
34
    else:
35
        return "ua." + uatype + "()"
36
37
38
class Struct(object):
39
    def __init__(self, name):
40
        self.name = name
41
        self.fields = []
42
        self.code = ""
43
        self.typeid = None
44
45
    def get_code(self):
46
        if not self.fields:
47
            return """
48
49
class {}(object):
50
    pass
51
52
""".format(self.name)
53
        self._make_constructor()
54
        self._make_from_binary()
55
        self._make_to_binary()
56
        return self.code
57
58
    def _make_constructor(self):
59
        self.code = """
60
61
62
class {0}(object):
63
    '''
64
    {0} structure autogenerated from xml
65
    '''
66
    def __init__(self, data=None):
67
        if data is not None:
68
            self._binary_init(data)
69
            return
70
""".format(self.name)
71
        for field in self.fields:
72
            self.code += "        self.{} = {}\n".format(field.name, field.value)
73
74
    def _make_from_binary(self):
75
        self.code += '''
76
    @staticmethod
77
    def from_binary(data):
78
        return {}(data=data)
79
80
    def _binary_init(self, data):
81
'''.format(self.name)
82
        for field in self.fields:
83
            if hasattr(Primitives, field.uatype):
84
                if field.array:
85
                    self.code += '        self.{} = ua.ua_binary.Primitives.{}.unpack_array(data)\n'.format(field.name, field.uatype)
86
                else:
87
                    self.code += '        self.{} = ua.ua_binary.Primitives.{}.unpack(data)\n'.format(field.name, field.uatype)
88
            else:
89
                if field.array:
90
                    self.code += '''
91
        length = ua.ua_binary.Primitives.Int32.unpack(data)
92
        if length == -1:
93
            self.{0} = None
94
        else:
95
            self.{0} = [ua.{1}.from_binary(data) for _ in range(length)]
96
'''.format(field.name, field.uatype)
97
                else:
98
                    self.code += "        self.{} = ua.{}.from_binary(data)\n".format(field.name, field.uatype)
99
100
    def _make_to_binary(self):
101
        self.code += '''
102
    def to_binary(self):
103
        packet = []
104
'''
105
        for field in self.fields:
106
            if hasattr(Primitives, field.uatype):
107
                if field.array:
108
                    self.code += '        packet.append(ua.ua_binary.Primitives.{}.pack_array(self.{}))\n'.format(field.uatype, field.name)
109
                else:
110
                    self.code += '        packet.append(ua.ua_binary.Primitives.{}.pack(self.{}))\n'.format(field.uatype, field.name)
111
            else:
112
                if field.array:
113
                    self.code += '''
114
        if self.{0} is None:
115
            packet.append(ua.ua_binary.Primitives.Int32.pack(-1))
116
        else:
117
            packet.append(ua.ua_binary.Primitives.Int32.pack(len(self.{0})))
118
            for element in self.{0}:
119
                packet.append(element.to_binary())
120
'''.format(field.name)
121
                else:
122
                    self.code += "        packet.append(self.{}.to_binary())\n".format(field.name)
123
        self.code += '        return b"".join(packet)'
124
125
126
class Field(object):
127
    def __init__(self, name):
128
        self.name = name
129
        self.uatype = None
130
        self.value = None
131
        self.array = False
132
133
134
class StructGenerator(object):
135
    def __init__(self):
136
        self.model = []
137
138
    def make_model_from_string(self, xml):
139
        obj = objectify.fromstring(xml)
140
        self._make_model(obj)
141
142
    def make_model_from_file(self, path):
143
        obj = objectify.parse(path)
144
        root = obj.getroot()
145
        self._make_model(root)
146
147
    def _make_model(self, root):
148
        for child in root.iter("{*}StructuredType"):
149
            struct = Struct(child.get("Name"))
150
            array = False
151
            for xmlfield in child.iter("{*}Field"):
152
                name = xmlfield.get("Name")
153
                if name.startswith("NoOf"):
154
                    array = True
155
                    continue
156
                field = Field(_clean_name(name))
157
                field.uatype = xmlfield.get("TypeName")
158
                if ":" in field.uatype:
159
                    field.uatype = field.uatype.split(":")[1]
160
                field.uatype = _clean_name(field.uatype)
161
                field.value = get_default_value(field.uatype)
162
                if array:
163
                    field.array = True
164
                    field.value = []
165
                    array = False
166
                struct.fields.append(field)
167
            self.model.append(struct)
168
169
    def save_to_file(self, path, register=False):
170
        _file = open(path, "wt")
171
        self._make_header(_file)
172
        for struct in self.model:
173
            _file.write(struct.get_code())
174
        if register:
175
            _file.write(self._make_registration())
176
        _file.close()
177
178
    def _make_registration(self):
179
        code = "\n\n"
180
        for struct in self.model:
181
            code += "ua.register_extension_object('{name}', ua.NodeId.from_string('{nodeid}'), {name})\n".format(name=struct.name, nodeid=struct.typeid)
182
        return code
183
184
    def get_python_classes(self, env=None):
185
        """
186
        generate Python code and execute in a new environment
187
        return a dict of structures {name: class}
188
        Rmw: Since the code is generated on the fly, in case of error the stack trace is 
189
        not available and debugging is very hard...
190
        """
191
        if env is None:
192
            env = {}
193
        #  Add the required libraries to dict
194
        if "ua" not in env:
195
            env['ua'] = ua
196
        if "datetime" not in env:
197
            env['datetime'] = datetime
198
        if "uuid" not in env:
199
            env['uuid'] = uuid
200
        # generate classes one by one and add them to dict
201
        for struct in self.model:
202
            code = struct.get_code()
203
            exec(code, env)
204
        return env
205
206
    def save_and_import(self, path, append_to=None):
207
        """
208
        save the new structures to a python file which be used later
209
        import the result and return resulting classes in a dict
210
        if append_to is a dict, the classes are added to the dict
211
        """
212
        self.save_to_file(path)
213
        name = os.path.basename(path)
214
        name = os.path.splitext(name)[0]
215
        mymodule = importlib.import_module(name)
216
        if append_to is None:
217
            result = {}
218
        else:
219
            result = append_to
220
        for struct in self.model:
221
            result[struct.name] = getattr(mymodule, struct.name)
222
        return result
223
224
    def _make_header(self, _file):
225
        _file.write("""
226
'''
227
THIS FILE IS AUTOGENERATED, DO NOT EDIT!!!
228
'''
229
230
from datetime import datetime
231
import uuid
232
233
from opcua import ua
234
""")
235
236
    def set_typeid(self, name, typeid):
237
        for struct in self.model:
238
            if struct.name == name:
239
                struct.typeid = typeid
240
                return
241
242
243
def load_type_definitions(server, nodes=None):
244
    """
245
    Download xml from given variable node defining custom structures.
246
    If no node is given, attemps to import variables from all nodes under
247
    "0:OPC Binary"
248
    the code is generated and imported on the fly. If you know the structures
249
    are not going to be modified it might be interresting to copy the generated files
250
    and include them in you code
251
    """
252
    if nodes is None:
253
        nodes = []
254
        for desc in server.nodes.opc_binary.get_children_descriptions():
255
            if desc.BrowseName != ua.QualifiedName("Opc.Ua"):
256
                nodes.append(server.get_node(desc.NodeId))
257
    
258
    structs_dict = {}
259
    generators = []
260
    for node in nodes:
261
        xml = node.get_value()
262
        xml = xml.decode("utf-8")
263
        generator = StructGenerator()
264
        generators.append(generator)
265
        generator.make_model_from_string(xml)
266
        # generate and execute new code on the fly
267
        generator.get_python_classes(structs_dict)
268
        # same but using a file that is imported. This can be usefull for debugging library
269
        #name = node.get_browse_name().Name
270
        # Make sure structure names do not contain charaters that cannot be used in Python class file names
271
        #name = _clean_name(name)
272
        #name = "structures_" + node.get_browse_name().Name
273
        #generator.save_and_import(name + ".py", append_to=structs_dict)
274
275
        # register classes
276
        # every children of our node should represent a class
277
        for ndesc in node.get_children_descriptions():
278
            ndesc_node = server.get_node(ndesc.NodeId)
279
            ref_desc_list = ndesc_node.get_references(refs=ua.ObjectIds.HasDescription, direction=ua.BrowseDirection.Inverse)
280
            if ref_desc_list:  #some server put extra things here
281
                name = _clean_name(ndesc.BrowseName.Name)
282
                if not name in structs_dict:
283
                    print("Error {} is found as child of binary definition node but is not found in xml".format(name))
284
                    continue
285
                nodeid = ref_desc_list[0].NodeId
286
                ua.register_extension_object(name, nodeid, structs_dict[name])
287
                # save the typeid if user want to create static file for type definitnion
288
                generator.set_typeid(name, nodeid.to_string())
289
    return generators, structs_dict
290
291
292
def _clean_name(name):
293
    """
294
    Remove characters that might be present in  OPC UA structures
295
    but cannot be part of of Python class names
296
    """
297
    name = re.sub(r'\W+', '_', name)
298
    name = re.sub(r'^[0-9]+', r'_\g<0>', name)
299
300
    return name
301