Completed
Pull Request — master (#388)
by Olivier
04:23
created

Field.__init__()   A

Complexity

Conditions 1

Size

Total Lines 5

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 5
CRAP Score 1

Importance

Changes 2
Bugs 0 Features 2
Metric Value
cc 1
c 2
b 0
f 2
dl 0
loc 5
ccs 5
cts 5
cp 1
crap 1
rs 9.4285
1
"""
2
parse simple structures from an xml tree
3
We only support a subset of features but should be enough
4
for custom structures
5
"""
6
7 1
import os
8 1
import importlib
9
10 1
from lxml import objectify
11
12
13 1
from opcua.ua.ua_binary import Primitives
14
15
16 1
def get_default_value(uatype):
17 1
    if uatype == "String":
18 1
        return "None" 
19 1
    elif uatype == "Guid":
20 1
        return "uuid.uuid4()" 
21 1
    elif uatype in ("ByteString", "CharArray", "Char"):
22 1
        return None 
23 1
    elif uatype == "Boolean":
24 1
        return "True"
25 1
    elif uatype == "DateTime":
26 1
        return "datetime.utcnow()"
27 1
    elif uatype in ("Int8", "Int16", "Int32", "Int64", "UInt8", "UInt16", "UInt32", "UInt64", "Double", "Float", "Byte", "SByte"):
28 1
        return 0
29
    else:
30 1
        return "ua." + uatype + "()"
31
32
33 1
class Struct(object):
34 1
    def __init__(self, name):
35 1
        self.name = name
36 1
        self.fields = []
37 1
        self.code = ""
38
39 1
    def get_code(self):
40 1
        if not self.fields:
41
            return """
42
43
class {}(object):
44
    pass
45
46
""".format(self.name)
47 1
        self._make_constructor()
48 1
        self._make_from_binary()
49 1
        self._make_to_binary()
50 1
        return self.code
51
52 1
    def _make_constructor(self):
53 1
        self.code = """
54
55
56
class {0}(object):
57
    '''
58
    {0} structure autogenerated from xml
59
    '''
60
    def __init__(self, data=None):
61
        if data is not None:
62
            self._binary_init(data)
63
            return
64
""".format(self.name)
65 1
        for field in self.fields:
66 1
            self.code += "        self.{} = {}\n".format(field.name, field.value)
67
68 1
    def _make_from_binary(self):
69 1
        self.code += '''
70
    @staticmethod
71
    def from_binary(data):
72
        return {}(data=data)
73
74
    def _binary_init(self, data):
75
'''.format(self.name)
76 1
        for field in self.fields:
77 1
            if hasattr(Primitives, field.uatype):
78 1
                if field.array:
79 1
                    self.code += '        self.{} = ua.ua_binary.Primitives.{}.unpack_array(data)\n'.format(field.name, field.uatype)
80
                else:
81 1
                    self.code += '        self.{} = ua.ua_binary.Primitives.{}.unpack(data)\n'.format(field.name, field.uatype)
82
            else:
83 1
                if field.array:
84 1
                    self.code += '''
85
        length = ua.ua_binary.Primitives.Int32.unpack(data)
86
        if length == -1:
87
            self.{0} = None
88
        else:
89
            self.{0} = [ua.{1}.from_binary(data) for _ in range(length)]
90
'''.format(field.name, field.uatype)
91
                else:
92 1
                    self.code += "        self.{} = ua.{}.from_binary(data)\n".format(field.name, field.uatype)
93
94 1
    def _make_to_binary(self):
95 1
        self.code += '''
96
    def to_binary(self):
97
        packet = []
98
'''
99 1
        for field in self.fields:
100 1
            if hasattr(Primitives, field.uatype):
101 1
                if field.array:
102 1
                    self.code += '        packet.append(ua.ua_binary.Primitives.{}.pack_array(self.{}))\n'.format(field.uatype, field.name)
103
                else:
104 1
                    self.code += '        packet.append(ua.ua_binary.Primitives.{}.pack(self.{}))\n'.format(field.uatype, field.name)
105
            else:
106 1
                if field.array:
107 1
                    self.code += '''
108
        if self.{0} is None:
109
            packet.append(ua.ua_binary.Primitives.Int32.pack(-1))
110
        else:
111
            packet.append(ua.ua_binary.Primitives.Int32.pack(len(self.{0})))
112
            for element in self.{0}:
113
                packet.append(element.to_binary())
114
'''.format(field.name)
115
                else:
116 1
                    self.code += "        packet.append(self.{}.to_binary())\n".format(field.name)
117 1
        self.code += '        return b"".join(packet)'
118
119
120 1
class Field(object):
121 1
    def __init__(self, name):
122 1
        self.name = name
123 1
        self.uatype = None
124 1
        self.value = None
125 1
        self.array = False
126
127
128 1
class StructGenerator(object):
129 1
    def __init__(self):
130 1
        self.model = []
131
132 1
    def make_model_from_string(self, xml):
133
        obj = objectify.fromstring(xml)
134
        self._make_model(obj)
135
136 1
    def make_model_from_file(self, path):
137 1
        obj = objectify.parse(path)
138 1
        root = obj.getroot()
139 1
        self._make_model(root)
140
141 1
    def _make_model(self, root):
142 1
        for child in root.iter("{*}StructuredType"):
143 1
            struct = Struct(child.get("Name"))
144 1
            array = False
145 1
            for xmlfield in child.iter("{*}Field"):
146 1
                name = xmlfield.get("Name")
147 1
                if name.startswith("NoOf"):
148 1
                    array = True
149 1
                    continue
150 1
                field = Field(name)
151 1
                field.uatype = xmlfield.get("TypeName")
152 1
                if ":" in field.uatype:
153 1
                    field.uatype = field.uatype.split(":")[1]
154 1
                field.value = get_default_value(field.uatype)
155 1
                if array:
156 1
                    field.array = True
157 1
                    field.value = []
158 1
                    array = False
159 1
                struct.fields.append(field)
160 1
            self.model.append(struct)
161
162 1
    def save_to_file(self, path):
163 1
        _file = open(path, "wt")
164 1
        self._make_header(_file)
165 1
        for struct in self.model:
166 1
            _file.write(struct.get_code())
167 1
        _file.close()
168
169 1
    def save_and_import(self, path):
170 1
        self.save_to_file(path)
171 1
        name = os.path.basename(path)
172 1
        name = os.path.splitext(name)[0]
173 1
        mymodule = importlib.import_module(name)
174 1
        mydict = {struct.name: getattr(mymodule, struct.name) for struct in self.model}
175 1
        return mydict
176
177 1
    def get_structures(self):
178
        ld = {}
179
        for struct in self.model:
180
            exec(struct.get_code(), ld)
181
        return ld
182
183 1
    def _make_header(self, _file):
184 1
        _file.write("""
185
'''
186
THIS FILE IS AUTOGENERATED, DO NOT EDIT!!!
187
'''
188
189
from datetime import datetime
190
import uuid
191
192
from opcua import ua
193
""")
194
195
196
197
198 1
if __name__ == "__main__":
199
    import sys
200
    from IPython import embed
201
    sys.path.insert(0, ".") # necessary for import in current dir
202
203
    #xmlpath = "schemas/Opc.Ua.Types.bsd"
204
    xmlpath = "schemas/example.bsd"
205
    c = StructGenerator(xmlpath, "structures.py")
206
    c.run()
207
    import structures as s
208
209
210
    #sts = c.get_structures()
211
    embed()
212