Completed
Push — master ( 6c18d6...f8fb14 )
by Olivier
02:27
created

XmlImporter.add_reference()   B

Complexity

Conditions 5

Size

Total Lines 16

Duplication

Lines 16
Ratio 100 %

Code Coverage

Tests 0
CRAP Score 30

Importance

Changes 1
Bugs 1 Features 0
Metric Value
cc 5
dl 16
loc 16
ccs 0
cts 0
cp 0
crap 30
rs 8.5454
c 1
b 1
f 0
1
"""
2
add node defined in XML to address space
3
format is the one from opc-ua specification
4
"""
5 1
import logging
6
import sys
7
8 1
9 1
from opcua import ua
10
from opcua.common import xmlparser
11
12 1
13
def ua_type_to_python(val, uatype):
14 1
    if uatype.startswith("Int") or uatype.startswith("UInt"):
15 1
        return int(val)
16 1
    elif uatype in ("String"):
17 1
        return val
18
    elif uatype in ("Bytes", "Bytes", "ByteString", "ByteArray"):
19 1
        if sys.version_info.major > 2:
20 1
            return bytes(val, 'utf8')
21 1
        else:
22 1
            return val
23 1
    else:
24 1
        raise Exception("uatype nopt handled", uatype, " for val ", val)
25 1
26
27 1
def to_python(val, obj, attname):
28 1
    if isinstance(obj, ua.NodeId) and attname == "Identifier":
29
        return ua.NodeId.from_string(val)
30
    else:
31
        return ua_type_to_python(val, obj.ua_types[attname])
32
33
34
class XmlImporter(object):
35
36
    def __init__(self, server):
37
        self.logger = logging.getLogger(__name__)
38
        self.parser = None
39
        self.server = server
40 1
41 1
    def import_xml(self, xmlpath, act_server):
42 1
        """
43 1
        import xml and return added nodes
44 1
        """
45 1
        self.logger.info("Importing XML file %s", xmlpath)
46 1
        self.parser = xmlparser.XMLParser(xmlpath, act_server)
47 1
        nodes = []
48 1
        for nodedata in self.parser:
49 1
            if nodedata.nodetype == 'UAObject':
50 1
                node = self.add_object(nodedata)
51 1
            elif nodedata.nodetype == 'UAObjectType':
52
                node = self.add_object_type(nodedata)
53 1
            elif nodedata.nodetype == 'UAVariable':
54 1
                node = self.add_variable(nodedata)
55
            elif nodedata.nodetype == 'UAVariableType':
56 1
                node = self.add_variable_type(nodedata)
57
            elif nodedata.nodetype == 'UAReferenceType':
58
                node = self.add_reference_type(nodedata)
59 1
            elif nodedata.nodetype == 'UADataType':
60
                node = self.add_datatype(nodedata)
61 1
            elif nodedata.nodetype == 'UAMethod':
62 1
                node = self.add_method(nodedata)
63 1
            else:
64
                self.logger.warning("Not implemented node type: %s ", nodedata.nodetype)
65
                continue
66 1
            nodes.append(node)
67
        return nodes
68 1
69
    def _get_node(self, obj):
70 1
        node = ua.AddNodesItem()
71 1
        node.RequestedNewNodeId = ua.NodeId.from_string(obj.nodeid)
72 1
        node.BrowseName = ua.QualifiedName.from_string(obj.browsename)
73 1 View Code Duplication
        node.NodeClass = getattr(ua.NodeClass, obj.nodetype[2:])
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
74 1
        if obj.parent:
75 1
            node.ParentNodeId = ua.NodeId.from_string(obj.parent)
76 1
        if obj.parentlink:
77 1
            node.ReferenceTypeId = self.to_nodeid(obj.parentlink)
78 1
        if obj.typedef:
79 1
            node.TypeDefinition = ua.NodeId.from_string(obj.typedef)
80
        return node
81 1
82
    def to_nodeid(self, nodeid):
83
        if not nodeid:
