Completed
Push — master ( f41327...987601 )
by Olivier
04:37 queued 01:54
created

generate_model.Parser.parse_enum()   C

Complexity

Conditions 10

Size

Total Lines 27
Code Lines 24

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 10
eloc 24
nop 2
dl 0
loc 27
rs 5.9999
c 0
b 0
f 0

How to fix   Complexity   

Complexity

Complex classes like generate_model.Parser.parse_enum() often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
"""
2
Generate address space code from xml file specification
3
"""
4
from copy import copy
5
from xml.etree import ElementTree
6
7
NeedOverride = []
8
NeedConstructor = []  # ["RelativePathElement", "ReadValueId", "OpenSecureChannelParameters", "UserIdentityToken", "RequestHeader", "ResponseHeader", "ReadParameters", "UserIdentityToken", "BrowseDescription", "ReferenceDescription", "CreateSubscriptionParameters", "PublishResult", "NotificationMessage", "SetPublishingModeParameters"]
9
IgnoredEnums = []  # ["IdType", "NodeIdType"]
10
# we want to implement som struct by hand, to make better interface or simply because they are too complicated
11
IgnoredStructs = []  # ["NodeId", "ExpandedNodeId", "Variant", "QualifiedName", "DataValue", "LocalizedText"]#, "ExtensionObject"]
12
# by default we split requests and respons in header and parameters, but some are so simple we do not split them
13
NoSplitStruct = ["GetEndpointsResponse", "CloseSessionRequest", "AddNodesResponse", "DeleteNodesResponse",
14
                 "BrowseResponse", "HistoryReadResponse", "HistoryUpdateResponse", "RegisterServerResponse",
15
                 "CloseSecureChannelRequest", "CloseSecureChannelResponse", "CloseSessionRequest",
16
                 "CloseSessionResponse", "UnregisterNodesResponse", "MonitoredItemModifyRequest",
17
                 "MonitoredItemsCreateRequest", "ReadResponse", "WriteResponse",
18
                 "TranslateBrowsePathsToNodeIdsResponse", "DeleteSubscriptionsResponse", "DeleteMonitoredItemsResponse",
19
                 "CreateMonitoredItemsResponse", "ServiceFault", "AddReferencesResponse",
20
                 "ModifyMonitoredItemsResponse", "RepublishResponse", "CallResponse", "FindServersResponse",
21
                 "RegisterServerRequest", "RegisterServer2Response"]
22
# structs that end with Request or Response but are not
23
NotRequest = ["MonitoredItemCreateRequest", "MonitoredItemModifyRequest", "CallMethodRequest"]
24
OverrideTypes = {}  # AttributeId": "AttributeID",  "ResultMask": "BrowseResultMask", "NodeClassMask": "NodeClass", "AccessLevel": "VariableAccessLevel", "UserAccessLevel": "VariableAccessLevel", "NotificationData": "NotificationData"}
25
OverrideNames = {}  # {"RequestHeader": "Header", "ResponseHeader": "Header", "StatusCode": "Status", "NodesToRead": "AttributesToRead"} # "MonitoringMode": "Mode",, "NotificationMessage": "Notification", "NodeIdType": "Type"}
26
27
28
# some object are defined in extensionobjects in spec but seems not to be in reality
29
# in addition to this list all request and response and descriptions will not inherit
30
# NoInherit = ["RequestHeader", "ResponseHeader", "ChannelSecurityToken", "UserTokenPolicy", "SignatureData", "BrowseResult", "ReadValueId", "WriteValue", "BrowsePath", "BrowsePathTarget", "RelativePath", "RelativePathElement", "BrowsePathResult"]#, "ApplicationDescription", "EndpointDescription"
31
32
33
class Bit(object):
34
    def __init__(self):
35
        self.name = None
36
        self.idx = None
37
        self.container = None
38
        self.length = 1
39
40
    def __str__(self):
41
        return f'(Bit: {self.name}, container:{self.container}, idx:{self.idx})'
42
43
    __repr__ = __str__
44
45
46
class Struct(object):
47
    def __init__(self):
48
        self.name = None
49
        self.basetype = None
50
        self.doc = ""
51
        self.fields = []
52
        self.bits = {}
53
        self.needconstructor = None
54
        self.needoverride = False
55
        self.children = []
56
        self.parents = []
57
        self.extensionobject = False  # used for struct which are not pure extension objects
58
59
    def get_field(self, name):
60
        for f in self.fields:
61
            if f.name == name:
62
                return f
63
        raise Exception(f'field not found: {name}')
64
65
    def __str__(self):
66
        return f'Struct {self.name}:{self.basetype}'
67
68
    __repr__ = __str__
69
70
71
class Field(object):
72
    def __init__(self):
