LocatingWrapper   A
last analyzed

Complexity

Total Complexity 5

Size/Duplication

Total Lines 33
Duplicated Lines 0 %

Importance

Changes 4
Bugs 1 Features 0
Metric Value
c 4
b 1
f 0
dl 0
loc 33
rs 10
wmc 5

4 Methods

Rating   Name   Duplication   Size   Complexity  
A read() 0 7 2
A where() 0 10 1
A close() 0 3 1
A __init__() 0 6 1
1
#
2
# Copyright (c) 2014-2015 SUSE Linux GmbH
3
#
4
# This program is free software; you can redistribute it and/or
5
# modify it under the terms of version 3 of the GNU General Public License as
6
# published by the Free Software Foundation.
7
#
8
# This program is distributed in the hope that it will be useful,
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11
# GNU General Public License for more details.
12
#
13
# You should have received a copy of the GNU General Public License
14
# along with this program; if not, contact SUSE LLC.
15
#
16
# To contact SUSE about this file by physical or electronic mail,
17
# you may find current contact information at www.suse.com
18
19
import re
20
import sys
21
import xml.sax
22
from collections import namedtuple
23
from docmanager.core import NS, ReturnCodes, VALIDROOTS
24
from docmanager.exceptions import DMInvalidXMLRootElement, \
25
                                  DMFileNotFoundError
26
from docmanager.logmanager import log, logmgr_flog
27
from io import StringIO
28
from itertools import accumulate
29
30
# -------------------------------------------------------------------
31
# Regular Expressions
32
33
ENTS = re.compile("(&([\w_\.-]+);)")
34
STEN = re.compile("(\[\[\[(\#?[\w_\.-]+)\]\]\])")
35
NAMESPACE_REGEX = re.compile("\{(?P<ns>.*)\}(?P<local>[-a-zA-Z0-9._]+)")
36
37
38
def ent2txt(match, start="[[[", end="]]]"):
39
    """Replace any &text; -> [[[text]]]
40
41
    :param _sre.SRE_Match match: match object from re
42
    :param str start: Start string of entity replacement
43
    :param str end:   end string
44
    :return: replaced string
45
    :rtype: str
46
    """
47
    logmgr_flog()
48
49
    if match:
50
        return "{}{}{}".format(start,
51
                               match.group(2),
52
                               end)
53
54
55
def txt2ent(match):
56
    """Replace any [[[text]]] -> &text;
57
58
    :param _sre.SRE_Match match: match object from re
59
    :return: replaced string
60
    :rtype: str
61
    """
62
    logmgr_flog()
63
64
    if match:
65
        return "&{};".format(match.group(2))
66
67
68
def preserve_entities(text):
69
    """Preserve any entities in text
70
71
    :param str text: the text that should preserve entities
72
    :return: the preserved text
73
    :rtype: str
74
    """
75
    logmgr_flog()
76
77
    return ENTS.sub(ent2txt, text)
78
79
80
def recover_entities(text):
81
    """Recover any preserved entities in text
82
83
    :param str text: the text that should recover entities
84
    :return: the recovered text
85
    :rtype: str
86
    """
87
    logmgr_flog()
88
89
    return STEN.sub(txt2ent, text)
90
91
92
def replaceinstream(stream, func):
93
    """Preserve or restore any entities in a stream or file-like object
94
       depending on the function `func`
95
96
    :param stream: iterable stream or file-like object
97
    :param func: replacement function, signature: func(text)
98
    :return: another stream with replaced entities
99
    :rtype: StringIO
100
    """
101
    logmgr_flog()
102
103
    result = StringIO()
104
105
    for line in stream:
106
        result.write(func(line))
107
108
    result.seek(0)
109
    return result
110
111
def check_root_element(rootelem, etree):
112
    """Checks if root element is valid
113
114
    :param object: root element (object)
115
    :param object: etree element (etree object)"""
116
    logmgr_flog()
117
118
    tag = etree.QName(rootelem.tag)
119
    if tag.localname not in VALIDROOTS:
120
        raise DMInvalidXMLRootElement("Cannot add info element to %s. "
121
                                      "Not a valid root element." % tag.localname,
122
                                      ReturnCodes.E_INVALID_ROOT_ELEMENT)
