Completed
Push — master ( 779079...b9b1be )
by Olivier
04:29
created

XMLParser._get_node_id()   A

Complexity

Conditions 3

Size

Total Lines 15

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 9
CRAP Score 3.4098

Importance

Changes 0
Metric Value
cc 3
c 0
b 0
f 0
dl 0
loc 15
ccs 9
cts 14
cp 0.6429
crap 3.4098
rs 9.4285
1
"""
2
parse xml file from opcua-spec
3
"""
4 1
import logging
5 1
import re
6 1
import sys
7
8 1
import xml.etree.ElementTree as ET
9
10
11 1
class NodeData(object):
12
13 1
    def __init__(self):
14 1
        self.nodetype = None
15 1
        self.nodeid = None
16 1
        self.browsename = None
17 1
        self.displayname = None
18 1
        self.symname = None  # FIXME: this param is never used, why?
19 1
        self.parent = None
20 1
        self.parentlink = None
21 1
        self.desc = ""
22 1
        self.typedef = None
23 1
        self.refs = []
24 1
        self.nodeclass = None
25 1
        self.eventnotifier = 0
26
27
        # variable
28 1
        self.datatype = None
29 1
        self.rank = -1  # check default value
30 1
        self.value = None
31 1
        self.valuetype = None
32 1
        self.dimensions = None
33 1
        self.accesslevel = None
34 1
        self.useraccesslevel = None
35 1
        self.minsample = None
36
37
        # referencetype
38 1
        self.inversename = ""
39 1
        self.abstract = "false"
40 1
        self.symmetric = "false"
41
42
        # datatype
43 1
        self.definition = []
44
45
46 1
class RefStruct(object):
47
48 1
    def __init__(self):
49
        self.reftype = None
50
        self.forward = True
51
        self.target = None
52
53
54 1
class XMLParser(object):
55
56 1
    def __init__(self, xmlpath, server):
57 1
        self.server = server  # POC
58 1
        self.logger = logging.getLogger(__name__)
59 1
        self._retag = re.compile(r"(\{.*\})(.*)")
60 1
        self.path = xmlpath
61
        self.aliases = {}
62 1
63 1
        self.tree = ET.parse(xmlpath)
64 1
        self.root = self.tree.getroot()
65
        self.it = None
66 1
67 1
        self.namespaces = {}
68 1
        self._re_nodeid = re.compile(r"^ns=(?P<ns>\d+[^;]*);i=(?P<i>\d+)")
69
70 1
    def __iter__(self):
71 1
        self.it = iter(self.root)
72 1
        return self
73
74
    def __next__(self):
75 1
        while True:
76 1
            if sys.version_info[0] < 3:
77 1
                child = self.it.next()
78
            else:
79
                child = self.it.__next__()
80
            name = self._retag.match(child.tag).groups()[1]
81 1
            if name == "Aliases":
82 1
                for el in child:
83
                    self.aliases[el.attrib["Alias"]] = el.text
84 1
            elif name == 'NamespaceUris':
85
                for ns_index, ns_element in enumerate(child):
86
                    ns_uri = ns_element.text
87 1
                    ns_server_index = self.server.register_namespace(ns_uri)
88 1
                    self.namespaces[ns_index + 1] = (ns_server_index, ns_uri)
89 1
            else:
90 1
                node = self._parse_node(name, child)
91 1
                return node
92 1
93 1
    def next(self):  # support for python2
94 1
        return self.__next__()
95 1
96
    def _get_node_id(self, value):
97 1
        """
98 1
        Check if the nodeid given in the xml model file must be converted
99 1
        to a already existing namespace id based on the files namespace uri
100 1
101 1
        :returns: NodeId (str)
102 1
        """
103
        result = value
104 1
        r_match = self._re_nodeid.search(value)
105
        if r_match:
106 1
            node_ns, node_id = r_match.groups()
107 1
            ns_server = self.namespaces.get(int(node_ns), None)
108
            if ns_server:
109
                result = "ns={};i={}".format(ns_server[0], node_id)
110
        return result
111
112
    def _parse_node(self, name, child):
113
        obj = NodeData()
114
        obj.nodetype = name
115
        for key, val in child.attrib.items():
116
            self._set_attr(key, val, obj)
117
        obj.displayname = obj.browsename  # give a default value to display name
118
        for el in child:
119
            self._parse_tag(el, obj)
120
        return obj
121
122
    def _set_attr(self, key, val, obj):
123
        if key == "NodeId":
124
            obj.nodeid = self._get_node_id(val)
125
        elif key == "BrowseName":
126
            obj.browsename = val
127
        elif key == "SymbolicName":
128 1
            obj.symname = val
129 1
        elif key == "ParentNodeId":
130
            obj.parent = self._get_node_id(val)
131 1
        elif key == "DataType":
132
            obj.datatype = val
133 1
        elif key == "IsAbstract":
134 1
            obj.abstract = val
135 1
        elif key == "EventNotifier":
136 1
            obj.eventnotifier = 1 if val == "1" else 0
137 1
        elif key == "ValueRank":
138 1
            obj.rank = int(val)
139
        elif key == "ArrayDimensions":