73
        self.name = None
74
        self.uatype = None
75
        self.length = None
76
        self.sourcetype = None
77
        self.switchfield = None
78
        self.switchvalue = None
79
        self.bitlength = 1
80
81
    def __str__(self):
82
        return f'Field {self.name}({self.uatype})'
83
84
    __repr__ = __str__
85
86
    def is_native_type(self):
87
        if self.uatype in (
88
        'Char', 'SByte', 'Int16', 'Int32', 'Int64', 'UInt16', 'UInt32', 'UInt64', 'Boolean', 'Double', 'Float', 'Byte',
89
        'String', 'CharArray', 'ByteString', 'DateTime'):
90
            return True
91
        return False
92
93
94
class Enum(object):
95
    def __init__(self):
96
        self.name = None
97
        self.uatype = None
98
        self.values = []
99
        self.doc = ""
100
101
    def get_ctype(self):
102
        return f'uint{self.uatype}_t'
103
104
105
class EnumValue(object):
106
    def __init__(self):
107
        self.name = None
108
        self.value = None
109
110
111
class Model(object):
112
    def __init__(self):
113
        self.structs = []
114
        self.enums = []
115
        self.struct_list = []
116
        self.enum_list = []
117
118
    def get_struct(self, name):
119
        for struct in self.structs:
120
            if name == struct.name:
121
                return struct
122
        raise Exception("No struct named: " + str(name))
123
124
    def get_enum(self, name):
125
        for s in self.enums:
126
            if name == s.name:
127
                return s
128
        raise Exception("No enum named: " + str(name))
129
130
131
def reorder_structs(model):
132
    types = IgnoredStructs + IgnoredEnums + [
133
        'Bit', 'Char', 'CharArray', 'Guid', 'SByte', 'Int16', 'Int32', 'Int64', 'UInt16', 'UInt32', 'UInt64',
134
        'DateTime', 'Boolean', 'Double', 'Float', 'ByteString', 'Byte', 'StatusCode', 'DiagnosticInfo', 'String',
135
        'AttributeID'
136
    ] + [enum.name for enum in model.enums] + ['VariableAccessLevel']
137
    waiting = {}
138
    newstructs = []
139
    for s in model.structs:
140
        types.append(s.name)
141
        s.waitingfor = []
142
        ok = True
143
        for f in s.fields:
144
            if f.uatype not in types:
145
                if f.uatype in waiting.keys():
146
                    waiting[f.uatype].append(s)
147
                    s.waitingfor.append(f.uatype)
148
                else:
149
                    waiting[f.uatype] = [s]
150
                    s.waitingfor.append(f.uatype)
151
                ok = False
152
        if ok:
153
            newstructs.append(s)
154
            waitings = waiting.pop(s.name, None)
155
            if waitings:
156
                for s2 in waitings:
157
                    s2.waitingfor.remove(s.name)
158
                    if not s2.waitingfor:
159
                        newstructs.append(s2)
160
    if len(model.structs) != len(newstructs):
161
        print(f'Error while reordering structs, some structs could not be reinserted, had {len(model.structs)} structs, we now have {len(newstructs)} structs')
162
        s1 = set(model.structs)
163
        s2 = set(newstructs)
164
        rest = s1 - s2
165
        print('Variant' in types)
166
        for s in s1 - s2:
167
            print(f'{s} is waiting for: {s.waitingfor}')
168
        # print(s1 -s2)
169
        # print(waiting)
170
    model.structs = newstructs
171
172
173
def override_types(model):
174
    for struct in model.structs:
175
        for field in struct.fields:
176
            if field.name in OverrideTypes.keys():
177
                field.uatype = OverrideTypes[field.name]
178
179
180
def remove_duplicates(model):
181
    for struct in model.structs:
182
        fields = []
183
        names = []
184
        for field in struct.fields:
185
            if field.name not in names:
186
                names.append(field.name)
187
                fields.append(field)
188
        struct.fields = fields
189
190
191
def add_encoding_field(model):
192
    for struct in model.structs:
193
        newfields = []
194
        container = None
195
        idx = 0
196
        for field in struct.fields:
197
            if field.uatype in ('UInt6', 'NodeIdType'):
198
                container = field.name
199
                b = Bit()
200
                b.name = field.name
201
                b.idx = 0
202
                b.container = container
203
                b.length = 6
204
                idx = b.length
205
                struct.bits[b.name] = b
206
207
            if field.uatype == 'Bit':
208
                if not container or idx > 7:
209
                    container = 'Encoding'
210
                    idx = 0
211
                    f = Field()
212
                    f.sourcetype = field.sourcetype
213
                    f.name = 'Encoding'
214
                    f.uatype = 'Byte'
215
                    newfields.append(f)
