Completed
Pull Request — master (#267)
by
unknown
04:13
created

XmlImporter._add_refs()   A

Complexity

Conditions 3

Size

Total Lines 13

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 0
CRAP Score 12

Importance

Changes 0
Metric Value
cc 3
dl 0
loc 13
ccs 0
cts 0
cp 0
crap 12
rs 9.4285
c 0
b 0
f 0
1
"""
2
add node defined in XML to address space
3
format is the one from opc-ua specification
4
"""
5 1
import logging
6
7
8 1
from opcua import ua
9 1
from opcua.common import xmlparser
10
11
12 1
class XmlImporter(object):
13
14 1
    def __init__(self, server):
15 1
        self.logger = logging.getLogger(__name__)
16 1
        self.parser = None
17 1
        self.server = server
18
19 1
    def import_xml(self, xmlpath, act_server):
20 1
        self.logger.info("Importing XML file %s", xmlpath)
21 1
        self.parser = xmlparser.XMLParser(xmlpath, act_server)
22 1
        for node in self.parser:
23 1
            print(node)
24 1
            print(dir(node))
25 1
            print(node.parent)
26
            if node.nodetype == 'UAObject':
27 1
                self.add_object(node)
28 1
            elif node.nodetype == 'UAObjectType':
29
                self.add_object_type(node)
30
            elif node.nodetype == 'UAVariable':
31
                self.add_variable(node)
32
            elif node.nodetype == 'UAVariableType':
33
                self.add_variable_type(node)
34
            elif node.nodetype == 'UAReferenceType':
35
                self.add_reference(node)
36
            elif node.nodetype == 'UADataType':
37
                self.add_datatype(node)
38
            elif node.nodetype == 'UAMethod':
39
                self.add_method(node)
40 1
            else:
41 1
                self.logger.info("Not implemented node type: %s ", node.nodetype)
42 1
43 1
    def _get_node(self, obj):
44 1
        node = ua.AddNodesItem()
45 1
        node.RequestedNewNodeId = ua.NodeId.from_string(obj.nodeid)
46 1
        node.BrowseName = ua.QualifiedName.from_string(obj.browsename)
47 1
        node.NodeClass = getattr(ua.NodeClass, obj.nodetype[2:])
48 1
        if obj.parent:
49 1
            node.ParentNodeId = ua.NodeId.from_string(obj.parent)
50 1
        if obj.parentlink:
51 1
            node.ReferenceTypeId = self.to_nodeid(obj.parentlink)
52
        if obj.typedef:
53 1
            node.TypeDefinition = ua.NodeId.from_string(obj.typedef)
54 1
        return node
55
56 1
    def to_nodeid(self, nodeid):
57
        if not nodeid:
58
            return ua.NodeId(ua.ObjectIds.String)
59 1
        elif "=" in nodeid:
60
            return ua.NodeId.from_string(nodeid)
61 1
        elif hasattr(ua.ObjectIds, nodeid):
62 1
            return ua.NodeId(getattr(ua.ObjectIds, nodeid))
63 1
        else:
64
            if nodeid in self.parser.aliases:
65
                nodeid = self.parser.aliases[nodeid]
66 1
            else:
67
                nodeid = "i={}".format(getattr(ua.ObjectIds, nodeid))
68 1
            return ua.NodeId.from_string(nodeid)
69
70 1
    def add_object(self, obj):
71 1
        node = self._get_node(obj)
72 1
        attrs = ua.ObjectAttributes()
73 1
        if obj.desc:
74 1
            attrs.Description = ua.LocalizedText(obj.desc)
75 1
        attrs.DisplayName = ua.LocalizedText(obj.displayname)
76 1
        attrs.EventNotifier = obj.eventnotifier
77 1
        node.NodeAttributes = attrs
78 1
        self.server.add_nodes([node])
79 1
        self._add_refs(obj)
80
81 1
    def add_object_type(self, obj):
82
        node = self._get_node(obj)
83
        attrs = ua.ObjectTypeAttributes()
84
        if obj.desc:
85
            attrs.Description = ua.LocalizedText(obj.desc)
86
        attrs.DisplayName = ua.LocalizedText(obj.displayname)
87
        attrs.IsAbstract = obj.abstract
88
        node.NodeAttributes = attrs
89
        self.server.add_nodes([node])
90
        self._add_refs(obj)
91
92 1
    def add_variable(self, obj):
93 1
        node = self._get_node(obj)
94 1
        attrs = ua.VariableAttributes()
95 1
        if obj.desc:
96
            attrs.Description = ua.LocalizedText(obj.desc)
97 1
        attrs.DisplayName = ua.LocalizedText(obj.displayname)
98 1
        attrs.DataType = self.to_nodeid(obj.datatype)
99
        # if obj.value and len(obj.value) == 1:
100 1
        if obj.value is not None:
101 1
            attrs.Value = self._add_variable_value(obj, )
102 1
        if obj.rank:
103 1
            attrs.ValueRank = obj.rank
104 1
        if obj.accesslevel:
105
            attrs.AccessLevel = obj.accesslevel
106 1
        if obj.useraccesslevel:
107
            attrs.UserAccessLevel = obj.useraccesslevel
108 1
        if obj.minsample:
109
            attrs.MinimumSamplingInterval = obj.minsample
110 1
        if obj.dimensions:
111
            attrs.ArrayDimensions = obj.dimensions
112 1
        node.NodeAttributes = attrs
113 1 View Code Duplication
        self.server.add_nodes([node])
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
114 1
        self._add_refs(obj)
115
116 1
    def _add_variable_value(self, obj):
117
        """
118
        Returns the value for a Variable based on the objects valuetype. 
119
        """
120
        if obj.valuetype == 'ListOfLocalizedText':
121
            return ua.Variant([ua.LocalizedText(txt) for txt in obj.value], None)
122
        elif obj.valuetype == 'EnumValueType':
123
            values = []
124
            for ev in obj.value:
125
                enum_value = ua.EnumValueType()
126
                enum_value.DisplayName = ua.LocalizedText(ev['DisplayName'])
127
                enum_value.Description = ua.LocalizedText(ev['Description'])
128
                enum_value.Value = int(ev['Value'])
129
                values.append(enum_value)
130
            return values
131
        elif obj.valuetype == 'Argument':
132 View Code Duplication
            values = []
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
133
            for arg in obj.value:
134
                argument = ua.Argument()
135 1
                argument.Name = arg['Name']
136
                argument.Description = ua.LocalizedText(arg['Description'])
137
                argument.DataType = self.to_nodeid(arg['DataType'])
138
                argument.ValueRank = int(arg['ValueRank'])
139
                argument.ArrayDimensions = arg['ArrayDimensions']
140
                values.append(argument)
141
            return values
142
143
        return ua.Variant(obj.value, getattr(ua.VariantType, obj.valuetype))
144
145
146
    def add_variable_type(self, obj):
147
        node = self._get_node(obj)
148
        attrs = ua.VariableTypeAttributes()
149
        if obj.desc:
150
            attrs.Description = ua.LocalizedText(obj.desc)
151
        attrs.DisplayName = ua.LocalizedText(obj.displayname)
152
        attrs.DataType = self.to_nodeid(obj.datatype)
153 1
        if obj.value and len(obj.value) == 1:
154
            attrs.Value = obj.value[0]
155
        if obj.rank:
156
            attrs.ValueRank = obj.rank
157
        if obj.abstract:
158
            attrs.IsAbstract = obj.abstract
159
        if obj.dimensions:
160
            attrs.ArrayDimensions = obj.dimensions
161
        node.NodeAttributes = attrs
162
        self.server.add_nodes([node])
163
        self._add_refs(obj)
164
165
    def add_method(self, obj):
166
        node = self._get_node(obj)
167
        attrs = ua.MethodAttributes()
168
        if obj.desc:
169 1
            attrs.Description = ua.LocalizedText(obj.desc)
170
        attrs.DisplayName = ua.LocalizedText(obj.displayname)
171
        if obj.accesslevel:
172
            attrs.AccessLevel = obj.accesslevel
173
        if obj.useraccesslevel:
174
            attrs.UserAccessLevel = obj.useraccesslevel
175
        if obj.minsample:
176
            attrs.MinimumSamplingInterval = obj.minsample
177
        if obj.dimensions:
178
            attrs.ArrayDimensions = obj.dimensions
179
        node.NodeAttributes = attrs
180
        self.server.add_nodes([node])
181 1
        self._add_refs(obj)
182 1
183 1
    def add_reference(self, obj):
184
        node = self._get_node(obj)
185
        attrs = ua.ReferenceTypeAttributes()
186
        if obj.desc:
187
            attrs.Description = ua.LocalizedText(obj.desc)
188
        attrs.DisplayName = ua.LocalizedText(obj.displayname)
189
        if obj. inversename:
190
            attrs.InverseName = ua.LocalizedText(obj.inversename)
191
        if obj.abstract:
192
            attrs.IsAbstract = obj.abstract
193
        if obj.symmetric:
194
            attrs.Symmetric = obj.symmetric
195
        node.NodeAttributes = attrs
196
        self.server.add_nodes([node])
197
        self._add_refs(obj)
198
199
    def add_datatype(self, obj):
200
        node = self._get_node(obj)
201
        attrs = ua.DataTypeAttributes()
202
        if obj.desc:
203
            attrs.Description = ua.LocalizedText(obj.desc)
204
        attrs.DisplayName = ua.LocalizedText(obj.displayname)
205
        if obj.abstract:
206
            attrs.IsAbstract = obj.abstract
207
        node.NodeAttributes = attrs
208
        self.server.add_nodes([node])
209
        self._add_refs(obj)
210
211
    def _add_refs(self, obj):
212
        if not obj.refs:
213
            return
214
        refs = []
215
        for data in obj.refs:
216
            ref = ua.AddReferencesItem()
217
            ref.IsForward = True
218
            ref.ReferenceTypeId = self.to_nodeid(data.reftype)
219
            ref.SourceNodeId = ua.NodeId.from_string(obj.nodeid)
220
            ref.TargetNodeClass = ua.NodeClass.DataType
221
            ref.TargetNodeId = ua.NodeId.from_string(data.target)
222
            refs.append(ref)
223
        self.server.add_references(refs)
224