140
            obj.dimensions = [int(i) for i in val.split(",")]
141
        elif key == "MinimumSamplingInterval":
142
            obj.minsample = int(val)
143
        elif key == "AccessLevel":
144
            obj.accesslevel = int(val)
145
        elif key == "UserAccessLevel":
146
            obj.useraccesslevel = int(val)
147 1
        elif key == "Symmetric":
148 1
            obj.symmetric = True if val == "true" else False
149 1
        else:
150 1
            self.logger.info("Attribute not implemented: %s:%s", key, val)
151 1
152 1
    def _parse_tag(self, el, obj):
153 1
        tag = self._retag.match(el.tag).groups()[1]
154
155 1
        if tag == "DisplayName":
156
            obj.displayname = el.text
157
        elif tag == "Description":
158
            obj.desc = el.text
159
        elif tag == "References":
160 1
            self._parse_refs(el, obj)
161 1
        elif tag == "Value":
162 1
            self._parse_value(el, obj)
163
        elif tag == "InverseName":
164 1
            obj.inversename = el.text
165
        elif tag == "Definition":
166 1
            for field in el:
167
                obj.definition.append(field)
168
        else:
169
            self.logger.info("Not implemented tag: %s", el)
170
171
    def _parse_value(self, el, obj):
172
        for val in el:
173
            ntag = self._retag.match(val.tag).groups()[1]
174 1
            obj.valuetype = ntag
175 1
            if ntag in ("Int8", "UInt8", "Int16", "UInt16", "Int32", "UInt32", "Int64", "UInt64"):
176 1
                obj.value = int(val.text)
177 1
            elif ntag in ("Float", "Double"):
178 1
                obj.value = float(val.text)
179
            elif ntag in ("Boolean"):
180
                if val.text in ("True", "true", "1", "on", "On"):
181 1
                    obj.value = bool(1)
182 1
                else:
183
                    obj.value = bool(0)
184
            elif ntag in ("ByteString", "String"):
185
                mytext = val.text
186
                if mytext is None:  # support importing null strings
187
                    mytext = ""
188
                mytext = mytext.replace('\n', '').replace('\r', '')
189
                # obj.value.append('b"{}"'.format(mytext))
190
                obj.value = mytext
191
            elif ntag == "ListOfExtensionObject":
192
                obj.value, obj.valuetype = self._parse_list_of_extension_object(el)
193
            elif ntag == "ListOfLocalizedText":
194
                obj.value = self._parse_list_of_localized_text(el)
195
            else:
196
                self.logger.info("Value type not implemented: %s", ntag)
197
198
    def _get_text(self, el):
199
        txt = ""
200
        for text in el.itertext():
201
            txt += text
202
        return txt
203
204
    def _parse_list_of_localized_text(self, el):
205
        value = []
206
        for localized_text_list in el:
207
            for localized_text in localized_text_list:
208
                ntag = self._retag.match(localized_text.tag).groups()[1]
209
                for child in localized_text:
210
                    ntag = self._retag.match(child.tag).groups()[1]
211
                    if ntag == 'Text':
212
                        value.append(self._get_text(child))
213
        return value
214
215
    def _parse_list_of_extension_object(self, el):
216
        '''
217
        Parse a uax:ListOfExtensionObject Value
218
        
219
        Return an array with a value of each uax:ExtensionObject/*/* (each element is convert to a netry in a dict.
220
               also the valuetype is returned. The valuetype is  uax:ExtensionObject/*/tag()
221
        '''
222
        value = []
223
        valuetype = None
224
        for extension_object_list in el:
225
            for extension_object in extension_object_list:
226
                extension_object.find('Body')
227
                for extension_object_part in extension_object:
228
                    ntag = self._retag.match(extension_object_part.tag).groups()[1]
229
                    if ntag == 'Body':
230
                        data = {}
231
                        ntag = self._retag.match(extension_object_part.find('*').tag).groups()[1]
232
                        valuetype = ntag
233
                        for body_item in extension_object_part.findall('*/*'):
234
                            ntag = self._retag.match(body_item.tag).groups()[1]
235
236
                            child = body_item.find('*')
237
                            if child is not None:
238
                                data[ntag] = self._get_text(child)
239
                            else:
240
                                data[ntag] = self._get_text(body_item)
241
                        value.append(data)
242
        return value, valuetype
243
244
    def _parse_refs(self, el, obj):
245
        for ref in el:
246
            if ref.attrib["ReferenceType"] == "HasTypeDefinition":
247
                obj.typedef = self._get_node_id(ref.text)
248
            elif "IsForward" in ref.attrib and ref.attrib["IsForward"] == "false":
249
                # if obj.parent:
250
                    # sys.stderr.write("Parent is already set with: "+ obj.parent + " " + ref.text + "\n")
251
                obj.parent = self._get_node_id(ref.text)
252
                obj.parentlink = ref.attrib["ReferenceType"]
253
            else:
254
                struct = RefStruct()
255
                if "IsForward" in ref.attrib:
256
                    struct.forward = ref.attrib["IsForward"]
257
                struct.target = self._get_node_id(ref.text)
258
                struct.reftype = ref.attrib["ReferenceType"]
259
                obj.refs.append(struct)
260