123
124
# -------------------------------------------------------------------
125
126
def is_xml(text):
127
    """Checks if a text starts with a typical XML construct
128
129
       :param str text: The text to observe
130
       :return: True, if text can be considered as XML, otherwise False
131
       :rtype: bool
132
    """
133
    logmgr_flog()
134
135
    possiblestartstrings = (re.compile("<\?xml"),
136
                            re.compile("<!DOCTYPE"),
137
                            re.compile("<!--",),
138
                            re.compile(r'<(?P<tag>(?:(?P<prefix>\w+):)?'
139
                                        '(?P<name>[a-zA-Z0-9_]+))\s*'),
140
                           )
141
    result = False
142
    for matcher in possiblestartstrings:
143
        match = matcher.search(text)
144
        if match:
145
            result = True
146
            break
147
    return result
148
149
150
def findinfo_pos(root):
151
    """Find the position where to insert the <info> element
152
153
    :return: position where to insert <info>
154
    :rtype: int
155
    """
156
    logmgr_flog()
157
158
    titles = root.xpath("(d:title|d:subtitle|d:titleabbrev)[last()]",
159
                        namespaces=NS)
160
    if not titles:
161
        # Just in case we didn't find any titles at all, return null
162
        return 0
163
164
    return root.index(titles[0]) + 1
165
166
167
# -------------------------------------------------------------------
168
169
def ensurefileobj(source):
170
    """Return a file(-like) object, regardless if it's a another
171
       file-object, a filename, or a string
172
173
       :param source: filename, file-like object, or string
174
       :return: StringIO or file-like object
175
    """
176
    logmgr_flog()
177
178
    # StringIO support:
179
    if hasattr(source, 'getvalue') and hasattr(source, 'tell'):
180
        # we return the source
181
        return source
182
    elif isinstance(source, (str, bytes)):
183
        if is_xml(source):
184
            return StringIO(source)
185
        else:
186
            # source isn't a file-like object nor starts with XML structure
187
            # so it has to be a filename
188
            try:
189
                res = StringIO(open(source, 'r').read())
190
            except FileNotFoundError as err: # pylint:disable=undefined-variable
191
                raise DMFileNotFoundError("Could not find file {!r}.".format(err.filename),
192
                                          err.filename, ReturnCodes.E_FILE_NOT_FOUND)
193
            # pylint: enable=undefined-variable
194
195
            return res
196
    # TODO: Check if source is an URL; should we allow this?
197
198
199
# -------------------------------------------------------------------
200
# Helper functions
201
202
def localname(tag):
203
    """Returns the local name of an element
204
205
    :param str tag: Usually in the form of {http://docbook.org/ns/docbook}article
206
    :return:  local name
207
    :rtype:  str
208
    """
209
    logmgr_flog()
210
211
    m = NAMESPACE_REGEX.search(tag)
212
    if m:
213
        return m.groupdict()['local']
214
    else:
215
        return tag
216
217
def get_namespace(tag):
218
    """Returns the namespace of an element
219
220
    :param str tag: Usually in the form of {http://docbook.org/ns/docbook}article
221
    :return:        namespace of the element
222
    :rtype:         str
223
    """
224
    logmgr_flog()
225
226
    m = NAMESPACE_REGEX.search(tag)
227
    if m:
228
        return m.groupdict()['ns']
229
    else:
230
        return ''
231
232
def compilestarttag(roottag=None):
233
    """Compile a regular expression for start tags like <article> or
234
       <d:book> with or without any  attributes
235
236
       :param str roottag: Name of roottag or None, for a general tag
237
       :return: a pattern object
238
       :rtype: _sre.SRE_Pattern
239
    """
240
    logmgr_flog()
241
242
    # Taken from the xmllib.py
243
    # http://code.metager.de/source/xref/python/jython/lib-python/2.7/xmllib.py
244
    _S = '[ \t\r\n]+'                       # white space
245
    _opS = '[ \t\r\n]*'                     # optional white space
246
    _Name = '[a-zA-Z_:][-a-zA-Z0-9._:]*'    # valid XML name
247
    _QStr = "(?:'[^']*'|\"[^\"]*\")"        # quoted XML string
248
    attrfind = re.compile(
249
        _S + '(?P<name>' + _Name + ')'
250
        '(' + _opS + '=' + _opS +
251
        '(?P<value>' + _QStr + '|[-a-zA-Z0-9.:+*%?!\(\)_#=~]+))?')
