Completed
Pull Request — master (#494)
by Olivier
05:47 queued 01:20
created

StructGenerator._make_registration()   A

Complexity

Conditions 2

Size

Total Lines 5

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 1
CRAP Score 4.048

Importance

Changes 1
Bugs 0 Features 0
Metric Value
cc 2
c 1
b 0
f 0
dl 0
loc 5
ccs 1
cts 5
cp 0.2
crap 4.048
rs 9.4285
1
"""
2
Support for custom structures in client and server
3
We only support a subset of features but should be enough
4
for custom structures
5
"""
6
7 1
import os
8 1
import importlib
9 1
import re
10
# The next two imports are for generated code
11 1
from datetime import datetime
12 1
import uuid
13
14 1
from lxml import objectify
15
16
17 1
from opcua.ua.ua_binary import Primitives
18 1
from opcua import ua
19
20
21 1
def get_default_value(uatype):
22 1
    if uatype == "String":
23 1
        return "None" 
24 1
    elif uatype == "Guid":
25 1
        return "uuid.uuid4()" 
26 1
    elif uatype in ("ByteString", "CharArray", "Char"):
27 1
        return None 
28 1
    elif uatype == "Boolean":
29 1
        return "True"
30 1
    elif uatype == "DateTime":
31 1
        return "datetime.utcnow()"
32 1
    elif uatype in ("Int16", "Int32", "Int64", "UInt16", "UInt32", "UInt64", "Double", "Float", "Byte", "SByte"):
33 1
        return 0
34
    else:
35 1
        return "ua." + uatype + "()"
36
37
38 1
class Struct(object):
39 1
    def __init__(self, name):
40 1
        self.name = name
41 1
        self.fields = []
42 1
        self.code = ""
43
44 1
    def get_code(self):
45 1
        if not self.fields:
46
            return """
47
48
class {}(object):
49
    pass
50
51
""".format(_clean_name(self.name))
52 1
        self._make_constructor()
53 1
        self._make_from_binary()
54 1
        self._make_to_binary()
55 1
        return self.code
56
57 1
    def _make_constructor(self):
58 1
        self.code = """
59
60
61
class {0}(object):
62
    '''
63
    {0} structure autogenerated from xml
64
    '''
65
    def __init__(self, data=None):
66
        if data is not None:
67
            self._binary_init(data)
68
            return
69
""".format(self.name)
70 1
        for field in self.fields:
71 1
            self.code += "        self.{} = {}\n".format(field.name, field.value)
72
73 1
    def _make_from_binary(self):
74 1
        self.code += '''
75
    @staticmethod
76
    def from_binary(data):
77
        return {}(data=data)
78
79
    def _binary_init(self, data):
80
'''.format(self.name)
81 1
        for field in self.fields:
82 1
            if hasattr(Primitives, field.uatype):
83 1
                if field.array:
84 1
                    self.code += '        self.{} = ua.ua_binary.Primitives.{}.unpack_array(data)\n'.format(field.name, field.uatype)
85
                else:
86 1
                    self.code += '        self.{} = ua.ua_binary.Primitives.{}.unpack(data)\n'.format(field.name, field.uatype)
87
            else:
88 1
                if field.array:
89 1
                    self.code += '''
90
        length = ua.ua_binary.Primitives.Int32.unpack(data)
91
        if length == -1:
92
            self.{0} = None
93
        else:
94
            self.{0} = [ua.{1}.from_binary(data) for _ in range(length)]
95
'''.format(field.name, field.uatype)
96
                else:
97 1
                    self.code += "        self.{} = ua.{}.from_binary(data)\n".format(field.name, field.uatype)
98
99 1
    def _make_to_binary(self):
100 1
        self.code += '''
101
    def to_binary(self):
102
        packet = []
103
'''
104 1
        for field in self.fields:
105 1
            if hasattr(Primitives, field.uatype):
106 1
                if field.array:
107 1
                    self.code += '        packet.append(ua.ua_binary.Primitives.{}.pack_array(self.{}))\n'.format(field.uatype, field.name)
108
                else:
109 1
                    self.code += '        packet.append(ua.ua_binary.Primitives.{}.pack(self.{}))\n'.format(field.uatype, field.name)
110
            else:
111 1
                if field.array:
112 1
                    self.code += '''
113
        if self.{0} is None:
114
            packet.append(ua.ua_binary.Primitives.Int32.pack(-1))
115
        else:
116
            packet.append(ua.ua_binary.Primitives.Int32.pack(len(self.{0})))
117
            for element in self.{0}:
118
                packet.append(element.to_binary())
119
'''.format(field.name)
120
                else:
121 1
                    self.code += "        packet.append(self.{}.to_binary())\n".format(field.name)
122 1
        self.code += '        return b"".join(packet)'
123
124
125 1
class Field(object):
126 1
    def __init__(self, name):
127 1
        self.name = name
128 1
        self.uatype = None
129 1
        self.value = None
130 1
        self.array = False
131
132
133 1
class StructGenerator(object):
134 1
    def __init__(self):
135 1
        self.model = []
136
137 1
    def make_model_from_string(self, xml):
138
        obj = objectify.fromstring(xml)
139
        self._make_model(obj)
140
141 1
    def make_model_from_file(self, path):
142 1
        obj = objectify.parse(path)
143 1
        root = obj.getroot()
144 1
        self._make_model(root)
145
146 1
    def _make_model(self, root):
147 1
        for child in root.iter("{*}StructuredType"):
148 1
            struct = Struct(child.get("Name"))
149 1
            array = False
150 1
            for xmlfield in child.iter("{*}Field"):
151 1
                name = xmlfield.get("Name")
152 1
                if name.startswith("NoOf"):
153 1
                    array = True
154 1
                    continue
155 1
                field = Field(name)
156 1
                field.uatype = xmlfield.get("TypeName")
157 1
                if ":" in field.uatype:
158 1
                    field.uatype = field.uatype.split(":")[1]
159 1
                field.value = get_default_value(field.uatype)
160 1
                if array:
161 1
                    field.array = True
162 1
                    field.value = []
163 1
                    array = False
164 1
                struct.fields.append(field)
165 1
            self.model.append(struct)
166
167 1
    def save_to_file(self, path, register=False):
168 1
        _file = open(path, "wt")
169 1
        self._make_header(_file)
170 1
        for struct in self.model:
171 1
            _file.write(struct.get_code())
172 1
        if register:
173
            _file.write(self._make_registration())
174 1
        _file.close()
175
176 1
    def _make_registration(self):
177
        code = "\n\n"
178
        for struct in self.model:
179
            code += "\nsetattr(ua, '{name}', {name})\n".format(name=struct.name)
180
        return code
181
182 1
    def get_python_classes(self, env=None):
183
        """
184
        generate Python code and execute in a new environment
185
        return a dict of structures {name: class}
186
        Rmw: Since the code is generated on the fly, in case of error the stack trace is 
187
        not available and debugging is very hard...
188
        """
189
        if env is None:
190
            env = {}
191
        #  Add the required libraries to dict
192
        if "ua" not in env:
193
            env['ua'] = ua
194
        if "datetime" not in env:
195
            env['datetime'] = datetime
196
        if "uuid" not in env:
197
            env['uuid'] = uuid
198
        # generate classes one by one and add them to dict
199
        for struct in self.model:
200
            code = struct.get_code()
201
            exec(code, env)
202
        return env
203
204 1
    def save_and_import(self, path, append_to=None):
205
        """
206
        save the new structures to a python file which be used later
207
        import the result and return resulting classes in a dict
208
        if append_to is a dict, the classes are added to the dict
209
        """
210 1
        self.save_to_file(path)
211 1
        name = os.path.basename(path)
212 1
        name = os.path.splitext(name)[0]
213 1
        mymodule = importlib.import_module(name)
214 1
        if append_to is None:
215 1
            result = {}
216
        else:
217
            result = append_to
218 1
        for struct in self.model:
219 1
            result[struct.name] = getattr(mymodule, struct.name)
220 1
        return result
221
222 1
    def _make_header(self, _file):
223 1
        _file.write("""
224
'''
225
THIS FILE IS AUTOGENERATED, DO NOT EDIT!!!
226
'''
227
228
from datetime import datetime
229
import uuid
230
231
from opcua import ua
232
""")
233
234
235
236
237 1
def load_type_definitions(server, nodes=None):
238
    """
239
    Download xml from given variable node defining custom structures.
240
    If no no node is given, attemps to import variables from all nodes under
241
    "0:OPC Binary"
242
    the code is generated and imported on the fly. If you know the structures
243
    are not going to be modified it might be interresting to copy the generated files
244
    and include them in you code
245
    """
246
    if nodes is None:
247
        nodes = []
248
        for desc in server.nodes.opc_binary.get_children_descriptions():
249
            if desc.BrowseName != ua.QualifiedName("Opc.Ua"):
250
                nodes.append(server.get_node(desc.NodeId))
251
    
252
    structs_dict = {}
253
    for node in nodes:
254
        xml = node.get_value()
255
        xml = xml.decode("utf-8")
256
        generator = StructGenerator()
257
        generator.make_model_from_string(xml)
258
        # generate and execute new code on the fly
259
        generator.get_python_classes(structs_dict)
260
        # same but using a file that is imported. This can be usefull for debugging library
261
        #name = node.get_browse_name().Name
262
        # Make sure structure names do not contain charaters that cannot be used in Python class file names
263
        #name = _clean_name(name)
264
        #name = "structures_" + node.get_browse_name().Name
265
        #generator.save_and_import(name + ".py", append_to=structs_dict)
266
267
        # register classes
268
        # every children of our node should represent a class
269
        for ndesc in node.get_children_descriptions():
270
            ndesc_node = server.get_node(ndesc.NodeId)
271
            ref_desc_list = ndesc_node.get_references(refs=ua.ObjectIds.HasDescription, direction=ua.BrowseDirection.Inverse)
272
            if ref_desc_list:  #some server put extra things here
273
                name = _clean_name(ndesc.BrowseName.Name)
274
                if not name in structs_dict:
275
                    print("Error {} is found as child of binary definition node but is not found in xml".format(name))
276
                    continue
277
                nodeid = ref_desc_list[0].NodeId
278
                ua.register_extension_object(name, nodeid, structs_dict[name])
279
280
281 1
def _clean_name(name):
282
    """
283
    Remove characters that might be present in  OPC UA structures
284
    but cannot be part of of Python class names
285
    """
286
    name = re.sub(r'\W+', '_', name)
287
    name = re.sub(r'^[0-9]+', '_', name)
288
    return name
289