Completed
Pull Request — master (#24)
by Olivier
02:42
created

generate_protocol_python.CodeGenerator.generate_struct_code()   F

Complexity

Conditions 18

Size

Total Lines 73
Code Lines 61

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 18
eloc 61
nop 2
dl 0
loc 73
rs 1.2
c 0
b 0
f 0

How to fix   Long Method    Complexity   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

Complexity

Complex classes like generate_protocol_python.CodeGenerator.generate_struct_code() often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
2
IgnoredEnums = ["NodeIdType"]
3
IgnoredStructs = ["QualifiedName", "NodeId", "ExpandedNodeId", "FilterOperand", "Variant", "DataValue", "ExtensionObject", "XmlElement", "LocalizedText"]
4
5
6
class Primitives1(object):
7
    SByte = 0
8
    Int16 = 0
9
    Int32 = 0
10
    Int64 = 0
11
    Char = 0
12
    Byte = 0
13
    UInt16 = 0
14
    UInt32 = 0
15
    UInt64 = 0
16
    Boolean = 0
17
    Double = 0
18
    Float = 0
19
20
21
class Primitives(Primitives1):
22
    Null = 0
23
    String = 0
24
    Bytes = 0
25
    ByteString = 0
26
    CharArray = 0
27
    DateTime = 0
28
29
30
class CodeGenerator:
31
32
    def __init__(self, model, output):
33
        self.model = model
34
        self.output_path = output
35
        self.output_file = None
36
        self.indent = '    '
37
        self.iidx = 0  # indent index
38
39
    def run(self):
40
        print('Writting python protocol code to ', self.output_path)
41
        self.output_file = open(self.output_path, 'w', encoding='utf-8')
42
        self.make_header()
43
        for enum in self.model.enums:
44
            if enum.name not in IgnoredEnums:
45
                self.generate_enum_code(enum)
46
        for struct in self.model.structs:
47
            if struct.name in IgnoredStructs:
48
                continue
49
            if struct.name.endswith('Node') or struct.name.endswith('NodeId'):
50
                continue
51
            self.generate_struct_code(struct)
52
53
        self.iidx = 0
54
        self.write("")
55
        self.write("")
56
        for struct in self.model.structs:
57
            if struct.name in IgnoredStructs:
58
                continue
59
            if struct.name.endswith('Node') or struct.name.endswith('NodeId'):
60
                continue
61
            if 'ExtensionObject' in struct.parents:
62
                self.write(f"nid = FourByteNodeId(ObjectIds.{struct.name}_Encoding_DefaultBinary)")
63
                self.write(f"extension_object_classes[nid] = {struct.name}")
64
                self.write(f"extension_object_ids['{struct.name}'] = nid")
65
66
    def write(self, line):
67
        if line:
68
            line = f'{self.indent * self.iidx}{line}'
69
        self.output_file.write(f'{line}\n')
70
71
    def make_header(self):
72
        self.write('"""')
73
        self.write('Autogenerate code from xml spec')
74
        self.write('"""')
75
        self.write('')
76
        self.write('from datetime import datetime')
77
        self.write('from enum import IntEnum')
78
        self.write('')
79
        #self.write('from asyncua.ua.uaerrors import UaError')
80
        self.write('from asyncua.ua.uatypes import *')
81
        self.write('from asyncua.ua.object_ids import ObjectIds')
82
83
    def generate_enum_code(self, enum):
84
        self.write('')
85
        self.write('')
86
        self.write(f'class {enum.name}(IntEnum):')
87
        self.iidx = 1
88
        self.write('"""')
89
        if enum.doc:
90
            self.write(enum.doc)
91
            self.write("")
92
        for val in enum.values:
93
            self.write(f':ivar {val.name}:')
94
            self.write(f':vartype {val.name}: {val.value}')
95
        self.write('"""')
96
        for val in enum.values:
97
            self.write(f'{val.name} = {val.value}')
98
        self.iidx = 0
99
100
    def generate_struct_code(self, obj):
101
        self.write('')
102
        self.write('')
103
        self.iidx = 0
104
        self.write(f'class {obj.name}(FrozenClass):')
105
        self.iidx += 1
106
        self.write('"""')
107
        if obj.doc:
108
            self.write(obj.doc)
109
            self.write("")
110
        for field in obj.fields:
111
            self.write(f':ivar {field.name}:')
112
            self.write(f':vartype {field.name}: {field.uatype}')
113
        self.write('"""')
114
115
        self.write('')
116
        switch_written = False
117
        for field in obj.fields:
118
            if field.switchfield is not None:
119
                if not switch_written:
120
                    self.write('ua_switches = {')
121
                    switch_written = True
122
123
                bit = obj.bits[field.switchfield]
124
                self.write(f"    '{field.name}': ('{bit.container}', {bit.idx}),")
125
            #if field.switchvalue is not None: Not sure we need to handle that one
126
        if switch_written:
127
            self.write("           }")
128
        self.write("ua_types = [")
129
        for field in obj.fields:
130
            prefix = "ListOf" if field.length else ""
131
            uatype = prefix + field.uatype
132
            if uatype == "ListOfChar":
133
                uatype = "String"
134
            self.write(f"    ('{field.name}', '{uatype}'),")
135
        self.write("           ]")
136
        self.write("")
137
138
        self.write("def __init__(self):")
139
        self.iidx += 1
140
141
        # hack extension object stuff
142
        extobj_hack = False
143
        if "BodyLength" in [f.name for f in obj.fields]:
144
            extobj_hack = True
145
146
        for field in obj.fields:
147
            if extobj_hack and field.name == "Encoding":
148
                self.write("self.Encoding = 1")
149
            elif field.uatype == obj.name:  # help!!! selv referencing class
150
                self.write("self.{} = None".format(field.name))
151
            elif not obj.name in ("ExtensionObject") and field.name == "TypeId":  # and ( obj.name.endswith("Request") or obj.name.endswith("Response")):
152
                self.write(f"self.TypeId = FourByteNodeId(ObjectIds.{obj.name}_Encoding_DefaultBinary)")
153
            else:
154
                self.write(f"self.{field.name} = {'[]' if field.length else self.get_default_value(field)}")
155
        self.write("self._freeze = True")
156
        self.iidx = 1
157
158
        #__str__
159
        self.write("")
160
        self.write("def __str__(self):")
161
        self.iidx += 1
162
        tmp = [f"{f.name}:{{self.{f.name}}}" for f in obj.fields]
163
        tmp = ", ".join(tmp)
164
        if tmp:
165
            self.write(f"return f'{obj.name}({tmp})'")
166
        else:
167
            self.write(f"return '{obj.name}()'")
168
        self.iidx -= 1
169
        self.write("")
170
        self.write("__repr__ = __str__")
171
172
        self.iix = 0
173
174
    def write_unpack_enum(self, name, enum):
175
        self.write(f"self.{name} = {enum.name}(uabin.Primitives.{enum.uatype}.unpack(data))")
176
177
    def get_size_from_uatype(self, uatype):
178
        if uatype in ("Sbyte", "Byte", "Char", "Boolean"):
179
            return 1
180
        elif uatype in ("Int16", "UInt16"):
181
            return 2
182
        elif uatype in ("Int32", "UInt32", "Float"):
183
            return 4
184
        elif uatype in ("Int64", "UInt64", "Double"):
185
            return 8
186
        else:
187
            raise Exception(f"Cannot get size from type {uatype}")
188
189
    def write_unpack_uatype(self, name, uatype):
190
        if hasattr(Primitives, uatype):
191
            self.write(f"self.{name} = uabin.Primitives.{uatype}.unpack(data)")
192
        else:
193
            self.write(f"self.{name} = {uatype}.from_binary(data))")
194
195
    def write_pack_enum(self, listname, name, enum):
196
        self.write(f"{listname}.append(uabin.Primitives.{enum.uatype}.pack({name}.value))")
197
198
    def write_pack_uatype(self, listname, name, uatype):
199
        if hasattr(Primitives, uatype):
200
            self.write(f"{listname}.append(uabin.Primitives.{uatype}.pack({name}))")
201
        else:
202
            self.write(f"{listname}.append({name}.to_binary())")
203
            return
204
205
    def get_default_value(self, field):
206
        if field.switchfield:
207
            return None
208
        if field.uatype in self.model.enum_list:
209
            enum = self.model.get_enum(field.uatype)
210
            return f'{enum.name}(0)'
211
        if field.uatype == 'String':
212
            return None 
213
        elif field.uatype in ('ByteString', 'CharArray', 'Char'):
214
            return None 
215
        elif field.uatype == 'Boolean':
216
            return 'True'
217
        elif field.uatype == 'DateTime':
218
            return 'datetime.utcnow()'
219
        elif field.uatype in ('Int16', 'Int32', 'Int64', 'UInt16', 'UInt32', 'UInt64', 'Double', 'Float', 'Byte'):
220
            return 0
221
        elif field.uatype in 'ExtensionObject':
222
            return 'ExtensionObject()'
223
        else:
224
            return f'{field.uatype}()'
225
226
227
if __name__ == '__main__':
228
    import generate_model as gm
229
    xml_path = 'Opc.Ua.Types.bsd'
230
    protocol_path = '../asyncua/ua/uaprotocol_auto.py'
231
    p = gm.Parser(xml_path)
232
    model = p.parse()
233
    gm.add_basetype_members(model)
234
    gm.add_encoding_field(model)
235
    gm.remove_duplicates(model)
236
    gm.remove_vector_length(model)
237
    gm.split_requests(model)
238
    gm.fix_names(model)
239
    gm.remove_duplicate_types(model)
240
    c = CodeGenerator(model, protocol_path)
241
    c.run()
242