Test Failed
Pull Request — master (#490)
by Olivier
06:42
created

Struct.__init__()   A

Complexity

Conditions 1

Size

Total Lines 4

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 0
CRAP Score 2

Importance

Changes 2
Bugs 0 Features 0
Metric Value
cc 1
c 2
b 0
f 0
dl 0
loc 4
ccs 0
cts 4
cp 0
crap 2
rs 10
1
"""
2
Support for custom structures in client and server
3
We only support a subset of features but should be enough
4
for custom structures
5
"""
6
7
import os
8
import importlib
9
import re
10
# The next two imports are for generated code
11
from datetime import datetime
12
import uuid
13
14
from lxml import objectify
15
16
17
from opcua.ua.ua_binary import Primitives
18
from opcua import ua
19
20
21
def get_default_value(uatype):
22
    if uatype == "String":
23
        return "None" 
24
    elif uatype == "Guid":
25
        return "uuid.uuid4()" 
26
    elif uatype in ("ByteString", "CharArray", "Char"):
27
        return None 
28
    elif uatype == "Boolean":
29
        return "True"
30
    elif uatype == "DateTime":
31
        return "datetime.utcnow()"
32
    elif uatype in ("Int16", "Int32", "Int64", "UInt16", "UInt32", "UInt64", "Double", "Float", "Byte", "SByte"):
33
        return 0
34
    else:
35
        return "ua." + uatype + "()"
36
37
38
class Struct(object):
39
    def __init__(self, name):
40
        self.name = name
41
        self.fields = []
42
        self.typeid = None
43
44
    def get_code(self):
45
        code = """
46
47
class {0}(object):
48
49
    '''
50
    {0} structure autogenerated from xml
51
    '''
52
53
""".format(self.name)
54
55
        code += "    ua_types = [\n"
56
        for field in self.fields:
57
            prefix = "ListOf" if field.array else ""
58
            uatype = prefix + field.uatype
59
            if uatype == "ListOfChar":
60
                uatype = "String"
61
            code += "        ('{}', '{}'),\n".format(field.name, uatype)
62
63
        code += "    ]"
64
        code += """
65
66
    def __init__(self):
67
""".format(self.name)
68
        if not self.fields:
69
            code += "    pass"
70
        for field in self.fields:
71
            code += "        self.{} = {}\n".format(field.name, field.value)
72
        return code
73
74
75
class Field(object):
76
    def __init__(self, name):
77
        self.name = name
78
        self.uatype = None
79
        self.value = None
80
        self.array = False
81
82
83
class StructGenerator(object):
84
    def __init__(self):
85
        self.model = []
86
87
    def make_model_from_string(self, xml):
88
        obj = objectify.fromstring(xml)
89
        self._make_model(obj)
90
91
    def make_model_from_file(self, path):
92
        obj = objectify.parse(path)
93
        root = obj.getroot()
94
        self._make_model(root)
95
96
    def _make_model(self, root):
97
        for child in root.iter("{*}StructuredType"):
98
            struct = Struct(child.get("Name"))
99
            array = False
100
            for xmlfield in child.iter("{*}Field"):
101
                name = xmlfield.get("Name")
102
                if name.startswith("NoOf"):
103
                    array = True
104
                    continue
105
                field = Field(_clean_name(name))
106
                field.uatype = xmlfield.get("TypeName")
107
                if ":" in field.uatype:
108
                    field.uatype = field.uatype.split(":")[1]
109
                field.uatype = _clean_name(field.uatype)
110
                field.value = get_default_value(field.uatype)
111
                if array:
112
                    field.array = True
113
                    field.value = []
114
                    array = False
115
                struct.fields.append(field)
116
            self.model.append(struct)
117
118
    def save_to_file(self, path, register=False):
119
        _file = open(path, "wt")
120
        self._make_header(_file)
121
        for struct in self.model:
122
            _file.write(struct.get_code())
123
        if register:
124
            _file.write(self._make_registration())
125
        _file.close()
126
127
    def _make_registration(self):
128
        code = "\n\n"
129
        for struct in self.model:
130
            code += "ua.register_extension_object('{name}', ua.NodeId.from_string('{nodeid}'), {name})\n".format(name=struct.name, nodeid=struct.typeid)
131
        return code
132
133
    def get_python_classes(self, env=None):
134
        """
135
        generate Python code and execute in a new environment
136
        return a dict of structures {name: class}
137
        Rmw: Since the code is generated on the fly, in case of error the stack trace is 
138
        not available and debugging is very hard...
139
        """
140
        if env is None:
141
            env = {}
142
        #  Add the required libraries to dict
143
        if "ua" not in env:
144
            env['ua'] = ua
145
        if "datetime" not in env:
146
            env['datetime'] = datetime
147
        if "uuid" not in env:
148
            env['uuid'] = uuid
149
        # generate classes one by one and add them to dict
150
        for struct in self.model:
151
            code = struct.get_code()
152
            exec(code, env)
153
        return env
154
155
    def save_and_import(self, path, append_to=None):
156
        """
157
        save the new structures to a python file which be used later
158
        import the result and return resulting classes in a dict
159
        if append_to is a dict, the classes are added to the dict
160
        """
161
        self.save_to_file(path)
162
        name = os.path.basename(path)
163
        name = os.path.splitext(name)[0]
164
        mymodule = importlib.import_module(name)
165
        if append_to is None:
166
            result = {}
167
        else:
168
            result = append_to
169
        for struct in self.model:
170
            result[struct.name] = getattr(mymodule, struct.name)
171
        return result
172
173
    def _make_header(self, _file):
174
        _file.write("""
175
'''
176
THIS FILE IS AUTOGENERATED, DO NOT EDIT!!!
177
'''
178
179
from datetime import datetime
180
import uuid
181
182
from opcua import ua
183
""")
184
185
    def set_typeid(self, name, typeid):
186
        for struct in self.model:
187
            if struct.name == name:
188
                struct.typeid = typeid
189
                return
190
191
192
def load_type_definitions(server, nodes=None):
193
    """
194
    Download xml from given variable node defining custom structures.
195
    If no node is given, attemps to import variables from all nodes under
196
    "0:OPC Binary"
197
    the code is generated and imported on the fly. If you know the structures
198
    are not going to be modified it might be interresting to copy the generated files
199
    and include them in you code
200
    """
201
    if nodes is None:
202
        nodes = []
203
        for desc in server.nodes.opc_binary.get_children_descriptions():
204
            if desc.BrowseName != ua.QualifiedName("Opc.Ua"):
205
                nodes.append(server.get_node(desc.NodeId))
206
    
207
    structs_dict = {}
208
    generators = []
209
    for node in nodes:
210
        xml = node.get_value()
211
        xml = xml.decode("utf-8")
212
        generator = StructGenerator()
213
        generators.append(generator)
214
        generator.make_model_from_string(xml)
215
        # generate and execute new code on the fly
216
        generator.get_python_classes(structs_dict)
217
        # same but using a file that is imported. This can be usefull for debugging library
218
        #name = node.get_browse_name().Name
219
        # Make sure structure names do not contain charaters that cannot be used in Python class file names
220
        #name = _clean_name(name)
221
        #name = "structures_" + node.get_browse_name().Name
222
        #generator.save_and_import(name + ".py", append_to=structs_dict)
223
224
        # register classes
225
        # every children of our node should represent a class
226
        for ndesc in node.get_children_descriptions():
227
            ndesc_node = server.get_node(ndesc.NodeId)
228
            ref_desc_list = ndesc_node.get_references(refs=ua.ObjectIds.HasDescription, direction=ua.BrowseDirection.Inverse)
229
            if ref_desc_list:  #some server put extra things here
230
                name = _clean_name(ndesc.BrowseName.Name)
231
                if not name in structs_dict:
232
                    print("Error {} is found as child of binary definition node but is not found in xml".format(name))
233
                    continue
234
                nodeid = ref_desc_list[0].NodeId
235
                ua.register_extension_object(name, nodeid, structs_dict[name])
236
                # save the typeid if user want to create static file for type definitnion
237
                generator.set_typeid(name, nodeid.to_string())
238
    return generators, structs_dict
239
240
241
def _clean_name(name):
242
    """
243
    Remove characters that might be present in  OPC UA structures
244
    but cannot be part of of Python class names
245
    """
246
    name = re.sub(r'\W+', '_', name)
247
    name = re.sub(r'^[0-9]+', r'_\g<0>', name)
248
249
    return name
250