Completed
Pull Request — master (#388)
by Olivier
06:49 queued 01:42
created

Struct._make_to_binary()   B

Complexity

Conditions 5

Size

Total Lines 24

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 11
CRAP Score 5

Importance

Changes 2
Bugs 0 Features 2
Metric Value
cc 5
c 2
b 0
f 2
dl 0
loc 24
ccs 11
cts 11
cp 1
crap 5
rs 8.1671
1
"""
2
parse simple structures from an xml tree
3
We only support a subset of features but should be enough
4
for custom structures
5
"""
6
7 1
from lxml import objectify
8
9
10 1
from opcua.ua.ua_binary import Primitives
11
12
13 1
def get_default_value(uatype):
14 1
    if uatype == "String":
15 1
        return "None" 
16 1
    elif uatype == "Guid":
17 1
        return "uuid.uuid4()" 
18 1
    elif uatype in ("ByteString", "CharArray", "Char"):
19 1
        return None 
20 1
    elif uatype == "Boolean":
21 1
        return "True"
22 1
    elif uatype == "DateTime":
23 1
        return "datetime.utcnow()"
24 1
    elif uatype in ("Int8", "Int16", "Int32", "Int64", "UInt8", "UInt16", "UInt32", "UInt64", "Double", "Float", "Byte", "SByte"):
25 1
        return 0
26
    else:
27 1
        return "ua." + uatype + "()"
28
29
30 1
class Struct(object):
31 1
    def __init__(self, name):
32 1
        self.name = name
33 1
        self.fields = []
34 1
        self.code = ""
35
36 1
    def get_code(self):
37 1
        self._make_constructor()
38 1
        self._make_from_binary()
39 1
        self._make_to_binary()
40 1
        return self.code
41
42 1
    def _make_constructor(self):
43 1
        self.code = """
44
45
class {0}(object):
46
    '''
47
    {0} autogenerated from xml
48
    '''
49
    def __init__(self, data=None):
50
        if data is not None:
51
            self._binary_init(data)
52
            return
53
""".format(self.name)
54 1
        for field in self.fields:
55 1
            self.code += "        self.{} = {}\n".format(field.name, field.value)
56
57 1
    def _make_from_binary(self):
58 1
        self.code += '''
59
    @staticmethod 
60
    def from_binary(data):
61
        return {}(data=data)
62
63
    def _binary_init(self, data):
64
'''.format(self.name)
65 1
        for field in self.fields:
66 1
            if hasattr(Primitives, field.uatype):
67 1
                if field.array:
68 1
                    self.code += '        self.{} = ua.ua_binary.Primitives.{}.unpack_array(data)\n'.format(field.name, field.uatype)
69
                else:
70 1
                    self.code += '        self.{} = ua.ua_binary.Primitives.{}.unpack(data)\n'.format(field.name, field.uatype)
71
            else:
72 1
                if field.array:
73 1
                    self.code += '''
74
        length = ua.ua_binary.Primitives.Int32.unpack(data)
75
        if length != -1:
76
            self.{0} = [ua.{1}.from_binary(data) for _ in range(length)]
77
'''.format(field.name, field.uatype)
78
                else:
79 1
                    self.code += "        self.{} = ua.{}.from_binary(data)\n".format(field.name, field.uatype)
80
81 1
    def _make_to_binary(self):
82 1
        self.code += '''
83
    def to_binary(self):
84
        packet = []
85
'''
86 1
        for field in self.fields:
87 1
            if hasattr(Primitives, field.uatype):
88 1
                if field.array:
89 1
                    self.code += '        packet.append(ua.ua_binary.Primitives.{}.pack_array(self.{}))\n'.format(field.uatype, field.name)
90
                else:
91 1
                    self.code += '        packet.append(ua.ua_binary.Primitives.{}.pack(self.{}))\n'.format(field.uatype, field.name)
92
            else:
93 1
                if field.array:
94 1
                    self.code += '''
95
        if self.{0} is None:
96
            packet.append(ua.ua_binary.Primitives.Int32.pack(-1))
97
        else:
98
            packet.append(ua.ua_binary.Primitives.Int32.pack(len(self.{0})))
99
            for element in self.{0}:
100
                packet.append(element.to_binary())
101
'''.format(field.name)
102
                else:
103 1
                    self.code += "        packet.append(self.{}.to_binary())\n".format(field.name)
104 1
        self.code += '        return b"".join(packet)'
105
106
107 1
class Field(object):
108 1
    def __init__(self, name):
109 1
        self.name = name
110 1
        self.uatype = None
111 1
        self.value = None
112 1
        self.array = False
113
114
115 1
class StructGenerator(object):
116 1
    def __init__(self, path, output):
117 1
        self.path = path
118 1
        self.output = output
119 1
        self.model = []
120 1
        self._file = None
121
122 1
    def _make_model(self):
123 1
        obj = objectify.parse(self.path)
124 1
        root = obj.getroot()
125 1
        for child in root.iter("{*}StructuredType"):
126 1
            struct = Struct(child.get("Name"))
127 1
            array = False
128 1
            for xmlfield in child.iter("{*}Field"):
129 1
                name = xmlfield.get("Name")
130 1
                if name.startswith("NoOf"):
131 1
                    array = True
132 1
                    continue
133 1
                field = Field(name)
134 1
                uatype = xmlfield.get("TypeName")
135 1
                if uatype.startswith("opc"):
136 1
                    field.uatype = uatype[4:]
137
                else:
138 1
                    field.uatype = uatype[3:]
139 1
                field.value = get_default_value(field.uatype)
140 1
                if array:
141 1
                    field.array = True
142 1
                    field.value = []
143 1
                    array = False
144 1
                struct.fields.append(field)
145 1
            self.model.append(struct)
146
147 1
    def run(self):
148 1
        self._make_model()
149 1
        self._file = open(self.output, "wt")
150 1
        self._make_header()
151 1
        for struct in self.model:
152 1
            self._file.write(struct.get_code())
153 1
        self._file.close()
154
155 1
    def get_structures(self):
156
        self._make_model()
157
        ld = {}
158
        for struct in self.model:
159
            exec(struct.get_code(), ld)
160
        return ld
161
162
163
164 1
    def _make_header(self):
165 1
        self._file.write("""
166
'''
167
THIS FILE IS AUTOGENERATED, DO NOT EDIT!!!
168
'''
169
170
from datetime import datetime
171
import uuid
172
173
from opcua import ua
174
175
176
""")
177
178
179
180
181 1
if __name__ == "__main__":
182
    import sys
183
    sys.path.insert(0, ".") # necessary for import in current dir
184
185
    xmlpath = "/home/olivier/python-opcua/schemas/example.bsd"
186
    c = StructGenerator(xmlpath, "structures.py")
187
    c.run()
188
    import structures as s
189
190
191
    #sts = c.get_structures()
192
    embed()
193