Passed
Pull Request — master (#238)
by Olivier
02:28
created

asyncua.common.structures104   B

Complexity

Total Complexity 45

Size/Duplication

Total Lines 193
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
eloc 122
dl 0
loc 193
rs 8.8
c 0
b 0
f 0
wmc 45

7 Functions

Rating   Name   Duplication   Size   Complexity  
C get_default_value() 0 23 11
A clean_name() 0 9 1
A load_enums() 0 9 4
A make_enum_code() 0 22 2
B load_data_type_definitions() 0 17 7
B _generate_object() 0 26 7
D make_structure_code() 0 60 13

How to fix   Complexity   

Complexity

Complex classes like asyncua.common.structures104 often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
from enum import Enum
2
from datetime import datetime
3
import uuid
4
from enum import IntEnum
5
import logging
6
import re
7
8
from asyncua import ua
9
10
11
logger = logging.getLogger(__name__)
12
13
def clean_name(name):
14
    """
15
    Remove characters that might be present in  OPC UA structures
16
    but cannot be part of of Python class names
17
    """
18
    name = re.sub(r'\W+', '_', name)
19
    name = re.sub(r'^[0-9]+', r'_\g<0>', name)
20
21
    return name
22
23
24
25
def get_default_value(uatype, enums=None):
26
    if enums is None:
27
        enums = {}
28
    if uatype == "String":
29
        return "None"
30
    elif uatype == "Guid":
31
        return "uuid.uuid4()"
32
    elif uatype in ("ByteString", "CharArray", "Char"):
33
        return b''
34
    elif uatype == "Boolean":
35
        return "True"
36
    elif uatype == "DateTime":
37
        return "datetime.utcnow()"
38
    elif uatype in ("Int16", "Int32", "Int64", "UInt16", "UInt32", "UInt64", "Double", "Float", "Byte", "SByte"):
39
        return 0
40
    elif uatype in enums:
41
        return f"ua.{uatype}({enums[uatype]})"
42
    elif hasattr(ua, uatype) and issubclass(getattr(ua, uatype), Enum):
43
        # We have an enum, try to initilize it correctly
44
        val = list(getattr(ua, uatype).__members__)[0]
45
        return f"ua.{uatype}.{val}"
46
    else:
47
        return f"ua.{uatype}()"
48
49
50
async def make_structure_code(data_type_node):
51
    """
52
    given a StructureDefinition object, generate Python code
53
    """
54
    sdef = await data_type_node.read_data_type_definition()
55
    name = clean_name((await data_type_node.read_browse_name()).Name)
56
    if sdef.StructureType not in (ua.StructureType.Structure, ua.StructureType.StructureWithOptionalFields):
57
        #if sdef.StructureType != ua.StructureType.Structure:
58
        raise NotImplementedError(f"Only StructureType implemented, not {ua.StructureType(sdef.StructureType).name} for node {name} {data_type_node} with DataTypdeDefinition {sdef}")
59
60
    code = f"""
61
62
class {name}:
63
64
    '''
65
    {name} structure autogenerated from StructureDefinition object
66
    '''
67
68
"""
69
    counter = 0
70
    if sdef.StructureType == ua.StructureType.StructureWithOptionalFields:
71
        code += '    ua_switches = {\n'
72
        for field in sdef.Fields:
73
74
            if field.IsOptional:
75
                code += f"        '{field.Name}': ('Encoding', {counter}),\n"
76
                counter += 1
77
        code += "    }\n\n"
78
79
    code += '    ua_types = [\n'
80
    uatypes = []
81
    for field in sdef.Fields:
82
        prefix = 'ListOf' if field.ValueRank >= 1 else ''
83
        if field.DataType.Identifier not in ua.ObjectIdNames:
84
            raise RuntimeError(f"Unknown field datatype for field: {field} in structure:{name}")
85
        uatype = prefix + ua.ObjectIdNames[field.DataType.Identifier]
86
        if field.ValueRank >= 1 and uatype == 'Char':
87
            uatype = 'String'
88
        uatypes.append((field, uatype))
89
        code += f"        ('{field.Name}', '{prefix + uatype}'),\n"
90
    code += "    ]\n"
91
    code += f"""
92
    def __str__(self):
93
        vals = [name + ": " + str(val) for name, val in self.__dict__.items()]
94
        return "{name}(" + ", ".join(vals) + ")"
95
96
    __repr__ = __str__
97
98
    def __init__(self):
99
"""
100
    if not sdef.Fields:
101
        code += "      pass"
102
    for field, uatype in uatypes:
103
        if field.ValueRank >= 1:
104
            default_value = "[]"
105
        else:
106
            default_value = get_default_value(uatype)
107
        code += f"        self.{field.Name} = {default_value}\n"
108
    print("CODE", code)
109
    return code
110
111
112
async def _generate_object(data_type_node, env=None, enum=False):
113
    """
114
    generate Python code and execute in a new environment
115
    return a dict of structures {name: class}
116
    Rmw: Since the code is generated on the fly, in case of error the stack trace is
117
    not available and debugging is very hard...
118
    """
119
    if env is None:
120
        env = {}
121
    #  Add the required libraries to dict
122
    if "ua" not in env:
123
        env['ua'] = ua
124
    if "datetime" not in env:
125
        env['datetime'] = datetime
126
    if "uuid" not in env:
127
        env['uuid'] = uuid
128
    if "enum" not in env:
129
        env['IntEnum'] = IntEnum
130
    # generate classe add it to env dict
131
    if enum:
132
        code = await make_enum_code(data_type_node)
133
    else:
134
        code = await make_structure_code(data_type_node)
135
    logger.debug("Executing code: %s", code)
136
    exec(code, env)
137
    return env
138
139
140
async def load_data_type_definitions(server, base_node=None):
141
    if base_node is None:
142
        base_node = server.nodes.base_structure_type
143
    for desc in await base_node.get_children_descriptions(refs=ua.ObjectIds.HasSubtype):
144
        if desc.BrowseName.Name == "FilterOperand":
145
            #FIXME: find out why that one is not in ua namespace...
146
            continue
147
        if hasattr(ua, desc.BrowseName.Name):
148
            continue
149
        logger.warning("Registring structure %s %s", desc.NodeId, desc.BrowseName)
150
        node = server.get_node(desc.NodeId)
151
        try:
152
            await _generate_object(node)
153
        except ua.uaerrors.BadAttributeIdInvalid:
154
            logger.warning("%s has no DataTypeDefinition atttribute", node)
155
        except Exception:
156
            logger.exception("Error getting datatype for node %s", node)
157
158
159
async def make_enum_code(data_type_node):
160
    """
161
    if node has a DataTypeDefinition arttribute, generate enum code
162
    """
163
    edef = await data_type_node.read_data_type_definition()
164
    name = clean_name((await data_type_node.read_browse_name()).Name)
165
    code = f"""
166
167
class {name}(IntEnum):
168
169
    '''
170
    {name} EnumInt autogenerated from xml
171
    '''
172
173
"""
174
175
    for field in edef.Fields:
176
        name = field.Name
177
        value = field.Value
178
        code += f"    {name} = {value}\n"
179
180
    return code
181
182
183
184
async def load_enums(server, base_node=None):
185
    if base_node is None:
186
        base_node = server.nodes.enum_data_type
187
    for desc in await base_node.get_children_descriptions(refs=ua.ObjectIds.HasSubtype):
188
        if hasattr(ua, desc.BrowseName.Name):
189
            continue
190
        logger.warning("Registring Enum %s %s", desc.NodeId, desc.BrowseName)
191
        node = server.get_node(desc.NodeId)
192
        await _generate_object(node)
193
194
195