Passed
Pull Request — master (#238)
by Olivier
02:17
created

asyncua.common.structures104._generate_object()   B

Complexity

Conditions 7

Size

Total Lines 27
Code Lines 18

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 7
eloc 18
nop 4
dl 0
loc 27
rs 8
c 0
b 0
f 0
1
from enum import Enum
2
from datetime import datetime
3
import uuid
4
from enum import IntEnum
5
import logging
6
import re
7
8
from asyncua import ua
9
10
logger = logging.getLogger(__name__)
11
12
13
def clean_name(name):
14
    """
15
    Remove characters that might be present in  OPC UA structures
16
    but cannot be part of of Python class names
17
    """
18
    newname = re.sub(r'\W+', '_', name)
19
    newname = re.sub(r'^[0-9]+', r'_\g<0>', newname)
20
21
    if name != newname:
22
        logger.warning("renamed %s to %s due to Python syntax", name, newname)
23
    return newname
24
25
26
def get_default_value(uatype, enums=None):
27
    if enums is None:
28
        enums = {}
29
    if uatype == "String":
30
        return "None"
31
    elif uatype == "Guid":
32
        return "uuid.uuid4()"
33
    elif uatype in ("ByteString", "CharArray", "Char"):
34
        return b''
35
    elif uatype == "Boolean":
36
        return "True"
37
    elif uatype == "DateTime":
38
        return "datetime.utcnow()"
39
    elif uatype in ("Int16", "Int32", "Int64", "UInt16", "UInt32", "UInt64", "Double", "Float", "Byte", "SByte"):
40
        return 0
41
    elif uatype in enums:
42
        return f"ua.{uatype}({enums[uatype]})"
43
    elif hasattr(ua, uatype) and issubclass(getattr(ua, uatype), Enum):
44
        # We have an enum, try to initilize it correctly
45
        val = list(getattr(ua, uatype).__members__)[0]
46
        return f"ua.{uatype}.{val}"
47
    else:
48
        return f"ua.{uatype}()"
49
50
51
def make_structure_code(name, sdef):
52
    """
53
    given a StructureDefinition object, generate Python code
54
    """
55
    if sdef.StructureType not in (ua.StructureType.Structure, ua.StructureType.StructureWithOptionalFields):
56
        #if sdef.StructureType != ua.StructureType.Structure:
57
        raise NotImplementedError(f"Only StructureType implemented, not {ua.StructureType(sdef.StructureType).name} for node {name} with DataTypdeDefinition {sdef}")
58
59
    code = f"""
60
61
class {name}:
62
63
    '''
64
    {name} structure autogenerated from StructureDefinition object
65
    '''
66
67
"""
68
    counter = 0
69
    if sdef.StructureType == ua.StructureType.StructureWithOptionalFields:
70
        code += '    ua_switches = {\n'
71
        for field in sdef.Fields:
72
73
            if field.IsOptional:
74
                code += f"        '{field.Name}': ('Encoding', {counter}),\n"
75
                counter += 1
76
        code += "    }\n\n"
77
78
    code += '    ua_types = [\n'
79
    uatypes = []
80
    for field in sdef.Fields:
81
        prefix = 'ListOf' if field.ValueRank >= 1 else ''
82
        if field.DataType.NamespaceIndex == 0 and field.DataType.Identifier in ua.ObjectIdNames:
83
            uatype = ua.ObjectIdNames[field.DataType.Identifier]
84
        elif field.DataType in ua.extension_objects_by_datatype:
85
            uatype = ua.extension_objects_by_datatype[field.DataType].__name__
86
        elif field.DataType in ua.enums_by_datatype:
87
            uatype = ua.enums_by_datatype[field.DataType].__name__
88
        else:
89
            raise RuntimeError(f"Unknown datatype for field: {field} in structure:{name}")
90
        if field.ValueRank >= 1 and uatype == 'Char':
91
            uatype = 'String'
92
        uatypes.append((field, uatype))
93
        code += f"        ('{field.Name}', '{prefix + uatype}'),\n"
94
    code += "    ]\n"
95
    code += f"""
96
    def __str__(self):
97
        vals = [f"{{name}}:{{val}}" for name, val in self.__dict__.items()]
98
        return f"{name}({{','.join(vals)}})"
99
100
    __repr__ = __str__
101
102
    def __init__(self):
103
"""
104
    if not sdef.Fields:
105
        code += "      pass"
106
    for field, uatype in uatypes:
107
        if field.ValueRank >= 1:
108
            default_value = "[]"
109
        else:
110
            default_value = get_default_value(uatype)
111
        code += f"        self.{field.Name} = {default_value}\n"
112
    print("CODE", code)
113
    return code
114
115
116
async def _generate_object(name, sdef, env=None, enum=False):
117
    """
118
    generate Python code and execute in a new environment
119
    return a dict of structures {name: class}
120
    Rmw: Since the code is generated on the fly, in case of error the stack trace is
121
    not available and debugging is very hard...
122
    """
123
    if env is None:
124
        env = {}
125
    #  Add the required libraries to dict
126
    if "ua" not in env:
127
        env['ua'] = ua
128
    if "datetime" not in env:
129
        env['datetime'] = datetime
130
    if "uuid" not in env:
131
        env['uuid'] = uuid
132
    if "enum" not in env:
133
        env['IntEnum'] = IntEnum
134
    # generate classe add it to env dict
135
    if enum:
136
        code = make_enum_code(name, sdef)
137
    else:
138
        code = make_structure_code(name, sdef)
139
    logger.debug("Executing code: %s", code)
140
    print("CODE", code)
141
    exec(code, env)
142
    return env
143
144
145
class DataTypeSorter:
146
    def __init__(self, name, desc, sdef):
147
        self.name = name
148
        self.desc = desc
149
        self.sdef = sdef
150
        self.encoding_id = self.sdef.DefaultEncodingId
151
        self.deps = [field.DataType for field in self.sdef.Fields]
152
153
    def __lt__(self, other):
154
        if self.desc.NodeId in other.deps:
155
            return True
156
        return False
157
158
    def __str__(self):
159
        return f"{self.__class__.__name__}({self.desc.NodeId, self.deps, self.encoding_id})"
160
161
    __repr__ = __str__
162
163
164
async def _recursive_parse(server, base_node, dtypes):
165
    for desc in await base_node.get_children_descriptions(refs=ua.ObjectIds.HasSubtype):
166
        sdef = await _read_data_type_definition(server, desc)
167
        if not sdef:
168
            continue
169
        name = clean_name(desc.BrowseName.Name)
170
        dtypes.append(DataTypeSorter(name, desc, sdef))
171
        await _recursive_parse(server, server.get_node(desc.NodeId), dtypes)
172
173
174
async def load_data_type_definitions(server, base_node=None):
175
    await load_enums(server)  # we need all enums to generate structure code
176
    if base_node is None:
177
        base_node = server.nodes.base_structure_type
178
    dtypes = []
179
    await _recursive_parse(server, base_node, dtypes)
180
    for dts in dtypes:
181
        try:
182
            env = await _generate_object(dts.name, dts.sdef)
183
            ua.register_extension_object(dts.name, dts.encoding_id, env[dts.name], dts.desc.NodeId)
184
        except NotImplementedError:
185
            logger.exception("Structure type %s not implemented", dts.sdef)
186
187
188
async def _read_data_type_definition(server, desc):
189
    if desc.BrowseName.Name == "FilterOperand":
190
        #FIXME: find out why that one is not in ua namespace...
191
        return None
192
    # FIXME: this is fishy, we may have same name in different Namespaces
193
    if hasattr(ua, desc.BrowseName.Name):
194
        return None
195
    logger.warning("Registring data type %s %s", desc.NodeId, desc.BrowseName)
196
    node = server.get_node(desc.NodeId)
197
    try:
198
        sdef = await node.read_data_type_definition()
199
    except ua.uaerrors.BadAttributeIdInvalid:
200
        logger.warning("%s has no DataTypeDefinition atttribute", node)
201
        return None
202
    except Exception:
203
        logger.exception("Error getting datatype for node %s", node)
204
        return None
205
    return sdef
206
207
208
def make_enum_code(name, edef):
209
    """
210
    if node has a DataTypeDefinition arttribute, generate enum code
211
    """
212
    code = f"""
213
214
class {name}(IntEnum):
215
216
    '''
217
    {name} EnumInt autogenerated from EnumDefinition
218
    '''
219
220
"""
221
222
    for field in edef.Fields:
223
        name = field.Name
224
        value = field.Value
225
        code += f"    {name} = {value}\n"
226
227
    return code
228
229
230
async def load_enums(server, base_node=None):
231
    if base_node is None:
232
        base_node = server.nodes.enum_data_type
233
    for desc in await base_node.get_children_descriptions(refs=ua.ObjectIds.HasSubtype):
234
        if hasattr(ua, desc.BrowseName.Name):
235
            continue
236
        logger.warning("Registring Enum %s %s", desc.NodeId, desc.BrowseName)
237
        name = clean_name(desc.BrowseName.Name)
238
        sdef = await _read_data_type_definition(server, desc)
239
        if not sdef:
240
            continue
241
        node = server.get_node(desc.NodeId)
242
        env = await _generate_object(name, sdef, enum=True)
243
        ua.register_enum(name, desc.NodeId, env[name])
244