Completed
Pull Request — master (#257)
by
unknown
07:13
created

XmlImporter.__init__()   A

Complexity

Conditions 1

Size

Total Lines 4

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 4
CRAP Score 1

Importance

Changes 0
Metric Value
cc 1
c 0
b 0
f 0
dl 0
loc 4
ccs 4
cts 4
cp 1
crap 1
rs 10
1
"""
2
add node defined in XML to address space
3
format is the one from opc-ua specification
4
"""
5 1
import logging
6
7
8 1
from opcua import ua
9 1
from opcua.common import xmlparser
10
11
12 1
class XmlImporter(object):
13
14 1
    def __init__(self, server):
15 1
        self.logger = logging.getLogger(__name__)
16 1
        self.parser = None
17 1
        self.server = server
18
19 1
    def import_xml(self, xmlpath, act_server):
20 1
        self.logger.info("Importing XML file %s", xmlpath)
21 1
        self.parser = xmlparser.XMLParser(xmlpath, act_server)
22 1
        for node in self.parser:
23 1
            if node.nodetype == 'UAObject':
24 1
                self.add_object(node)
25 1
            elif node.nodetype == 'UAObjectType':
26
                self.add_object_type(node)
27 1
            elif node.nodetype == 'UAVariable':
28 1
                self.add_variable(node)
29
            elif node.nodetype == 'UAVariableType':
30
                self.add_variable_type(node)
31
            elif node.nodetype == 'UAReferenceType':
32
                self.add_reference(node)
33
            elif node.nodetype == 'UADataType':
34
                self.add_datatype(node)
35
            elif node.nodetype == 'UAMethod':
36
                self.add_method(node)
37
            else:
38
                self.logger.info("Not implemented node type: %s ", node.nodetype)
39
40 1
    def _get_node(self, obj):
41 1
        node = ua.AddNodesItem()
42 1
        node.RequestedNewNodeId = ua.NodeId.from_string(obj.nodeid)
43 1
        node.BrowseName = ua.QualifiedName.from_string(obj.browsename)
44 1
        node.NodeClass = getattr(ua.NodeClass, obj.nodetype[2:])
45 1
        if obj.parent:
46 1
            node.ParentNodeId = ua.NodeId.from_string(obj.parent)
47 1
        if obj.parentlink:
48 1
            node.ReferenceTypeId = self.to_nodeid(obj.parentlink)
49 1
        if obj.typedef:
50 1
            node.TypeDefinition = ua.NodeId.from_string(obj.typedef)
51 1
        return node
52
53 1
    def to_nodeid(self, nodeid):
54 1
        if not nodeid:
55
            return ua.NodeId(ua.ObjectIds.String)
56 1
        elif "=" in nodeid:
57
            return ua.NodeId.from_string(nodeid)
58
        elif hasattr(ua.ObjectIds, nodeid):
59 1
            return ua.NodeId(getattr(ua.ObjectIds, nodeid))
60
        else:
61 1
            if nodeid in self.parser.aliases:
62 1
                nodeid = self.parser.aliases[nodeid]
63 1
            else:
64
                nodeid = "i={}".format(getattr(ua.ObjectIds, nodeid))
65
            return ua.NodeId.from_string(nodeid)
66 1
67
    def add_object(self, obj):
68 1
        node = self._get_node(obj)
69
        attrs = ua.ObjectAttributes()
70 1
        if obj.desc:
71 1
            attrs.Description = ua.LocalizedText(obj.desc)
72 1
        attrs.DisplayName = ua.LocalizedText(obj.displayname)
73 1
        attrs.EventNotifier = obj.eventnotifier
74 1
        node.NodeAttributes = attrs
75 1
        self.server.add_nodes([node])
76 1
        self._add_refs(obj)
77 1
78 1
    def add_object_type(self, obj):
79 1
        node = self._get_node(obj)
80
        attrs = ua.ObjectTypeAttributes()
81 1
        if obj.desc:
82
            attrs.Description = ua.LocalizedText(obj.desc)
83
        attrs.DisplayName = ua.LocalizedText(obj.displayname)
84
        attrs.IsAbstract = obj.abstract
85
        node.NodeAttributes = attrs
86
        self.server.add_nodes([node])
87
        self._add_refs(obj)
88
89
    def add_variable(self, obj):
90
        node = self._get_node(obj)
91
        attrs = ua.VariableAttributes()
92 1
        if obj.desc:
93 1
            attrs.Description = ua.LocalizedText(obj.desc)
94 1
        attrs.DisplayName = ua.LocalizedText(obj.displayname)
95 1
        attrs.DataType = self.to_nodeid(obj.datatype)
96
        # if obj.value and len(obj.value) == 1:
97 1
        if obj.value is not None:
98 1
            #TODO: If non variant based types grow move it to a seperate function 
99
            attrs.Value = self._add_variable_value(obj, )
100 1
        if obj.rank:
101 1
            attrs.ValueRank = obj.rank
102 1
        if obj.accesslevel:
103 1
            attrs.AccessLevel = obj.accesslevel
104 1
        if obj.useraccesslevel:
105
            attrs.UserAccessLevel = obj.useraccesslevel
106 1
        if obj.minsample:
107
            attrs.MinimumSamplingInterval = obj.minsample
108 1
        if obj.dimensions:
109
            attrs.ArrayDimensions = obj.dimensions
110 1
        node.NodeAttributes = attrs
111
        self.server.add_nodes([node])
112 1
        self._add_refs(obj)
113 1 View Code Duplication
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
114 1
    def _add_variable_value(self, obj):
115
        """
116 1
        Returns the value for a Variable based on the objects valuetype. 
117
        """
118
        if obj.valuetype == 'ListOfLocalizedText':
119
            return ua.Variant([ua.LocalizedText(txt) for txt in obj.value], None)
120
        elif obj.valuetype == 'EnumValueType':
121
            values = []
122
            for ev in obj.value:
123
                enum_value = ua.EnumValueType()
124
                enum_value.DisplayName = ua.LocalizedText(ev['DisplayName'])
125
                enum_value.Description = ua.LocalizedText(ev['Description'])
126
                enum_value.Value = int(ev['Value'])
127
                values.append(enum_value)
128
            return values
129
        elif obj.valuetype == 'Argument':
130
            values = []
131
            for arg in obj.value:
132 View Code Duplication
                argument = ua.Argument()
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
133
                argument.Name = arg['Name']
134
                argument.Description = ua.LocalizedText(arg['Description'])
135 1
                argument.DataType = self.to_nodeid(arg['DataType'])
136
                argument.ValueRank = int(arg['ValueRank'])
137
                argument.ArrayDimensions = arg['ArrayDimensions']
138
                values.append(argument)
139
            return values
140
141
        return ua.Variant(obj.value, getattr(ua.VariantType, obj.valuetype))
142
143
144
    def add_variable_type(self, obj):
145
        node = self._get_node(obj)
146
        attrs = ua.VariableTypeAttributes()
147
        if obj.desc:
148
            attrs.Description = ua.LocalizedText(obj.desc)
149
        attrs.DisplayName = ua.LocalizedText(obj.displayname)
150
        attrs.DataType = self.to_nodeid(obj.datatype)
151
        if obj.value and len(obj.value) == 1:
152
            attrs.Value = obj.value[0]
153 1
        if obj.rank:
154
            attrs.ValueRank = obj.rank
155
        if obj.abstract:
156
            attrs.IsAbstract = obj.abstract
157
        if obj.dimensions:
158
            attrs.ArrayDimensions = obj.dimensions
159
        node.NodeAttributes = attrs
160
        self.server.add_nodes([node])
161
        self._add_refs(obj)
162
163
    def add_method(self, obj):
164
        node = self._get_node(obj)
165
        attrs = ua.MethodAttributes()
166
        if obj.desc:
167
            attrs.Description = ua.LocalizedText(obj.desc)
168
        attrs.DisplayName = ua.LocalizedText(obj.displayname)
169 1
        if obj.accesslevel:
170
            attrs.AccessLevel = obj.accesslevel
171
        if obj.useraccesslevel:
172
            attrs.UserAccessLevel = obj.useraccesslevel
173
        if obj.minsample:
174
            attrs.MinimumSamplingInterval = obj.minsample
175
        if obj.dimensions:
176
            attrs.ArrayDimensions = obj.dimensions
177
        node.NodeAttributes = attrs
178
        self.server.add_nodes([node])
179
        self._add_refs(obj)
180
181 1
    def add_reference(self, obj):
182 1
        node = self._get_node(obj)
183 1
        attrs = ua.ReferenceTypeAttributes()
184
        if obj.desc:
185
            attrs.Description = ua.LocalizedText(obj.desc)
186
        attrs.DisplayName = ua.LocalizedText(obj.displayname)
187
        if obj. inversename:
188
            attrs.InverseName = ua.LocalizedText(obj.inversename)
189
        if obj.abstract:
190
            attrs.IsAbstract = obj.abstract
191
        if obj.symmetric:
192
            attrs.Symmetric = obj.symmetric
193
        node.NodeAttributes = attrs
194
        self.server.add_nodes([node])
195
        self._add_refs(obj)
196
197
    def add_datatype(self, obj):
198
        node = self._get_node(obj)
199
        attrs = ua.DataTypeAttributes()
200
        if obj.desc:
201
            attrs.Description = ua.LocalizedText(obj.desc)
202
        attrs.DisplayName = ua.LocalizedText(obj.displayname)
203
        if obj.abstract:
204
            attrs.IsAbstract = obj.abstract
205
        node.NodeAttributes = attrs
206
        self.server.add_nodes([node])
207
        self._add_refs(obj)
208
209
    def _add_refs(self, obj):
210
        if not obj.refs:
211
            return
212
        refs = []
213
        for data in obj.refs:
214
            ref = ua.AddReferencesItem()
215
            ref.IsForward = True
216
            ref.ReferenceTypeId = self.to_nodeid(data.reftype)
217
            ref.SourceNodeId = ua.NodeId.from_string(obj.nodeid)
218
            ref.TargetNodeClass = ua.NodeClass.DataType
219
            ref.TargetNodeId = ua.NodeId.from_string(data.target)
220
            refs.append(ref)
221
        self.server.add_references(refs)
222