Total Complexity | 101 |
Total Lines | 581 |
Duplicated Lines | 0 % |
Complex classes like src.docmanager.XmlHandler often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.
Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.
1 | # |
||
33 | class XmlHandler(object): |
||
34 | """An XmlHandler instance represents an XML tree of a file |
||
35 | """ |
||
36 | |||
37 | def __init__(self, filename, stoponerror=True): |
||
38 | """Initializes the XmlHandler class |
||
39 | |||
40 | :param str filename: filename of XML file |
||
41 | """ |
||
42 | logmgr_flog() |
||
43 | log.debug("Initialized a new XML Handler for file %r.", filename) |
||
44 | |||
45 | # general |
||
46 | self._filename = "" |
||
47 | self._buffer = None # StringIO |
||
48 | |||
49 | # file util |
||
50 | self._fileutil = FileUtil(filename) |
||
51 | |||
52 | # prolog |
||
53 | self._offset = 0 |
||
54 | self._header = "" |
||
55 | self._root = "" |
||
56 | self.roottag = "" |
||
57 | |||
58 | # parser |
||
59 | self.__xmlparser = None |
||
60 | self.invalidfile = False |
||
61 | self.fileerror = "" |
||
62 | self.xmlerrorstring = "" |
||
63 | self.stoponerror = stoponerror |
||
64 | |||
65 | # lxml |
||
66 | self.__tree = None |
||
67 | self.__root = None |
||
68 | self.__docmanager = None |
||
69 | |||
70 | # load the file into a StringIO buffer |
||
71 | self._filename = filename |
||
72 | self._buffer = ensurefileobj(self._filename) |
||
73 | |||
74 | # log |
||
75 | self.xmllogerrorstring = "" |
||
76 | |||
77 | # parse the given file with lxml |
||
78 | self.parse() |
||
79 | |||
80 | def parse(self): |
||
81 | """This function parses the whole XML file |
||
82 | """ |
||
83 | logmgr_flog() |
||
84 | |||
85 | # find the prolog of the XML file (everything before the start tag) |
||
86 | try: |
||
87 | prolog = findprolog(self._buffer) |
||
88 | except SAXParseException as err: |
||
89 | self.invalidfile = True |
||
90 | self.fileerror = "<{}:{}> {} in {!r}.".format(\ |
||
91 | err.getLineNumber(), \ |
||
92 | err.getColumnNumber(), \ |
||
93 | err.getMessage(), \ |
||
94 | self.filename,) |
||
95 | |||
96 | if self.stoponerror: |
||
97 | raise DMXmlParseError(self.fileerror, ReturnCodes.E_XML_PARSE_ERROR) |
||
98 | |||
99 | if not self.invalidfile: |
||
100 | # save prolog details |
||
101 | self._offset, self._header, self._root, self._roottag = prolog['offset'], \ |
||
102 | prolog['header'], \ |
||
103 | prolog['root'], \ |
||
104 | prolog['roottag'] |
||
105 | |||
106 | # replace any entities |
||
107 | self.replace_entities() |
||
108 | |||
109 | # register namespace |
||
110 | # etree.register_namespace("dm", "{dm}".format(**NS)) |
||
111 | self.__xmlparser = etree.XMLParser(remove_blank_text=False, |
||
112 | resolve_entities=False, |
||
113 | dtd_validation=False) |
||
114 | |||
115 | # load the file and set a reference to the dm group |
||
116 | try: |
||
117 | self.__tree = etree.parse(self._buffer, self.__xmlparser) |
||
118 | except etree.XMLSyntaxError as err: |
||
119 | self.invalidfile = True |
||
120 | self.fileerror = err.msg |
||
121 | |||
122 | if self.stoponerror: |
||
123 | raise DMXmlParseError(err, ReturnCodes.E_XML_PARSE_ERROR) |
||
124 | |||
125 | if not self.invalidfile: |
||
126 | self.__root = self.__tree.getroot() |
||
127 | |||
128 | try: |
||
129 | check_root_element(self.__root, etree) |
||
130 | except ValueError as err: |
||
131 | self.invalidfile = True |
||
132 | self.fileerror = err |
||
133 | |||
134 | if self.stoponerror: |
||
135 | raise DMXmlParseError(err, ReturnCodes.E_XML_PARSE_ERROR) |
||
136 | |||
137 | if not self.invalidfile: |
||
138 | # check for DocBook 5 namespace in start tag |
||
139 | try: |
||
140 | self.check_docbook5_ns() |
||
141 | |||
142 | # check for docmanager element |
||
143 | self.__docmanager = self.__tree.find("//dm:docmanager", namespaces=NS) |
||
144 | |||
145 | if self.__docmanager is None: |
||
146 | log.info("No docmanager element found") |
||
147 | self.create_group() |
||
148 | else: |
||
149 | log.debug("Found docmanager element %s", self.__docmanager.getparent()) |
||
150 | except DMNotDocBook5File as err: |
||
151 | if self.stoponerror == True: |
||
152 | raise DMNotDocBook5File(err.errorstr, err.error) |
||
153 | |||
154 | def check_docbook5_ns(self): |
||
155 | """Checks if the current file is a valid DocBook 5 file. |
||
156 | """ |
||
157 | rootns = get_namespace(self.__root.tag) |
||
158 | if rootns != NS['d']: |
||
159 | self.invalidfile = True |
||
160 | self.fileerror = "The document is not a valid DocBook 5 document." |
||
161 | raise DMNotDocBook5File(self.fileerror, ReturnCodes.E_NOT_DOCBOOK5_FILE) |
||
162 | |||
163 | def replace_entities(self): |
||
164 | """This function replaces entities in the StringIO buffer |
||
165 | """ |
||
166 | logmgr_flog() |
||
167 | |||
168 | self._buffer.seek(self._offset) |
||
169 | self._buffer = replaceinstream(self._buffer, preserve_entities) |
||
170 | |||
171 | def init_default_props(self, force=False, bugtracker=False): |
||
172 | """Initializes the default properties for the given XML files |
||
173 | |||
174 | :param bool force: Ignore if there are already properties in an |
||
175 | XML - just overwrite them |
||
176 | """ |
||
177 | logmgr_flog() |
||
178 | |||
179 | props = list(DEFAULT_DM_PROPERTIES) |
||
180 | |||
181 | if bugtracker: |
||
182 | for i in BT_ELEMENTLIST: |
||
183 | props.append(i) |
||
184 | |||
185 | ret = 0 |
||
186 | for i in props: |
||
187 | if (i not in self.get(i)) or \ |
||
188 | (self.get(i)[i] is None) or \ |
||
189 | (self.get(i)[i] is not None and force): |
||
190 | self.set({i: ""}) |
||
191 | else: |
||
192 | ret += 1 |
||
193 | return ret |
||
194 | |||
195 | def check_root_element(self): |
||
196 | """Checks if root element is valid""" |
||
197 | logmgr_flog() |
||
198 | |||
199 | tag = etree.QName(self.__root.tag) |
||
200 | if tag.localname not in VALIDROOTS: |
||
201 | raise DMInvalidXMLRootElement("Cannot add info element to file %r. " |
||
202 | "This file does not contain a valid " |
||
203 | "DocBook 5 root element. Found %s", |
||
204 | self._filename, localname(self.__root.tag), |
||
205 | ReturnCodes.E_INVALID_ROOT_ELEMENT) |
||
206 | |||
207 | def create_group(self): |
||
208 | """Creates the docmanager group element""" |
||
209 | logmgr_flog() |
||
210 | |||
211 | #search the info-element if not exists raise an error |
||
212 | info = self.__tree.find("//d:info", namespaces=NS) |
||
213 | # TODO: We need to check for a --force option |
||
214 | if info is None: |
||
215 | log.debug("No <info> element found!") |
||
216 | pos = findinfo_pos(self.__root) |
||
217 | log.debug("Using position %d", pos) |
||
218 | info = etree.Element("{%s}info" % NS["d"]) |
||
219 | info.tail = '\n' |
||
220 | info.text = '\n' |
||
221 | self.__root.insert(pos, info) |
||
222 | |||
223 | log.debug("Adding <info> element in '%s'", self.filename) |
||
224 | |||
225 | log.debug("Adding <dm:docmanager> to <info>") |
||
226 | # dm = etree.Element("{%s}docmanager" % NS["dm"]) |
||
227 | # self.__docmanager = info.insert(0, dm) |
||
228 | self.__docmanager = etree.SubElement(info, |
||
229 | "{{{dm}}}docmanager".format(**NS), |
||
230 | nsmap={'dm': NS['dm']}, |
||
231 | ) |
||
232 | |||
233 | def set(self, pairs): |
||
234 | """Sets the key as element and value as content |
||
235 | |||
236 | :param key: name of the element |
||
237 | :param value: value that this element will contain |
||
238 | |||
239 | If key="foo" and value="bar" you will get: |
||
240 | <foo>bar</foo> |
||
241 | whereas foo belongs to the DocManager namespace |
||
242 | """ |
||
243 | logmgr_flog() |
||
244 | |||
245 | #import pdb |
||
246 | #pdb.set_trace() |
||
247 | |||
248 | dm = self.__docmanager |
||
249 | dmelem = list() |
||
250 | lastnode = dm |
||
251 | |||
252 | for key in pairs: |
||
253 | elemlist = key.split("/") |
||
254 | |||
255 | for e in elemlist: |
||
256 | name = "dm:" + e |
||
257 | |||
258 | dmelem.append(name) |
||
259 | node = dm.find("/".join(dmelem), namespaces=NS) |
||
260 | |||
261 | if node is None: |
||
262 | node = etree.SubElement(lastnode, "{{{dm}}}{key}".format(key=e, **NS)) |
||
263 | |||
264 | lastnode = node |
||
265 | node.text = "" |
||
266 | |||
267 | node.text = pairs[key] |
||
268 | |||
269 | def is_set(self, key, values): |
||
270 | """Checks if element 'key' exists with 'values' |
||
271 | |||
272 | :param str key: the element to search for |
||
273 | :param str values: the value inside the element |
||
274 | |||
275 | :return: if conditions are met |
||
276 | :rtype: bool |
||
277 | """ |
||
278 | logmgr_flog() |
||
279 | |||
280 | #check if the key has on of the given values |
||
281 | element = self.__docmanager.find("./dm:"+key, |
||
282 | namespaces=NS) |
||
283 | if self.is_prop_set(key) is True and element.text in values: |
||
284 | return True |
||
285 | |||
286 | return False |
||
287 | |||
288 | def is_prop_set(self, prop): |
||
289 | """ |
||
290 | Checks if a property is set in an XML element |
||
291 | |||
292 | :param str prop: the property |
||
293 | |||
294 | :return: if property is set |
||
295 | :rtype: bool |
||
296 | """ |
||
297 | logmgr_flog() |
||
298 | |||
299 | element = self.__docmanager.find("./dm:{}".format(prop), namespaces=NS) |
||
300 | if element is not None: |
||
301 | return True |
||
302 | |||
303 | return False |
||
304 | |||
305 | def set_attr(self, prop, data): |
||
306 | """Sets an attribute for a property |
||
307 | :param str prop: The property |
||
308 | :param dict data: A dictionary of attributes and values |
||
309 | example: {"attr1": "val1", "attr2": "val2"} |
||
310 | """ |
||
311 | node = self.find_elem(prop) |
||
312 | |||
313 | if node is None: |
||
314 | raise DMPropertyNotFound(self.filename, prop) |
||
315 | |||
316 | for i in data: |
||
317 | node.set(i, data[i]) |
||
318 | |||
319 | def del_attr(self, prop, data): |
||
320 | """Deletes one or more attributes of a property |
||
321 | :param str prop: The property |
||
322 | :param list data: A list of all attributes |
||
323 | """ |
||
324 | node = self.find_elem(prop) |
||
325 | |||
326 | if node is None: |
||
327 | raise DMPropertyNotFound(self.filename, prop) |
||
328 | |||
329 | errors = [] |
||
330 | for i in data: |
||
331 | try: |
||
332 | del node.attrib[i] |
||
333 | except KeyError: |
||
334 | errors.append(i) |
||
335 | |||
336 | return errors |
||
337 | |||
338 | def get_attr(self, props, data): |
||
339 | """Gets one or more attributes of a property |
||
340 | :param list props: The properties |
||
341 | :param list data: A list of all attributes |
||
342 | """ |
||
343 | attrs = OrderedDict() |
||
344 | nodes = [] |
||
345 | |||
346 | if props: |
||
347 | for prop in props: |
||
348 | attrs[prop] = OrderedDict() |
||
349 | node = self.find_elem(prop) |
||
350 | |||
351 | if node is not None: |
||
352 | nodes.append((localname(node.tag), node)) |
||
353 | else: |
||
354 | for idx, i in enumerate(self.__docmanager.iter()): |
||
355 | # this is needed because otherwise we also get the "docmanager" |
||
356 | # element |
||
357 | if idx: |
||
358 | xpath = get_property_xpath(i) |
||
359 | |||
360 | attrs[xpath] = OrderedDict() |
||
361 | nodes.append((xpath, i)) |
||
362 | |||
363 | for node in nodes: |
||
364 | prop = node[0] |
||
365 | elem = node[1] |
||
366 | |||
367 | if data: |
||
368 | for i in data: |
||
369 | try: |
||
370 | attrs[prop][i] = elem.attrib[i] |
||
371 | except KeyError: |
||
372 | pass |
||
373 | else: |
||
374 | for i in elem.attrib: |
||
375 | attrs[prop][i] = elem.attrib[i] |
||
376 | |||
377 | return attrs |
||
378 | |||
379 | def get(self, keys=None): |
||
380 | """Returns all matching values for a key in docmanager element |
||
381 | |||
382 | :param key: localname of element to search for |
||
383 | :type key: string, list, tuple, or None |
||
384 | :return: the values |
||
385 | :rtype: dict |
||
386 | """ |
||
387 | logmgr_flog() |
||
388 | |||
389 | if len(keys) == 0: |
||
390 | return self.get_all() |
||
391 | |||
392 | dm = self.__docmanager |
||
393 | dmelem = list() |
||
394 | values = OrderedDict() |
||
395 | |||
396 | if not isinstance(keys, list): |
||
397 | keys = [ keys ] |
||
398 | |||
399 | for key in keys: |
||
400 | elemlist = key.split("/") |
||
401 | dmelem = list() |
||
402 | |||
403 | for e in elemlist: |
||
404 | name = "dm:" + e |
||
405 | |||
406 | dmelem.append(name) |
||
407 | node = dm.find("/".join(dmelem), namespaces=NS) |
||
408 | |||
409 | if node is None: |
||
410 | break |
||
411 | |||
412 | values.update({key: None if node is None else node.text}) |
||
413 | |||
414 | return values |
||
415 | |||
416 | def get_all(self): |
||
417 | """Returns all keys and values in a docmanager xml file |
||
418 | """ |
||
419 | logmgr_flog() |
||
420 | |||
421 | ret = OrderedDict() |
||
422 | for idx, i in enumerate(self.__docmanager.iter()): |
||
423 | # we want to skip the "docmanager" element here |
||
424 | if idx: |
||
425 | xpath = get_property_xpath(i) |
||
426 | ret[xpath] = i.text |
||
427 | |||
428 | return ret |
||
429 | |||
430 | def delete(self, key, condition=None): |
||
431 | """Deletes an element inside docmanager element |
||
432 | |||
433 | :param str key: element name to delete |
||
434 | :param str condition: the condition for the deletion (the var condition has to be equal with the property value) |
||
435 | :return boolean: True = success | False = no property has been deleted |
||
436 | """ |
||
437 | logmgr_flog() |
||
438 | |||
439 | key = key.split("/") |
||
440 | lastnode = None |
||
441 | |||
442 | key_handler = self.__docmanager.find("dm:{}".format(key[0]), namespaces=NS) |
||
443 | |||
444 | for idx, prop in enumerate(key): |
||
445 | if lastnode is not None: |
||
446 | key_handler = lastnode.find("dm:{}".format(prop), namespaces=NS) |
||
447 | |||
448 | lastnode = key_handler |
||
449 | |||
450 | if key_handler is None: |
||
451 | break |
||
452 | |||
453 | if idx == len(key)-1: |
||
454 | if condition is not None: |
||
455 | if condition != key_handler.text: |
||
456 | break |
||
457 | |||
458 | key_handler.getparent().remove(key_handler) |
||
459 | return True |
||
460 | |||
461 | return False |
||
462 | |||
463 | def find_elem(self, prop): |
||
464 | """Searches for the an XML element |
||
465 | :param str prop: The property |
||
466 | :return lxml.etree._Element: |
||
467 | """ |
||
468 | props = prop.split("/") |
||
469 | |||
470 | dm = self.__docmanager |
||
471 | lastnode = None |
||
472 | |||
473 | for i in props: |
||
474 | if lastnode is None: |
||
475 | lastnode = dm |
||
476 | |||
477 | lastnode = lastnode.find("dm:{}".format(i), namespaces=NS) |
||
478 | |||
479 | if lastnode is None: |
||
480 | return None |
||
481 | |||
482 | return lastnode |
||
483 | |||
484 | def get_indentation(self, node, indentation=""): |
||
485 | """Calculates indentation level |
||
486 | |||
487 | :param lxml.etree._Element node: node where to start |
||
488 | :param str indentation: Additional indentation |
||
489 | """ |
||
490 | logmgr_flog() |
||
491 | |||
492 | indent = "" |
||
493 | if node is not None: |
||
494 | indent = "".join(["".join(n.tail.split("\n")) |
||
495 | for n in node.iterancestors() |
||
496 | if n.tail is not None ]) |
||
497 | return indent+indentation |
||
498 | |||
499 | def indent_dm(self): |
||
500 | """Indents only dm:docmanager element and its children""" |
||
501 | logmgr_flog() |
||
502 | |||
503 | dmindent=' ' |
||
504 | dm = self.__tree.find("//dm:docmanager", |
||
505 | namespaces=NS) |
||
506 | log.debug("dm is %s", dm) |
||
507 | if dm is None: |
||
508 | return |
||
509 | log.debug("-----") |
||
510 | info = dm.getparent() #.getprevious() |
||
511 | log.info("info: %s", info) |
||
512 | prev = info.getprevious() |
||
513 | log.info("prev: %s", prev) |
||
514 | parent = info.getparent() |
||
515 | log.info("parent of info: %s", parent) |
||
516 | log.info("child of info: %s", info.getchildren()) |
||
517 | |||
518 | if info.tail is None: |
||
519 | info.tail = "" |
||
520 | |||
521 | infoindent = "".join(info.tail.split('\n')) |
||
522 | prev = dm.getprevious() |
||
523 | #log.info("prev: %s", prev) |
||
524 | if prev is not None: |
||
525 | log.info("prev: %s", prev) |
||
526 | prev.tail = '\n' + infoindent |
||
527 | indent=self.get_indentation(dm.getprevious()) |
||
528 | dm.text = '\n' + indent + ' ' |
||
529 | dm.tail = '\n' + infoindent |
||
530 | for node in dm.iterchildren(): |
||
531 | i = dmindent if node.getnext() is not None else '' |
||
532 | node.tail = '\n' + indent + i |
||
533 | |||
534 | def write(self): |
||
535 | """Write XML tree to original filename""" |
||
536 | logmgr_flog() |
||
537 | |||
538 | # Only indent docmanager child elements |
||
539 | self.indent_dm() |
||
540 | |||
541 | log.debug("root: %s", repr(self._root)) |
||
542 | with open(self._filename, 'w') as f: |
||
543 | info = self.__root.find("d:info", namespaces=NS) |
||
544 | |||
545 | xml_indent(info, 2) |
||
546 | content = recover_entities(etree.tostring(self.__tree, \ |
||
547 | encoding='unicode', \ |
||
548 | # doctype=self._header.rstrip()) |
||
549 | )) |
||
550 | # self._offset, self._header, self._root, self._roottag |
||
551 | starttag = compilestarttag(self._roottag) |
||
552 | content = starttag.sub(lambda _: self._root.rstrip(), content, 1) |
||
553 | |||
554 | # log.debug("content: %s", repr(content)) |
||
555 | f.write(self._header.rstrip()+"\n" + content) |
||
556 | |||
557 | @property |
||
558 | def filename(self): |
||
559 | """Returns filename of the input source |
||
560 | |||
561 | :return: filename |
||
562 | :rtype: str |
||
563 | """ |
||
564 | # return self.__tree.docinfo.URL |
||
565 | return self._filename |
||
566 | |||
567 | @filename.setter |
||
568 | def filename(self, _): |
||
569 | raise ValueError("filename is only readable") |
||
570 | @filename.deleter |
||
571 | def filename(self): |
||
572 | raise ValueError("filename cannot be deleted") |
||
573 | |||
574 | @property |
||
575 | def tree(self): |
||
576 | """Return our parsed tree object |
||
577 | |||
578 | :return: tree object |
||
579 | :rtype: lxml.etree._ElementTree |
||
580 | """ |
||
581 | return self.__tree |
||
582 | |||
583 | @tree.setter |
||
584 | def tree(self, _): |
||
585 | raise ValueError("tree is only readable") |
||
586 | @tree.deleter |
||
587 | def tree(self): |
||
588 | raise ValueError("tree cannot be deleted") |
||
589 | |||
590 | @property |
||
591 | def root(self): |
||
592 | """Returns the root element of the XML tree |
||
593 | |||
594 | :return: root element |
||
595 | :rtype: lxml.etree._Element |
||
596 | """ |
||
597 | return self.__root |
||
598 | |||
599 | @root.setter |
||
600 | def root(self, _): |
||
601 | raise ValueError("root is only readable") |
||
602 | |||
603 | @root.deleter |
||
604 | def root(self): |
||
605 | raise ValueError("root cannot be deleted") |
||
606 | |||
607 | @property |
||
608 | def dm(self): |
||
609 | return self.__docmanager |
||
610 | |||
611 | @property |
||
612 | def fileutil(self): |
||
613 | return self._fileutil |
||
614 |