84
            return ua.NodeId(ua.ObjectIds.String)
85 View Code Duplication
        elif "=" in nodeid:
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
86
            return ua.NodeId.from_string(nodeid)
87
        elif hasattr(ua.ObjectIds, nodeid):
88
            return ua.NodeId(getattr(ua.ObjectIds, nodeid))
89
        else:
90
            if nodeid in self.parser.aliases:
91
                nodeid = self.parser.aliases[nodeid]
92 1
            else:
93 1
                nodeid = "i={}".format(getattr(ua.ObjectIds, nodeid))
94 1
            return ua.NodeId.from_string(nodeid)
95 1
96
    def add_object(self, obj):
97 1
        node = self._get_node(obj)
98 1
        attrs = ua.ObjectAttributes()
99
        if obj.desc:
100 1
            attrs.Description = ua.LocalizedText(obj.desc)
101 1
        attrs.DisplayName = ua.LocalizedText(obj.displayname)
102 1
        attrs.EventNotifier = obj.eventnotifier
103 1
        node.NodeAttributes = attrs
104 1
        res = self.server.add_nodes([node])
105
        self._add_refs(obj)
106 1
        return res[0].AddedNodeId
107
108 1
    def add_object_type(self, obj):
109
        node = self._get_node(obj)
110 1
        attrs = ua.ObjectTypeAttributes()
111
        if obj.desc:
112 1
            attrs.Description = ua.LocalizedText(obj.desc)
113 1
        attrs.DisplayName = ua.LocalizedText(obj.displayname)
114 1
        attrs.IsAbstract = obj.abstract
115
        node.NodeAttributes = attrs
116 1
        res = self.server.add_nodes([node])
117
        self._add_refs(obj)
118
        return res[0].AddedNodeId
119
120
    def add_variable(self, obj):
121
        node = self._get_node(obj)
122
        attrs = ua.VariableAttributes()
123
        if obj.desc:
124
            attrs.Description = ua.LocalizedText(obj.desc)
125
        attrs.DisplayName = ua.LocalizedText(obj.displayname)
126
        attrs.DataType = self.to_nodeid(obj.datatype)
127
        # if obj.value and len(obj.value) == 1:
128
        if obj.value is not None:
129
            attrs.Value = self._add_variable_value(obj, )
130
        if obj.rank:
131
            attrs.ValueRank = obj.rank
132
        if obj.accesslevel:
133
            attrs.AccessLevel = obj.accesslevel
134
        if obj.useraccesslevel:
135 1
            attrs.UserAccessLevel = obj.useraccesslevel
136
        if obj.minsample:
137
            attrs.MinimumSamplingInterval = obj.minsample
138
        if obj.dimensions:
139
            attrs.ArrayDimensions = obj.dimensions
140
        node.NodeAttributes = attrs
141
        res = self.server.add_nodes([node])
142
        self._add_refs(obj)
143
        return res[0].AddedNodeId
144
    
145
    def _make_ext_obj(sefl, obj):
146
        ext = getattr(ua, obj.objname)()
147
        for name, val in obj.body.items():
148
            if type(val) is str:
149
                raise Exception("Error val should a dict", name, val)
150
            else:
151
                for attname, v in val.items():
152
                    if type(v) is str:
153 1
                        setattr(ext, attname, to_python(v, ext, attname))
154
                    else:
155
                        for attname2, v2 in v.items():
156
                            obj2 = getattr(ext, attname)
157
                            setattr(obj2, attname2, to_python(v2, obj2, attname2))
158
        return ext
159
160
    def _add_variable_value(self, obj):
161
        """
162
        Returns the value for a Variable based on the objects valuetype. 
163
        """
164
        if obj.valuetype == 'ListOfExtensionObject':
165
            values = []
166
            for ext in obj.value:
167
                extobj = self._make_ext_obj(ext)
168
                values.append(extobj)
169 1
            return values
170
        elif obj.valuetype.startswith("ListOf"):
171
            vtype = obj.valuetype[6:]
