Completed
Push — master ( 08a6b0...10a906 )
by Olivier
12:26
created

get_default_value()   B

Complexity

Conditions 7

Size

Total Lines 15

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 14
CRAP Score 7

Importance

Changes 5
Bugs 0 Features 0
Metric Value
cc 7
c 5
b 0
f 0
dl 0
loc 15
ccs 14
cts 14
cp 1
crap 7
rs 7.3333
1
"""
2
Support for custom structures in client and server
3
We only support a subset of features but should be enough
4
for custom structures
5
"""
6
7 1
import os
8 1
import importlib
9 1
import re
10
# The next two imports are for generated code
11 1
from datetime import datetime
12 1
import uuid
13
14 1
from lxml import objectify
15
16
17 1
from opcua.ua.ua_binary import Primitives
18 1
from opcua import ua
19
20
21 1
def get_default_value(uatype):
22 1
    if uatype == "String":
23 1
        return "None" 
24 1
    elif uatype == "Guid":
25 1
        return "uuid.uuid4()" 
26 1
    elif uatype in ("ByteString", "CharArray", "Char"):
27 1
        return None 
28 1
    elif uatype == "Boolean":
29 1
        return "True"
30 1
    elif uatype == "DateTime":
31 1
        return "datetime.utcnow()"
32 1
    elif uatype in ("Int16", "Int32", "Int64", "UInt16", "UInt32", "UInt64", "Double", "Float", "Byte", "SByte"):
33 1
        return 0
34
    else:
35 1
        return "ua." + uatype + "()"
36
37
38 1
class Struct(object):
39 1
    def __init__(self, name):
40 1
        self.name = name
41 1
        self.fields = []
42 1
        self.code = ""
43 1
        self.typeid = None
44
45 1
    def get_code(self):
46 1
        if not self.fields:
47
            return """
48
49
class {}(object):
50
    pass
51
52
""".format(self.name)
53 1
        self._make_constructor()
54 1
        self._make_from_binary()
55 1
        self._make_to_binary()
56 1
        return self.code
57
58 1
    def _make_constructor(self):
59 1
        self.code = """
60
61
62
class {0}(object):
63
    '''
64
    {0} structure autogenerated from xml
65
    '''
66
    def __init__(self, data=None):
67
        if data is not None:
68
            self._binary_init(data)
69
            return
70
""".format(self.name)
71 1
        for field in self.fields:
72 1
            self.code += "        self.{} = {}\n".format(field.name, field.value)
73
74 1
    def _make_from_binary(self):
75 1
        self.code += '''
76
    @staticmethod
77
    def from_binary(data):
78
        return {}(data=data)
79
80
    def _binary_init(self, data):
81
'''.format(self.name)
82 1
        for field in self.fields:
83 1
            if hasattr(Primitives, field.uatype):
84 1
                if field.array:
85 1
                    self.code += '        self.{} = ua.ua_binary.Primitives.{}.unpack_array(data)\n'.format(field.name, field.uatype)
86
                else:
87 1
                    self.code += '        self.{} = ua.ua_binary.Primitives.{}.unpack(data)\n'.format(field.name, field.uatype)
88
            else:
89 1
                if field.array:
90 1
                    self.code += '''
91
        length = ua.ua_binary.Primitives.Int32.unpack(data)
92
        if length == -1:
93
            self.{0} = None
94
        else:
95
            self.{0} = [ua.{1}.from_binary(data) for _ in range(length)]
96
'''.format(field.name, field.uatype)
97
                else:
98 1
                    self.code += "        self.{} = ua.{}.from_binary(data)\n".format(field.name, field.uatype)
99
100 1
    def _make_to_binary(self):
101 1
        self.code += '''
102
    def to_binary(self):
103
        packet = []
104
'''
105 1
        for field in self.fields:
106 1
            if hasattr(Primitives, field.uatype):
107 1
                if field.array:
108 1
                    self.code += '        packet.append(ua.ua_binary.Primitives.{}.pack_array(self.{}))\n'.format(field.uatype, field.name)
109
                else:
110 1
                    self.code += '        packet.append(ua.ua_binary.Primitives.{}.pack(self.{}))\n'.format(field.uatype, field.name)
111
            else:
112 1
                if field.array:
113 1
                    self.code += '''
114
        if self.{0} is None:
115
            packet.append(ua.ua_binary.Primitives.Int32.pack(-1))
116
        else:
117
            packet.append(ua.ua_binary.Primitives.Int32.pack(len(self.{0})))
118
            for element in self.{0}:
119
                packet.append(element.to_binary())
120
'''.format(field.name)
121
                else:
122 1
                    self.code += "        packet.append(self.{}.to_binary())\n".format(field.name)
123 1
        self.code += '        return b"".join(packet)'
124
125
126 1
class Field(object):
127 1
    def __init__(self, name):
128 1
        self.name = name
129 1
        self.uatype = None
130 1
        self.value = None
131 1
        self.array = False
132
133
134 1
class StructGenerator(object):
135 1
    def __init__(self):
136 1
        self.model = []
137
138 1
    def make_model_from_string(self, xml):
139
        obj = objectify.fromstring(xml)
140
        self._make_model(obj)
141
142 1
    def make_model_from_file(self, path):
143 1
        obj = objectify.parse(path)
144 1
        root = obj.getroot()
145 1
        self._make_model(root)
146
147 1
    def _make_model(self, root):
148 1
        for child in root.iter("{*}StructuredType"):
149 1
            struct = Struct(child.get("Name"))
150 1
            array = False
151 1
            for xmlfield in child.iter("{*}Field"):
152 1
                name = xmlfield.get("Name")