216
217
                b = Bit()
218
                b.name = field.name
219
                b.idx = idx
220
                b.container = container
221
                b.length = field.bitlength
222
                idx += field.bitlength
223
                struct.bits[b.name] = b
224
            else:
225
                newfields.append(field)
226
        struct.fields = newfields
227
228
229
def remove_vector_length(model):
230
    for struct in model.structs:
231
        new = []
232
        for field in struct.fields:
233
            if not field.name.startswith('NoOf') and field.name != 'Length':
234
                new.append(field)
235
        struct.fields = new
236
237
238
def remove_body_length(model):
239
    for struct in model.structs:
240
        new = []
241
        for field in struct.fields:
242
            if not field.name == 'BodyLength':
243
                new.append(field)
244
        struct.fields = new
245
246
247
def remove_duplicate_types(model):
248
    for struct in model.structs:
249
        for field in struct.fields:
250
            if field.uatype == 'CharArray':
251
                field.uatype = 'String'
252
253
254
# def remove_extensionobject_fields(model):
255
# for obj in model.structs:
256
# if obj.name.endswith("Request") or obj.name.endswith("Response"):
257
# obj.fields = [el for el in obj.fields if el.name not in ("TypeId", "Body", "Encoding")]
258
259
def split_requests(model):
260
    structs = []
261
    for struct in model.structs:
262
        structtype = None
263
        if struct.name.endswith('Request') and not struct.name in NotRequest:
264
            structtype = 'Request'
265
        elif struct.name.endswith('Response') or struct.name == 'ServiceFault':
266
            structtype = 'Response'
267
        if structtype:
268
            # for field in struct.fields:
269
            # if field.name == "Encoding":
270
            # struct.fields.remove(field)
271
            # break
272
            # for field in struct.fields:
273
            # if field.name == "BodyLength":
274
            # struct.fields.remove(field)
275
            # break
276
            struct.needconstructor = True
277
            field = Field()
278
            field.name = 'TypeId'
279
            field.uatype = 'NodeId'
280
            struct.fields.insert(0, field)
281
282
        if structtype and not struct.name in NoSplitStruct:
283
            paramstruct = Struct()
284
            if structtype == 'Request':
285
                basename = struct.name.replace('Request', '') + 'Parameters'
286
                paramstruct.name = basename
287
            else:
288
                basename = struct.name.replace('Response', '') + 'Result'
289
                paramstruct.name = basename
290
            paramstruct.fields = struct.fields[2:]
291
            paramstruct.bits = struct.bits
292
293
            struct.fields = struct.fields[:2]
294
            # struct.bits = {}
295
            structs.append(paramstruct)
296
297
            typeid = Field()
298
            typeid.name = "Parameters"
299
            typeid.uatype = paramstruct.name
300
            struct.fields.append(typeid)
301
        structs.append(struct)
302
    model.structs = structs
303
304
305
class Parser(object):
306
    def __init__(self, path):
307
        self.path = path
308
        self.model = None
309
310
    def parse(self):
311
        print("Parsing: ", self.path)
312
        self.model = Model()
313
        tree = ElementTree.parse(self.path)
314
        root = tree.getroot()
315
        self.add_extension_object()
316
        self.add_data_type_definition()
317
        for child in root:
318
            tag = child.tag[40:]
319
            if tag == 'StructuredType':
320
                struct = self.parse_struct(child)
321
                if struct.name != 'ExtensionObject':
322
                    self.model.structs.append(struct)
323
                    self.model.struct_list.append(struct.name)
324
            elif tag == 'EnumeratedType':
325
                enum = self.parse_enum(child)
326
                self.model.enums.append(enum)
327
                self.model.enum_list.append(enum.name)
328
            # else:
329
            # print("Not implemented node type: " + tag + "\n")
330
        return self.model
331
332
    def add_extension_object(self):
333
        obj = Struct()
334
        obj.name = 'ExtensionObject'
335
        f = Field()
336
        f.name = 'TypeId'
337
        f.uatype = 'NodeId'
338
        obj.fields.append(f)
339
        f = Field()
340
        f.name = 'BinaryBody'
341
        f.uatype = 'Bit'
342
        obj.fields.append(f)
343
        f = Field()
344
        f.name = 'XmlBody'
345
        f.uatype = 'Bit'
346
        obj.fields.append(f)
347
        f = Field()
348
        f.name = 'Body'
349
        f.uatype = 'ByteString'
350
        f.switchfield = 'BinaryBody'
351
        obj.fields.append(f)
352
        self.model.struct_list.append(obj.name)
353
354
        self.model.structs.append(obj)
355
356
    def add_data_type_definition(self):
357
        obj = Struct()
358
        obj.name = "DataTypeDefinition"
