Completed
Pull Request — master (#490)
by Olivier
10:23
created

Struct.get_code()   B

Complexity

Conditions 6

Size

Total Lines 29

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 10
CRAP Score 6.027

Importance

Changes 2
Bugs 0 Features 0
Metric Value
cc 6
dl 0
loc 29
ccs 10
cts 11
cp 0.9091
crap 6.027
rs 7.5384
c 2
b 0
f 0
1
"""
2
Support for custom structures in client and server
3
We only support a subset of features but should be enough
4
for custom structures
5
"""
6
7 1
import os
8 1
import importlib
9 1
import re
10
# The next two imports are for generated code
11 1
from datetime import datetime
12 1
import uuid
13
14 1
from lxml import objectify
15
16
17 1
from opcua.ua.ua_binary import Primitives
18 1
from opcua import ua
19
20
21 1
def get_default_value(uatype):
22 1
    if uatype == "String":
23 1
        return "None" 
24 1
    elif uatype == "Guid":
25 1
        return "uuid.uuid4()" 
26 1
    elif uatype in ("ByteString", "CharArray", "Char"):
27 1
        return None 
28 1
    elif uatype == "Boolean":
29 1
        return "True"
30 1
    elif uatype == "DateTime":
31 1
        return "datetime.utcnow()"
32 1
    elif uatype in ("Int16", "Int32", "Int64", "UInt16", "UInt32", "UInt64", "Double", "Float", "Byte", "SByte"):
33 1
        return 0
34
    else:
35 1
        return "ua." + uatype + "()"
36
37
38 1
class Struct(object):
39 1
    def __init__(self, name):
40 1
        self.name = name
41 1
        self.fields = []
42 1
        self.typeid = None
43 1
44
    def get_code(self):
45 1
        code = """
46 1
47
class {0}(object):
48
49
    '''
50
    {0} structure autogenerated from xml
51
    '''
52
53 1
""".format(self.name)
54 1
55 1
        code += "    ua_types = [\n"
56 1
        for field in self.fields:
57
            prefix = "ListOf" if field.array else ""
58 1
            uatype = prefix + field.uatype
59 1
            if uatype == "ListOfChar":
60
                uatype = "String"
61
            code += "        ('{}', '{}'),\n".format(field.name, uatype)
62
63
        code += "    ]"
64
        code += """
65
66
    def __init__(self):
67
""".format(self.name)
68
        if not self.fields:
69
            code += "    pass"
70
        for field in self.fields:
71 1
            code += "        self.{} = {}\n".format(field.name, field.value)
72 1
        return code
73
74 1
75 1
class Field(object):
76
    def __init__(self, name):
77
        self.name = name
78
        self.uatype = None
79
        self.value = None
80
        self.array = False
81
82 1
83 1
class StructGenerator(object):
84 1
    def __init__(self):
85 1
        self.model = []
86
87 1
    def make_model_from_string(self, xml):
88
        obj = objectify.fromstring(xml)
89 1
        self._make_model(obj)
90 1
91
    def make_model_from_file(self, path):
92
        obj = objectify.parse(path)
93
        root = obj.getroot()
94
        self._make_model(root)
95
96
    def _make_model(self, root):
97
        for child in root.iter("{*}StructuredType"):
98 1
            struct = Struct(child.get("Name"))
99
            array = False
100 1
            for xmlfield in child.iter("{*}Field"):
101 1
                name = xmlfield.get("Name")
102
                if name.startswith("NoOf"):
103
                    array = True
104
                    continue
105 1
                field = Field(_clean_name(name))
106 1
                field.uatype = xmlfield.get("TypeName")
107 1
                if ":" in field.uatype:
108 1
                    field.uatype = field.uatype.split(":")[1]
109
                field.uatype = _clean_name(field.uatype)
110 1
                field.value = get_default_value(field.uatype)
111
                if array:
112 1
                    field.array = True
113 1
                    field.value = []
114
                    array = False
115
                struct.fields.append(field)
116
            self.model.append(struct)
117
118
    def save_to_file(self, path, register=False):
119
        _file = open(path, "wt")
120
        self._make_header(_file)
121
        for struct in self.model:
122 1
            _file.write(struct.get_code())
123 1
        if register:
124
            _file.write(self._make_registration())
125
        _file.close()
126 1
127 1
    def _make_registration(self):
128 1
        code = "\n\n"
129 1
        for struct in self.model:
130 1
            code += "ua.register_extension_object('{name}', ua.NodeId.from_string('{nodeid}'), {name})\n".format(name=struct.name, nodeid=struct.typeid)
131 1
        return code
132
133
    def get_python_classes(self, env=None):
134 1
        """
135 1
        generate Python code and execute in a new environment
136 1
        return a dict of structures {name: class}
137
        Rmw: Since the code is generated on the fly, in case of error the stack trace is 
138 1
        not available and debugging is very hard...
139
        """
140
        if env is None:
141
            env = {}
142 1
        #  Add the required libraries to dict
143 1
        if "ua" not in env:
144 1
            env['ua'] = ua
145 1
        if "datetime" not in env:
146
            env['datetime'] = datetime
147 1
        if "uuid" not in env:
148 1
            env['uuid'] = uuid
149 1
        # generate classes one by one and add them to dict
150 1
        for struct in self.model:
151 1
            code = struct.get_code()
152 1
            exec(code, env)
153 1
        return env
154 1
155 1
    def save_and_import(self, path, append_to=None):
156 1
        """
157 1
        save the new structures to a python file which be used later
158 1
        import the result and return resulting classes in a dict
159 1
        if append_to is a dict, the classes are added to the dict
160 1
        """
161 1
        self.save_to_file(path)
162 1
        name = os.path.basename(path)
163 1
        name = os.path.splitext(name)[0]
164 1
        mymodule = importlib.import_module(name)
165 1
        if append_to is None:
166 1
            result = {}
167 1
        else:
168
            result = append_to
169 1
        for struct in self.model:
170 1
            result[struct.name] = getattr(mymodule, struct.name)
171 1
        return result
172 1
173 1
    def _make_header(self, _file):
174 1
        _file.write("""
175
'''
176 1
THIS FILE IS AUTOGENERATED, DO NOT EDIT!!!
177
'''
178 1
179
from datetime import datetime
180
import uuid
181
182
from opcua import ua
183
""")
184 1
185
    def set_typeid(self, name, typeid):
186
        for struct in self.model:
187
            if struct.name == name:
188
                struct.typeid = typeid
189
                return
190
191
192
def load_type_definitions(server, nodes=None):
193
    """
194
    Download xml from given variable node defining custom structures.
195
    If no node is given, attemps to import variables from all nodes under
196
    "0:OPC Binary"
197
    the code is generated and imported on the fly. If you know the structures
198
    are not going to be modified it might be interresting to copy the generated files
199
    and include them in you code
200
    """
201
    if nodes is None:
202
        nodes = []
203
        for desc in server.nodes.opc_binary.get_children_descriptions():
204
            if desc.BrowseName != ua.QualifiedName("Opc.Ua"):
205
                nodes.append(server.get_node(desc.NodeId))
206 1
    
207
    structs_dict = {}
208
    generators = []
209
    for node in nodes:
210
        xml = node.get_value()
211
        xml = xml.decode("utf-8")
212 1
        generator = StructGenerator()
213 1
        generators.append(generator)
214 1
        generator.make_model_from_string(xml)
215 1
        # generate and execute new code on the fly
216 1
        generator.get_python_classes(structs_dict)
217 1
        # same but using a file that is imported. This can be usefull for debugging library
218
        #name = node.get_browse_name().Name
219
        # Make sure structure names do not contain charaters that cannot be used in Python class file names
220 1
        #name = _clean_name(name)
221 1
        #name = "structures_" + node.get_browse_name().Name
222 1
        #generator.save_and_import(name + ".py", append_to=structs_dict)
223
224 1
        # register classes
225 1
        # every children of our node should represent a class
226
        for ndesc in node.get_children_descriptions():
227
            ndesc_node = server.get_node(ndesc.NodeId)
228
            ref_desc_list = ndesc_node.get_references(refs=ua.ObjectIds.HasDescription, direction=ua.BrowseDirection.Inverse)
229
            if ref_desc_list:  #some server put extra things here
230
                name = _clean_name(ndesc.BrowseName.Name)
231
                if not name in structs_dict:
232
                    print("Error {} is found as child of binary definition node but is not found in xml".format(name))
233
                    continue
234
                nodeid = ref_desc_list[0].NodeId
235
                ua.register_extension_object(name, nodeid, structs_dict[name])
236 1
                # save the typeid if user want to create static file for type definitnion
237
                generator.set_typeid(name, nodeid.to_string())
238
    return generators, structs_dict
239
240
241
def _clean_name(name):
242
    """
243 1
    Remove characters that might be present in  OPC UA structures
244
    but cannot be part of of Python class names
245
    """
246
    name = re.sub(r'\W+', '_', name)
247
    name = re.sub(r'^[0-9]+', r'_\g<0>', name)
248
249
    return name
250