153 1
                if name.startswith("NoOf"):
154 1
                    array = True
155 1
                    continue
156 1
                field = Field(_clean_name(name))
157 1
                field.uatype = xmlfield.get("TypeName")
158 1
                if ":" in field.uatype:
159 1
                    field.uatype = field.uatype.split(":")[1]
160 1
                field.uatype = _clean_name(field.uatype)
161 1
                field.value = get_default_value(field.uatype)
162 1
                if array:
163 1
                    field.array = True
164 1
                    field.value = []
165 1
                    array = False
166 1
                struct.fields.append(field)
167 1
            self.model.append(struct)
168
169 1
    def save_to_file(self, path, register=False):
170 1
        _file = open(path, "wt")
171 1
        self._make_header(_file)
172 1
        for struct in self.model:
173 1
            _file.write(struct.get_code())
174 1
        if register:
175
            _file.write(self._make_registration())
176 1
        _file.close()
177
178 1
    def _make_registration(self):
179
        code = "\n\n"
180
        for struct in self.model:
181
            code += "ua.register_extension_object('{name}', ua.NodeId.from_string('{nodeid}'), {name})\n".format(name=struct.name, nodeid=struct.typeid)
182
        return code
183
184 1
    def get_python_classes(self, env=None):
185
        """
186
        generate Python code and execute in a new environment
187
        return a dict of structures {name: class}
188
        Rmw: Since the code is generated on the fly, in case of error the stack trace is 
189
        not available and debugging is very hard...
190
        """
191
        if env is None:
192
            env = {}
193
        #  Add the required libraries to dict
194
        if "ua" not in env:
195
            env['ua'] = ua
196
        if "datetime" not in env:
197
            env['datetime'] = datetime
198
        if "uuid" not in env:
199
            env['uuid'] = uuid
200
        # generate classes one by one and add them to dict
201
        for struct in self.model:
202
            code = struct.get_code()
203
            exec(code, env)
204
        return env
205
206 1
    def save_and_import(self, path, append_to=None):
207
        """
208
        save the new structures to a python file which be used later
209
        import the result and return resulting classes in a dict
210
        if append_to is a dict, the classes are added to the dict
211
        """
212 1
        self.save_to_file(path)
213 1
        name = os.path.basename(path)
214 1
        name = os.path.splitext(name)[0]
215 1
        mymodule = importlib.import_module(name)
216 1
        if append_to is None:
217 1
            result = {}
218
        else:
219
            result = append_to
220 1
        for struct in self.model:
221 1
            result[struct.name] = getattr(mymodule, struct.name)
222 1
        return result
223
224 1
    def _make_header(self, _file):
225 1
        _file.write("""
226
'''
227
THIS FILE IS AUTOGENERATED, DO NOT EDIT!!!
228
'''
229
230
from datetime import datetime
231
import uuid
232
233
from opcua import ua
234
""")
235
236 1
    def set_typeid(self, name, typeid):
237
        for struct in self.model:
238
            if struct.name == name:
239
                struct.typeid = typeid
240
                return
241
242
243 1
def load_type_definitions(server, nodes=None):
244
    """
245
    Download xml from given variable node defining custom structures.
246
    If no node is given, attemps to import variables from all nodes under
247
    "0:OPC Binary"
248
    the code is generated and imported on the fly. If you know the structures
249
    are not going to be modified it might be interresting to copy the generated files
250
    and include them in you code
251
    """
252
    if nodes is None:
253
        nodes = []
254
        for desc in server.nodes.opc_binary.get_children_descriptions():
255
            if desc.BrowseName != ua.QualifiedName("Opc.Ua"):
256
                nodes.append(server.get_node(desc.NodeId))
257
    
258
    structs_dict = {}
259
    generators = []
260
    for node in nodes:
261
        xml = node.get_value()
262
        xml = xml.decode("utf-8")
263
        generator = StructGenerator()
264
        generators.append(generator)
265
        generator.make_model_from_string(xml)
266
        # generate and execute new code on the fly
267
        generator.get_python_classes(structs_dict)
268
        # same but using a file that is imported. This can be usefull for debugging library
269
        #name = node.get_browse_name().Name
270
        # Make sure structure names do not contain charaters that cannot be used in Python class file names
271
        #name = _clean_name(name)
272
        #name = "structures_" + node.get_browse_name().Name
273
        #generator.save_and_import(name + ".py", append_to=structs_dict)
274
275
        # register classes
276
        # every children of our node should represent a class
277
        for ndesc in node.get_children_descriptions():
278
            ndesc_node = server.get_node(ndesc.NodeId)
279
            ref_desc_list = ndesc_node.get_references(refs=ua.ObjectIds.HasDescription, direction=ua.BrowseDirection.Inverse)
280
            if ref_desc_list:  #some server put extra things here
281
                name = _clean_name(ndesc.BrowseName.Name)
282
                if not name in structs_dict:
283
                    print("Error {} is found as child of binary definition node but is not found in xml".format(name))
284
                    continue
285
                nodeid = ref_desc_list[0].NodeId
286
                ua.register_extension_object(name, nodeid, structs_dict[name])
287
                # save the typeid if user want to create static file for type definitnion
288
                generator.set_typeid(name, nodeid.to_string())
289
    return generators, structs_dict
290
291
292 1
def _clean_name(name):
293
    """
294
    Remove characters that might be present in  OPC UA structures
295
    but cannot be part of of Python class names
296
    """
297 1
    name = re.sub(r'\W+', '_', name)
298 1
    name = re.sub(r'^[0-9]+', r'_\g<0>', name)
299
300
    return name
301