Passed
Pull Request — master (#238)
by Olivier
02:19
created

asyncua.common.structures104   B

Complexity

Total Complexity 45

Size/Duplication

Total Lines 196
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
eloc 124
dl 0
loc 196
rs 8.8
c 0
b 0
f 0
wmc 45

7 Functions

Rating   Name   Duplication   Size   Complexity  
C get_default_value() 0 23 11
A clean_name() 0 9 1
B _generate_object() 0 26 7
D make_structure_code() 0 60 13
A load_enums() 0 9 4
A make_enum_code() 0 22 2
B load_data_type_definitions() 0 19 7

How to fix   Complexity   

Complexity

Complex classes like asyncua.common.structures104 often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
from enum import Enum
2
from datetime import datetime
3
import uuid
4
from enum import IntEnum
5
import logging
6
import re
7
8
from asyncua import ua
9
10
11
logger = logging.getLogger(__name__)
12
13
def clean_name(name):
14
    """
15
    Remove characters that might be present in  OPC UA structures
16
    but cannot be part of of Python class names
17
    """
18
    name = re.sub(r'\W+', '_', name)
19
    name = re.sub(r'^[0-9]+', r'_\g<0>', name)
20
21
    return name
22
23
24
25
def get_default_value(uatype, enums=None):
26
    if enums is None:
27
        enums = {}
28
    if uatype == "String":
29
        return "None"
30
    elif uatype == "Guid":
31
        return "uuid.uuid4()"
32
    elif uatype in ("ByteString", "CharArray", "Char"):
33
        return b''
34
    elif uatype == "Boolean":
35
        return "True"
36
    elif uatype == "DateTime":
37
        return "datetime.utcnow()"
38
    elif uatype in ("Int16", "Int32", "Int64", "UInt16", "UInt32", "UInt64", "Double", "Float", "Byte", "SByte"):
39
        return 0
40
    elif uatype in enums:
41
        return f"ua.{uatype}({enums[uatype]})"
42
    elif hasattr(ua, uatype) and issubclass(getattr(ua, uatype), Enum):
43
        # We have an enum, try to initilize it correctly
44
        val = list(getattr(ua, uatype).__members__)[0]
45
        return f"ua.{uatype}.{val}"
46
    else:
47
        return f"ua.{uatype}()"
48
49
50
async def make_structure_code(data_type_node):
51
    """
52
    given a StructureDefinition object, generate Python code
53
    """
54
    sdef = await data_type_node.read_data_type_definition()
55
    name = clean_name((await data_type_node.read_browse_name()).Name)
56
    if sdef.StructureType not in (ua.StructureType.Structure, ua.StructureType.StructureWithOptionalFields):
57
        #if sdef.StructureType != ua.StructureType.Structure:
58
        raise NotImplementedError(f"Only StructureType implemented, not {ua.StructureType(sdef.StructureType).name} for node {name} {data_type_node} with DataTypdeDefinition {sdef}")
59
60
    code = f"""
61
62
class {name}:
63
64
    '''
65
    {name} structure autogenerated from StructureDefinition object
66
    '''
67
68
"""
69
    counter = 0
70
    if sdef.StructureType == ua.StructureType.StructureWithOptionalFields:
71
        code += '    ua_switches = {\n'
72
        for field in sdef.Fields:
73
74
            if field.IsOptional:
75
                code += f"        '{field.Name}': ('Encoding', {counter}),\n"
76
                counter += 1
77
        code += "    }\n\n"
78
79
    code += '    ua_types = [\n'
80
    uatypes = []
81
    for field in sdef.Fields:
82
        prefix = 'ListOf' if field.ValueRank >= 1 else ''
83
        if field.DataType.Identifier not in ua.ObjectIdNames:
84
            raise RuntimeError(f"Unknown field datatype for field: {field} in structure:{name}")
85
        uatype = prefix + ua.ObjectIdNames[field.DataType.Identifier]
86
        if field.ValueRank >= 1 and uatype == 'Char':
87
            uatype = 'String'
88
        uatypes.append((field, uatype))
89
        code += f"        ('{field.Name}', '{prefix + uatype}'),\n"
90
    code += "    ]\n"
91
    code += f"""
92
    def __str__(self):
93
        vals = [name + ": " + str(val) for name, val in self.__dict__.items()]
94
        return "{name}(" + ", ".join(vals) + ")"
95
96
    __repr__ = __str__
97
98
    def __init__(self):
99
"""
100
    if not sdef.Fields:
101
        code += "      pass"
102
    for field, uatype in uatypes:
103
        if field.ValueRank >= 1:
104
            default_value = "[]"
105
        else:
106
            default_value = get_default_value(uatype)
107
        code += f"        self.{field.Name} = {default_value}\n"
108
    print("CODE", code)
109
    return code
110
111
112
async def _generate_object(data_type_node, env=None, enum=False):
113
    """
114
    generate Python code and execute in a new environment
115
    return a dict of structures {name: class}
116
    Rmw: Since the code is generated on the fly, in case of error the stack trace is
117
    not available and debugging is very hard...
118
    """
119
    if env is None:
120
        env = {}
121
    #  Add the required libraries to dict
122
    if "ua" not in env:
123
        env['ua'] = ua
124
    if "datetime" not in env:
125
        env['datetime'] = datetime
126
    if "uuid" not in env:
127
        env['uuid'] = uuid
128
    if "enum" not in env:
129
        env['IntEnum'] = IntEnum
130
    # generate classe add it to env dict
131
    if enum:
132
        code = await make_enum_code(data_type_node)
133
    else:
134
        code = await make_structure_code(data_type_node)
135
    logger.debug("Executing code: %s", code)
136
    exec(code, env)
137
    return env
138
139
140
async def load_data_type_definitions(server, base_node=None):
141
    if base_node is None:
142
        base_node = server.nodes.base_structure_type
143
    for desc in await base_node.get_children_descriptions(refs=ua.ObjectIds.HasSubtype):
144
        if desc.BrowseName.Name == "FilterOperand":
145
            #FIXME: find out why that one is not in ua namespace...
146
            continue
147
        if hasattr(ua, desc.BrowseName.Name):
148
            continue
149
        logger.warning("Registring structure %s %s", desc.NodeId, desc.BrowseName)
150
        node = server.get_node(desc.NodeId)
151
        try:
152
            env = await _generate_object(node)
153
            name = desc.BrowseName.Name
154
            ua.register_extension_object(name, desc.NodeId, env[name])
155
        except ua.uaerrors.BadAttributeIdInvalid:
156
            logger.warning("%s has no DataTypeDefinition atttribute", node)
157
        except Exception:
158
            logger.exception("Error getting datatype for node %s", node)
159
160
161
162
async def make_enum_code(data_type_node):
163
    """
164
    if node has a DataTypeDefinition arttribute, generate enum code
165
    """
166
    edef = await data_type_node.read_data_type_definition()
167
    name = clean_name((await data_type_node.read_browse_name()).Name)
168
    code = f"""
169
170
class {name}(IntEnum):
171
172
    '''
173
    {name} EnumInt autogenerated from xml
174
    '''
175
176
"""
177
178
    for field in edef.Fields:
179
        name = field.Name
180
        value = field.Value
181
        code += f"    {name} = {value}\n"
182
183
    return code
184
185
186
187
async def load_enums(server, base_node=None):
188
    if base_node is None:
189
        base_node = server.nodes.enum_data_type
190
    for desc in await base_node.get_children_descriptions(refs=ua.ObjectIds.HasSubtype):
191
        if hasattr(ua, desc.BrowseName.Name):
192
            continue
193
        logger.warning("Registring Enum %s %s", desc.NodeId, desc.BrowseName)
194
        node = server.get_node(desc.NodeId)
195
        await _generate_object(node)
196
197
198