Passed
Pull Request — master (#494)
by Olivier
03:56
created

StructGenerator.get_python_classes()   B

Complexity

Conditions 6

Size

Total Lines 21

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 1
CRAP Score 34.317

Importance

Changes 1
Bugs 0 Features 0
Metric Value
cc 6
c 1
b 0
f 0
dl 0
loc 21
ccs 1
cts 13
cp 0.0769
crap 34.317
rs 7.8867
1
"""
2
parse simple structures from an xml tree
3
We only support a subset of features but should be enough
4
for custom structures
5
"""
6
7 1
import os
8 1
import importlib
9
10 1
from lxml import objectify
11
12
13 1
from opcua.ua.ua_binary import Primitives
14 1
from opcua import ua
15
# The next two imports are for generated code
16 1
from datetime import datetime
17 1
import uuid
18
19
20 1
def get_default_value(uatype):
21 1
    if uatype == "String":
22 1
        return "None" 
23 1
    elif uatype == "Guid":
24 1
        return "uuid.uuid4()" 
25 1
    elif uatype in ("ByteString", "CharArray", "Char"):
26 1
        return None 
27 1
    elif uatype == "Boolean":
28 1
        return "True"
29 1
    elif uatype == "DateTime":
30 1
        return "datetime.utcnow()"
31 1
    elif uatype in ("Int16", "Int32", "Int64", "UInt16", "UInt32", "UInt64", "Double", "Float", "Byte", "SByte"):
32 1
        return 0
33
    else:
34 1
        return "ua." + uatype + "()"
35
36
37 1
class Struct(object):
38 1
    def __init__(self, name):
39 1
        self.name = name
40 1
        self.fields = []
41 1
        self.code = ""
42
43 1
    def get_code(self):
44 1
        if not self.fields:
45
            return """
46
47
class {}(object):
48
    pass
49
50
""".format(self.name)
51 1
        self._make_constructor()
52 1
        self._make_from_binary()
53 1
        self._make_to_binary()
54 1
        return self.code
55
56 1
    def _make_constructor(self):
57 1
        self.code = """
58
59
60
class {0}(object):
61
    '''
62
    {0} structure autogenerated from xml
63
    '''
64
    def __init__(self, data=None):
65
        if data is not None:
66
            self._binary_init(data)
67
            return
68
""".format(self.name)
69 1
        for field in self.fields:
70 1
            self.code += "        self.{} = {}\n".format(field.name, field.value)
71
72 1
    def _make_from_binary(self):
73 1
        self.code += '''
74
    @staticmethod
75
    def from_binary(data):
76
        return {}(data=data)
77
78
    def _binary_init(self, data):
79
'''.format(self.name)
80 1
        for field in self.fields:
81 1
            if hasattr(Primitives, field.uatype):
82 1
                if field.array:
83 1
                    self.code += '        self.{} = ua.ua_binary.Primitives.{}.unpack_array(data)\n'.format(field.name, field.uatype)
84
                else:
85 1
                    self.code += '        self.{} = ua.ua_binary.Primitives.{}.unpack(data)\n'.format(field.name, field.uatype)
86
            else:
87 1
                if field.array:
88 1
                    self.code += '''
89
        length = ua.ua_binary.Primitives.Int32.unpack(data)
90
        if length == -1:
91
            self.{0} = None
92
        else:
93
            self.{0} = [ua.{1}.from_binary(data) for _ in range(length)]
94
'''.format(field.name, field.uatype)
95
                else:
96 1
                    self.code += "        self.{} = ua.{}.from_binary(data)\n".format(field.name, field.uatype)
97
98 1
    def _make_to_binary(self):
99 1
        self.code += '''
100
    def to_binary(self):
101
        packet = []
102
'''
103 1
        for field in self.fields:
104 1
            if hasattr(Primitives, field.uatype):
105 1
                if field.array:
106 1
                    self.code += '        packet.append(ua.ua_binary.Primitives.{}.pack_array(self.{}))\n'.format(field.uatype, field.name)
107
                else:
108 1
                    self.code += '        packet.append(ua.ua_binary.Primitives.{}.pack(self.{}))\n'.format(field.uatype, field.name)
109
            else:
110 1
                if field.array:
111 1
                    self.code += '''
112
        if self.{0} is None:
113
            packet.append(ua.ua_binary.Primitives.Int32.pack(-1))
114
        else:
115
            packet.append(ua.ua_binary.Primitives.Int32.pack(len(self.{0})))
116
            for element in self.{0}:
117
                packet.append(element.to_binary())
118
'''.format(field.name)
119
                else:
120 1
                    self.code += "        packet.append(self.{}.to_binary())\n".format(field.name)
121 1
        self.code += '        return b"".join(packet)'
122
123
124 1
class Field(object):
125 1
    def __init__(self, name):
126 1
        self.name = name
127 1
        self.uatype = None
128 1
        self.value = None
129 1
        self.array = False
130
131
132 1
class StructGenerator(object):
133 1
    def __init__(self):
134 1
        self.model = []
135
136 1
    def make_model_from_string(self, xml):
137
        obj = objectify.fromstring(xml)
138
        self._make_model(obj)
139
140 1
    def make_model_from_file(self, path):
141 1
        obj = objectify.parse(path)
142 1
        root = obj.getroot()
143 1
        self._make_model(root)
144
145 1
    def _make_model(self, root):
146 1
        for child in root.iter("{*}StructuredType"):
147 1
            struct = Struct(child.get("Name"))
148 1
            array = False
149 1
            for xmlfield in child.iter("{*}Field"):
150 1
                name = xmlfield.get("Name")
151 1
                if name.startswith("NoOf"):
152 1
                    array = True
153 1
                    continue
154 1
                field = Field(name)
155 1
                field.uatype = xmlfield.get("TypeName")
156 1
                if ":" in field.uatype:
157 1
                    field.uatype = field.uatype.split(":")[1]
158 1
                field.value = get_default_value(field.uatype)
159 1
                if array:
160 1
                    field.array = True
161 1
                    field.value = []
162 1
                    array = False
163 1
                struct.fields.append(field)
164 1
            self.model.append(struct)
165
166 1
    def save_to_file(self, path):
167 1
        _file = open(path, "wt")
168 1
        self._make_header(_file)
169 1
        for struct in self.model:
170 1
            _file.write(struct.get_code())
171 1
        _file.close()
172
173 1
    def get_python_classes(self, env=None):
174
        """
175
        generate Python code and execute in a new environment
176
        return a dict of structures {name: class}
177
        Rmw: Since the code is generated on the fly, in case of error the stack trace is 
178
        not available and debugging is very hard...
179
        """
180
        if env is None:
181
            env = {}
182
        #  Add the required libraries to dict
183
        if not "ua" in env:
184
            env['ua'] = ua
185
        if not "datetime" in env:
186
            env['datetime'] = datetime
187
        if not "uuid" in env:
188
            env['uuid'] = uuid
189
        # generate classes one by one and add them to dict
190
        for struct in self.model:
191
            code = struct.get_code()
192
            exec(code, env)
193
        return env
194
195 1
    def save_and_import(self, path, append_to=None):
196
        """
197
        save the new structures to a python file which be used later
198
        import the result and return resulting classes in a dict
199
        if append_to is a dict, the classes are added to the dict
200
        """
201 1
        self.save_to_file(path)
202 1
        name = os.path.basename(path)
203 1
        name = os.path.splitext(name)[0]
204 1
        mymodule = importlib.import_module(name)
205 1
        if append_to is None:
206 1
            result = {}
207
        else:
208
            result = append_to
209 1
        for struct in self.model:
210 1
            result[struct.name] = getattr(mymodule, struct.name)
211 1
        return result
212
213 1
    def _make_header(self, _file):
214 1
        _file.write("""
215
'''
216
THIS FILE IS AUTOGENERATED, DO NOT EDIT!!!
217
'''
218
219
from datetime import datetime
220
import uuid
221
222
from opcua import ua
223
""")
224
225
226
227
228 1
if __name__ == "__main__":
229
    import sys
230
    from IPython import embed
231
    sys.path.insert(0, ".") # necessary for import in current dir
232
233
    #xmlpath = "schemas/Opc.Ua.Types.bsd"
234
    xmlpath = "schemas/example.bsd"
235
    c = StructGenerator(xmlpath, "structures.py")
236
    c.run()
237
    import structures as s
238
239
240
    #sts = c.get_structures()
241
    embed()
242