Completed
Push — master ( b9b1be...f17f79 )
by Olivier
07:27
created

XmlImporter.add_object()   A

Complexity

Conditions 2

Size

Total Lines 10

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 8
CRAP Score 2

Importance

Changes 0
Metric Value
cc 2
dl 0
loc 10
ccs 8
cts 8
cp 1
crap 2
rs 9.4285
c 0
b 0
f 0
1
"""
2
add node defined in XML to address space
3
format is the one from opc-ua specification
4
"""
5 1
import logging
6
7
8 1
from opcua import ua
9 1
from opcua.common import xmlparser
10
11
12 1
class XmlImporter(object):
13
14 1
    def __init__(self, server):
15 1
        self.logger = logging.getLogger(__name__)
16 1
        self.parser = None
17 1
        self.server = server
18
19 1
    def import_xml(self, xmlpath, act_server):
20 1
        self.logger.info("Importing XML file %s", xmlpath)
21 1
        self.parser = xmlparser.XMLParser(xmlpath, act_server)
22 1
        for node in self.parser:
23 1
            if node.nodetype == 'UAObject':
24 1
                self.add_object(node)
25 1
            elif node.nodetype == 'UAObjectType':
26
                self.add_object_type(node)
27 1
            elif node.nodetype == 'UAVariable':
28 1
                self.add_variable(node)
29
            elif node.nodetype == 'UAVariableType':
30
                self.add_variable_type(node)
31
            elif node.nodetype == 'UAReferenceType':
32
                self.add_reference(node)
33
            elif node.nodetype == 'UADataType':
34
                self.add_datatype(node)
35
            elif node.nodetype == 'UAMethod':
36
                self.add_method(node)
37
            else:
38
                self.logger.info("Not implemented node type: %s ", node.nodetype)
39
40 1
    def _get_node(self, obj):
41 1
        node = ua.AddNodesItem()
42 1
        node.RequestedNewNodeId = ua.NodeId.from_string(obj.nodeid)
43 1
        node.BrowseName = ua.QualifiedName.from_string(obj.browsename)
44 1
        node.NodeClass = getattr(ua.NodeClass, obj.nodetype[2:])
45 1
        if obj.parent:
46 1
            node.ParentNodeId = ua.NodeId.from_string(obj.parent)
47 1
        if obj.parentlink:
48 1
            node.ReferenceTypeId = self.to_nodeid(obj.parentlink)
49 1
        if obj.typedef:
50 1
            node.TypeDefinition = ua.NodeId.from_string(obj.typedef)
51 1
        return node
52
53 1
    def to_nodeid(self, nodeid):
54 1
        if not nodeid:
55
            return ua.NodeId(ua.ObjectIds.String)
56 1
        elif "=" in nodeid:
57
            return ua.NodeId.from_string(nodeid)
58
        elif hasattr(ua.ObjectIds, nodeid):
59 1
            return ua.NodeId(getattr(ua.ObjectIds, nodeid))
60
        else:
61 1
            if nodeid in self.parser.aliases:
62 1
                nodeid = self.parser.aliases[nodeid]
63 1
            else:
64
                nodeid = "i={}".format(getattr(ua.ObjectIds, nodeid))
65
            return ua.NodeId.from_string(nodeid)
66 1
67
    def add_object(self, obj):
68 1
        node = self._get_node(obj)
69
        attrs = ua.ObjectAttributes()
70 1
        if obj.desc:
71 1
            attrs.Description = ua.LocalizedText(obj.desc)
72 1
        attrs.DisplayName = ua.LocalizedText(obj.displayname)
73 1
        attrs.EventNotifier = obj.eventnotifier
74 1
        node.NodeAttributes = attrs
75 1
        self.server.add_nodes([node])
76 1
        self._add_refs(obj)
77 1
78 1
    def add_object_type(self, obj):
79 1
        node = self._get_node(obj)
80
        attrs = ua.ObjectTypeAttributes()
81 1
        if obj.desc:
82
            attrs.Description = ua.LocalizedText(obj.desc)
83
        attrs.DisplayName = ua.LocalizedText(obj.displayname)
84
        attrs.IsAbstract = obj.abstract
85
        node.NodeAttributes = attrs
86
        self.server.add_nodes([node])
87
        self._add_refs(obj)
88
89
    def add_variable(self, obj):
90
        node = self._get_node(obj)
91
        attrs = ua.VariableAttributes()
92 1
        if obj.desc:
93 1
            attrs.Description = ua.LocalizedText(obj.desc)
94 1
        attrs.DisplayName = ua.LocalizedText(obj.displayname)
95 1
        attrs.DataType = self.to_nodeid(obj.datatype)
96
        # if obj.value and len(obj.value) == 1:
97 1
        if obj.value is not None:
98 1
            attrs.Value = self._add_variable_value(obj, )
99
        if obj.rank:
100 1
            attrs.ValueRank = obj.rank
101 1
        if obj.accesslevel:
102 1
            attrs.AccessLevel = obj.accesslevel
103 1
        if obj.useraccesslevel:
104 1
            attrs.UserAccessLevel = obj.useraccesslevel
105
        if obj.minsample:
106 1
            attrs.MinimumSamplingInterval = obj.minsample
107
        if obj.dimensions:
108 1
            attrs.ArrayDimensions = obj.dimensions
109
        node.NodeAttributes = attrs
110 1
        self.server.add_nodes([node])
111
        self._add_refs(obj)
112 1
113 1 View Code Duplication
    def _add_variable_value(self, obj):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
114 1
        """
115
        Returns the value for a Variable based on the objects valuetype. 
116 1
        """
117
        if obj.valuetype == 'ListOfLocalizedText':
118
            return ua.Variant([ua.LocalizedText(txt) for txt in obj.value], None)
119
        elif obj.valuetype == 'EnumValueType':
120
            values = []
121
            for ev in obj.value:
122
                enum_value = ua.EnumValueType()
123
                enum_value.DisplayName = ua.LocalizedText(ev['DisplayName'])
124
                enum_value.Description = ua.LocalizedText(ev['Description'])
125
                enum_value.Value = int(ev['Value'])
126
                values.append(enum_value)
127
            return values
128
        elif obj.valuetype == 'Argument':
129
            values = []
130
            for arg in obj.value:
131
                argument = ua.Argument()
132 View Code Duplication
                argument.Name = arg['Name']
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
133
                argument.Description = ua.LocalizedText(arg['Description'])
134
                argument.DataType = self.to_nodeid(arg['DataType'])
135 1
                argument.ValueRank = int(arg['ValueRank'])
136
                argument.ArrayDimensions = arg['ArrayDimensions']
137
                values.append(argument)
138
            return values
139
140
        return ua.Variant(obj.value, getattr(ua.VariantType, obj.valuetype))
141
142
143
    def add_variable_type(self, obj):
144
        node = self._get_node(obj)
145
        attrs = ua.VariableTypeAttributes()
146
        if obj.desc:
147
            attrs.Description = ua.LocalizedText(obj.desc)
148
        attrs.DisplayName = ua.LocalizedText(obj.displayname)
149
        attrs.DataType = self.to_nodeid(obj.datatype)
150
        if obj.value and len(obj.value) == 1:
151
            attrs.Value = obj.value[0]
152
        if obj.rank:
153 1
            attrs.ValueRank = obj.rank
154
        if obj.abstract:
155
            attrs.IsAbstract = obj.abstract
156
        if obj.dimensions:
157
            attrs.ArrayDimensions = obj.dimensions
158
        node.NodeAttributes = attrs
159
        self.server.add_nodes([node])
160
        self._add_refs(obj)
161
162
    def add_method(self, obj):
163
        node = self._get_node(obj)
164
        attrs = ua.MethodAttributes()
165
        if obj.desc:
166
            attrs.Description = ua.LocalizedText(obj.desc)
167
        attrs.DisplayName = ua.LocalizedText(obj.displayname)
168
        if obj.accesslevel:
169 1
            attrs.AccessLevel = obj.accesslevel
170
        if obj.useraccesslevel:
171
            attrs.UserAccessLevel = obj.useraccesslevel
172
        if obj.minsample:
173
            attrs.MinimumSamplingInterval = obj.minsample
174
        if obj.dimensions:
175
            attrs.ArrayDimensions = obj.dimensions
176
        node.NodeAttributes = attrs
177
        self.server.add_nodes([node])
178
        self._add_refs(obj)
179
180
    def add_reference(self, obj):
181 1
        node = self._get_node(obj)
182 1
        attrs = ua.ReferenceTypeAttributes()
183 1
        if obj.desc:
184
            attrs.Description = ua.LocalizedText(obj.desc)
185
        attrs.DisplayName = ua.LocalizedText(obj.displayname)
186
        if obj. inversename:
187
            attrs.InverseName = ua.LocalizedText(obj.inversename)
188
        if obj.abstract:
189
            attrs.IsAbstract = obj.abstract
190
        if obj.symmetric:
191
            attrs.Symmetric = obj.symmetric
192
        node.NodeAttributes = attrs
193
        self.server.add_nodes([node])
194
        self._add_refs(obj)
195
196
    def add_datatype(self, obj):
197
        node = self._get_node(obj)
198
        attrs = ua.DataTypeAttributes()
199
        if obj.desc:
200
            attrs.Description = ua.LocalizedText(obj.desc)
201
        attrs.DisplayName = ua.LocalizedText(obj.displayname)
202
        if obj.abstract:
203
            attrs.IsAbstract = obj.abstract
204
        node.NodeAttributes = attrs
205
        self.server.add_nodes([node])
206
        self._add_refs(obj)
207
208
    def _add_refs(self, obj):
209
        if not obj.refs:
210
            return
211
        refs = []
212
        for data in obj.refs:
213
            ref = ua.AddReferencesItem()
214
            ref.IsForward = True
215
            ref.ReferenceTypeId = self.to_nodeid(data.reftype)
216
            ref.SourceNodeId = ua.NodeId.from_string(obj.nodeid)
217
            ref.TargetNodeClass = ua.NodeClass.DataType
218
            ref.TargetNodeId = ua.NodeId.from_string(data.target)
219
            refs.append(ref)
220
        self.server.add_references(refs)
221