252
    starttagend = re.compile(_opS + '(?P<slash>/?)>')
253
    if roottag:
254
        root = '<(?P<tagname>' + roottag + ')'
255
    else:
256
        root = '<(?P<tagname>' + _Name + ')'
257
    return re.compile(root + '(?P<attrs>(?:' + attrfind.pattern + ')*)' +
258
                      starttagend.pattern)
259
260
261
# -------------
262
263
class LocatingWrapper(object):
264
    """Holds a table which are used to transform line and column position
265
       into offset
266
    """
267
    def __init__(self, f):
268
        logmgr_flog()
269
270
        self.f = f
271
        self.offset = [0]
272
        self.curoffs = 0
273
274
    def read(self, *a):
275
        """Read data"""
276
        logmgr_flog()
277
278
        data = self.f.read(*a)
279
        self.offset.extend(accumulate(len(m)+1 for m in data.split('\n')))
280
        return data
281
282
    def where(self, locator):
283
        """Returns the offset from line and column
284
285
        :param locator: locator object
286
        :return: offset
287
        :rtype:  int
288
        """
289
        logmgr_flog()
290
291
        return self.offset[locator.getLineNumber() - 1] + locator.getColumnNumber()
292
293
    def close(self):
294
        """Close the locator"""
295
        logmgr_flog()
296
        # Normally, we would close our file(-alike) object and call
297
        #   self.f.close()
298
        # However, we do nothing
299
300
301
302
class Handler(xml.sax.handler.ContentHandler):
303
    """ContentHandler to watch for start and end elements. Needed to
304
       get the location of all the elements
305
    """
306
    def __init__( self, context, locator):
307
        logmgr_flog()
308
        super().__init__()# pylint:disable=super-on-old-class
309
        self.context = context
310
        self.locstm = locator
311
        self.pos = namedtuple('Position', ['line', 'col', 'offset'])
312
313
    def setDocumentLocator(self, locator):
314
        """Called by the parser to give the application a locator for
315
           locating the origin of document events.
316
317
        :param LocatingWrapper loc: LocatingWrapper object
318
        """
319
        logmgr_flog()
320
321
        self.loc = locator
322
323
    def startElement(self, name, attrs):
324
        """Signals the start of an element in non-namespace mode
325
326
        :param str name:  XML 1.0 Name of the element
327
        :param Attributes attrs: attributes of the current element
328
        """
329
        logmgr_flog()
330
331
        ctxlen = len(self.context)
332
        # We are only interested in the first two start tags
333
        if ctxlen < 2:
334
            current = self.locstm.where(self.loc)
335
            pos = self.pos(self.loc.getLineNumber(), \
336
                         self.loc.getColumnNumber(), \
337
                         current)
338
            self.context.append(["%s" % name, pos])
339
340
    def endElement(self, name):
341
        """Signals the end of an element in non-namespace mode
342
343
        :param str name:  XML 1.0 Name of the element
344
        """
345
        logmgr_flog()
346
347
        eline = self.loc.getLineNumber()
348
        ecol = self.loc.getColumnNumber()
349
        last = self.locstm.where(self.loc)
350
        pos = self.pos(line=eline, col=ecol, offset=last)
351
352
        # save the position of an end tag and add '/' in front of the
353
        # name to distinguish it from a start tag
354
        self.context.append(["/%s" % name, pos])
355
356
    def processingInstruction(self, target, data):
357
        """Receive notification of a processing instruction (PI)
358
359
        :param str target: the target of the PI
360
        :param str data:   the data of the PI
361
        """
362
        logmgr_flog()
363
364
        ctxlen = len(self.context)
365
        # Only append PIs when it's NOT before start-tag
366
        if ctxlen:
367
            current = self.locstm.where(self.loc)
368
            pos = self.pos(self.loc.getLineNumber(), \
369
                            self.loc.getColumnNumber(), \
370
                            current)
371
            self.context.append(["?%s" % target, pos])
372
373
    def comment(self, text): # pylint: disable=unused-argument
374
        """Signals an XML comment
375
376
        :param str text: text content of the XML comment
377
        """
378
        logmgr_flog()
379
380
        ctxlen = len(self.context)
381
        # We are only interested in the first two start tags
382
        if ctxlen:
383
            current = self.locstm.where(self.loc)
