Completed
Pull Request — master (#494)
by Olivier
03:37
created

StructGenerator._make_registration()   A

Complexity

Conditions 2

Size

Total Lines 5

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 1
CRAP Score 4.048

Importance

Changes 1
Bugs 0 Features 0
Metric Value
cc 2
c 1
b 0
f 0
dl 0
loc 5
ccs 1
cts 5
cp 0.2
crap 4.048
rs 9.4285
1
"""
2
Support for custom structures in client and server
3
We only support a subset of features but should be enough
4
for custom structures
5
"""
6
7 1
import os
8 1
import importlib
9 1
import re
10
# The next two imports are for generated code
11 1
from datetime import datetime
12 1
import uuid
13
14 1
from lxml import objectify
15
16
17 1
from opcua.ua.ua_binary import Primitives
18 1
from opcua import ua
19
20
21 1
def get_default_value(uatype):
22
    if uatype == "String":
23
        return "None" 
24
    elif uatype == "Guid":
25
        return "uuid.uuid4()" 
26
    elif uatype in ("ByteString", "CharArray", "Char"):
27
        return None 
28
    elif uatype == "Boolean":
29
        return "True"
30
    elif uatype == "DateTime":
31
        return "datetime.utcnow()"
32
    elif uatype in ("Int16", "Int32", "Int64", "UInt16", "UInt32", "UInt64", "Double", "Float", "Byte", "SByte"):
33
        return 0
34
    else:
35
        return "ua." + uatype + "()"
36
37
38 1
class Struct(object):
39 1
    def __init__(self, name):
40
        self.name = name
41
        self.fields = []
42
        self.code = ""
43
44 1
    def get_code(self):
45
        if not self.fields:
46
            return """
47
48
class {}(object):
49
    pass
50
51
""".format(_clean_name(self.name))
52
        self._make_constructor()
53
        self._make_from_binary()
54
        self._make_to_binary()
55
        return self.code
56
57 1
    def _make_constructor(self):
58
        self.code = """
59
60
61
class {0}(object):
62
    '''
63
    {0} structure autogenerated from xml
64
    '''
65
    def __init__(self, data=None):
66
        if data is not None:
67
            self._binary_init(data)
68
            return
69
""".format(self.name)
70
        for field in self.fields:
71
            self.code += "        self.{} = {}\n".format(field.name, field.value)
72
73 1
    def _make_from_binary(self):
74
        self.code += '''
75
    @staticmethod
76
    def from_binary(data):
77
        return {}(data=data)
78
79
    def _binary_init(self, data):
80
'''.format(self.name)
81
        for field in self.fields:
82
            if hasattr(Primitives, field.uatype):
83
                if field.array:
84
                    self.code += '        self.{} = ua.ua_binary.Primitives.{}.unpack_array(data)\n'.format(field.name, field.uatype)
85
                else:
86
                    self.code += '        self.{} = ua.ua_binary.Primitives.{}.unpack(data)\n'.format(field.name, field.uatype)
87
            else:
88
                if field.array:
89
                    self.code += '''
90
        length = ua.ua_binary.Primitives.Int32.unpack(data)
91
        if length == -1:
92
            self.{0} = None
93
        else:
94
            self.{0} = [ua.{1}.from_binary(data) for _ in range(length)]
95
'''.format(field.name, field.uatype)
96
                else:
97
                    self.code += "        self.{} = ua.{}.from_binary(data)\n".format(field.name, field.uatype)
98
99 1
    def _make_to_binary(self):
100
        self.code += '''
101
    def to_binary(self):
102
        packet = []
103
'''
104
        for field in self.fields:
105
            if hasattr(Primitives, field.uatype):
106
                if field.array:
107
                    self.code += '        packet.append(ua.ua_binary.Primitives.{}.pack_array(self.{}))\n'.format(field.uatype, field.name)
108
                else:
109
                    self.code += '        packet.append(ua.ua_binary.Primitives.{}.pack(self.{}))\n'.format(field.uatype, field.name)
110
            else:
111
                if field.array:
112
                    self.code += '''
113
        if self.{0} is None:
114
            packet.append(ua.ua_binary.Primitives.Int32.pack(-1))
115
        else:
116
            packet.append(ua.ua_binary.Primitives.Int32.pack(len(self.{0})))
117
            for element in self.{0}:
118
                packet.append(element.to_binary())
119
'''.format(field.name)
120
                else:
121
                    self.code += "        packet.append(self.{}.to_binary())\n".format(field.name)
122
        self.code += '        return b"".join(packet)'
123
124
125 1
class Field(object):
126 1
    def __init__(self, name):
127
        self.name = name
128
        self.uatype = None
129
        self.value = None
130
        self.array = False
131
132
133 1
class StructGenerator(object):
134 1
    def __init__(self):
135
        self.model = []
136
137 1
    def make_model_from_string(self, xml):
138
        obj = objectify.fromstring(xml)
139
        self._make_model(obj)
140
141 1
    def make_model_from_file(self, path):
142
        obj = objectify.parse(path)
143
        root = obj.getroot()
144
        self._make_model(root)
145
146 1
    def _make_model(self, root):
147
        for child in root.iter("{*}StructuredType"):
148
            struct = Struct(child.get("Name"))
149
            array = False
150
            for xmlfield in child.iter("{*}Field"):
151
                name = xmlfield.get("Name")
152
                if name.startswith("NoOf"):
153
                    array = True
154
                    continue
155
                field = Field(name)
156
                field.uatype = xmlfield.get("TypeName")
157
                if ":" in field.uatype:
158
                    field.uatype = field.uatype.split(":")[1]
159
                field.value = get_default_value(field.uatype)
160
                if array:
161
                    field.array = True
162
                    field.value = []
163
                    array = False
164
                struct.fields.append(field)
165
            self.model.append(struct)
166
167 1
    def save_to_file(self, path, register=False):
168
        _file = open(path, "wt")
169
        self._make_header(_file)
170
        for struct in self.model:
171
            _file.write(struct.get_code())
172
        if register:
173
            _file.write(self._make_registration())
174
        _file.close()
175
176 1
    def _make_registration(self):
177
        code = "\n\n"
178
        for struct in self.model:
179
            code += "\nsetattr(ua, '{name}', {name})\n".format(name=struct.name)
180
        return code
181
182 1
    def get_python_classes(self, env=None):
183
        """
184
        generate Python code and execute in a new environment
185
        return a dict of structures {name: class}
186
        Rmw: Since the code is generated on the fly, in case of error the stack trace is 
187
        not available and debugging is very hard...
188
        """
189
        if env is None:
190
            env = {}
191
        #  Add the required libraries to dict
192
        if "ua" not in env:
193
            env['ua'] = ua
194
        if "datetime" not in env:
195
            env['datetime'] = datetime
196
        if "uuid" not in env:
197
            env['uuid'] = uuid
198
        # generate classes one by one and add them to dict
199
        for struct in self.model:
200
            code = struct.get_code()
201
            exec(code, env)
202
        return env
203
204 1
    def save_and_import(self, path, append_to=None):
205
        """
206
        save the new structures to a python file which be used later
207
        import the result and return resulting classes in a dict
208
        if append_to is a dict, the classes are added to the dict
209
        """
210
        self.save_to_file(path)
211
        name = os.path.basename(path)
212
        name = os.path.splitext(name)[0]
213
        mymodule = importlib.import_module(name)
214
        if append_to is None:
215
            result = {}
216
        else:
217
            result = append_to
218
        for struct in self.model:
219
            result[struct.name] = getattr(mymodule, struct.name)
220
        return result
221
222 1
    def _make_header(self, _file):
223
        _file.write("""
224
'''
225
THIS FILE IS AUTOGENERATED, DO NOT EDIT!!!
226
'''
227
228
from datetime import datetime
229
import uuid
230
231
from opcua import ua
232
""")
233
234
235
236
237 1
def load_type_definitions(server, nodes=None):
238
    """
239
    Download xml from given variable node defining custom structures.
240
    If no no node is given, attemps to import variables from all nodes under
241
    "0:OPC Binary"
242
    the code is generated and imported on the fly. If you know the structures
243
    are not going to be modified it might be interresting to copy the generated files
244
    and include them in you code
245
    """
246
    if nodes is None:
247
        nodes = []
248
        for desc in server.nodes.opc_binary.get_children_descriptions():
249
            if desc.BrowseName != ua.QualifiedName("Opc.Ua"):
250
                nodes.append(server.get_node(desc.NodeId))
251
    
252
    structs_dict = {}
253
    for node in nodes:
254
        xml = node.get_value()
255
        xml = xml.decode("utf-8")
256
        generator = StructGenerator()
257
        generator.make_model_from_string(xml)
258
        # generate and execute new code on the fly
259
        generator.get_python_classes(structs_dict)
260
        # same but using a file that is imported. This can be usefull for debugging library
261
        #name = node.get_browse_name().Name
262
        # Make sure structure names do not contain charaters that cannot be used in Python class file names
263
        #name = _clean_name(name)
264
        #name = "structures_" + node.get_browse_name().Name
265
        #generator.save_and_import(name + ".py", append_to=structs_dict)
266
267
        # register classes
268
        # every children of our node should represent a class
269
        for ndesc in node.get_children_descriptions():
270
            ndesc_node = server.get_node(ndesc.NodeId)
271
            ref_desc_list = ndesc_node.get_references(refs=ua.ObjectIds.HasDescription, direction=ua.BrowseDirection.Inverse)
272
            if ref_desc_list:  #some server put extra things here
273
                name = _clean_name(ndesc.BrowseName.Name)
274
                if not name in structs_dict:
275
                    print("Error {} is found as child of binary definition node but is not found in xml".format(name))
276
                    continue
277
                nodeid = ref_desc_list[0].NodeId
278
                ua.register_extension_object(name, nodeid, structs_dict[name])
279
280
281 1
def _clean_name(name):
282
    """
283
    Remove characters that might be present in  OPC UA structures
284
    but cannot be part of of Python class names
285
    """
286
    name = re.sub(r'\W+', '_', name)
287
    name = re.sub(r'^[0-9]+', '_', name)
288
    return name
289