asyncua.common.structures104.make_structure_code()   F
last analyzed

Complexity

Conditions 18

Size

Total Lines 72
Code Lines 41

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 18
eloc 41
nop 3
dl 0
loc 72
rs 1.2
c 0
b 0
f 0

How to fix   Long Method    Complexity   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

Complexity

Complex classes like asyncua.common.structures104.make_structure_code() often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
from enum import Enum
2
from datetime import datetime
3
import uuid
4
from enum import IntEnum
5
import logging
6
import re
7
8
from asyncua import ua
9
10
logger = logging.getLogger(__name__)
11
12
13
def clean_name(name):
14
    """
15
    Remove characters that might be present in  OPC UA structures
16
    but cannot be part of of Python class names
17
    """
18
    newname = re.sub(r'\W+', '_', name)
19
    newname = re.sub(r'^[0-9]+', r'_\g<0>', newname)
20
21
    if name != newname:
22
        logger.warning("renamed %s to %s due to Python syntax", name, newname)
23
    return newname
24
25
26
def get_default_value(uatype, enums=None):
27
    if enums is None:
28
        enums = {}
29
    if uatype == "String":
30
        return "None"
31
    elif uatype == "Guid":
32
        return "uuid.uuid4()"
33
    elif uatype in ("ByteString", "CharArray", "Char"):
34
        return b''
35
    elif uatype == "Boolean":
36
        return "True"
37
    elif uatype == "DateTime":
38
        return "datetime.utcnow()"
39
    elif uatype in ("Int16", "Int32", "Int64", "UInt16", "UInt32", "UInt64", "Double", "Float", "Byte", "SByte"):
40
        return 0
41
    elif uatype in enums:
42
        return f"ua.{uatype}({enums[uatype]})"
43
    elif hasattr(ua, uatype) and issubclass(getattr(ua, uatype), Enum):
44
        # We have an enum, try to initilize it correctly
45
        val = list(getattr(ua, uatype).__members__)[0]
46
        return f"ua.{uatype}.{val}"
47
    else:
48
        return f"ua.{uatype}()"
49
50
51
def make_structure_code(data_type, name, sdef):
52
    """
53
    given a StructureDefinition object, generate Python code
54
    """
55
    if sdef.StructureType not in (ua.StructureType.Structure, ua.StructureType.StructureWithOptionalFields):
56
        # if sdef.StructureType != ua.StructureType.Structure:
57
        raise NotImplementedError(f"Only StructureType implemented, not {ua.StructureType(sdef.StructureType).name} for node {name} with DataTypdeDefinition {sdef}")
58
59
    code = f"""
60
61
class {name}:
62
63
    '''
64
    {name} structure autogenerated from StructureDefinition object
65
    '''
66
67
    data_type = ua.NodeId({data_type.Identifier}, {data_type.NamespaceIndex})
68
69
"""
70
    counter = 0
71
    # FIXME: with subscturutre weprobably need to add all fields from parents
72
    # this requires network call etc...
73
    if sdef.StructureType == ua.StructureType.StructureWithOptionalFields:
74
        code += '    ua_switches = {\n'
75
        for field in sdef.Fields:
76
77
            if field.IsOptional:
78
                code += f"        '{field.Name}': ('Encoding', {counter}),\n"
79
                counter += 1
80
        code += "    }\n\n"
81
82
    code += '    ua_types = [\n'
83
    if sdef.StructureType == ua.StructureType.StructureWithOptionalFields:
84
        code += "        ('Encoding', 'Byte'),\n"
85
    uatypes = []
86
    for field in sdef.Fields:
87
        prefix = 'ListOf' if field.ValueRank >= 1 else ''
88
        if field.DataType.NamespaceIndex == 0 and field.DataType.Identifier in ua.ObjectIdNames:
89
            uatype = ua.ObjectIdNames[field.DataType.Identifier]
90
        elif field.DataType in ua.extension_objects_by_datatype:
91
            uatype = ua.extension_objects_by_datatype[field.DataType].__name__
92
        elif field.DataType in ua.enums_by_datatype:
93
            uatype = ua.enums_by_datatype[field.DataType].__name__
94
        else:
95
            # FIXME: we are probably missing many custom tyes here based on builtin types
96
            # maybe we can use ua_utils.get_base_data_type()
97
            raise RuntimeError(f"Unknown datatype for field: {field} in structure:{name}, please report")
98
        if field.ValueRank >= 1 and uatype == 'Char':
99
            uatype = 'String'
100
        uatypes.append((field, uatype))
101
        code += f"        ('{field.Name}', '{prefix + uatype}'),\n"
102
    code += "    ]\n"
103
    code += f"""
104
    def __str__(self):
105
        vals = [f"{{name}}:{{val}}" for name, val in self.__dict__.items()]
106
        return f"{name}({{','.join(vals)}})"
107
108
    __repr__ = __str__
109
110
    def __init__(self):
111
"""
112
    if not sdef.Fields:
113
        code += "      pass"
114
    if sdef.StructureType == ua.StructureType.StructureWithOptionalFields:
115
        code += "        self.Encoding = 0\n"
116
    for field, uatype in uatypes:
117
        if field.ValueRank >= 1:
