Passed
Pull Request — master (#251)
by Olivier
02:33
created

asyncua.common.structures104.clean_name()   A

Complexity

Conditions 2

Size

Total Lines 11
Code Lines 6

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 2
eloc 6
nop 1
dl 0
loc 11
rs 10
c 0
b 0
f 0
1
from enum import Enum
2
from datetime import datetime
3
import uuid
4
from enum import IntEnum
5
import logging
6
import re
7
8
from asyncua import ua
9
10
logger = logging.getLogger(__name__)
11
12
13
def clean_name(name):
14
    """
15
    Remove characters that might be present in  OPC UA structures
16
    but cannot be part of of Python class names
17
    """
18
    newname = re.sub(r'\W+', '_', name)
19
    newname = re.sub(r'^[0-9]+', r'_\g<0>', newname)
20
21
    if name != newname:
22
        logger.warning("renamed %s to %s due to Python syntax", name, newname)
23
    return newname
24
25
26
def get_default_value(uatype, enums=None):
27
    if enums is None:
28
        enums = {}
29
    if uatype == "String":
30
        return "None"
31
    elif uatype == "Guid":
32
        return "uuid.uuid4()"
33
    elif uatype in ("ByteString", "CharArray", "Char"):
34
        return b''
35
    elif uatype == "Boolean":
36
        return "True"
37
    elif uatype == "DateTime":
38
        return "datetime.utcnow()"
39
    elif uatype in ("Int16", "Int32", "Int64", "UInt16", "UInt32", "UInt64", "Double", "Float", "Byte", "SByte"):
40
        return 0
41
    elif uatype in enums:
42
        return f"ua.{uatype}({enums[uatype]})"
43
    elif hasattr(ua, uatype) and issubclass(getattr(ua, uatype), Enum):
44
        # We have an enum, try to initilize it correctly
45
        val = list(getattr(ua, uatype).__members__)[0]
46
        return f"ua.{uatype}.{val}"
47
    else:
48
        return f"ua.{uatype}()"
49
50
51
def make_structure_code(data_type, name, sdef):
52
    """
53
    given a StructureDefinition object, generate Python code
54
    """
55
    if sdef.StructureType not in (ua.StructureType.Structure, ua.StructureType.StructureWithOptionalFields):
56
        #if sdef.StructureType != ua.StructureType.Structure:
57
        raise NotImplementedError(f"Only StructureType implemented, not {ua.StructureType(sdef.StructureType).name} for node {name} with DataTypdeDefinition {sdef}")
58
59
    code = f"""
60
61
class {name}:
62
63
    '''
64
    {name} structure autogenerated from StructureDefinition object
65
    '''
66
67
    data_type = ua.NodeId({data_type.Identifier}, {data_type.NamespaceIndex})
68
69
"""
70
    counter = 0
71
    # FIXME: with subscturutre weprobably need to add all fields from parents
72
    # this requires network call etc...
73
    if sdef.StructureType == ua.StructureType.StructureWithOptionalFields:
74
        code += '    ua_switches = {\n'
75
        for field in sdef.Fields:
76
77
            if field.IsOptional:
78
                code += f"        '{field.Name}': ('Encoding', {counter}),\n"
79
                counter += 1
80
        code += "    }\n\n"
81
82
    code += '    ua_types = [\n'
83
    if sdef.StructureType == ua.StructureType.StructureWithOptionalFields:
84
        code += f"        ('Encoding', 'Byte'),\n"
85
    uatypes = []
86
    for field in sdef.Fields:
87
        prefix = 'ListOf' if field.ValueRank >= 1 else ''
88
        if field.DataType.NamespaceIndex == 0 and field.DataType.Identifier in ua.ObjectIdNames:
89
            uatype = ua.ObjectIdNames[field.DataType.Identifier]
90
        elif field.DataType in ua.extension_objects_by_datatype:
91
            uatype = ua.extension_objects_by_datatype[field.DataType].__name__
92
        elif field.DataType in ua.enums_by_datatype:
93
            uatype = ua.enums_by_datatype[field.DataType].__name__
94
        else:
95
            #FIXME: we are probably missing many custom tyes here based on builtin types
96
            #maybe we can use ua_utils.get_base_data_type()
97
            raise RuntimeError(f"Unknown datatype for field: {field} in structure:{name}, please report")
98
        if field.ValueRank >= 1 and uatype == 'Char':
99
            uatype = 'String'
100
        uatypes.append((field, uatype))
101
        code += f"        ('{field.Name}', '{prefix + uatype}'),\n"
102
    code += "    ]\n"
103
    code += f"""
104
    def __str__(self):
105
        vals = [f"{{name}}:{{val}}" for name, val in self.__dict__.items()]
106
        return f"{name}({{','.join(vals)}})"
107
108
    __repr__ = __str__
109
110
    def __init__(self):
111
"""
112
    if not sdef.Fields:
113
        code += "      pass"
114
    if sdef.StructureType == ua.StructureType.StructureWithOptionalFields:
115
        code += f"        self.Encoding = 0\n"
116
    for field, uatype in uatypes:
117
        if field.ValueRank >= 1:
118
            default_value = "[]"
119
        else:
120
            default_value = get_default_value(uatype)
121
        code += f"        self.{field.Name} = {default_value}\n"
122
    print("CODE", code)
123
    return code
124
125
126
async def _generate_object(name, sdef, data_type=None, env=None, enum=False):
127
    """
128
    generate Python code and execute in a new environment
129
    return a dict of structures {name: class}
130
    Rmw: Since the code is generated on the fly, in case of error the stack trace is
131
    not available and debugging is very hard...
132
    """
133
    if env is None:
134
        env = {}
135
    #  Add the required libraries to dict
136
    if "ua" not in env:
137
        env['ua'] = ua
138
    if "datetime" not in env:
139
        env['datetime'] = datetime
140
    if "uuid" not in env:
141
        env['uuid'] = uuid
142
    if "enum" not in env:
143
        env['IntEnum'] = IntEnum
144
    # generate classe add it to env dict
145
    if enum:
146
        code = make_enum_code(name, sdef)
147
    else:
148
        code = make_structure_code(data_type, name, sdef)
149
    logger.debug("Executing code: %s", code)
150
    print("CODE", code)
151
    exec(code, env)
152
    return env
153
154
155
class DataTypeSorter:
156
    def __init__(self, data_type, name, desc, sdef):
157
        self.data_type = data_type
158
        self.name = name
159
        self.desc = desc
160
        self.sdef = sdef
161
        self.encoding_id = self.sdef.DefaultEncodingId
162
        self.deps = [field.DataType for field in self.sdef.Fields]
163
164
    def __lt__(self, other):
165
        if self.desc.NodeId in other.deps:
166
            return True
167
        return False
168
169
    def __str__(self):
170
        return f"{self.__class__.__name__}({self.desc.NodeId, self.deps, self.encoding_id})"
171
172
    __repr__ = __str__
173
174
175
async def _recursive_parse(server, base_node, dtypes, parent_sdef=None):
176
    for desc in await base_node.get_children_descriptions(refs=ua.ObjectIds.HasSubtype):
177
        sdef = await _read_data_type_definition(server, desc)
178
        if not sdef:
179
            continue
180
        name = clean_name(desc.BrowseName.Name)
181
        if parent_sdef:
182
            for field in reversed(parent_sdef.Fields):
183
                sdef.Fields.insert(0, field)
184
        dtypes.append(DataTypeSorter(desc.NodeId, name, desc, sdef))
185
        await _recursive_parse(server, server.get_node(desc.NodeId), dtypes, parent_sdef=sdef)
186
187
188
async def load_data_type_definitions(server, base_node=None):
189
    await load_enums(server)  # we need all enums to generate structure code
190
    if base_node is None:
191
        base_node = server.nodes.base_structure_type
192
    dtypes = []
193
    await _recursive_parse(server, base_node, dtypes)
194
    dtypes.sort()
195
    for dts in dtypes:
196
        try:
197
            env = await _generate_object(dts.name, dts.sdef, data_type=dts.data_type)
198
            ua.register_extension_object(dts.name, dts.encoding_id, env[dts.name], dts.desc.NodeId)
199
        except NotImplementedError:
200
            logger.exception("Structure type %s not implemented", dts.sdef)
201
202
203
async def _read_data_type_definition(server, desc):
204
    if desc.BrowseName.Name == "FilterOperand":
205
        #FIXME: find out why that one is not in ua namespace...
206
        return None
207
    # FIXME: this is fishy, we may have same name in different Namespaces
208
    if hasattr(ua, desc.BrowseName.Name):
209
        return None
210
    logger.warning("Registring data type %s %s", desc.NodeId, desc.BrowseName)
211
    node = server.get_node(desc.NodeId)
212
    try:
213
        sdef = await node.read_data_type_definition()
214
    except ua.uaerrors.BadAttributeIdInvalid:
215
        logger.warning("%s has no DataTypeDefinition atttribute", node)
216
        return None
217
    except Exception:
218
        logger.exception("Error getting datatype for node %s", node)
219
        return None
220
    return sdef
221
222
223
def make_enum_code(name, edef):
224
    """
225
    if node has a DataTypeDefinition arttribute, generate enum code
226
    """
227
    code = f"""
228
229
class {name}(IntEnum):
230
231
    '''
232
    {name} EnumInt autogenerated from EnumDefinition
233
    '''
234
235
"""
236
237
    for field in edef.Fields:
238
        name = field.Name
239
        value = field.Value
240
        code += f"    {name} = {value}\n"
241
242
    return code
243
244
245
async def load_enums(server, base_node=None):
246
    if base_node is None:
247
        base_node = server.nodes.enum_data_type
248
    for desc in await base_node.get_children_descriptions(refs=ua.ObjectIds.HasSubtype):
249
        if hasattr(ua, desc.BrowseName.Name):
250
            continue
251
        logger.warning("Registring Enum %s %s", desc.NodeId, desc.BrowseName)
252
        name = clean_name(desc.BrowseName.Name)
253
        sdef = await _read_data_type_definition(server, desc)
254
        if not sdef:
255
            continue
256
        node = server.get_node(desc.NodeId)
257
        env = await _generate_object(name, sdef, enum=True)
258
        ua.register_enum(name, desc.NodeId, env[name])
259