172
            return [getattr(ua, vtype)(v) for v in obj.value]
173
        else:
174
            return ua.Variant(obj.value, getattr(ua.VariantType, obj.valuetype))
175
176
    def add_variable_type(self, obj):
177
        node = self._get_node(obj)
178
        attrs = ua.VariableTypeAttributes()
179
        if obj.desc:
180
            attrs.Description = ua.LocalizedText(obj.desc)
181 1
        attrs.DisplayName = ua.LocalizedText(obj.displayname)
182 1
        attrs.DataType = self.to_nodeid(obj.datatype)
183 1
        if obj.value and len(obj.value) == 1:
184
            attrs.Value = obj.value[0]
185
        if obj.rank:
186
            attrs.ValueRank = obj.rank
187
        if obj.abstract:
188
            attrs.IsAbstract = obj.abstract
189
        if obj.dimensions:
190
            attrs.ArrayDimensions = obj.dimensions
191
        node.NodeAttributes = attrs
192
        res = self.server.add_nodes([node])
193
        self._add_refs(obj)
194
        return res[0].AddedNodeId
195
196
    def add_method(self, obj):
197
        node = self._get_node(obj)
198
        attrs = ua.MethodAttributes()
199
        if obj.desc:
200
            attrs.Description = ua.LocalizedText(obj.desc)
201
        attrs.DisplayName = ua.LocalizedText(obj.displayname)
202
        if obj.accesslevel:
203
            attrs.AccessLevel = obj.accesslevel
204
        if obj.useraccesslevel:
205
            attrs.UserAccessLevel = obj.useraccesslevel
206
        if obj.minsample:
207 View Code Duplication
            attrs.MinimumSamplingInterval = obj.minsample
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
208
        if obj.dimensions:
209
            attrs.ArrayDimensions = obj.dimensions
210
        node.NodeAttributes = attrs
211
        res = self.server.add_nodes([node])
212
        self._add_refs(obj)
213
        return res[0].AddedNodeId
214
215
    def add_reference_type(self, obj):
216
        node = self._get_node(obj)
217
        attrs = ua.ReferenceTypeAttributes()
218
        if obj.desc:
219 View Code Duplication
            attrs.Description = ua.LocalizedText(obj.desc)
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
220
        attrs.DisplayName = ua.LocalizedText(obj.displayname)
221
        if obj. inversename:
222
            attrs.InverseName = ua.LocalizedText(obj.inversename)
223
        if obj.abstract:
224
            attrs.IsAbstract = obj.abstract
225
        if obj.symmetric:
226
            attrs.Symmetric = obj.symmetric
227
        node.NodeAttributes = attrs
228
        res = self.server.add_nodes([node])
229
        self._add_refs(obj)
230
        return res[0].AddedNodeId
231
232
    def add_datatype(self, obj):
233
        node = self._get_node(obj)
234
        attrs = ua.DataTypeAttributes()
235
        if obj.desc:
236
            attrs.Description = ua.LocalizedText(obj.desc)
237
        attrs.DisplayName = ua.LocalizedText(obj.displayname)
238 View Code Duplication
        if obj.abstract:
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
239
            attrs.IsAbstract = obj.abstract
240
        node.NodeAttributes = attrs
241
        res = self.server.add_nodes([node])
242
        self._add_refs(obj)
243
        return res[0].AddedNodeId
244
245
    def _add_refs(self, obj):
246
        if not obj.refs:
247
            return
248
        refs = []
249
        for data in obj.refs:
250
            ref = ua.AddReferencesItem()
251
            ref.IsForward = True
252
            ref.ReferenceTypeId = self.to_nodeid(data.reftype)
253
            ref.SourceNodeId = ua.NodeId.from_string(obj.nodeid)
254
            ref.TargetNodeClass = ua.NodeClass.DataType
255
            ref.TargetNodeId = ua.NodeId.from_string(data.target)
256
            refs.append(ref)
257
        self.server.add_references(refs)
258