118
            default_value = "[]"
119
        else:
120
            default_value = get_default_value(uatype)
121
        code += f"        self.{field.Name} = {default_value}\n"
122
    return code
123
124
125
async def _generate_object(name, sdef, data_type=None, env=None, enum=False):
126
    """
127
    generate Python code and execute in a new environment
128
    return a dict of structures {name: class}
129
    Rmw: Since the code is generated on the fly, in case of error the stack trace is
130
    not available and debugging is very hard...
131
    """
132
    if env is None:
133
        env = {}
134
    #  Add the required libraries to dict
135
    if "ua" not in env:
136
        env['ua'] = ua
137
    if "datetime" not in env:
138
        env['datetime'] = datetime
139
    if "uuid" not in env:
140
        env['uuid'] = uuid
141
    if "enum" not in env:
142
        env['IntEnum'] = IntEnum
143
    # generate classe add it to env dict
144
    if enum:
145
        code = make_enum_code(name, sdef)
146
    else:
147
        code = make_structure_code(data_type, name, sdef)
148
    logger.debug("Executing code: %s", code)
149
    print("CODE", code)
150
    exec(code, env)
151
    return env
152
153
154
class DataTypeSorter:
155
    def __init__(self, data_type, name, desc, sdef):
156
        self.data_type = data_type
157
        self.name = name
158
        self.desc = desc
159
        self.sdef = sdef
160
        self.encoding_id = self.sdef.DefaultEncodingId
161
        self.deps = [field.DataType for field in self.sdef.Fields]
162
163
    def __lt__(self, other):
164
        if self.desc.NodeId in other.deps:
165
            return True
166
        return False
167
168
    def __str__(self):
169
        return f"{self.__class__.__name__}({self.desc.NodeId, self.deps, self.encoding_id})"
170
171
    __repr__ = __str__
172
173
174
async def _recursive_parse(server, base_node, dtypes, parent_sdef=None):
175
    for desc in await base_node.get_children_descriptions(refs=ua.ObjectIds.HasSubtype):
176
        sdef = await _read_data_type_definition(server, desc)
177
        if not sdef:
178
            continue
179
        name = clean_name(desc.BrowseName.Name)
180
        if parent_sdef:
181
            for field in reversed(parent_sdef.Fields):
182
                sdef.Fields.insert(0, field)
183
        dtypes.append(DataTypeSorter(desc.NodeId, name, desc, sdef))
184
        await _recursive_parse(server, server.get_node(desc.NodeId), dtypes, parent_sdef=sdef)
185
186
187
async def load_data_type_definitions(server, base_node=None):
188
    await load_enums(server)  # we need all enums to generate structure code
189
    if base_node is None:
190
        base_node = server.nodes.base_structure_type
191
    dtypes = []
192
    await _recursive_parse(server, base_node, dtypes)
193
    dtypes.sort()
194
    for dts in dtypes:
195
        try:
196
            env = await _generate_object(dts.name, dts.sdef, data_type=dts.data_type)
197
            ua.register_extension_object(dts.name, dts.encoding_id, env[dts.name], dts.desc.NodeId)
198
        except NotImplementedError:
199
            logger.exception("Structure type %s not implemented", dts.sdef)
200
201
202
async def _read_data_type_definition(server, desc):
203
    if desc.BrowseName.Name == "FilterOperand":
204
        #FIXME: find out why that one is not in ua namespace...
205
        return None
206
    # FIXME: this is fishy, we may have same name in different Namespaces
207
    if hasattr(ua, desc.BrowseName.Name):
208
        return None
209
    logger.warning("Registring data type %s %s", desc.NodeId, desc.BrowseName)
210
    node = server.get_node(desc.NodeId)
211
    try:
212
        sdef = await node.read_data_type_definition()
213
    except ua.uaerrors.BadAttributeIdInvalid:
214
        logger.warning("%s has no DataTypeDefinition atttribute", node)
215
        return None
216
    except Exception:
217
        logger.exception("Error getting datatype for node %s", node)
218
        return None
219
    return sdef
220
221
222
def make_enum_code(name, edef):
223
    """
224
    if node has a DataTypeDefinition arttribute, generate enum code
225
    """
226
    code = f"""
227
228
class {name}(IntEnum):
229
230
    '''
231
    {name} EnumInt autogenerated from EnumDefinition
232
    '''
233
234
"""
235
236
    for field in edef.Fields:
237
        name = field.Name
238
        value = field.Value
239
        code += f"    {name} = {value}\n"
240
241
    return code
242
243
244
async def load_enums(server, base_node=None):
245
    if base_node is None:
246
        base_node = server.nodes.enum_data_type
247
    for desc in await base_node.get_children_descriptions(refs=ua.ObjectIds.HasSubtype):
248
        if hasattr(ua, desc.BrowseName.Name):
249
            continue
250
        logger.warning("Registring Enum %s %s", desc.NodeId, desc.BrowseName)
251
        name = clean_name(desc.BrowseName.Name)
252
        edef = await _read_data_type_definition(server, desc)
253
        if not edef:
254
            continue
255
        env = await _generate_object(name, edef, enum=True)
256
        ua.register_enum(name, desc.NodeId, env[name])
257