Completed
Push — master ( cf2066...c217b8 )
by Olivier
04:12
created

XmlImporter.add_variable()   D

Complexity

Conditions 8

Size

Total Lines 24

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 1
CRAP Score 64.006

Importance

Changes 0
Metric Value
cc 8
dl 0
loc 24
ccs 1
cts 23
cp 0.0434
crap 64.006
rs 4.3478
c 0
b 0
f 0
1
"""
2
add node defined in XML to address space
3
format is the one from opc-ua specification
4
"""
5 1
import logging
6
import sys
7
8 1
9 1
from opcua import ua
10
from opcua.common import xmlparser
11
12 1
13
def ua_type_to_python(val, uatype):
14 1
    if uatype.startswith("Int") or uatype.startswith("UInt"):
15 1
        return int(val)
16 1
    elif uatype in ("String"):
17 1
        return val
18
    elif uatype in ("Bytes", "Bytes", "ByteString", "ByteArray"):
19 1
        if sys.version_info.major > 2:
20 1
            return bytes(val, 'utf8')
21 1
        else:
22 1
            return val
23 1
    else:
24 1
        raise Exception("uatype nopt handled", uatype, " for val ", val)
25 1
26
27 1
def to_python(val, obj, attname):
28 1
    if isinstance(obj, ua.NodeId) and attname == "Identifier":
29
        return ua.NodeId.from_string(val)
30
    else:
31
        return ua_type_to_python(val, obj.ua_types[attname])
32
33
34
class XmlImporter(object):
35
36
    def __init__(self, server):
37
        self.logger = logging.getLogger(__name__)
38
        self.parser = None
39
        self.server = server
40 1
41 1
    def import_xml(self, xmlpath, act_server):
42 1
        """
43 1
        import xml and return added nodes
44 1
        """
45 1
        self.logger.info("Importing XML file %s", xmlpath)
46 1
        self.parser = xmlparser.XMLParser(xmlpath, act_server)
47 1
        nodes = []
48 1
        for nodedata in self.parser:
49 1
            if nodedata.nodetype == 'UAObject':
50 1
                node = self.add_object(nodedata)
51 1
            elif nodedata.nodetype == 'UAObjectType':
52
                node = self.add_object_type(nodedata)
53 1
            elif nodedata.nodetype == 'UAVariable':
54 1
                node = self.add_variable(nodedata)
55
            elif nodedata.nodetype == 'UAVariableType':
56 1
                node = self.add_variable_type(nodedata)
57
            elif nodedata.nodetype == 'UAReferenceType':
58
                node = self.add_reference_type(nodedata)
59 1
            elif nodedata.nodetype == 'UADataType':
60
                node = self.add_datatype(nodedata)
61 1
            elif nodedata.nodetype == 'UAMethod':
62 1
                node = self.add_method(nodedata)
63 1
            else:
64
                self.logger.warning("Not implemented node type: %s ", nodedata.nodetype)
65
                continue
66 1
            nodes.append(node)
67
        return nodes
68 1
69
    def _get_node(self, obj):
70 1
        node = ua.AddNodesItem()
71 1
        node.RequestedNewNodeId = ua.NodeId.from_string(obj.nodeid)
72 1
        node.BrowseName = ua.QualifiedName.from_string(obj.browsename)
73 1 View Code Duplication
        node.NodeClass = getattr(ua.NodeClass, obj.nodetype[2:])
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
74 1
        if obj.parent:
75 1
            node.ParentNodeId = ua.NodeId.from_string(obj.parent)
76 1
        if obj.parentlink:
77 1
            node.ReferenceTypeId = self.to_nodeid(obj.parentlink)
78 1
        if obj.typedef:
79 1
            node.TypeDefinition = ua.NodeId.from_string(obj.typedef)
80
        return node
81 1
82
    def to_nodeid(self, nodeid):
83
        if not nodeid:
84
            return ua.NodeId(ua.ObjectIds.String)
85 View Code Duplication
        elif "=" in nodeid:
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
86
            return ua.NodeId.from_string(nodeid)
87
        elif hasattr(ua.ObjectIds, nodeid):
