Completed
Push — master ( 440d06...4ed484 )
by Olivier
04:29
created

StructGenerator   A

Complexity

Total Complexity 17

Size/Duplication

Total Lines 76
Duplicated Lines 0 %

Test Coverage

Coverage 87.04%

Importance

Changes 1
Bugs 0 Features 0
Metric Value
c 1
b 0
f 0
dl 0
loc 76
ccs 47
cts 54
cp 0.8704
rs 10
wmc 17

8 Methods

Rating   Name   Duplication   Size   Complexity  
A get_structures() 0 5 2
B _make_model() 0 20 6
A make_model_from_file() 0 4 1
A save_to_file() 0 6 2
A save_and_import() 0 17 3
A __init__() 0 2 1
A make_model_from_string() 0 3 1
A _make_header() 0 11 1
1
"""
2
parse simple structures from an xml tree
3
We only support a subset of features but should be enough
4
for custom structures
5
"""
6
7 1
import os
8 1
import importlib
9
10 1
from lxml import objectify
11
12
13 1
from opcua.ua.ua_binary import Primitives
14
15
16 1
def get_default_value(uatype):
17 1
    if uatype == "String":
18 1
        return "None" 
19 1
    elif uatype == "Guid":
20 1
        return "uuid.uuid4()" 
21 1
    elif uatype in ("ByteString", "CharArray", "Char"):
22 1
        return None 
23 1
    elif uatype == "Boolean":
24 1
        return "True"
25 1
    elif uatype == "DateTime":
26 1
        return "datetime.utcnow()"
27 1
    elif uatype in ("Int8", "Int16", "Int32", "Int64", "UInt8", "UInt16", "UInt32", "UInt64", "Double", "Float", "Byte", "SByte"):
28 1
        return 0
29
    else:
30 1
        return "ua." + uatype + "()"
31
32
33 1
class Struct(object):
34 1
    def __init__(self, name):
35 1
        self.name = name
36 1
        self.fields = []
37 1
        self.code = ""
38
39 1
    def get_code(self):
40 1
        if not self.fields:
41
            return """
42
43
class {}(object):
44
    pass
45
46
""".format(self.name)
47 1
        self._make_constructor()
48 1
        self._make_from_binary()
49 1
        self._make_to_binary()
50 1
        return self.code
51
52 1
    def _make_constructor(self):
53 1
        self.code = """
54
55
56
class {0}(object):
57
    '''
58
    {0} structure autogenerated from xml
59
    '''
60
    def __init__(self, data=None):
61
        if data is not None:
62
            self._binary_init(data)
63
            return
64
""".format(self.name)
65 1
        for field in self.fields:
66 1
            self.code += "        self.{} = {}\n".format(field.name, field.value)
67
68 1
    def _make_from_binary(self):
69 1
        self.code += '''
70
    @staticmethod
71
    def from_binary(data):
72
        return {}(data=data)
73
74
    def _binary_init(self, data):
75
'''.format(self.name)
76 1
        for field in self.fields:
77 1
            if hasattr(Primitives, field.uatype):
78 1
                if field.array:
79 1
                    self.code += '        self.{} = ua.ua_binary.Primitives.{}.unpack_array(data)\n'.format(field.name, field.uatype)
80
                else:
81 1
                    self.code += '        self.{} = ua.ua_binary.Primitives.{}.unpack(data)\n'.format(field.name, field.uatype)
82
            else:
83 1
                if field.array:
84 1
                    self.code += '''
85
        length = ua.ua_binary.Primitives.Int32.unpack(data)
86
        if length == -1:
87
            self.{0} = None
88
        else:
89
            self.{0} = [ua.{1}.from_binary(data) for _ in range(length)]
90
'''.format(field.name, field.uatype)
91
                else:
92 1
                    self.code += "        self.{} = ua.{}.from_binary(data)\n".format(field.name, field.uatype)
93
94 1
    def _make_to_binary(self):
95 1
        self.code += '''
96
    def to_binary(self):
97
        packet = []
98
'''
99 1
        for field in self.fields:
100 1
            if hasattr(Primitives, field.uatype):
101 1
                if field.array:
