Completed
Pull Request — master (#296)
by Olivier
04:18
created

XmlImporter._add_variable_value()   B

Complexity

Conditions 5

Size

Total Lines 15

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 1
CRAP Score 25.0182

Importance

Changes 0
Metric Value
cc 5
dl 0
loc 15
ccs 1
cts 14
cp 0.0714
crap 25.0182
rs 8.5454
c 0
b 0
f 0
1
"""
2
add node defined in XML to address space
3
format is the one from opc-ua specification
4
"""
5 1
import logging
6
7
8 1
from opcua import ua
9 1
from opcua.common import xmlparser
10
11
12 1
def ua_type_to_python(val, uatype):
13
    if uatype.startswith("Int") or uatype.startswith("UInt"):
14 1
        return int(val)
15 1
    elif uatype in ("String"):
16 1
        return val
17 1
    elif uatype in ("Bytes", "Bytes", "ByteString", "ByteArray"):
18
        return bytes(val, 'utf8')
19 1
    else:
20 1
        raise Exception("uatype nopt handled", uatype, " for val ", val)
21 1
22 1
23 1
def to_python(val, obj, attname):
24 1
    print(val, obj, attname)
25 1
    if isinstance(obj, ua.NodeId) and attname == "Identifier":
26
        return ua.NodeId.from_string(val)
27 1
    else:
28 1
        return ua_type_to_python(val, obj.ua_types[attname])
29
30
31
class XmlImporter(object):
32
33
    def __init__(self, server):
34
        self.logger = logging.getLogger(__name__)
35
        self.parser = None
36
        self.server = server
37
38
    def import_xml(self, xmlpath, act_server):
39
        """
40 1
        import xml and return added nodes
41 1
        """
42 1
        self.logger.info("Importing XML file %s", xmlpath)
43 1
        self.parser = xmlparser.XMLParser(xmlpath, act_server)
44 1
        nodes = []
45 1
        for nodedata in self.parser:
46 1
            if nodedata.nodetype == 'UAObject':
47 1
                node = self.add_object(nodedata)
48 1
            elif nodedata.nodetype == 'UAObjectType':
49 1
                node = self.add_object_type(nodedata)
50 1
            elif nodedata.nodetype == 'UAVariable':
51 1
                node = self.add_variable(nodedata)
52
            elif nodedata.nodetype == 'UAVariableType':
53 1
                node = self.add_variable_type(nodedata)
54 1
            elif nodedata.nodetype == 'UAReferenceType':
55
                node = self.add_reference(nodedata)
56 1
            elif nodedata.nodetype == 'UADataType':
57
                node = self.add_datatype(nodedata)
58
            elif nodedata.nodetype == 'UAMethod':
59 1
                node = self.add_method(nodedata)
60
            else:
61 1
                self.logger.warning("Not implemented node type: %s ", nodedata.nodetype)
62 1
                continue
63 1
            nodes.append(node)
64
        return nodes
65
66 1
    def _get_node(self, obj):
67
        node = ua.AddNodesItem()
68 1
        node.RequestedNewNodeId = ua.NodeId.from_string(obj.nodeid)
69
        node.BrowseName = ua.QualifiedName.from_string(obj.browsename)
70 1
        node.NodeClass = getattr(ua.NodeClass, obj.nodetype[2:])
71 1
        if obj.parent:
72 1
            node.ParentNodeId = ua.NodeId.from_string(obj.parent)
73 1 View Code Duplication
        if obj.parentlink:
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
74 1
            node.ReferenceTypeId = self.to_nodeid(obj.parentlink)
75 1
        if obj.typedef:
76 1
            node.TypeDefinition = ua.NodeId.from_string(obj.typedef)
77 1
        return node
78 1
79 1
    def to_nodeid(self, nodeid):
80
        if not nodeid:
81 1
            return ua.NodeId(ua.ObjectIds.String)
82
        elif "=" in nodeid:
83
            return ua.NodeId.from_string(nodeid)
84
        elif hasattr(ua.ObjectIds, nodeid):
85 View Code Duplication
            return ua.NodeId(getattr(ua.ObjectIds, nodeid))
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
86
        else:
87
            if nodeid in self.parser.aliases:
88
                nodeid = self.parser.aliases[nodeid]
89
            else:
90
                nodeid = "i={}".format(getattr(ua.ObjectIds, nodeid))
91
            return ua.NodeId.from_string(nodeid)
92 1
93 1
    def add_object(self, obj):
94 1
        node = self._get_node(obj)
95 1
        attrs = ua.ObjectAttributes()
96
        if obj.desc:
97 1
            attrs.Description = ua.LocalizedText(obj.desc)
98 1
        attrs.DisplayName = ua.LocalizedText(obj.displayname)
99
        attrs.EventNotifier = obj.eventnotifier
100 1
        node.NodeAttributes = attrs
101 1
        res = self.server.add_nodes([node])
102 1
        self._add_refs(obj)
103 1
        return res[0].AddedNodeId
104 1
105
    def add_object_type(self, obj):
106 1
        node = self._get_node(obj)
107
        attrs = ua.ObjectTypeAttributes()
108 1
        if obj.desc:
109
            attrs.Description = ua.LocalizedText(obj.desc)
110 1
        attrs.DisplayName = ua.LocalizedText(obj.displayname)
111
        attrs.IsAbstract = obj.abstract
112 1
        node.NodeAttributes = attrs
113 1
        res = self.server.add_nodes([node])
114 1
        self._add_refs(obj)
115
        return res[0].AddedNodeId
116 1
117
    def add_variable(self, obj):
118
        node = self._get_node(obj)
119
        attrs = ua.VariableAttributes()
120
        if obj.desc:
121
            attrs.Description = ua.LocalizedText(obj.desc)
122
        attrs.DisplayName = ua.LocalizedText(obj.displayname)
123
        attrs.DataType = self.to_nodeid(obj.datatype)
124
        # if obj.value and len(obj.value) == 1:
125
        if obj.value is not None:
126
            attrs.Value = self._add_variable_value(obj, )
127
        if obj.rank:
128
            attrs.ValueRank = obj.rank
129
        if obj.accesslevel:
130
            attrs.AccessLevel = obj.accesslevel
131
        if obj.useraccesslevel:
132
            attrs.UserAccessLevel = obj.useraccesslevel
133
        if obj.minsample:
134
            attrs.MinimumSamplingInterval = obj.minsample
135 1
        if obj.dimensions:
136
            attrs.ArrayDimensions = obj.dimensions
137
        node.NodeAttributes = attrs
138
        res = self.server.add_nodes([node])
139
        self._add_refs(obj)
140
        return res[0].AddedNodeId
141
    
142
    def _make_ext_obj(sefl, obj):
143
        ext = getattr(ua, obj.objname)()
144
        for name, val in obj.body.items():
145
            if type(val) is str:
146
                raise Exception("Error val should a dict", name, val)
147
            else:
148
                for attname, v in val.items():
149
                    if type(v) is str:
150
                        setattr(ext, attname, to_python(v, ext, attname))
151
                    else:
152
                        print("v is not string, it is", v, type(v))
153 1
                        for attname2, v2 in v.items():
154
                            obj2 = getattr(ext, attname)
155
                            setattr(obj2, attname2, to_python(v2, obj2, attname2))
156
        return ext
157
158
    def _add_variable_value(self, obj):
159
        """
160
        Returns the value for a Variable based on the objects valuetype. 
161
        """
162
        if obj.valuetype == 'ListOfExtensionObject':
163
            values = []
164
            for ext in obj.value:
165
                extobj = self._make_ext_obj(ext)
166
                values.append(extobj)
167
            return values
168
        elif obj.valuetype.startswith("ListOf"):
169 1
            vtype = obj.valuetype[6:]
170
            return [getattr(ua, vtype)(v) for v in obj.value]
171
        else:
172
            return ua.Variant(obj.value, getattr(ua.VariantType, obj.valuetype))
173
        #if obj.valuetype == 'ListOfLocalizedText':
174
            #return ua.Variant([ua.LocalizedText(txt) for txt in obj.value], None)
175
        #elif obj.valuetype == 'ListOfExtension':
176
        #elif obj.valuetype == 'EnumValueType':
177
            #values = []
178
            #for ev in obj.value:
179
                #enum_value = ua.EnumValueType()
180
                #enum_value.DisplayName = ua.LocalizedText(ev['DisplayName'])
181 1
                #enum_value.Description = ua.LocalizedText(ev['Description'])
182 1
                #enum_value.Value = int(ev['Value'])
183 1
                #values.append(enum_value)
184
            #return values
185
        #elif obj.valuetype == 'Argument':
186
            #values = []
187
            #for arg in obj.value:
188
                #argument = ua.Argument()
189
                #argument.Name = arg['Name']
190
                #argument.Description = ua.LocalizedText(arg['Description'])
191
                #argument.DataType = self.to_nodeid(arg['DataType'])
192
                #argument.ValueRank = int(arg['ValueRank'])
193
                #argument.ArrayDimensions = arg['ArrayDimensions']
194
                #values.append(argument)
195
            #return values
196
197
        #return ua.Variant(obj.value, getattr(ua.VariantType, obj.valuetype))
198
199
    def add_variable_type(self, obj):
200
        node = self._get_node(obj)
201
        attrs = ua.VariableTypeAttributes()
202
        if obj.desc:
203
            attrs.Description = ua.LocalizedText(obj.desc)
204
        attrs.DisplayName = ua.LocalizedText(obj.displayname)
205
        attrs.DataType = self.to_nodeid(obj.datatype)
206
        if obj.value and len(obj.value) == 1:
207 View Code Duplication
            attrs.Value = obj.value[0]
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
208
        if obj.rank:
209
            attrs.ValueRank = obj.rank
210
        if obj.abstract:
211
            attrs.IsAbstract = obj.abstract
212
        if obj.dimensions:
213
            attrs.ArrayDimensions = obj.dimensions
214
        node.NodeAttributes = attrs
215
        res = self.server.add_nodes([node])
216
        self._add_refs(obj)
217
        return res[0].AddedNodeId
218
219 View Code Duplication
    def add_method(self, obj):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
220
        node = self._get_node(obj)
221
        attrs = ua.MethodAttributes()
222
        if obj.desc:
223
            attrs.Description = ua.LocalizedText(obj.desc)
224
        attrs.DisplayName = ua.LocalizedText(obj.displayname)
225
        if obj.accesslevel:
226
            attrs.AccessLevel = obj.accesslevel
227
        if obj.useraccesslevel:
228
            attrs.UserAccessLevel = obj.useraccesslevel
229
        if obj.minsample:
230
            attrs.MinimumSamplingInterval = obj.minsample
231
        if obj.dimensions:
232
            attrs.ArrayDimensions = obj.dimensions
233
        node.NodeAttributes = attrs
234
        res = self.server.add_nodes([node])
235
        self._add_refs(obj)
236
        return res[0].AddedNodeId
237
238 View Code Duplication
    def add_reference(self, obj):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
239
        node = self._get_node(obj)
240
        attrs = ua.ReferenceTypeAttributes()
241
        if obj.desc:
242
            attrs.Description = ua.LocalizedText(obj.desc)
243
        attrs.DisplayName = ua.LocalizedText(obj.displayname)
244
        if obj. inversename:
245
            attrs.InverseName = ua.LocalizedText(obj.inversename)
246
        if obj.abstract:
247
            attrs.IsAbstract = obj.abstract
248
        if obj.symmetric:
249
            attrs.Symmetric = obj.symmetric
250
        node.NodeAttributes = attrs
251
        res = self.server.add_nodes([node])
252
        self._add_refs(obj)
253
        return res[0].AddedNodeId
254
255
    def add_datatype(self, obj):
256
        node = self._get_node(obj)
257
        attrs = ua.DataTypeAttributes()
258
        if obj.desc:
259
            attrs.Description = ua.LocalizedText(obj.desc)
260
        attrs.DisplayName = ua.LocalizedText(obj.displayname)
261
        if obj.abstract:
262
            attrs.IsAbstract = obj.abstract
263
        node.NodeAttributes = attrs
264
        res = self.server.add_nodes([node])
265
        self._add_refs(obj)
266
        return res[0].AddedNodeId
267
268
    def _add_refs(self, obj):
269
        if not obj.refs:
270
            return
271
        refs = []
272
        for data in obj.refs:
273
            ref = ua.AddReferencesItem()
274
            ref.IsForward = True
275
            ref.ReferenceTypeId = self.to_nodeid(data.reftype)
276
            ref.SourceNodeId = ua.NodeId.from_string(obj.nodeid)
277
            ref.TargetNodeClass = ua.NodeClass.DataType
278
            ref.TargetNodeId = ua.NodeId.from_string(data.target)
279
            refs.append(ref)
280
        self.server.add_references(refs)
281