359
        self.model.struct_list.append(obj.name)
360
        self.model.structs.append(obj)
361
362
    def parse_struct(self, child):
363
        tag = child.tag[40:]
364
        struct = Struct()
365
        for key, val in child.attrib.items():
366
            if key == 'Name':
367
                struct.name = val
368
            elif key == 'BaseType':
369
                if ':' in val:
370
                    prefix, val = val.split(':')
371
                struct.basetype = val
372
                tmp = struct
373
                while tmp.basetype:
374
                    struct.parents.append(tmp.basetype)
375
                    tmp = self.model.get_struct(tmp.basetype)
376
            else:
377
                print(f'Error unknown key: {key}')
378
        for el in child:
379
            tag = el.tag[40:]
380
            if tag == 'Field':
381
                field = Field()
382
                for key, val in el.attrib.items():
383
                    if key == 'Name':
384
                        field.name = val
385
                    elif key == 'TypeName':
386
                        field.uatype = val.split(':')[1]
387
                    elif key == 'LengthField':
388
                        field.length = val
389
                    elif key == 'SourceType':
390
                        field.sourcetype = val
391
                    elif key == 'SwitchField':
392
                        field.switchfield = val
393
                    elif key == 'SwitchValue':
394
                        field.switchvalue = val
395
                    elif key == 'Length':
396
                        field.bitlength = int(val)
397
                    else:
398
                        print(f'Unknown field item: {struct.name} {key}')
399
400
                struct.fields.append(field)
401
            elif tag == 'Documentation':
402
                struct.doc = el.text
403
            else:
404
                print(f'Unknown tag: {tag}')
405
406
        return struct
407
408
    def parse_enum(self, child):
409
        tag = child.tag[40:]
410
        enum = Enum()
411
        for k, v in child.items():
412
            if k == 'Name':
413
                enum.name = v
414
            elif k == 'LengthInBits':
415
                enum.uatype = f'UIntv{v}'
416
            else:
417
                print(f'Unknown attr for enum: {k}')
418
        for el in child:
419
            tag = el.tag[40:]
420
            if tag == 'EnumeratedValue':
421
                ev = EnumValue()
422
                for k, v in el.attrib.items():
423
                    if k == 'Name':
424
                        ev.name = v
425
                    elif k == 'Value':
426
                        ev.value = v
427
                    else:
428
                        print(f'Unknown field attrib: {k}')
429
                enum.values.append(ev)
430
            elif tag == 'Documentation':
431
                enum.doc = el.text
432
            else:
433
                print(f'Unknown enum tag: {tag}')
434
        return enum
435
436
437
# "def reorder_extobjects(model):
438
# ext = model.get_struct("ExtensionObject")
439
# print(ext)
440
# typeid = ext.fields[4]
441
# ext.fields.remove(typeid)
442
# ext.fields.insert(0, typeid)
443
444
def add_basetype_members(model):
445
    for struct in model.structs:
446
        if not struct.basetype:
447
            continue
448
        emptystruct = False
449
        if len(struct.fields) == 0:
450
            emptystruct = True
451
        if struct.basetype in ('ExtensionObject'):
452
            struct.basetype = None
453
            continue
454
        base = model.get_struct(struct.basetype)
455
        # if struct.basetype == "ExtensionObject" and len(struct.fields) != 0:
456
        # if struct.basetype == "ExtensionObject" and len(struct.fields) != 0:
457
        # if struc
458
        # for f in base.fields:
459
        # if f.name == "TypeId":
460
        # f2 = copy(f)
461
        # f2.switchfield = None
462
        # struct.fields.insert(0, f2)
463
        # break
464
        # continue
465
        for name, bit in base.bits.items():
466
            struct.bits[name] = bit
467
        for idx, field in enumerate(base.fields):
468
            field = copy(field)
469
            if field.name == 'Body' and not emptystruct:
470
                # print('Field is names Body', struct.name, field.name)
471
                struct.extensionobject = True
472
                field.name = 'BodyLength'
473
                field.uatype = 'Int32'
474
                field.length = None
475
                field.switchfield = None
476
                # print("Field is names Body 2", struct.name, field.name)
477
            # HACK EXTENSIONOBJECT
478
            # if base.name == "ExtensionObject":
479
            # continue
480
            # if field.uatype == "Bit":
481
            # continue
482
            # if field.name == "Body":
483
            # continue
484
            # if field.name == "TypeId":
485
            # field.switchfield = None
486
            # END HACK
487
            if not field.sourcetype:
488
                field.sourcetype = base.name
489
            struct.fields.insert(idx, field)
490
491
492
def fix_names(model):
493
    for s in model.enums:
494
        for f in s.values:
495
            if f.name == 'None':
496
                f.name = 'None_'
497