88
            return ua.NodeId(getattr(ua.ObjectIds, nodeid))
89
        else:
90
            if nodeid in self.parser.aliases:
91
                nodeid = self.parser.aliases[nodeid]
92 1
            else:
93 1
                nodeid = "i={}".format(getattr(ua.ObjectIds, nodeid))
94 1
            return ua.NodeId.from_string(nodeid)
95 1
96
    def add_object(self, obj):
97 1
        node = self._get_node(obj)
98 1
        attrs = ua.ObjectAttributes()
99
        if obj.desc:
100 1
            attrs.Description = ua.LocalizedText(obj.desc)
101 1
        attrs.DisplayName = ua.LocalizedText(obj.displayname)
102 1
        attrs.EventNotifier = obj.eventnotifier
103 1
        node.NodeAttributes = attrs
104 1
        res = self.server.add_nodes([node])
105
        self._add_refs(obj)
106 1
        return res[0].AddedNodeId
107
108 1
    def add_object_type(self, obj):
109
        node = self._get_node(obj)
110 1
        attrs = ua.ObjectTypeAttributes()
111
        if obj.desc:
112 1
            attrs.Description = ua.LocalizedText(obj.desc)
113 1
        attrs.DisplayName = ua.LocalizedText(obj.displayname)
114 1
        attrs.IsAbstract = obj.abstract
115
        node.NodeAttributes = attrs
116 1
        res = self.server.add_nodes([node])
117
        self._add_refs(obj)
118
        return res[0].AddedNodeId
119
120
    def add_variable(self, obj):
121
        node = self._get_node(obj)
122
        attrs = ua.VariableAttributes()
123
        if obj.desc:
124
            attrs.Description = ua.LocalizedText(obj.desc)
125
        attrs.DisplayName = ua.LocalizedText(obj.displayname)
126
        attrs.DataType = self.to_nodeid(obj.datatype)
127
        # if obj.value and len(obj.value) == 1:
128
        if obj.value is not None:
129
            attrs.Value = self._add_variable_value(obj, )
130
        if obj.rank:
131
            attrs.ValueRank = obj.rank
132
        if obj.accesslevel:
133
            attrs.AccessLevel = obj.accesslevel
134
        if obj.useraccesslevel:
135 1
            attrs.UserAccessLevel = obj.useraccesslevel
136
        if obj.minsample:
137
            attrs.MinimumSamplingInterval = obj.minsample
138
        if obj.dimensions:
139
            attrs.ArrayDimensions = obj.dimensions
140
        node.NodeAttributes = attrs
141
        res = self.server.add_nodes([node])
142
        self._add_refs(obj)
143
        return res[0].AddedNodeId
144
    
145
    def _make_ext_obj(sefl, obj):
146
        ext = getattr(ua, obj.objname)()
147
        for name, val in obj.body.items():
148
            if type(val) is str:
149
                raise Exception("Error val should a dict", name, val)
150
            else:
151
                for attname, v in val.items():
152
                    # tow possible values:
153 1
                    # either we get value directly 
154
                    # or a dict if it s an object or a list
155
                    if type(v) is str:
156
                        setattr(ext, attname, to_python(v, ext, attname))
157
                    else:
158
                        # so we habve either an object or a list...
159
                        obj2 = getattr(ext, attname)
160
                        if not isinstance(obj2, ua.NodeId) and not hasattr(obj2, "ua_types"):
161
                            # we probably have a list
162
                            my_list = []
163
                            for vtype, v2 in v.items():
164
                                my_list.append(ua_type_to_python(v2, vtype))
165
                            setattr(obj, attname, my_list)
166
                        else:
167
                            for attname2, v2 in v.items():
168
                                setattr(obj2, attname2, to_python(v2, obj2, attname2))
169 1
        return ext
170
171
    def _add_variable_value(self, obj):
172
        """
173
        Returns the value for a Variable based on the objects valuetype. 
174
        """
175
        if obj.valuetype == 'ListOfExtensionObject':
176
            values = []
177
            for ext in obj.value:
178
                extobj = self._make_ext_obj(ext)
