Completed
Pull Request — master (#296)
by Olivier
04:45 queued 24s
created

XmlImporter._make_ext_obj()   B

Complexity

Conditions 6

Size

Total Lines 14

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 1
CRAP Score 34.317

Importance

Changes 0
Metric Value
cc 6
dl 0
loc 14
ccs 1
cts 13
cp 0.0769
crap 34.317
rs 8
c 0
b 0
f 0
1
"""
2
add node defined in XML to address space
3
format is the one from opc-ua specification
4
"""
5 1
import logging
6
7
8 1
from opcua import ua
9 1
from opcua.common import xmlparser
10
11
12 1
def ua_type_to_python(val, uatype):
13
    if uatype.startswith("Int") or uatype.startswith("UInt"):
14 1
        return int(val)
15 1
    elif uatype in ("String"):
16 1
        return val
17 1
    elif uatype in ("Bytes", "Bytes", "ByteString", "ByteArray"):
18
        return bytes(val, 'utf8')
19 1
    else:
20 1
        raise Exception("uatype nopt handled", uatype, " for val ", val)
21 1
22 1
23 1
def to_python(val, obj, attname):
24 1
    if isinstance(obj, ua.NodeId) and attname == "Identifier":
25 1
        return ua.NodeId.from_string(val)
26
    else:
27 1
        return ua_type_to_python(val, obj.ua_types[attname])
28 1
29
30
class XmlImporter(object):
31
32
    def __init__(self, server):
33
        self.logger = logging.getLogger(__name__)
34
        self.parser = None
35
        self.server = server
36
37
    def import_xml(self, xmlpath, act_server):
38
        """
39
        import xml and return added nodes
40 1
        """
41 1
        self.logger.info("Importing XML file %s", xmlpath)
42 1
        self.parser = xmlparser.XMLParser(xmlpath, act_server)
43 1
        nodes = []
44 1
        for nodedata in self.parser:
45 1
            if nodedata.nodetype == 'UAObject':
46 1
                node = self.add_object(nodedata)
47 1
            elif nodedata.nodetype == 'UAObjectType':
48 1
                node = self.add_object_type(nodedata)
49 1
            elif nodedata.nodetype == 'UAVariable':
50 1
                node = self.add_variable(nodedata)
51 1
            elif nodedata.nodetype == 'UAVariableType':
52
                node = self.add_variable_type(nodedata)
53 1
            elif nodedata.nodetype == 'UAReferenceType':
54 1
                node = self.add_reference(nodedata)
55
            elif nodedata.nodetype == 'UADataType':
56 1
                node = self.add_datatype(nodedata)
57
            elif nodedata.nodetype == 'UAMethod':
58
                node = self.add_method(nodedata)
59 1
            else:
60
                self.logger.warning("Not implemented node type: %s ", nodedata.nodetype)
61 1
                continue
62 1
            nodes.append(node)
63 1
        return nodes
64
65
    def _get_node(self, obj):
66 1
        node = ua.AddNodesItem()
67
        node.RequestedNewNodeId = ua.NodeId.from_string(obj.nodeid)
68 1
        node.BrowseName = ua.QualifiedName.from_string(obj.browsename)
69
        node.NodeClass = getattr(ua.NodeClass, obj.nodetype[2:])
70 1
        if obj.parent:
71 1
            node.ParentNodeId = ua.NodeId.from_string(obj.parent)
72 1
        if obj.parentlink:
73 1 View Code Duplication
            node.ReferenceTypeId = self.to_nodeid(obj.parentlink)
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
74 1
        if obj.typedef:
75 1
            node.TypeDefinition = ua.NodeId.from_string(obj.typedef)
76 1
        return node
77 1
78 1
    def to_nodeid(self, nodeid):
79 1
        if not nodeid:
80
            return ua.NodeId(ua.ObjectIds.String)
81 1
        elif "=" in nodeid:
82
            return ua.NodeId.from_string(nodeid)
83
        elif hasattr(ua.ObjectIds, nodeid):
84
            return ua.NodeId(getattr(ua.ObjectIds, nodeid))
85 View Code Duplication
        else:
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
86
            if nodeid in self.parser.aliases:
87
                nodeid = self.parser.aliases[nodeid]
88
            else:
89
                nodeid = "i={}".format(getattr(ua.ObjectIds, nodeid))
90
            return ua.NodeId.from_string(nodeid)
91
92 1
    def add_object(self, obj):
93 1
        node = self._get_node(obj)
94 1
        attrs = ua.ObjectAttributes()
95 1
        if obj.desc:
96
            attrs.Description = ua.LocalizedText(obj.desc)
97 1
        attrs.DisplayName = ua.LocalizedText(obj.displayname)
98 1
        attrs.EventNotifier = obj.eventnotifier
99
        node.NodeAttributes = attrs
100 1
        res = self.server.add_nodes([node])
101 1
        self._add_refs(obj)
102 1
        return res[0].AddedNodeId
103 1
104 1
    def add_object_type(self, obj):
105
        node = self._get_node(obj)
106 1
        attrs = ua.ObjectTypeAttributes()
107
        if obj.desc:
108 1
            attrs.Description = ua.LocalizedText(obj.desc)
109
        attrs.DisplayName = ua.LocalizedText(obj.displayname)
110 1
        attrs.IsAbstract = obj.abstract
111
        node.NodeAttributes = attrs
112 1
        res = self.server.add_nodes([node])
113 1
        self._add_refs(obj)
114 1
        return res[0].AddedNodeId
115
116 1
    def add_variable(self, obj):
117
        node = self._get_node(obj)
118
        attrs = ua.VariableAttributes()
119
        if obj.desc:
120
            attrs.Description = ua.LocalizedText(obj.desc)
121
        attrs.DisplayName = ua.LocalizedText(obj.displayname)
122
        attrs.DataType = self.to_nodeid(obj.datatype)
123
        # if obj.value and len(obj.value) == 1:
124
        if obj.value is not None:
125
            attrs.Value = self._add_variable_value(obj, )
126
        if obj.rank:
127
            attrs.ValueRank = obj.rank
128
        if obj.accesslevel:
129
            attrs.AccessLevel = obj.accesslevel
130
        if obj.useraccesslevel:
131
            attrs.UserAccessLevel = obj.useraccesslevel
132
        if obj.minsample:
133
            attrs.MinimumSamplingInterval = obj.minsample
134
        if obj.dimensions:
135 1
            attrs.ArrayDimensions = obj.dimensions
136
        node.NodeAttributes = attrs
137
        res = self.server.add_nodes([node])
138
        self._add_refs(obj)
139
        return res[0].AddedNodeId
140
    
141
    def _make_ext_obj(sefl, obj):
142
        ext = getattr(ua, obj.objname)()
143
        for name, val in obj.body.items():
144
            if type(val) is str:
145
                raise Exception("Error val should a dict", name, val)
146
            else:
147
                for attname, v in val.items():
148
                    if type(v) is str:
149
                        setattr(ext, attname, to_python(v, ext, attname))
150
                    else:
151
                        for attname2, v2 in v.items():
152
                            obj2 = getattr(ext, attname)
153 1
                            setattr(obj2, attname2, to_python(v2, obj2, attname2))
154
        return ext
155
156
    def _add_variable_value(self, obj):
157
        """
158
        Returns the value for a Variable based on the objects valuetype. 
159
        """
160
        if obj.valuetype == 'ListOfExtensionObject':
161
            values = []
162
            for ext in obj.value:
163
                extobj = self._make_ext_obj(ext)
164
                values.append(extobj)
165
            return values
166
        elif obj.valuetype.startswith("ListOf"):
167
            vtype = obj.valuetype[6:]
168
            return [getattr(ua, vtype)(v) for v in obj.value]
169 1
        else:
170
            return ua.Variant(obj.value, getattr(ua.VariantType, obj.valuetype))
171
172
    def add_variable_type(self, obj):
173
        node = self._get_node(obj)
174
        attrs = ua.VariableTypeAttributes()
175
        if obj.desc:
176
            attrs.Description = ua.LocalizedText(obj.desc)
177
        attrs.DisplayName = ua.LocalizedText(obj.displayname)
178
        attrs.DataType = self.to_nodeid(obj.datatype)
179
        if obj.value and len(obj.value) == 1:
180
            attrs.Value = obj.value[0]
181 1
        if obj.rank:
182 1
            attrs.ValueRank = obj.rank
183 1
        if obj.abstract:
184
            attrs.IsAbstract = obj.abstract
185
        if obj.dimensions:
186
            attrs.ArrayDimensions = obj.dimensions
187
        node.NodeAttributes = attrs
188
        res = self.server.add_nodes([node])
189
        self._add_refs(obj)
190
        return res[0].AddedNodeId
191
192
    def add_method(self, obj):
193
        node = self._get_node(obj)
194
        attrs = ua.MethodAttributes()
195
        if obj.desc:
196
            attrs.Description = ua.LocalizedText(obj.desc)
197
        attrs.DisplayName = ua.LocalizedText(obj.displayname)
198
        if obj.accesslevel:
199
            attrs.AccessLevel = obj.accesslevel
200
        if obj.useraccesslevel:
201
            attrs.UserAccessLevel = obj.useraccesslevel
202
        if obj.minsample:
203
            attrs.MinimumSamplingInterval = obj.minsample
204
        if obj.dimensions:
205
            attrs.ArrayDimensions = obj.dimensions
206
        node.NodeAttributes = attrs
207 View Code Duplication
        res = self.server.add_nodes([node])
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
208
        self._add_refs(obj)
209
        return res[0].AddedNodeId
210
211
    def add_reference(self, obj):
212
        node = self._get_node(obj)
213
        attrs = ua.ReferenceTypeAttributes()
214
        if obj.desc:
215
            attrs.Description = ua.LocalizedText(obj.desc)
216
        attrs.DisplayName = ua.LocalizedText(obj.displayname)
217
        if obj. inversename:
218
            attrs.InverseName = ua.LocalizedText(obj.inversename)
219 View Code Duplication
        if obj.abstract:
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
220
            attrs.IsAbstract = obj.abstract
221
        if obj.symmetric:
222
            attrs.Symmetric = obj.symmetric
223
        node.NodeAttributes = attrs
224
        res = self.server.add_nodes([node])
225
        self._add_refs(obj)
226
        return res[0].AddedNodeId
227
228
    def add_datatype(self, obj):
229
        node = self._get_node(obj)
230
        attrs = ua.DataTypeAttributes()
231
        if obj.desc:
232
            attrs.Description = ua.LocalizedText(obj.desc)
233
        attrs.DisplayName = ua.LocalizedText(obj.displayname)
234
        if obj.abstract:
235
            attrs.IsAbstract = obj.abstract
236
        node.NodeAttributes = attrs
237
        res = self.server.add_nodes([node])
238 View Code Duplication
        self._add_refs(obj)
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
239
        return res[0].AddedNodeId
240
241
    def _add_refs(self, obj):
242
        if not obj.refs:
243
            return
244
        refs = []
245
        for data in obj.refs:
246
            ref = ua.AddReferencesItem()
247
            ref.IsForward = True
248
            ref.ReferenceTypeId = self.to_nodeid(data.reftype)
249
            ref.SourceNodeId = ua.NodeId.from_string(obj.nodeid)
250
            ref.TargetNodeClass = ua.NodeClass.DataType
251
            ref.TargetNodeId = ua.NodeId.from_string(data.target)
252
            refs.append(ref)
253
        self.server.add_references(refs)
254