384
            pos = self.pos(self.loc.getLineNumber(), \
385
                           self.loc.getColumnNumber(), \
386
                           current)
387
            self.context.append(["-- comment", pos])
388
389
    # From LexicalParser
390
    def startCDATA(self):
391
        """Signals a CDATA section"""
392
        logmgr_flog()
393
394
    endCDATA = startCDATA
395
396
    def startDTD(self,  doctype, publicID, systemID): # pylint:disable=unused-argument
397
        """Signals the start of an DTD declaration
398
399
        :param  doctype: name of the root element
400
        :param publicID: public identifier (or empty)
401
        :param systemID: system identifier (or empty)
402
        """
403
        logmgr_flog()
404
405
    def endDTD(self):
406
        """Reports the end of a DTD declaration"""
407
        logmgr_flog()
408
409
    def startEntity(self, name):  # pylint: disable=unused-argument
410
        """Reports the start of an entity"""
411
        logmgr_flog()
412
413
414
def findprolog(source, maxsize=-1):
415
    """Returns a dictionary with essential information about the prolog
416
417
    :param source:
418
    :type source: source, file object, or file-like object
419
                  expected to be well-formed
420
    :param int maxize: Maximum size of bytes to read into XML buffer
421
    :return: { 'header': '...', # str everything before the start tag
422
               'root':   '...', # str: start tag from '<' til '>'
423
               'offset:  1,     # Integer
424
             }
425
    :rtype: dict
426
    """
427
    logmgr_flog()
428
429
    result = {}
430
431
    # context is used to save our locations
432
    context = []
433
434
    buf = ensurefileobj(source)
435
    # We read in maxsize and hope this is enough...
436
    xmlbuf = buf.read(maxsize)
437
    buf.seek(0)
438
    locstm = LocatingWrapper(buf)
439
    parser = xml.sax.make_parser()
440
441
    # Disable certain features:
442
    # no validation, no external general and parameter entities
443
    parser.setFeature(xml.sax.handler.feature_validation, False)
444
    parser.setFeature(xml.sax.handler.feature_external_ges, False)
445
    parser.setFeature(xml.sax.handler.feature_external_pes, False)
446
447
    handler = Handler(context, locstm)
448
    parser.setProperty(xml.sax.handler.property_lexical_handler, handler);
449
450
    parser.setContentHandler(handler)
451
    parser.parse(locstm)
452
453
    first = context[0]
454
    soffset = first[1].offset
455
    doctype = xmlbuf[:soffset]
456
457
    # Check if we have reached the "end tag" (symbolized with '/' in
458
    # its first character).
459
    # If yes, start and end tag is on the same line and we can use the
460
    # last entry.
461
    # If not, we need to look in the next entry
462
    if context[1][0][0] == '/':
463
        last = context[-1]
464
    elif context[1][0][0] ==  '-':
465
        last = context[1]
466
    else:
467
        last = context[1]
468
469
    eoffset = last[1].offset
470
    starttag = xmlbuf[soffset:eoffset].rstrip(' ')
471
472
    result['header'] = doctype
473
    result['root'] = starttag
474
    result['offset'] = len(doctype)
475
    result['roottag'] = context[0][0]
476
477
    return result
478
479
def xml_indent(elem, level=0):
480
    """Indent XML elements
481
482
    :param lxml.etree._Element elem: XML Element to indent
483
    :param int level: indentation level
484
    """
485
486
    i = "\n" + level*"  "
487
    if len(elem):
488
        if not elem.text or not elem.text.strip():
489
            elem.text = i + "  "
490
        if not elem.tail or not elem.tail.strip():
491
            elem.tail = i
492
        for elem in elem:
493
            xml_indent(elem, level+1)
494
        if not elem.tail or not elem.tail.strip():
495
            elem.tail = i
496
    else:
497
        if level and (not elem.tail or not elem.tail.strip()):
498
            elem.tail = i
499
500
def get_property_xpath(elem):
501
    """Gets the xpath of an lxml.etree._Element
502
    :param lxml.etree._Element elem: An etree element
503
    :return str: XPath of the given element
504
    """
505
    elems = [ localname(i.tag) for i in elem.iterancestors() if get_namespace(i.tag) == NS['dm'] ]
506
507
    elems.reverse()
508
    elems = elems[1:]
509
510
    elems.append(localname(elem.tag))
511
512
    return "/".join(elems)
513