Completed
Pull Request — master (#388)
by Olivier
04:55
created

Struct._make_from_binary()   B

Complexity

Conditions 5

Size

Total Lines 25

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 10
CRAP Score 5

Importance

Changes 3
Bugs 0 Features 2
Metric Value
cc 5
c 3
b 0
f 2
dl 0
loc 25
ccs 10
cts 10
cp 1
crap 5
rs 8.0894
1
"""
2
parse simple structures from an xml tree
3
We only support a subset of features but should be enough
4
for custom structures
5
"""
6
7 1
from lxml import objectify
8
9
10 1
from opcua.ua.ua_binary import Primitives
11
12
13 1
def get_default_value(uatype):
14 1
    if uatype == "String":
15 1
        return "None" 
16 1
    elif uatype == "Guid":
17 1
        return "uuid.uuid4()" 
18 1
    elif uatype in ("ByteString", "CharArray", "Char"):
19 1
        return None 
20 1
    elif uatype == "Boolean":
21 1
        return "True"
22 1
    elif uatype == "DateTime":
23 1
        return "datetime.utcnow()"
24 1
    elif uatype in ("Int8", "Int16", "Int32", "Int64", "UInt8", "UInt16", "UInt32", "UInt64", "Double", "Float", "Byte", "SByte"):
25 1
        return 0
26
    else:
27 1
        return "ua." + uatype + "()"
28
29
30 1
class Struct(object):
31 1
    def __init__(self, name):
32 1
        self.name = name
33 1
        self.fields = []
34 1
        self.code = ""
35
36 1
    def get_code(self):
37 1
        if not self.fields:
38
            return """
39
40
class {}(object):
41
    pass
42
43
""".format(self.name)
44 1
        self._make_constructor()
45 1
        self._make_from_binary()
46 1
        self._make_to_binary()
47 1
        return self.code
48
49 1
    def _make_constructor(self):
50 1
        self.code = """
51
52
53
class {0}(object):
54
    '''
55
    {0} structure autogenerated from xml
56
    '''
57
    def __init__(self, data=None):
58
        if data is not None:
59
            self._binary_init(data)
60
            return
61
""".format(self.name)
62 1
        for field in self.fields:
63 1
            self.code += "        self.{} = {}\n".format(field.name, field.value)
64
65 1
    def _make_from_binary(self):
66 1
        self.code += '''
67
    @staticmethod
68
    def from_binary(data):
69
        return {}(data=data)
70
71
    def _binary_init(self, data):
72
'''.format(self.name)
73 1
        for field in self.fields:
74 1
            if hasattr(Primitives, field.uatype):
75 1
                if field.array:
76 1
                    self.code += '        self.{} = ua.ua_binary.Primitives.{}.unpack_array(data)\n'.format(field.name, field.uatype)
77
                else:
78 1
                    self.code += '        self.{} = ua.ua_binary.Primitives.{}.unpack(data)\n'.format(field.name, field.uatype)
79
            else:
80 1
                if field.array:
81 1
                    self.code += '''
82
        length = ua.ua_binary.Primitives.Int32.unpack(data)
83
        if length == -1:
84
            self.{0} = None
85
        else:
86
            self.{0} = [ua.{1}.from_binary(data) for _ in range(length)]
87
'''.format(field.name, field.uatype)
88
                else:
89 1
                    self.code += "        self.{} = ua.{}.from_binary(data)\n".format(field.name, field.uatype)
90
91 1
    def _make_to_binary(self):
92 1
        self.code += '''
93
    def to_binary(self):
94
        packet = []
95
'''
96 1
        for field in self.fields:
97 1
            if hasattr(Primitives, field.uatype):
98 1
                if field.array:
99 1
                    self.code += '        packet.append(ua.ua_binary.Primitives.{}.pack_array(self.{}))\n'.format(field.uatype, field.name)
100
                else:
101 1
                    self.code += '        packet.append(ua.ua_binary.Primitives.{}.pack(self.{}))\n'.format(field.uatype, field.name)
102
            else:
103 1
                if field.array:
104 1
                    self.code += '''
105
        if self.{0} is None:
106
            packet.append(ua.ua_binary.Primitives.Int32.pack(-1))
107
        else:
108
            packet.append(ua.ua_binary.Primitives.Int32.pack(len(self.{0})))
109
            for element in self.{0}:
110
                packet.append(element.to_binary())
111
'''.format(field.name)
112
                else:
113 1
                    self.code += "        packet.append(self.{}.to_binary())\n".format(field.name)
114 1
        self.code += '        return b"".join(packet)'
115
116
117 1
class Field(object):
118 1
    def __init__(self, name):
119 1
        self.name = name
120 1
        self.uatype = None
121 1
        self.value = None
122 1
        self.array = False
123
124
125 1
class StructGenerator(object):
126 1
    def __init__(self, path, output):
127 1
        self.path = path
128 1
        self.output = output
129 1
        self.model = []
130 1
        self._file = None
131
132 1
    def _make_model(self):
133 1
        obj = objectify.parse(self.path)
134 1
        root = obj.getroot()
135 1
        for child in root.iter("{*}StructuredType"):
136 1
            struct = Struct(child.get("Name"))
137 1
            array = False
138 1
            for xmlfield in child.iter("{*}Field"):
139 1
                name = xmlfield.get("Name")
140 1
                if name.startswith("NoOf"):
141 1
                    array = True
142 1
                    continue
143 1
                field = Field(name)
144 1
                field.uatype = xmlfield.get("TypeName")
145 1
                if ":" in field.uatype:
146 1
                    field.uatype = field.uatype.split(":")[1]
147 1
                field.value = get_default_value(field.uatype)
148 1
                if array:
149 1
                    field.array = True
150 1
                    field.value = []
151 1
                    array = False
152 1
                struct.fields.append(field)
153 1
            self.model.append(struct)
154
155 1
    def run(self):
156 1
        self._make_model()
157 1
        self._file = open(self.output, "wt")
158 1
        self._make_header()
159 1
        for struct in self.model:
160 1
            self._file.write(struct.get_code())
161 1
        self._file.close()
162
163 1
    def get_structures(self):
164
        self._make_model()
165
        ld = {}
166
        for struct in self.model:
167
            exec(struct.get_code(), ld)
168
        return ld
169
170 1
    def _make_header(self):
171 1
        self._file.write("""
172
'''
173
THIS FILE IS AUTOGENERATED, DO NOT EDIT!!!
174
'''
175
176
from datetime import datetime
177
import uuid
178
179
from opcua import ua
180
""")
181
182
183
184
185 1
if __name__ == "__main__":
186
    import sys
187
    from IPython import embed
188
    sys.path.insert(0, ".") # necessary for import in current dir
189
190
    #xmlpath = "schemas/Opc.Ua.Types.bsd"
191
    xmlpath = "schemas/example.bsd"
192
    c = StructGenerator(xmlpath, "structures.py")
193
    c.run()
194
    import structures as s
195
196
197
    #sts = c.get_structures()
198
    embed()
199