Completed
Pull Request — master (#494)
by Olivier
08:23
created

StructGenerator._make_model()   B

Complexity

Conditions 6

Size

Total Lines 20

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 20
CRAP Score 6

Importance

Changes 1
Bugs 0 Features 0
Metric Value
cc 6
c 1
b 0
f 0
dl 0
loc 20
ccs 20
cts 20
cp 1
crap 6
rs 8
1
"""
2
parse simple structures from an xml tree
3
We only support a subset of features but should be enough
4
for custom structures
5
"""
6
7 1
import os
8 1
import importlib
9
10 1
from lxml import objectify
11
12
13 1
from opcua.ua.ua_binary import Primitives
14 1
from opcua import ua
15
# The next two imports are for generated code
16 1
from datetime import datetime
17 1
import uuid
18
19
20 1
def get_default_value(uatype):
21 1
    if uatype == "String":
22 1
        return "None" 
23 1
    elif uatype == "Guid":
24 1
        return "uuid.uuid4()" 
25 1
    elif uatype in ("ByteString", "CharArray", "Char"):
26 1
        return None 
27 1
    elif uatype == "Boolean":
28 1
        return "True"
29 1
    elif uatype == "DateTime":
30 1
        return "datetime.utcnow()"
31 1
    elif uatype in ("Int16", "Int32", "Int64", "UInt16", "UInt32", "UInt64", "Double", "Float", "Byte", "SByte"):
32 1
        return 0
33
    else:
34 1
        return "ua." + uatype + "()"
35
36
37 1
class Struct(object):
38 1
    def __init__(self, name):
39 1
        self.name = name
40 1
        self.fields = []
41 1
        self.code = ""
42
43 1
    def get_code(self):
44 1
        if not self.fields:
45
            return """
46
47
class {}(object):
48
    pass
49
50
""".format(self.name)
51 1
        self._make_constructor()
52 1
        self._make_from_binary()
53 1
        self._make_to_binary()
54 1
        return self.code
55
56 1
    def _make_constructor(self):
57 1
        self.code = """
58
59
60
class {0}(object):
61
    '''
62
    {0} structure autogenerated from xml
63
    '''
64
    def __init__(self, data=None):
65
        if data is not None:
66
            self._binary_init(data)
67
            return
68
""".format(self.name)
69 1
        for field in self.fields:
70 1
            self.code += "        self.{} = {}\n".format(field.name, field.value)
71
72 1
    def _make_from_binary(self):
73 1
        self.code += '''
74
    @staticmethod
75
    def from_binary(data):
76
        return {}(data=data)
77
78
    def _binary_init(self, data):
79
'''.format(self.name)
80 1
        for field in self.fields:
81 1
            if hasattr(Primitives, field.uatype):
82 1
                if field.array:
83 1
                    self.code += '        self.{} = ua.ua_binary.Primitives.{}.unpack_array(data)\n'.format(field.name, field.uatype)
84
                else:
85 1
                    self.code += '        self.{} = ua.ua_binary.Primitives.{}.unpack(data)\n'.format(field.name, field.uatype)
86
            else:
87 1
                if field.array:
88 1
                    self.code += '''
89
        length = ua.ua_binary.Primitives.Int32.unpack(data)
90
        if length == -1:
91
            self.{0} = None
92
        else:
93
            self.{0} = [ua.{1}.from_binary(data) for _ in range(length)]
94
'''.format(field.name, field.uatype)
95
                else:
96 1
                    self.code += "        self.{} = ua.{}.from_binary(data)\n".format(field.name, field.uatype)
97
98 1
    def _make_to_binary(self):
99 1
        self.code += '''
100
    def to_binary(self):
101
        packet = []
102
'''
103 1
        for field in self.fields:
104 1
            if hasattr(Primitives, field.uatype):
105 1
                if field.array:
106 1
                    self.code += '        packet.append(ua.ua_binary.Primitives.{}.pack_array(self.{}))\n'.format(field.uatype, field.name)
107
                else:
108 1
                    self.code += '        packet.append(ua.ua_binary.Primitives.{}.pack(self.{}))\n'.format(field.uatype, field.name)
109
            else:
110 1
                if field.array:
111 1
                    self.code += '''
112
        if self.{0} is None:
113
            packet.append(ua.ua_binary.Primitives.Int32.pack(-1))
114
        else:
115
            packet.append(ua.ua_binary.Primitives.Int32.pack(len(self.{0})))
116
            for element in self.{0}:
117
                packet.append(element.to_binary())
118
'''.format(field.name)
119
                else:
120 1
                    self.code += "        packet.append(self.{}.to_binary())\n".format(field.name)
121 1
        self.code += '        return b"".join(packet)'
122
123
124 1
class Field(object):
125 1
    def __init__(self, name):
126 1
        self.name = name
127 1
        self.uatype = None
128 1
        self.value = None
129 1
        self.array = False
130
131
132 1
class StructGenerator(object):
133 1
    def __init__(self):
134 1
        self.model = []
135
136 1
    def make_model_from_string(self, xml):
137
        obj = objectify.fromstring(xml)
138
        self._make_model(obj)
139
140 1
    def make_model_from_file(self, path):
141 1
        obj = objectify.parse(path)
142 1
        root = obj.getroot()
143 1
        self._make_model(root)
144
145 1
    def _make_model(self, root):
146 1
        for child in root.iter("{*}StructuredType"):
147 1
            struct = Struct(child.get("Name"))
148 1
            array = False
149 1
            for xmlfield in child.iter("{*}Field"):
150 1
                name = xmlfield.get("Name")
151 1
                if name.startswith("NoOf"):
152 1
                    array = True
153 1
                    continue
154 1
                field = Field(name)
155 1
                field.uatype = xmlfield.get("TypeName")
156 1
                if ":" in field.uatype:
157 1
                    field.uatype = field.uatype.split(":")[1]
158 1
                field.value = get_default_value(field.uatype)
159 1
                if array:
160 1
                    field.array = True
161 1
                    field.value = []
162 1
                    array = False
163 1
                struct.fields.append(field)
164 1
            self.model.append(struct)
165
166 1
    def save_to_file(self, path, register=False):
167 1
        _file = open(path, "wt")
168 1
        self._make_header(_file)
169 1
        for struct in self.model:
170 1
            _file.write(struct.get_code())
171 1
        if register:
172
            _file.write(self._make_registration())
173 1
        _file.close()
174
175 1
    def _make_registration(self):
176
        code = "\n\n"
177
        for struct in self.model:
178
            code += "\nsetattr(ua, '{name}', {name})\n".format(name=struct.name)
179
        return code
180
181 1
    def get_python_classes(self, env=None):
182
        """
183
        generate Python code and execute in a new environment
184
        return a dict of structures {name: class}
185
        Rmw: Since the code is generated on the fly, in case of error the stack trace is 
186
        not available and debugging is very hard...
187
        """
188
        if env is None:
189
            env = {}
190
        #  Add the required libraries to dict
191
        if not "ua" in env:
192
            env['ua'] = ua
193
        if not "datetime" in env:
194
            env['datetime'] = datetime
195
        if not "uuid" in env:
196
            env['uuid'] = uuid
197
        # generate classes one by one and add them to dict
198
        for struct in self.model:
199
            code = struct.get_code()
200
            exec(code, env)
201
        return env
202
203 1
    def save_and_import(self, path, append_to=None):
204
        """
205
        save the new structures to a python file which be used later
206
        import the result and return resulting classes in a dict
207
        if append_to is a dict, the classes are added to the dict
208
        """
209 1
        self.save_to_file(path)
210 1
        name = os.path.basename(path)
211 1
        name = os.path.splitext(name)[0]
212 1
        mymodule = importlib.import_module(name)
213 1
        if append_to is None:
214 1
            result = {}
215
        else:
216
            result = append_to
217 1
        for struct in self.model:
218 1
            result[struct.name] = getattr(mymodule, struct.name)
219 1
        return result
220
221 1
    def _make_header(self, _file):
222 1
        _file.write("""
223
'''
224
THIS FILE IS AUTOGENERATED, DO NOT EDIT!!!
225
'''
226
227
from datetime import datetime
228
import uuid
229
230
from opcua import ua
231
""")
232
233
234
235
236 1
if __name__ == "__main__":
237
    import sys
238
    from IPython import embed
239
    sys.path.insert(0, ".") # necessary for import in current dir
240
241
    #xmlpath = "schemas/Opc.Ua.Types.bsd"
242
    xmlpath = "schemas/example.bsd"
243
    c = StructGenerator(xmlpath, "structures.py")
244
    c.run()
245
    import structures as s
246
247
248
    #sts = c.get_structures()
249
    embed()
250