Completed
Pull Request — master (#494)
by Olivier
07:16
created

StructGenerator.get_python_classes()   B

Complexity

Conditions 6

Size

Total Lines 21

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 11
CRAP Score 7.0986

Importance

Changes 1
Bugs 0 Features 0
Metric Value
cc 6
c 1
b 0
f 0
dl 0
loc 21
ccs 11
cts 16
cp 0.6875
crap 7.0986
rs 7.8867
1
"""
2
parse simple structures from an xml tree
3
We only support a subset of features but should be enough
4
for custom structures
5
"""
6
7 1
import os
8 1
import importlib
9
10 1
from lxml import objectify
11
12
13 1
from opcua.ua.ua_binary import Primitives
14
from opcua import ua
15
# The next two imports are for generated code
16 1
from datetime import datetime
17 1
import uuid
18 1
19 1
20 1
def get_default_value(uatype):
21 1
    if uatype == "String":
22 1
        return "None" 
23 1
    elif uatype == "Guid":
24 1
        return "uuid.uuid4()" 
25 1
    elif uatype in ("ByteString", "CharArray", "Char"):
26 1
        return None 
27 1
    elif uatype == "Boolean":
28 1
        return "True"
29
    elif uatype == "DateTime":
30 1
        return "datetime.utcnow()"
31
    elif uatype in ("Int16", "Int32", "Int64", "UInt16", "UInt32", "UInt64", "Double", "Float", "Byte", "SByte"):
32
        return 0
33 1
    else:
34 1
        return "ua." + uatype + "()"
35 1
36 1
37 1
class Struct(object):
38
    def __init__(self, name):
39 1
        self.name = name
40 1
        self.fields = []
41
        self.code = ""
42
43
    def get_code(self):
44
        if not self.fields:
45
            return """
46
47 1
class {}(object):
48 1
    pass
49 1
50 1
""".format(self.name)
51
        self._make_constructor()
52 1
        self._make_from_binary()
53 1
        self._make_to_binary()
54
        return self.code
55
56
    def _make_constructor(self):
57
        self.code = """
58
59
60
class {0}(object):
61
    '''
62
    {0} structure autogenerated from xml
63
    '''
64
    def __init__(self, data=None):
65 1
        if data is not None:
66 1
            self._binary_init(data)
67
            return
68 1
""".format(self.name)
69 1
        for field in self.fields:
70
            self.code += "        self.{} = {}\n".format(field.name, field.value)
71
72
    def _make_from_binary(self):
73
        self.code += '''
74
    @staticmethod
75
    def from_binary(data):
76 1
        return {}(data=data)
77 1
78 1
    def _binary_init(self, data):
79 1
'''.format(self.name)
80
        for field in self.fields:
81 1
            if hasattr(Primitives, field.uatype):
82
                if field.array:
83 1
                    self.code += '        self.{} = ua.ua_binary.Primitives.{}.unpack_array(data)\n'.format(field.name, field.uatype)
84 1
                else:
85
                    self.code += '        self.{} = ua.ua_binary.Primitives.{}.unpack(data)\n'.format(field.name, field.uatype)
86
            else:
87
                if field.array:
88
                    self.code += '''
89
        length = ua.ua_binary.Primitives.Int32.unpack(data)
90
        if length == -1:
91
            self.{0} = None
92 1
        else:
93
            self.{0} = [ua.{1}.from_binary(data) for _ in range(length)]
94 1
'''.format(field.name, field.uatype)
95 1
                else:
96
                    self.code += "        self.{} = ua.{}.from_binary(data)\n".format(field.name, field.uatype)
97
98
    def _make_to_binary(self):
99 1
        self.code += '''
100 1
    def to_binary(self):
101 1
        packet = []
102 1
'''
103
        for field in self.fields:
104 1
            if hasattr(Primitives, field.uatype):
105
                if field.array:
106 1
                    self.code += '        packet.append(ua.ua_binary.Primitives.{}.pack_array(self.{}))\n'.format(field.uatype, field.name)
107 1
                else:
108
                    self.code += '        packet.append(ua.ua_binary.Primitives.{}.pack(self.{}))\n'.format(field.uatype, field.name)
109
            else:
110
                if field.array:
111
                    self.code += '''
112
        if self.{0} is None:
113
            packet.append(ua.ua_binary.Primitives.Int32.pack(-1))
114
        else:
115
            packet.append(ua.ua_binary.Primitives.Int32.pack(len(self.{0})))
116 1
            for element in self.{0}:
117 1
                packet.append(element.to_binary())
118
'''.format(field.name)
119
                else:
120 1
                    self.code += "        packet.append(self.{}.to_binary())\n".format(field.name)
121 1
        self.code += '        return b"".join(packet)'
122 1
123 1
124 1
class Field(object):
125 1
    def __init__(self, name):
126
        self.name = name
127
        self.uatype = None
128 1
        self.value = None
129 1
        self.array = False
130 1
131
132 1
class StructGenerator(object):
133
    def __init__(self):
134
        self.model = []
135
136 1
    def make_model_from_string(self, xml):
137 1
        obj = objectify.fromstring(xml)
138 1
        self._make_model(obj)
139 1
140
    def make_model_from_file(self, path):
141 1
        obj = objectify.parse(path)
142 1
        root = obj.getroot()
143 1
        self._make_model(root)
144 1
145 1
    def _make_model(self, root):
146 1
        for child in root.iter("{*}StructuredType"):
147 1
            struct = Struct(child.get("Name"))
148 1
            array = False
149 1
            for xmlfield in child.iter("{*}Field"):
150 1
                name = xmlfield.get("Name")
151 1
                if name.startswith("NoOf"):
152 1
                    array = True
153 1
                    continue
154 1
                field = Field(name)
155 1
                field.uatype = xmlfield.get("TypeName")
156 1
                if ":" in field.uatype:
157 1
                    field.uatype = field.uatype.split(":")[1]
158 1
                field.value = get_default_value(field.uatype)
159 1
                if array:
160 1
                    field.array = True
161
                    field.value = []
162 1
                    array = False
163 1
                struct.fields.append(field)
164 1
            self.model.append(struct)
165 1
166 1
    def save_to_file(self, path):
167 1
        _file = open(path, "wt")
168
        self._make_header(_file)
169 1
        for struct in self.model:
170
            _file.write(struct.get_code())
171
        _file.close()
172
173
    def get_python_classes(self, env=None):
174
        """
175 1
        generate Python code and execute in a new environment
176 1
        return a dict of structures {name: class}
177 1
        Rmw: Since the code is generated on the fly, in case of error the stack trace is 
178 1
        not available and debugging is very hard...
179 1
        """
180 1
        if env is None:
181
            env = {}
182
        #  Add the required libraries to dict
183 1
        if not "ua" in env:
184 1
            env['ua'] = ua
185 1
        if not "datetime" in env:
186
            env['datetime'] = datetime
187 1
        if not "uuid" in env:
188
            env['uuid'] = uuid
189
        # generate classes one by one and add them to dict
190
        for struct in self.model:
191
            code = struct.get_code()
192
            exec(code, env)
193 1
        return env
194 1
195
    def save_and_import(self, path, append_to=None):
196
        """
197
        save the new structures to a python file which be used later
198
        import the result and return resulting classes in a dict
199
        if append_to is a dict, the classes are added to the dict
200
        """
201
        self.save_to_file(path)
202
        name = os.path.basename(path)
203
        name = os.path.splitext(name)[0]
204
        mymodule = importlib.import_module(name)
205
        if append_to is None:
206
            result = {}
207
        else:
208 1
            result = append_to
209
        for struct in self.model:
210
            result[struct.name] = getattr(mymodule, struct.name)
211
        return result
212
213
    def _make_header(self, _file):
214
        _file.write("""
215
'''
216
THIS FILE IS AUTOGENERATED, DO NOT EDIT!!!
217
'''
218
219
from datetime import datetime
220
import uuid
221
222
from opcua import ua
223
""")
224
225
226
227
228
if __name__ == "__main__":
229
    import sys
230
    from IPython import embed
231
    sys.path.insert(0, ".") # necessary for import in current dir
232
233
    #xmlpath = "schemas/Opc.Ua.Types.bsd"
234
    xmlpath = "schemas/example.bsd"
235
    c = StructGenerator(xmlpath, "structures.py")
236
    c.run()
237
    import structures as s
238
239
240
    #sts = c.get_structures()
241
    embed()
242