179
                values.append(extobj)
180
            return values
181 1
        elif obj.valuetype.startswith("ListOf"):
182 1
            vtype = obj.valuetype[6:]
183 1
            return [getattr(ua, vtype)(v) for v in obj.value]
184
        else:
185
            return ua.Variant(obj.value, getattr(ua.VariantType, obj.valuetype))
186
187
    def add_variable_type(self, obj):
188
        node = self._get_node(obj)
189
        attrs = ua.VariableTypeAttributes()
190
        if obj.desc:
191
            attrs.Description = ua.LocalizedText(obj.desc)
192
        attrs.DisplayName = ua.LocalizedText(obj.displayname)
193
        attrs.DataType = self.to_nodeid(obj.datatype)
194
        if obj.value and len(obj.value) == 1:
195
            attrs.Value = obj.value[0]
196
        if obj.rank:
197
            attrs.ValueRank = obj.rank
198
        if obj.abstract:
199
            attrs.IsAbstract = obj.abstract
200
        if obj.dimensions:
201
            attrs.ArrayDimensions = obj.dimensions
202
        node.NodeAttributes = attrs
203
        res = self.server.add_nodes([node])
204
        self._add_refs(obj)
205
        return res[0].AddedNodeId
206
207 View Code Duplication
    def add_method(self, obj):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
208
        node = self._get_node(obj)
209
        attrs = ua.MethodAttributes()
210
        if obj.desc:
211
            attrs.Description = ua.LocalizedText(obj.desc)
212
        attrs.DisplayName = ua.LocalizedText(obj.displayname)
213
        if obj.accesslevel:
214
            attrs.AccessLevel = obj.accesslevel
215
        if obj.useraccesslevel:
216
            attrs.UserAccessLevel = obj.useraccesslevel
217
        if obj.minsample:
218
            attrs.MinimumSamplingInterval = obj.minsample
219 View Code Duplication
        if obj.dimensions:
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
220
            attrs.ArrayDimensions = obj.dimensions
221
        node.NodeAttributes = attrs
222
        res = self.server.add_nodes([node])
223
        self._add_refs(obj)
224
        return res[0].AddedNodeId
225
226
    def add_reference_type(self, obj):
227
        node = self._get_node(obj)
228
        attrs = ua.ReferenceTypeAttributes()
229
        if obj.desc:
230
            attrs.Description = ua.LocalizedText(obj.desc)
231
        attrs.DisplayName = ua.LocalizedText(obj.displayname)
232
        if obj. inversename:
233
            attrs.InverseName = ua.LocalizedText(obj.inversename)
234
        if obj.abstract:
235
            attrs.IsAbstract = obj.abstract
236
        if obj.symmetric:
237
            attrs.Symmetric = obj.symmetric
238 View Code Duplication
        node.NodeAttributes = attrs
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
239
        res = self.server.add_nodes([node])
240
        self._add_refs(obj)
241
        return res[0].AddedNodeId
242
243
    def add_datatype(self, obj):
244
        node = self._get_node(obj)
245
        attrs = ua.DataTypeAttributes()
246
        if obj.desc:
247
            attrs.Description = ua.LocalizedText(obj.desc)
248
        attrs.DisplayName = ua.LocalizedText(obj.displayname)
249
        if obj.abstract:
250
            attrs.IsAbstract = obj.abstract
251
        node.NodeAttributes = attrs
252
        res = self.server.add_nodes([node])
253
        self._add_refs(obj)
254
        return res[0].AddedNodeId
255
256
    def _add_refs(self, obj):
257
        if not obj.refs:
258
            return
259
        refs = []
260
        for data in obj.refs:
261
            ref = ua.AddReferencesItem()
262
            ref.IsForward = True
263
            ref.ReferenceTypeId = self.to_nodeid(data.reftype)
264
            ref.SourceNodeId = ua.NodeId.from_string(obj.nodeid)
265
            ref.TargetNodeClass = ua.NodeClass.DataType
266
            ref.TargetNodeId = ua.NodeId.from_string(data.target)
267
            refs.append(ref)
268
        self.server.add_references(refs)
269