102 1
                    self.code += '        packet.append(ua.ua_binary.Primitives.{}.pack_array(self.{}))\n'.format(field.uatype, field.name)
103
                else:
104 1
                    self.code += '        packet.append(ua.ua_binary.Primitives.{}.pack(self.{}))\n'.format(field.uatype, field.name)
105
            else:
106 1
                if field.array:
107 1
                    self.code += '''
108
        if self.{0} is None:
109
            packet.append(ua.ua_binary.Primitives.Int32.pack(-1))
110
        else:
111
            packet.append(ua.ua_binary.Primitives.Int32.pack(len(self.{0})))
112
            for element in self.{0}:
113
                packet.append(element.to_binary())
114
'''.format(field.name)
115
                else:
116 1
                    self.code += "        packet.append(self.{}.to_binary())\n".format(field.name)
117 1
        self.code += '        return b"".join(packet)'
118
119
120 1
class Field(object):
121 1
    def __init__(self, name):
122 1
        self.name = name
123 1
        self.uatype = None
124 1
        self.value = None
125 1
        self.array = False
126
127
128 1
class StructGenerator(object):
129 1
    def __init__(self):
130 1
        self.model = []
131
132 1
    def make_model_from_string(self, xml):
133
        obj = objectify.fromstring(xml)
134
        self._make_model(obj)
135
136 1
    def make_model_from_file(self, path):
137 1
        obj = objectify.parse(path)
138 1
        root = obj.getroot()
139 1
        self._make_model(root)
140
141 1
    def _make_model(self, root):
142 1
        for child in root.iter("{*}StructuredType"):
143 1
            struct = Struct(child.get("Name"))
144 1
            array = False
145 1
            for xmlfield in child.iter("{*}Field"):
146 1
                name = xmlfield.get("Name")
147 1
                if name.startswith("NoOf"):
148 1
                    array = True
149 1
                    continue
150 1
                field = Field(name)
151 1
                field.uatype = xmlfield.get("TypeName")
152 1
                if ":" in field.uatype:
153 1
                    field.uatype = field.uatype.split(":")[1]
154 1
                field.value = get_default_value(field.uatype)
155 1
                if array:
156 1
                    field.array = True
157 1
                    field.value = []
158 1
                    array = False
159 1
                struct.fields.append(field)
160 1
            self.model.append(struct)
161
162 1
    def save_to_file(self, path):
163 1
        _file = open(path, "wt")
164 1
        self._make_header(_file)
165 1
        for struct in self.model:
166 1
            _file.write(struct.get_code())
167 1
        _file.close()
168
169 1
    def save_and_import(self, path, append_to=None):
170
        """
171
        save the new structures to a python file which be used later
172
        import the result and return resulting classes in a dict
173
        if append_to is a dict, the classes are added to the dict
174
        """
175 1
        self.save_to_file(path)
176 1
        name = os.path.basename(path)
177 1
        name = os.path.splitext(name)[0]
178 1
        mymodule = importlib.import_module(name)
179 1
        if append_to is None:
180 1
            result = {}
181
        else:
182
            result = append_to
183 1
        for struct in self.model:
184 1
            result[struct.name] = getattr(mymodule, struct.name)
185 1
        return result
186
187 1
    def get_structures(self):
188
        ld = {}
189
        for struct in self.model:
190
            exec(struct.get_code(), ld)
191
        return ld
192
193 1
    def _make_header(self, _file):
194 1
        _file.write("""
195
'''
196
THIS FILE IS AUTOGENERATED, DO NOT EDIT!!!
197
'''
198
199
from datetime import datetime
200
import uuid
201
202
from opcua import ua
203
""")
204
205
206
207
208 1
if __name__ == "__main__":
209
    import sys
210
    from IPython import embed
211
    sys.path.insert(0, ".") # necessary for import in current dir
212
213
    #xmlpath = "schemas/Opc.Ua.Types.bsd"
214
    xmlpath = "schemas/example.bsd"
215
    c = StructGenerator(xmlpath, "structures.py")
216
    c.run()
217
    import structures as s
218
219
220
    #sts = c.get_structures()
221
    embed()
222