Passed
Pull Request — master (#238)
by Olivier
02:10
created

asyncua.common.structures104   A

Complexity

Total Complexity 38

Size/Duplication

Total Lines 173
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
eloc 106
dl 0
loc 173
rs 9.36
c 0
b 0
f 0
wmc 38

7 Functions

Rating   Name   Duplication   Size   Complexity  
A load_enums() 0 9 4
C get_default_value() 0 23 11
A make_enum_code() 0 22 2
A load_data_type_definitions() 0 12 5
A clean_name() 0 9 1
B _generate_object() 0 26 7
B make_structure_code() 0 45 8
1
from enum import Enum
2
from datetime import datetime
3
import uuid
4
from enum import IntEnum
5
import logging
6
import re
7
8
from asyncua import ua
9
10
11
logger = logging.getLogger(__name__)
12
13
def clean_name(name):
14
    """
15
    Remove characters that might be present in  OPC UA structures
16
    but cannot be part of of Python class names
17
    """
18
    name = re.sub(r'\W+', '_', name)
19
    name = re.sub(r'^[0-9]+', r'_\g<0>', name)
20
21
    return name
22
23
24
25
def get_default_value(uatype, enums=None):
26
    if enums is None:
27
        enums = {}
28
    if uatype == "String":
29
        return "None"
30
    elif uatype == "Guid":
31
        return "uuid.uuid4()"
32
    elif uatype in ("ByteString", "CharArray", "Char"):
33
        return b''
34
    elif uatype == "Boolean":
35
        return "True"
36
    elif uatype == "DateTime":
37
        return "datetime.utcnow()"
38
    elif uatype in ("Int16", "Int32", "Int64", "UInt16", "UInt32", "UInt64", "Double", "Float", "Byte", "SByte"):
39
        return 0
40
    elif uatype in enums:
41
        return f"ua.{uatype}({enums[uatype]})"
42
    elif hasattr(ua, uatype) and issubclass(getattr(ua, uatype), Enum):
43
        # We have an enum, try to initilize it correctly
44
        val = list(getattr(ua, uatype).__members__)[0]
45
        return f"ua.{uatype}.{val}"
46
    else:
47
        return f"ua.{uatype}()"
48
49
50
async def make_structure_code(data_type_node):
51
    """
52
    given a StructureDefinition object, generate Python code
53
    """
54
    sdef = await data_type_node.read_data_type_definition()
55
    name = clean_name((await data_type_node.read_browse_name()).Name)
56
    if sdef.StructureType != ua.StructureType.Structure:
57
        raise NotImplementedError("Only StructureType implemented")
58
59
    code = f"""
60
61
class {name}:
62
63
    '''
64
    {name} structure autogenerated from StructureDefinition object
65
    '''
66
67
"""
68
    code += '    ua_types = [\n'
69
    uatypes = []
70
    for field in sdef.Fields:
71
        prefix = 'ListOf' if field.ValueRank >= 1 else ''
72
        if field.DataType.Identifier not in ua.ObjectIdNames:
73
            raise RuntimeError(f"Unknown  field datatype for field: {field} in structure:{sdef.Name}")
74
        uatype = prefix + ua.ObjectIdNames[field.DataType.Identifier]
75
        if uatype == 'ListOfChar':
76
            uatype = 'String'
77
        uatypes.append((field, uatype))
78
        code += f"        ('{field.Name}', '{uatype}'),\n"
79
    code += "    ]"
80
    code += """
81
    def __str__(self):
82
        vals = [name + ": " + str(val) for name, val in self.__dict__.items()]
83
        return self.__class__.__name__ + "(" + ", ".join(vals) + ")"
84
85
    __repr__ = __str__
86
87
    def __init__(self):
88
"""
89
    if not sdef.Fields:
90
        code += "      pass"
91
    for field, uatype in uatypes:
92
        default_value = get_default_value(uatype)
93
        code += f"        self.{field.Name} = {default_value}\n"
94
    return code
95
96
97
async def _generate_object(data_type_node, env=None, enum=False):
98
    """
99
    generate Python code and execute in a new environment
100
    return a dict of structures {name: class}
101
    Rmw: Since the code is generated on the fly, in case of error the stack trace is
102
    not available and debugging is very hard...
103
    """
104
    if env is None:
105
        env = {}
106
    #  Add the required libraries to dict
107
    if "ua" not in env:
108
        env['ua'] = ua
109
    if "datetime" not in env:
110
        env['datetime'] = datetime
111
    if "uuid" not in env:
112
        env['uuid'] = uuid
113
    if "enum" not in env:
114
        env['IntEnum'] = IntEnum
115
    # generate classe add it to env dict
116
    if enum:
117
        code = await make_enum_code(data_type_node)
118
    else:
119
        code = await make_structure_code(data_type_node)
120
    logger.debug("Executing code: %s", code)
121
    exec(code, env)
122
    return env
123
124
125
async def load_data_type_definitions(server, base_node=None):
126
    if base_node is None:
127
        base_node = server.nodes.base_structure_type
128
    for desc in await base_node.get_children_descriptions(refs=ua.ObjectIds.HasSubtype):
129
        if desc.BrowseName.Name == "FilterOperand":
130
            #FIXME: find out why that one is not in ua namespace...
131
            continue
132
        if hasattr(ua, desc.BrowseName.Name):
133
            continue
134
        logger.warning("Registring structure %s %s", desc.NodeId, desc.BrowseName)
135
        node = server.get_node(desc.NodeId)
136
        await _generate_object(node)
137
138
139
async def make_enum_code(data_type_node):
140
    """
141
    if node has a DataTypeDefinition arttribute, generate enum code
142
    """
143
    edef = await data_type_node.read_data_type_definition()
144
    name = clean_name((await data_type_node.read_browse_name()).Name)
145
    code = f"""
146
147
class {name}(IntEnum):
148
149
    '''
150
    {name} EnumInt autogenerated from xml
151
    '''
152
153
"""
154
155
    for field in edef.Fields:
156
        name = field.Name
157
        value = field.Value
158
        code += f"    {name} = {value}\n"
159
160
    return code
161
162
163
164
async def load_enums(server, base_node=None):
165
    if base_node is None:
166
        base_node = server.nodes.enum_data_type
167
    for desc in await base_node.get_children_descriptions(refs=ua.ObjectIds.HasSubtype):
168
        if hasattr(ua, desc.BrowseName.Name):
169
            continue
170
        logger.warning("Registring Enum %s %s", desc.NodeId, desc.BrowseName)
171
        node = server.get_node(desc.NodeId)
172
        await _generate_object(node)
173
174
175