Completed
Pull Request — master (#251)
by Olivier
07:59
created

asyncua.common.structures104   F

Complexity

Total Complexity 61

Size/Duplication

Total Lines 254
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
eloc 170
dl 0
loc 254
rs 3.52
c 0
b 0
f 0
wmc 61

9 Functions

Rating   Name   Duplication   Size   Complexity  
C get_default_value() 0 23 11
A clean_name() 0 11 2
A load_enums() 0 14 5
A _recursive_parse() 0 8 3
A _read_data_type_definition() 0 18 5
A load_data_type_definitions() 0 13 4
A make_enum_code() 0 20 2
B _generate_object() 0 27 7
F make_structure_code() 0 71 18

3 Methods

Rating   Name   Duplication   Size   Complexity  
A DataTypeSorter.__lt__() 0 4 2
A DataTypeSorter.__init__() 0 7 1
A DataTypeSorter.__str__() 0 2 1

How to fix   Complexity   

Complexity

Complex classes like asyncua.common.structures104 often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
from enum import Enum
2
from datetime import datetime
3
import uuid
4
from enum import IntEnum
5
import logging
6
import re
7
8
from asyncua import ua
9
10
logger = logging.getLogger(__name__)
11
12
13
def clean_name(name):
14
    """
15
    Remove characters that might be present in  OPC UA structures
16
    but cannot be part of of Python class names
17
    """
18
    newname = re.sub(r'\W+', '_', name)
19
    newname = re.sub(r'^[0-9]+', r'_\g<0>', newname)
20
21
    if name != newname:
22
        logger.warning("renamed %s to %s due to Python syntax", name, newname)
23
    return newname
24
25
26
def get_default_value(uatype, enums=None):
27
    if enums is None:
28
        enums = {}
29
    if uatype == "String":
30
        return "None"
31
    elif uatype == "Guid":
32
        return "uuid.uuid4()"
33
    elif uatype in ("ByteString", "CharArray", "Char"):
34
        return b''
35
    elif uatype == "Boolean":
36
        return "True"
37
    elif uatype == "DateTime":
38
        return "datetime.utcnow()"
39
    elif uatype in ("Int16", "Int32", "Int64", "UInt16", "UInt32", "UInt64", "Double", "Float", "Byte", "SByte"):
40
        return 0
41
    elif uatype in enums:
42
        return f"ua.{uatype}({enums[uatype]})"
43
    elif hasattr(ua, uatype) and issubclass(getattr(ua, uatype), Enum):
44
        # We have an enum, try to initilize it correctly
45
        val = list(getattr(ua, uatype).__members__)[0]
46
        return f"ua.{uatype}.{val}"
47
    else:
48
        return f"ua.{uatype}()"
49
50
51
def make_structure_code(data_type, name, sdef):
52
    """
53
    given a StructureDefinition object, generate Python code
54
    """
55
    if sdef.StructureType not in (ua.StructureType.Structure, ua.StructureType.StructureWithOptionalFields):
56
        #if sdef.StructureType != ua.StructureType.Structure:
57
        raise NotImplementedError(f"Only StructureType implemented, not {ua.StructureType(sdef.StructureType).name} for node {name} with DataTypdeDefinition {sdef}")
58
59
    code = f"""
60
61
class {name}:
62
63
    '''
64
    {name} structure autogenerated from StructureDefinition object
65
    '''
66
67
    data_type = ua.NodeId({data_type.Identifier}, {data_type.NamespaceIndex})
68
69
"""
70
    counter = 0
71
    if sdef.StructureType == ua.StructureType.StructureWithOptionalFields:
72
        code += '    ua_switches = {\n'
73
        for field in sdef.Fields:
74
75
            if field.IsOptional:
76
                code += f"        '{field.Name}': ('Encoding', {counter}),\n"
77
                counter += 1
78
        code += "    }\n\n"
79
80
    code += '    ua_types = [\n'
81
    if sdef.StructureType == ua.StructureType.StructureWithOptionalFields:
82
        code += f"        ('Encoding', 'Byte'),\n"
83
    uatypes = []
84
    for field in sdef.Fields:
85
        prefix = 'ListOf' if field.ValueRank >= 1 else ''
86
        if field.DataType.NamespaceIndex == 0 and field.DataType.Identifier in ua.ObjectIdNames:
87
            uatype = ua.ObjectIdNames[field.DataType.Identifier]
88
        elif field.DataType in ua.extension_objects_by_datatype:
89
            uatype = ua.extension_objects_by_datatype[field.DataType].__name__
90
        elif field.DataType in ua.enums_by_datatype:
91
            uatype = ua.enums_by_datatype[field.DataType].__name__
92
        else:
93
            #FIXME: we are probably missing many custom tyes here based on builtin types
94
            #maybe we can use ua_utils.get_base_data_type()
95
            raise RuntimeError(f"Unknown datatype for field: {field} in structure:{name}, please report")
96
        if field.ValueRank >= 1 and uatype == 'Char':
97
            uatype = 'String'
98
        uatypes.append((field, uatype))
99
        code += f"        ('{field.Name}', '{prefix + uatype}'),\n"
100
    code += "    ]\n"
101
    code += f"""
102
    def __str__(self):
103
        vals = [f"{{name}}:{{val}}" for name, val in self.__dict__.items()]
104
        return f"{name}({{','.join(vals)}})"
105
106
    __repr__ = __str__
107
108
    def __init__(self):
109
"""
110
    if not sdef.Fields:
111
        code += "      pass"
112
    if sdef.StructureType == ua.StructureType.StructureWithOptionalFields:
113
        code += f"        self.Encoding = 0\n"
114
    for field, uatype in uatypes:
115
        if field.ValueRank >= 1:
116
            default_value = "[]"
117
        else:
118
            default_value = get_default_value(uatype)
119
        code += f"        self.{field.Name} = {default_value}\n"
120
    print("CODE", code)
121
    return code
122
123
124
async def _generate_object(name, sdef, data_type=None, env=None, enum=False):
125
    """
126
    generate Python code and execute in a new environment
127
    return a dict of structures {name: class}
128
    Rmw: Since the code is generated on the fly, in case of error the stack trace is
129
    not available and debugging is very hard...
130
    """
131
    if env is None:
132
        env = {}
133
    #  Add the required libraries to dict
134
    if "ua" not in env:
135
        env['ua'] = ua
136
    if "datetime" not in env:
137
        env['datetime'] = datetime
138
    if "uuid" not in env:
139
        env['uuid'] = uuid
140
    if "enum" not in env:
141
        env['IntEnum'] = IntEnum
142
    # generate classe add it to env dict
143
    if enum:
144
        code = make_enum_code(name, sdef)
145
    else:
146
        code = make_structure_code(data_type, name, sdef)
147
    logger.debug("Executing code: %s", code)
148
    print("CODE", code)
149
    exec(code, env)
150
    return env
151
152
153
class DataTypeSorter:
154
    def __init__(self, data_type, name, desc, sdef):
155
        self.data_type = data_type
156
        self.name = name
157
        self.desc = desc
158
        self.sdef = sdef
159
        self.encoding_id = self.sdef.DefaultEncodingId
160
        self.deps = [field.DataType for field in self.sdef.Fields]
161
162
    def __lt__(self, other):
163
        if self.desc.NodeId in other.deps:
164
            return True
165
        return False
166
167
    def __str__(self):
168
        return f"{self.__class__.__name__}({self.desc.NodeId, self.deps, self.encoding_id})"
169
170
    __repr__ = __str__
171
172
173
async def _recursive_parse(server, base_node, dtypes):
174
    for desc in await base_node.get_children_descriptions(refs=ua.ObjectIds.HasSubtype):
175
        sdef = await _read_data_type_definition(server, desc)
176
        if not sdef:
177
            continue
178
        name = clean_name(desc.BrowseName.Name)
179
        dtypes.append(DataTypeSorter(desc.NodeId, name, desc, sdef))
180
        await _recursive_parse(server, server.get_node(desc.NodeId), dtypes)
181
182
183
async def load_data_type_definitions(server, base_node=None):
184
    await load_enums(server)  # we need all enums to generate structure code
185
    if base_node is None:
186
        base_node = server.nodes.base_structure_type
187
    dtypes = []
188
    await _recursive_parse(server, base_node, dtypes)
189
    dtypes.sort()
190
    for dts in dtypes:
191
        try:
192
            env = await _generate_object(dts.name, dts.sdef, data_type=dts.data_type)
193
            ua.register_extension_object(dts.name, dts.encoding_id, env[dts.name], dts.desc.NodeId)
194
        except NotImplementedError:
195
            logger.exception("Structure type %s not implemented", dts.sdef)
196
197
198
async def _read_data_type_definition(server, desc):
199
    if desc.BrowseName.Name == "FilterOperand":
200
        #FIXME: find out why that one is not in ua namespace...
201
        return None
202
    # FIXME: this is fishy, we may have same name in different Namespaces
203
    if hasattr(ua, desc.BrowseName.Name):
204
        return None
205
    logger.warning("Registring data type %s %s", desc.NodeId, desc.BrowseName)
206
    node = server.get_node(desc.NodeId)
207
    try:
208
        sdef = await node.read_data_type_definition()
209
    except ua.uaerrors.BadAttributeIdInvalid:
210
        logger.warning("%s has no DataTypeDefinition atttribute", node)
211
        return None
212
    except Exception:
213
        logger.exception("Error getting datatype for node %s", node)
214
        return None
215
    return sdef
216
217
218
def make_enum_code(name, edef):
219
    """
220
    if node has a DataTypeDefinition arttribute, generate enum code
221
    """
222
    code = f"""
223
224
class {name}(IntEnum):
225
226
    '''
227
    {name} EnumInt autogenerated from EnumDefinition
228
    '''
229
230
"""
231
232
    for field in edef.Fields:
233
        name = field.Name
234
        value = field.Value
235
        code += f"    {name} = {value}\n"
236
237
    return code
238
239
240
async def load_enums(server, base_node=None):
241
    if base_node is None:
242
        base_node = server.nodes.enum_data_type
243
    for desc in await base_node.get_children_descriptions(refs=ua.ObjectIds.HasSubtype):
244
        if hasattr(ua, desc.BrowseName.Name):
245
            continue
246
        logger.warning("Registring Enum %s %s", desc.NodeId, desc.BrowseName)
247
        name = clean_name(desc.BrowseName.Name)
248
        sdef = await _read_data_type_definition(server, desc)
249
        if not sdef:
250
            continue
251
        node = server.get_node(desc.NodeId)
252
        env = await _generate_object(name, sdef, enum=True)
253
        ua.register_enum(name, desc.NodeId, env[name])
254