Completed
Pull Request — master (#388)
by Olivier
03:38
created

Struct.__init__()   A

Complexity

Conditions 1

Size

Total Lines 4

Duplication

Lines 0
Ratio 0 %

Importance

Changes 1
Bugs 0 Features 1
Metric Value
cc 1
c 1
b 0
f 1
dl 0
loc 4
rs 10
1
"""
2
parse simple structures from an xml tree
3
We only support a subset of features but should be enough
4
for custom structures
5
"""
6
7
from lxml import etree
8
from lxml import objectify
9
10
from IPython import embed
11
12
13
from opcua.ua.ua_binary import Primitives
14
15
16
def get_default_value(uatype):
17
    if uatype in ("String"):
18
        return "None" 
19
    elif uatype == "Guid":
20
        return "uuid.uuid4()" 
21
    elif uatype in ("ByteString", "CharArray", "Char"):
22
        return None 
23
    elif uatype in ("Boolean"):
24
        return "True"
25
    elif uatype in ("DateTime"):
26
        return "datetime.utcnow()"
27
    elif uatype in ("Int8", "Int16", "Int32", "Int64", "UInt8", "UInt16", "UInt32", "UInt64", "Double", "Float", "Byte", "SByte"):
28
        return 0
29
    else:
30
        return "ua." + uatype + "()"
31
32
33
class Struct(object):
34
    def __init__(self, name):
35
        self.name = name
36
        self.fields = []
37
        self.code = ""
38
39
    def get_code(self):
40
        self._make_constructor()
41
        self._make_from_binary()
42
        self._make_to_binary()
43
        return self.code
44
45
    def _make_constructor(self):
46
        self.code = """
47
48
class {0}(object):
49
    '''
50
    {0} autogenerated from xml
51
    '''
52
    def __init__(self, data=None):
53
        if data is not None:
54
            self._binary_init(data)
55
            return
56
""".format(self.name)
57
        for field in self.fields:
58
            self.code += "        self.{} = {}\n".format(field.name, field.value)
59
60
    def _make_from_binary(self):
61
        self.code += '''
62
    @staticmethod 
63
    def from_binary(data):
64
        return {}(data=data)
65
66
    def _binary_init(self, data):
67
'''.format(self.name)
68
        for field in self.fields:
69
            if hasattr(Primitives, field.uatype):
70
                if field.array:
71
                    self.code += '        self.{} = ua.ua_binary.Primitives.{}.unpack_array(data)\n'.format(field.name, field.uatype)
72
                else:
73
                    self.code += '        self.{} = ua.ua_binary.Primitives.{}.unpack(data)\n'.format(field.name, field.uatype)
74
            else:
75
                if field.array:
76
                    self.code += '''
77
        length = ua.ua_binary.Primitives.Int32.unpack(data)
78
        if length != -1:
79
            self.{0} = [ua.{1}.from_binary(data) for _ in range(length)]
80
'''.format(field.name, field.uatype)
81
                else:
82
                    self.code += "        self.{} = ua.{}.from_binary(data)\n".format(field.name, field.uatype)
83
84
    def _make_to_binary(self):
85
        self.code += '''
86
    def to_binary(self):
87
        packet = []
88
'''
89
        for field in self.fields:
90
            if hasattr(Primitives, field.uatype):
91
                if field.array:
92
                    self.code += '        packet.append(ua.ua_binary.Primitives.{}.pack_array(self.{}))\n'.format(field.uatype, field.name)
93
                else:
94
                    self.code += '        packet.append(ua.ua_binary.Primitives.{}.pack(self.{}))\n'.format(field.uatype, field.name)
95
            else:
96
                if field.array:
97
                    self.code += '''
98
        if self.{0} is None:
99
            packet.append(ua.ua_binary.Primitives.Int32.pack(-1))
100
        else:
101
            packet.append(ua.ua_binary.Primitives.Int32.pack(len(self.{0})))
102
            for element in self.{0}:
103
                packet.append(element.to_binary())
104
'''.format(field.name, field.uatype)
105
                else:
106
                    self.code += "        packet.append(self.{}.to_binary())\n".format(field.name)
107
        self.code += '        return b"".join(packet)'
108
109
110
class Field(object):
111
    def __init__(self, name):
112
        self.name = name
113
        self.uatype = None
114
        self.value = None
115
        self.array = False
116
117
118
class StructGenerator(object):
119
    def __init__(self, path, output):
120
        self.path = path
121
        self.output = output
122
        self.model = []
123
124
    def generate(self):
125
        parser = etree.XMLParser(remove_blank_text=True, ns_clean=True)
126
        root = etree.parse(self.path, parser)
127
        embed()
128
        for child in root.iter():
129
            embed()
130
131
    def _make_model(self):
132
        obj = objectify.parse(self.path)
133
        root = obj.getroot()
134
        for child in root.iter("{*}StructuredType"):
135
            struct = Struct(child.get("Name"))
136
            array = False
137
            for xmlfield in child.iter("{*}Field"):
138
                name = xmlfield.get("Name")
139
                if name.startswith("NoOf"):
140
                    array = True
141
                    continue
142
                field = Field(name)
143
                uatype = xmlfield.get("TypeName")
144
                if uatype.startswith("opc"):
145
                    field.uatype = uatype[4:]
146
                else:
147
                    field.uatype = uatype[3:]
148
                field.value = get_default_value(field.uatype)
149
                if array:
150
                    field.array = True
151
                    field.value = []
152
                    array = False
153
                struct.fields.append(field)
154
            self.model.append(struct)
155
156
    def run(self):
157
        self._make_model()
158
        self._file = open(self.output, "wt")
159
        self._make_header()
160
        for struct in self.model:
161
            self._file.write(struct.get_code())
162
        self._file.close()
163
164
    def get_structures(self):
165
        self._make_model()
166
        ld = {}
167
        for struct in self.model:
168
            exec(struct.get_code(), ld)
169
        return ld
170
171
172
173
    def _make_header(self):
174
        self._file.write("""
175
'''
176
THIS FILE IS AUTOGENERATED, DO NOT EDIT!!!
177
'''
178
179
from datetime import datetime
180
import uuid
181
182
from opcua import ua
183
184
185
""")
186
187
188
189
190
if __name__ == "__main__":
191
    import sys
192
    sys.path.insert(0, ".") # necessary for import in current dir
193
194
    xmlpath = "/home/olivier/python-opcua/schemas/example.bsd"
195
    c = StructGenerator(xmlpath, "structures.py")
196
    c.run()
197
    import structures as s
198
199
200
    #sts = c.get_structures()
201
    embed()
202