Passed
Pull Request — master (#672)
by Konstantin
02:16
created

ocrd_utils.str.generate_range()   A

Complexity

Conditions 4

Size

Total Lines 14
Code Lines 11

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 11
dl 0
loc 14
rs 9.85
c 0
b 0
f 0
cc 4
nop 2
1
"""
2
Utility functions for strings, paths and URL.
3
"""
4
5
import re
6
import json
7
from .constants import REGEX_FILE_ID
8
from .logging import getLogger
9
10
__all__ = [
11
    'assert_file_grp_cardinality',
12
    'concat_padded',
13
    'get_local_filename',
14
    'is_local_filename',
15
    'is_string',
16
    'make_file_id',
17
    'nth_url_segment',
18
    'parse_json_string_or_file',
19
    'parse_json_string_with_comments',
20
    'remove_non_path_from_url',
21
    'safe_filename',
22
]
23
24
25
def assert_file_grp_cardinality(grps, n, msg=None):
26
    """
27
    Assert that a string of comma-separated fileGrps contains exactly ``n`` entries.
28
    """
29
    if isinstance(grps, str):
30
        grps = grps.split(',')
31
    assert len(grps) == n, \
32
            "Expected exactly %d output file group%s%s, but '%s' has %d" % (
33
                n,
34
                '' if n == 1 else 's',
35
                ' (%s)' % msg if msg else '',
36
                grps,
37
                len(grps)
38
            )
39
40
def concat_padded(base, *args):
41
    """
42
    Concatenate string and zero-padded 4 digit number
43
    """
44
    ret = base
45
    for n in args:
46
        if is_string(n):
47
            ret = "%s_%s" % (ret, n)
48
        else:
49
            ret = "%s_%04i"  % (ret, n)
50
    return ret
51
52
def remove_non_path_from_url(url):
53
    """
54
    Remove everything from URL after path.
55
    """
56
    url = url.split('?', 1)[0]    # query
57
    url = url.split('#', 1)[0]    # fragment identifier
58
    url = re.sub(r"/+$", "", url) # trailing slashes
59
    return url
60
61
def make_file_id(ocrd_file, output_file_grp):
62
    """
63
    Derive a new file ID for an output file from an existing input file ``ocrd_file``
64
    and the name of the output file's ``fileGrp/@USE``, ``output_file_grp``.
65
    If ``ocrd_file``'s ID contains the input file's fileGrp name, then replace it by ``output_file_grp``.
66
    Otherwise use ``output_file_grp`` together with the position of ``ocrd_file`` within the input fileGrp
67
    (as a fallback counter). Increment counter until there is no more ID conflict.
68
    """
69
    ret = ocrd_file.ID.replace(ocrd_file.fileGrp, output_file_grp)
70
    if ret == ocrd_file.ID:
71
        m = re.match(r'.*?(\d{3,}).*', ocrd_file.pageId or '')
72
        if m:
73
            n = int(m.group(1))
74
        else:
75
            ids = [f.ID for f in ocrd_file.mets.find_files(fileGrp=ocrd_file.fileGrp, mimetype=ocrd_file.mimetype)]
76
            try:
77
                n = ids.index(ocrd_file.ID) + 1
78
            except ValueError:
79
                n = len(ids)
80
        ret = concat_padded(output_file_grp, n)
81
        while next(ocrd_file.mets.find_files(ID=ret), None):
82
            n += 1
83
            ret = concat_padded(output_file_grp, n)
84
    if not REGEX_FILE_ID.fullmatch(ret):
85
        ret = ret.replace(':', '_')
86
        ret = re.sub(r'^([^a-zA-Z_])', r'id_\1', ret)
87
        ret = re.sub(r'[^\w.-]', r'', ret)
88
    return ret
89
90
def nth_url_segment(url, n=-1):
91
    """
92
    Return the last /-delimited segment of a URL-like string
93
94
    Arguments:
95
        url (string):
96
        n (integer): index of segment, default: -1
97
    """
98
    segments = remove_non_path_from_url(url).split('/')
99
    try:
100
        return segments[n]
101
    except IndexError:
102
        return ''
103
104
def get_local_filename(url, start=None):
105
    """
106
    Return local filename, optionally relative to ``start``
107
108
    Arguments:
109
        url (string): filename or URL
110
        start (string): Base path to remove from filename. Raise an exception if not a prefix of url
111
    """
112
    if url.startswith('https://') or url.startswith('http:'):
113
        raise Exception("Can't determine local filename of http(s) URL")
114
    if url.startswith('file://'):
115
        url = url[len('file://'):]
116
    # Goobi/Kitodo produces those, they are always absolute
117
    if url.startswith('file:/'):
118
        raise Exception("Invalid (java) URL: %s" % url)
119
    if start:
120
        if not url.startswith(start):
121
            raise Exception("Cannot remove prefix %s from url %s" % (start, url))
122
        if not start.endswith('/'):
123
            start += '/'
124
        url = url[len(start):]
125
    return url
126
127
def is_local_filename(url):
128
    """
129
    Whether a url is a local filename.
130
    """
131
    return url.startswith('file://') or not('://' in url)
132
133
def is_string(val):
134
    """
135
    Return whether a value is a ``str``.
136
    """
137
    return isinstance(val, str)
138
139
140
def parse_json_string_with_comments(val):
141
    """
142
    Parse a string of JSON interspersed with #-prefixed full-line comments
143
    """
144
    jsonstr = re.sub(r'^\s*#.*$', '', val, flags=re.MULTILINE)
145
    return json.loads(jsonstr)
146
147
def parse_json_string_or_file(*values):    # pylint: disable=unused-argument
148
    """
149
    Parse a string as either the path to a JSON object or a literal JSON object.
150
151
    Empty strings are equivalent to '{}'
152
    """
153
    ret = {}
154
    for value in values:
155
        err = None
156
        value_parsed = None
157
        if re.fullmatch(r"\s*", value):
158
            continue
159
        try:
160
            try:
161
                with open(value, 'r') as f:
162
                    value_parsed = parse_json_string_with_comments(f.read())
163
            except (FileNotFoundError, OSError):
164
                value_parsed = parse_json_string_with_comments(value.strip())
165
            if not isinstance(value_parsed, dict):
166
                err = ValueError("Not a valid JSON object: '%s' (parsed as '%s')" % (value, value_parsed))
167
        except json.decoder.JSONDecodeError as e:
168
            err = ValueError("Error parsing '%s': %s" % (value, e))
169
        if err:
170
            raise err       # pylint: disable=raising-bad-type
171
        ret = {**ret, **value_parsed}
172
    return ret
173
174
def safe_filename(url):
175
    """
176
    Sanitize input to be safely used as the basename of a local file.
177
    """
178
    ret = re.sub('[^A-Za-z0-9]+', '.', url)
179
    #  print('safe filename: %s -> %s' % (url, ret))
180
    return ret
181
182
def generate_range(start, end):
183
    """
184
    Generate a list of strings by incrementing the number part of ``start`` until including ``end``.
185
    """
186
    ret = []
187
    start_num, end_num = re.search('\d+', start), re.search('\d+', end)
188
    if not start_num and end_num:
189
        getLogger('ocrd_utils.generate_range').error("Unable to parse generate range %s .. %s" % (start, end))
190
        return [start, end]
191
    start_num, end_num = start_num.group(0), end_num.group(0)
192
    start_num_len = len(start_num)
193
    for i in range(int(start_num), int(end_num)):
194
        ret.append(start.replace(start_num, str(i).zfill(start_num_len)))
195
    return ret
196