Passed
Pull Request — master (#238)
by Olivier
02:38
created

asyncua.common.structures104._generate_object()   B

Complexity

Conditions 7

Size

Total Lines 27
Code Lines 18

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 7
eloc 18
nop 4
dl 0
loc 27
rs 8
c 0
b 0
f 0
1
from enum import Enum
2
from datetime import datetime
3
import uuid
4
from enum import IntEnum
5
import logging
6
import re
7
8
from asyncua import ua
9
10
logger = logging.getLogger(__name__)
11
12
13
def clean_name(name):
14
    """
15
    Remove characters that might be present in  OPC UA structures
16
    but cannot be part of of Python class names
17
    """
18
    newname = re.sub(r'\W+', '_', name)
19
    newname = re.sub(r'^[0-9]+', r'_\g<0>', newname)
20
21
    if name != newname:
22
        logger.warning("renamed %s to %s due to Python syntax", name, newname)
23
    return newname
24
25
26
def get_default_value(uatype, enums=None):
27
    if enums is None:
28
        enums = {}
29
    if uatype == "String":
30
        return "None"
31
    elif uatype == "Guid":
32
        return "uuid.uuid4()"
33
    elif uatype in ("ByteString", "CharArray", "Char"):
34
        return b''
35
    elif uatype == "Boolean":
36
        return "True"
37
    elif uatype == "DateTime":
38
        return "datetime.utcnow()"
39
    elif uatype in ("Int16", "Int32", "Int64", "UInt16", "UInt32", "UInt64", "Double", "Float", "Byte", "SByte"):
40
        return 0
41
    elif uatype in enums:
42
        return f"ua.{uatype}({enums[uatype]})"
43
    elif hasattr(ua, uatype) and issubclass(getattr(ua, uatype), Enum):
44
        # We have an enum, try to initilize it correctly
45
        val = list(getattr(ua, uatype).__members__)[0]
46
        return f"ua.{uatype}.{val}"
47
    else:
48
        return f"ua.{uatype}()"
49
50
51
def make_structure_code(name, sdef):
52
    """
53
    given a StructureDefinition object, generate Python code
54
    """
55
    if sdef.StructureType not in (ua.StructureType.Structure, ua.StructureType.StructureWithOptionalFields):
56
        #if sdef.StructureType != ua.StructureType.Structure:
57
        raise NotImplementedError(f"Only StructureType implemented, not {ua.StructureType(sdef.StructureType).name} for node {name} with DataTypdeDefinition {sdef}")
58
59
    code = f"""
60
61
class {name}:
62
63
    '''
64
    {name} structure autogenerated from StructureDefinition object
65
    '''
66
67
"""
68
    counter = 0
69
    if sdef.StructureType == ua.StructureType.StructureWithOptionalFields:
70
        code += '    ua_switches = {\n'
71
        for field in sdef.Fields:
72
73
            if field.IsOptional:
74
                code += f"        '{field.Name}': ('Encoding', {counter}),\n"
75
                counter += 1
76
        code += "    }\n\n"
77
78
    code += '    ua_types = [\n'
79
    uatypes = []
80
    for field in sdef.Fields:
81
        prefix = 'ListOf' if field.ValueRank >= 1 else ''
82
        if field.DataType.NamespaceIndex == 0 and field.DataType.Identifier in ua.ObjectIdNames:
83
            uatype = ua.ObjectIdNames[field.DataType.Identifier]
84
        elif field.DataType in ua.extension_objects_by_datatype:
85
            uatype = ua.extension_objects_by_datatype[field.DataType].__name__
86
        elif field.DataType in ua.enums_by_datatype:
87
            uatype = ua.enums_by_datatype[field.DataType].__name__
88
        else:
89
            #FIXME: we are probably missing many custom tyes here based on builtin types
90
            #maybe we can use ua_utils.get_base_data_type()
91
            raise RuntimeError(f"Unknown datatype for field: {field} in structure:{name}, please report")
92
        if field.ValueRank >= 1 and uatype == 'Char':
93
            uatype = 'String'
94
        uatypes.append((field, uatype))
95
        code += f"        ('{field.Name}', '{prefix + uatype}'),\n"
96
    code += "    ]\n"
97
    code += f"""
98
    def __str__(self):
99
        vals = [f"{{name}}:{{val}}" for name, val in self.__dict__.items()]
100
        return f"{name}({{','.join(vals)}})"
101
102
    __repr__ = __str__
103
104
    def __init__(self):
105
"""
106
    if not sdef.Fields:
107
        code += "      pass"
108
    for field, uatype in uatypes:
109
        if field.ValueRank >= 1:
110
            default_value = "[]"
111
        else:
112
            default_value = get_default_value(uatype)
113
        code += f"        self.{field.Name} = {default_value}\n"
114
    print("CODE", code)
115
    return code
116
117
118
async def _generate_object(name, sdef, env=None, enum=False):
119
    """
120
    generate Python code and execute in a new environment
121
    return a dict of structures {name: class}
122
    Rmw: Since the code is generated on the fly, in case of error the stack trace is
123
    not available and debugging is very hard...
124
    """
125
    if env is None:
126
        env = {}
127
    #  Add the required libraries to dict
128
    if "ua" not in env:
129
        env['ua'] = ua
130
    if "datetime" not in env:
131
        env['datetime'] = datetime
132
    if "uuid" not in env:
133
        env['uuid'] = uuid
134
    if "enum" not in env:
135
        env['IntEnum'] = IntEnum
136
    # generate classe add it to env dict
137
    if enum:
138
        code = make_enum_code(name, sdef)
139
    else:
140
        code = make_structure_code(name, sdef)
141
    logger.debug("Executing code: %s", code)
142
    print("CODE", code)
143
    exec(code, env)
144
    return env
145
146
147
class DataTypeSorter:
148
    def __init__(self, name, desc, sdef):
149
        self.name = name
150
        self.desc = desc
151
        self.sdef = sdef
152
        self.encoding_id = self.sdef.DefaultEncodingId
153
        self.deps = [field.DataType for field in self.sdef.Fields]
154
155
    def __lt__(self, other):
156
        if self.desc.NodeId in other.deps:
157
            return True
158
        return False
159
160
    def __str__(self):
161
        return f"{self.__class__.__name__}({self.desc.NodeId, self.deps, self.encoding_id})"
162
163
    __repr__ = __str__
164
165
166
async def _recursive_parse(server, base_node, dtypes):
167
    for desc in await base_node.get_children_descriptions(refs=ua.ObjectIds.HasSubtype):
168
        sdef = await _read_data_type_definition(server, desc)
169
        if not sdef:
170
            continue
171
        name = clean_name(desc.BrowseName.Name)
172
        dtypes.append(DataTypeSorter(name, desc, sdef))
173
        await _recursive_parse(server, server.get_node(desc.NodeId), dtypes)
174
175
176
async def load_data_type_definitions(server, base_node=None):
177
    await load_enums(server)  # we need all enums to generate structure code
178
    if base_node is None:
179
        base_node = server.nodes.base_structure_type
180
    dtypes = []
181
    await _recursive_parse(server, base_node, dtypes)
182
    dtypes.sort()
183
    for dts in dtypes:
184
        try:
185
            env = await _generate_object(dts.name, dts.sdef)
186
            ua.register_extension_object(dts.name, dts.encoding_id, env[dts.name], dts.desc.NodeId)
187
        except NotImplementedError:
188
            logger.exception("Structure type %s not implemented", dts.sdef)
189
190
191
async def _read_data_type_definition(server, desc):
192
    if desc.BrowseName.Name == "FilterOperand":
193
        #FIXME: find out why that one is not in ua namespace...
194
        return None
195
    # FIXME: this is fishy, we may have same name in different Namespaces
196
    if hasattr(ua, desc.BrowseName.Name):
197
        return None
198
    logger.warning("Registring data type %s %s", desc.NodeId, desc.BrowseName)
199
    node = server.get_node(desc.NodeId)
200
    try:
201
        sdef = await node.read_data_type_definition()
202
    except ua.uaerrors.BadAttributeIdInvalid:
203
        logger.warning("%s has no DataTypeDefinition atttribute", node)
204
        return None
205
    except Exception:
206
        logger.exception("Error getting datatype for node %s", node)
207
        return None
208
    return sdef
209
210
211
def make_enum_code(name, edef):
212
    """
213
    if node has a DataTypeDefinition arttribute, generate enum code
214
    """
215
    code = f"""
216
217
class {name}(IntEnum):
218
219
    '''
220
    {name} EnumInt autogenerated from EnumDefinition
221
    '''
222
223
"""
224
225
    for field in edef.Fields:
226
        name = field.Name
227
        value = field.Value
228
        code += f"    {name} = {value}\n"
229
230
    return code
231
232
233
async def load_enums(server, base_node=None):
234
    if base_node is None:
235
        base_node = server.nodes.enum_data_type
236
    for desc in await base_node.get_children_descriptions(refs=ua.ObjectIds.HasSubtype):
237
        if hasattr(ua, desc.BrowseName.Name):
238
            continue
239
        logger.warning("Registring Enum %s %s", desc.NodeId, desc.BrowseName)
240
        name = clean_name(desc.BrowseName.Name)
241
        sdef = await _read_data_type_definition(server, desc)
242
        if not sdef:
243
            continue
244
        node = server.get_node(desc.NodeId)
245
        env = await _generate_object(name, sdef, enum=True)
246
        ua.register_enum(name, desc.NodeId, env[name])
247