Completed
Pull Request — master (#494)
by Olivier
04:24
created

Struct   A

Complexity

Total Complexity 15

Size/Duplication

Total Lines 86
Duplicated Lines 0 %

Test Coverage

Coverage 97.37%

Importance

Changes 2
Bugs 0 Features 0
Metric Value
c 2
b 0
f 0
dl 0
loc 86
ccs 37
cts 38
cp 0.9737
rs 10
wmc 15

5 Methods

Rating   Name   Duplication   Size   Complexity  
B _make_from_binary() 0 25 5
A _make_constructor() 0 15 2
A __init__() 0 5 1
A get_code() 0 12 2
B _make_to_binary() 0 24 5
1
"""
2
Support for custom structures in client and server
3
We only support a subset of features but should be enough
4
for custom structures
5
"""
6
7 1
import os
8 1
import importlib
9 1
import re
10
# The next two imports are for generated code
11 1
from datetime import datetime
12 1
import uuid
13
14 1
from lxml import objectify
15
16
17 1
from opcua.ua.ua_binary import Primitives
18 1
from opcua import ua
19
20
21 1
def get_default_value(uatype):
22 1
    if uatype == "String":
23 1
        return "None" 
24 1
    elif uatype == "Guid":
25 1
        return "uuid.uuid4()" 
26 1
    elif uatype in ("ByteString", "CharArray", "Char"):
27 1
        return None 
28 1
    elif uatype == "Boolean":
29 1
        return "True"
30 1
    elif uatype == "DateTime":
31 1
        return "datetime.utcnow()"
32 1
    elif uatype in ("Int16", "Int32", "Int64", "UInt16", "UInt32", "UInt64", "Double", "Float", "Byte", "SByte"):
33 1
        return 0
34
    else:
35 1
        return "ua." + uatype + "()"
36
37
38 1
class Struct(object):
39 1
    def __init__(self, name):
40 1
        self.name = name
41 1
        self.fields = []
42 1
        self.code = ""
43 1
        self.typeid = None
44
45 1
    def get_code(self):
46 1
        if not self.fields:
47
            return """
48
49
class {}(object):
50
    pass
51
52
""".format(_clean_name(self.name))
53 1
        self._make_constructor()
54 1
        self._make_from_binary()
55 1
        self._make_to_binary()
56 1
        return self.code
57
58 1
    def _make_constructor(self):
59 1
        self.code = """
60
61
62
class {0}(object):
63
    '''
64
    {0} structure autogenerated from xml
65
    '''
66
    def __init__(self, data=None):
67
        if data is not None:
68
            self._binary_init(data)
69
            return
70
""".format(self.name)
71 1
        for field in self.fields:
72 1
            self.code += "        self.{} = {}\n".format(field.name, field.value)
73
74 1
    def _make_from_binary(self):
75 1
        self.code += '''
76
    @staticmethod
77
    def from_binary(data):
78
        return {}(data=data)
79
80
    def _binary_init(self, data):
81
'''.format(self.name)
82 1
        for field in self.fields:
83 1
            if hasattr(Primitives, field.uatype):
84 1
                if field.array:
85 1
                    self.code += '        self.{} = ua.ua_binary.Primitives.{}.unpack_array(data)\n'.format(field.name, field.uatype)
86
                else:
87 1
                    self.code += '        self.{} = ua.ua_binary.Primitives.{}.unpack(data)\n'.format(field.name, field.uatype)
88
            else:
89 1
                if field.array:
90 1
                    self.code += '''
91
        length = ua.ua_binary.Primitives.Int32.unpack(data)
92
        if length == -1:
93
            self.{0} = None
94
        else:
95
            self.{0} = [ua.{1}.from_binary(data) for _ in range(length)]
96
'''.format(field.name, field.uatype)
97
                else:
98 1
                    self.code += "        self.{} = ua.{}.from_binary(data)\n".format(field.name, field.uatype)
99
100 1
    def _make_to_binary(self):
101 1
        self.code += '''
102
    def to_binary(self):
103
        packet = []
104
'''
105 1
        for field in self.fields:
106 1
            if hasattr(Primitives, field.uatype):
107 1
                if field.array:
108 1
                    self.code += '        packet.append(ua.ua_binary.Primitives.{}.pack_array(self.{}))\n'.format(field.uatype, field.name)
109
                else:
110 1
                    self.code += '        packet.append(ua.ua_binary.Primitives.{}.pack(self.{}))\n'.format(field.uatype, field.name)
111
            else:
112 1
                if field.array:
113 1
                    self.code += '''
114
        if self.{0} is None:
115
            packet.append(ua.ua_binary.Primitives.Int32.pack(-1))
116
        else:
117
            packet.append(ua.ua_binary.Primitives.Int32.pack(len(self.{0})))
118
            for element in self.{0}:
119
                packet.append(element.to_binary())
120
'''.format(field.name)
121
                else:
122 1
                    self.code += "        packet.append(self.{}.to_binary())\n".format(field.name)
123 1
        self.code += '        return b"".join(packet)'
124
125
126 1
class Field(object):
127 1
    def __init__(self, name):
128 1
        self.name = name
129 1
        self.uatype = None
130 1
        self.value = None
131 1
        self.array = False
132
133
134 1
class StructGenerator(object):
135 1
    def __init__(self):
136 1
        self.model = []
137
138 1
    def make_model_from_string(self, xml):
139
        obj = objectify.fromstring(xml)
140
        self._make_model(obj)
141
142 1
    def make_model_from_file(self, path):
143 1
        obj = objectify.parse(path)
144 1
        root = obj.getroot()
145 1
        self._make_model(root)
146
147 1
    def _make_model(self, root):
148 1
        for child in root.iter("{*}StructuredType"):
149 1
            struct = Struct(child.get("Name"))
150 1
            array = False
151 1
            for xmlfield in child.iter("{*}Field"):
152 1
                name = xmlfield.get("Name")
153 1
                if name.startswith("NoOf"):
154 1
                    array = True
155 1
                    continue
156 1
                field = Field(name)
157 1
                field.uatype = xmlfield.get("TypeName")
158 1
                if ":" in field.uatype:
159 1
                    field.uatype = field.uatype.split(":")[1]
160 1
                field.value = get_default_value(field.uatype)
161 1
                if array:
162 1
                    field.array = True
163 1
                    field.value = []
164 1
                    array = False
165 1
                struct.fields.append(field)
166 1
            self.model.append(struct)
167
168 1
    def save_to_file(self, path, register=False):
169 1
        _file = open(path, "wt")
170 1
        self._make_header(_file)
171 1
        for struct in self.model:
172 1
            _file.write(struct.get_code())
173 1
        if register:
174
            _file.write(self._make_registration())
175 1
        _file.close()
176
177 1
    def _make_registration(self):
178
        code = "\n\n"
179
        for struct in self.model:
180
            code += "ua.register_extension_object('{name}', ua.NodeId.from_string('{nodeid}'), {name})\n".format(name=struct.name, nodeid=struct.typeid)
181
        return code
182
183 1
    def get_python_classes(self, env=None):
184
        """
185
        generate Python code and execute in a new environment
186
        return a dict of structures {name: class}
187
        Rmw: Since the code is generated on the fly, in case of error the stack trace is 
188
        not available and debugging is very hard...
189
        """
190
        if env is None:
191
            env = {}
192
        #  Add the required libraries to dict
193
        if "ua" not in env:
194
            env['ua'] = ua
195
        if "datetime" not in env:
196
            env['datetime'] = datetime
197
        if "uuid" not in env:
198
            env['uuid'] = uuid
199
        # generate classes one by one and add them to dict
200
        for struct in self.model:
201
            code = struct.get_code()
202
            exec(code, env)
203
        return env
204
205 1
    def save_and_import(self, path, append_to=None):
206
        """
207
        save the new structures to a python file which be used later
208
        import the result and return resulting classes in a dict
209
        if append_to is a dict, the classes are added to the dict
210
        """
211 1
        self.save_to_file(path)
212 1
        name = os.path.basename(path)
213 1
        name = os.path.splitext(name)[0]
214 1
        mymodule = importlib.import_module(name)
215 1
        if append_to is None:
216 1
            result = {}
217
        else:
218
            result = append_to
219 1
        for struct in self.model:
220 1
            result[struct.name] = getattr(mymodule, struct.name)
221 1
        return result
222
223 1
    def _make_header(self, _file):
224 1
        _file.write("""
225
'''
226
THIS FILE IS AUTOGENERATED, DO NOT EDIT!!!
227
'''
228
229
from datetime import datetime
230
import uuid
231
232
from opcua import ua
233
""")
234
235 1
    def set_typeid(self, name, typeid):
236
        for struct in self.model:
237
            if struct.name == name:
238
                struct.typeid = typeid
239
                return
240
241
242 1
def load_type_definitions(server, nodes=None):
243
    """
244
    Download xml from given variable node defining custom structures.
245
    If no node is given, attemps to import variables from all nodes under
246
    "0:OPC Binary"
247
    the code is generated and imported on the fly. If you know the structures
248
    are not going to be modified it might be interresting to copy the generated files
249
    and include them in you code
250
    """
251
    if nodes is None:
252
        nodes = []
253
        for desc in server.nodes.opc_binary.get_children_descriptions():
254
            if desc.BrowseName != ua.QualifiedName("Opc.Ua"):
255
                nodes.append(server.get_node(desc.NodeId))
256
    
257
    structs_dict = {}
258
    generators = []
259
    for node in nodes:
260
        xml = node.get_value()
261
        xml = xml.decode("utf-8")
262
        generator = StructGenerator()
263
        generators.append(generator)
264
        generator.make_model_from_string(xml)
265
        # generate and execute new code on the fly
266
        generator.get_python_classes(structs_dict)
267
        # same but using a file that is imported. This can be usefull for debugging library
268
        #name = node.get_browse_name().Name
269
        # Make sure structure names do not contain charaters that cannot be used in Python class file names
270
        #name = _clean_name(name)
271
        #name = "structures_" + node.get_browse_name().Name
272
        #generator.save_and_import(name + ".py", append_to=structs_dict)
273
274
        # register classes
275
        # every children of our node should represent a class
276
        for ndesc in node.get_children_descriptions():
277
            ndesc_node = server.get_node(ndesc.NodeId)
278
            ref_desc_list = ndesc_node.get_references(refs=ua.ObjectIds.HasDescription, direction=ua.BrowseDirection.Inverse)
279
            if ref_desc_list:  #some server put extra things here
280
                name = _clean_name(ndesc.BrowseName.Name)
281
                if not name in structs_dict:
282
                    print("Error {} is found as child of binary definition node but is not found in xml".format(name))
283
                    continue
284
                nodeid = ref_desc_list[0].NodeId
285
                ua.register_extension_object(name, nodeid, structs_dict[name])
286
                # save the typeid if user want to create static file for type definitnion
287
                generator.set_typeid(name, nodeid.to_string())
288
    return generators, structs_dict
289
290
291 1
def _clean_name(name):
292
    """
293
    Remove characters that might be present in  OPC UA structures
294
    but cannot be part of of Python class names
295
    """
296
    name = re.sub(r'\W+', '_', name)
297
    name = re.sub(r'^[0-9]+', '_', name)
298
    return name
299