Passed
Pull Request — master (#673)
by Konstantin
02:29
created

ocrd_models.ocrd_mets.OcrdMets.add_file()   B

Complexity

Conditions 8

Size

Total Lines 35
Code Lines 19

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 19
dl 0
loc 35
rs 7.3333
c 0
b 0
f 0
cc 8
nop 10

How to fix   Many Parameters   

Many Parameters

Methods with many parameters are not only hard to understand, but their parameters also often become inconsistent when you need more, or different data.

There are several approaches to avoid long parameter lists:

1
"""
2
API to METS
3
"""
4
from datetime import datetime
5
from re import fullmatch
6
from lxml import etree as ET
7
8
from ocrd_utils import is_local_filename, getLogger, VERSION, REGEX_PREFIX, REGEX_FILE_ID
9
10
from .constants import (
11
    NAMESPACES as NS,
12
    TAG_METS_AGENT,
13
    TAG_METS_DIV,
14
    TAG_METS_FILE,
15
    TAG_METS_FILEGRP,
16
    TAG_METS_FILESEC,
17
    TAG_METS_FPTR,
18
    TAG_METS_METSHDR,
19
    TAG_METS_STRUCTMAP,
20
    IDENTIFIER_PRIORITY,
21
    TAG_MODS_IDENTIFIER,
22
    METS_XML_EMPTY,
23
)
24
25
from .ocrd_xml_base import OcrdXmlDocument, ET
26
from .ocrd_file import OcrdFile
27
from .ocrd_agent import OcrdAgent
28
29
REGEX_PREFIX_LEN = len(REGEX_PREFIX)
30
31
class OcrdMets(OcrdXmlDocument):
32
    """
33
    API to a single METS file
34
    """
35
36
    @staticmethod
37
    def empty_mets(now=None):
38
        """
39
        Create an empty METS file from bundled template.
40
        """
41
        if not now:
42
            now = datetime.now().isoformat()
43
        tpl = METS_XML_EMPTY.decode('utf-8')
44
        tpl = tpl.replace('{{ VERSION }}', VERSION)
45
        tpl = tpl.replace('{{ NOW }}', '%s' % now)
46
        return OcrdMets(content=tpl.encode('utf-8'))
47
48
    def __init__(self, **kwargs):
49
        """
50
51
        """
52
        super(OcrdMets, self).__init__(**kwargs)
53
54
    def __str__(self):
55
        """
56
        String representation
57
        """
58
        return 'OcrdMets[fileGrps=%s,files=%s]' % (self.file_groups, list(self.find_files()))
59
60
    @property
61
    def unique_identifier(self):
62
        """
63
        Get the unique identifier by looking through ``mods:identifier``
64
65
        See `specs <https://ocr-d.de/en/spec/mets#unique-id-for-the-document-processed>`_ for details.
66
        """
67
        for t in IDENTIFIER_PRIORITY:
68
            found = self._tree.getroot().find('.//mods:identifier[@type="%s"]' % t, NS)
69
            if found is not None:
70
                return found.text
71
72
    @unique_identifier.setter
73
    def unique_identifier(self, purl):
74
        """
75
        Set the unique identifier by looking through ``mods:identifier``
76
77
        See `specs <https://ocr-d.de/en/spec/mets#unique-id-for-the-document-processed>`_ for details.
78
        """
79
        id_el = None
80
        for t in IDENTIFIER_PRIORITY:
81
            id_el = self._tree.getroot().find('.//mods:identifier[@type="%s"]' % t, NS)
82
            if id_el is not None:
83
                break
84
        if id_el is None:
85
            mods = self._tree.getroot().find('.//mods:mods', NS)
86
            id_el = ET.SubElement(mods, TAG_MODS_IDENTIFIER)
87
            id_el.set('type', 'purl')
88
        id_el.text = purl
89
90
    @property
91
    def agents(self):
92
        """
93
        List all :py:class:`ocrd_models.ocrd_agent.OcrdAgent`s
94
        """
95
        return [OcrdAgent(el_agent) for el_agent in self._tree.getroot().findall('mets:metsHdr/mets:agent', NS)]
96
97
    def add_agent(self, *args, **kwargs):
98
        """
99
        Add an :py:class:`ocrd_models.ocrd_agent.OcrdAgent` to the list of agents in the `metsHdr`.
100
        """
101
        el_metsHdr = self._tree.getroot().find('.//mets:metsHdr', NS)
102
        if el_metsHdr is None:
103
            el_metsHdr = ET.Element(TAG_METS_METSHDR)
104
            self._tree.getroot().insert(0, el_metsHdr)
105
        #  assert(el_metsHdr is not None)
106
        el_agent = ET.SubElement(el_metsHdr, TAG_METS_AGENT)
107
        #  print(ET.tostring(el_metsHdr))
108
        return OcrdAgent(el_agent, *args, **kwargs)
109
110
    @property
111
    def file_groups(self):
112
        """
113
        List the `@USE` of all `mets:fileGrp` entries.
114
        """
115
        return [el.get('USE') for el in self._tree.getroot().findall('.//mets:fileGrp', NS)]
116
117
    def find_all_files(self, *args, **kwargs):
118
        """
119
        Like :py:meth:`find_files` but return a list of all results.
120
121
        Equivalent to ``list(self.find_files(...))``
122
        """
123
        return list(self.find_files(*args, **kwargs))
124
125
    # pylint: disable=multiple-statements
126
    def find_files(self, ID=None, fileGrp=None, pageId=None, mimetype=None, url=None, local_only=False):
127
        """
128
        Search `mets:file` entries in this METS document and yield results.
129
130
131
        The ``ID``, ``fileGrp``, ``url`` and ``mimetype`` parameters can each be
132
        either a literal string or a regular expression if the string starts
133
        with `//` (double slash). If it is a regex, the leading `//` is removed
134
        and candidates are matched against the regex with `re.fullmatch`. If it is
135
        a literal string, comparison is done with string equality.
136
137
        Keyword Args:
138
            ID (string) : `@ID` of the `mets:file`
139
            fileGrp (string) : `@USE` of the `mets:fileGrp` to list files of
140
            pageId (string) : `@ID` of the corresponding physical `mets:structMap` entry (physical page)
141
            url (string) : `@xlink:href` (URL or path) of `mets:Flocat` of `mets:file`
142
            mimetype (string) : `@MIMETYPE` of `mets:file`
143
            local (boolean) : Whether to restrict results to local files in the filesystem
144
145
        Yields:
146
            :py:class:`ocrd_models:ocrd_file:OcrdFile` instantiations
147
        """
148
        ret = []
149
        if pageId:
150
            if pageId.startswith(REGEX_PREFIX):
151
                raise Exception("find_files does not support regex search for pageId")
152
            pageIds, pageId = pageId.split(','), list()
153
            for page in self._tree.getroot().xpath(
154
                '//mets:div[@TYPE="page"]', namespaces=NS):
155
                if page.get('ID') in pageIds:
156
                    pageId.extend(
157
                        [fptr.get('FILEID') for fptr in page.findall('mets:fptr', NS)])
158
        for cand in self._tree.getroot().xpath('//mets:file', namespaces=NS):
159
            if ID:
160
                if ID.startswith(REGEX_PREFIX):
161
                    if not fullmatch(ID[REGEX_PREFIX_LEN:], cand.get('ID')): continue
162
                else:
163
                    if not ID == cand.get('ID'): continue
164
165
            if pageId is not None and cand.get('ID') not in pageId:
166
                continue
167
168
            if fileGrp:
169
                if fileGrp.startswith(REGEX_PREFIX):
170
                    if not fullmatch(fileGrp[REGEX_PREFIX_LEN:], cand.getparent().get('USE')): continue
171
                else:
172
                    if cand.getparent().get('USE') != fileGrp: continue
173
174
            if mimetype:
175
                if mimetype.startswith(REGEX_PREFIX):
176
                    if not fullmatch(mimetype[REGEX_PREFIX_LEN:], cand.get('MIMETYPE') or ''): continue
177
                else:
178
                    if cand.get('MIMETYPE') != mimetype: continue
179
180
            if url:
181
                cand_locat = cand.find('mets:FLocat', namespaces=NS)
182
                if cand_locat is None:
183
                    continue
184
                cand_url = cand_locat.get('{%s}href' % NS['xlink'])
185
                if url.startswith(REGEX_PREFIX):
186
                    if not fullmatch(url[REGEX_PREFIX_LEN:], cand_url): continue
187
                else:
188
                    if cand_url != url: continue
189
190
            f = OcrdFile(cand, mets=self)
191
192
            # If only local resources should be returned and f is not a file path: skip the file
193
            if local_only and not is_local_filename(f.url):
194
                continue
195
            yield f
196
197
    def add_file_group(self, fileGrp):
198
        """
199
        Add a new `mets:fileGrp`.
200
201
        Arguments:
202
            fileGrp (string): `@USE` of the new `mets:fileGrp`.
203
        """
204
        if ',' in fileGrp:
205
            raise Exception('fileGrp must not contain commas')
206
        el_fileSec = self._tree.getroot().find('mets:fileSec', NS)
207
        if el_fileSec is None:
208
            el_fileSec = ET.SubElement(self._tree.getroot(), TAG_METS_FILESEC)
209
        el_fileGrp = el_fileSec.find('mets:fileGrp[@USE="%s"]' % fileGrp, NS)
210
        if el_fileGrp is None:
211
            el_fileGrp = ET.SubElement(el_fileSec, TAG_METS_FILEGRP)
212
            el_fileGrp.set('USE', fileGrp)
213
        return el_fileGrp
214
215
    def rename_file_group(self, old, new):
216
        """
217
        Rename a `mets:fileGrp` by changing the `@USE` from ``old`` to ``new``.
218
        """
219
        el_fileGrp = self._tree.getroot().find('mets:fileSec/mets:fileGrp[@USE="%s"]' % old, NS)
220
        if el_fileGrp is None:
221
            raise FileNotFoundError("No such fileGrp '%s'" % old)
222
        el_fileGrp.set('USE', new)
223
224
    def remove_file_group(self, USE, recursive=False, force=False):
225
        """
226
        Remove a `mets:fileGrp` (fixed `@USE`) or `mets:fileGrp`s (regex `@USE`)
227
228
        Arguments:
229
            USE (string): `@USE` of the `mets:fileGrp` to delete. Can be a regex if prefixed with `//`
230
            recursive (boolean): Whether to recursively delete all `mets:file`s in the group
231
            force (boolean): Do not raise an exception if `mets:fileGrp` does not exist
232
        """
233
        log = getLogger('ocrd_models.ocrd_mets.remove_file_group')
234
        el_fileSec = self._tree.getroot().find('mets:fileSec', NS)
235
        if el_fileSec is None:
236
            raise Exception("No fileSec!")
237
        if isinstance(USE, str):
238
            if USE.startswith(REGEX_PREFIX):
239
                for cand in el_fileSec.findall('mets:fileGrp', NS):
240
                    if fullmatch(USE[REGEX_PREFIX_LEN:], cand.get('USE')):
241
                        self.remove_file_group(cand, recursive=recursive)
242
                return
243
            else:
244
                el_fileGrp = el_fileSec.find('mets:fileGrp[@USE="%s"]' % USE, NS)
245
        else:
246
            el_fileGrp = USE
247
        if el_fileGrp is None:   # pylint: disable=len-as-condition
248
            msg = "No such fileGrp: %s" % USE
249
            if force:
250
                log.warning(msg)
251
                return
252
            raise Exception(msg)
253
        files = el_fileGrp.findall('mets:file', NS)
254
        if files:
255
            if not recursive:
256
                raise Exception("fileGrp %s is not empty and recursive wasn't set" % USE)
257
            for f in files:
258
                self.remove_one_file(f.get('ID'))
259
        el_fileGrp.getparent().remove(el_fileGrp)
260
261
    def add_file(self, fileGrp, mimetype=None, url=None, ID=None, pageId=None, force=False, local_filename=None, ignore=False, **kwargs):
262
        """
263
        Instantiate and add a new :py:class:`ocrd_models.ocrd_file.OcrdFile`.
264
265
        Arguments:
266
            fileGrp (string): `@USE` of `mets:fileGrp` to add to
267
        Keyword Args:
268
            mimetype (string): `@MIMETYPE` of the `mets:file` to use
269
            url (string): `@xlink:href` (URL or path) of the `mets:file` to use
270
            ID (string): `@ID` of the `mets:file` to use
271
            pageId (string): `@ID` in the physical `mets:structMap` to link to
272
            force (boolean): Whether to add the file even if a `mets:file` with the same `@ID` already exists.
273
            ignore (boolean): Do not look for existing files at all. Shift responsibility for preventing errors from duplicate ID to the user.
274
            local_filename (string):
275
        """
276
        if not ID:
277
            raise Exception("Must set ID of the mets:file")
278
        elif not REGEX_FILE_ID.fullmatch(ID):
279
            raise Exception("Invalid syntax for mets:file/@ID %s" % ID)
280
        el_fileGrp = self._tree.getroot().find(".//mets:fileGrp[@USE='%s']" % (fileGrp), NS)
281
        if el_fileGrp is None:
282
            el_fileGrp = self.add_file_group(fileGrp)
283
        if ID and not ignore and next(self.find_files(ID=ID), None):
284
            if not force:
285
                raise Exception("File with ID='%s' already exists" % ID)
286
            mets_file = next(self.find_files(ID=ID))
287
        else:
288
            mets_file = OcrdFile(ET.SubElement(el_fileGrp, TAG_METS_FILE), mets=self)
289
        mets_file.url = url
290
        mets_file.mimetype = mimetype
291
        mets_file.ID = ID
292
        mets_file.pageId = pageId
293
        mets_file.local_filename = local_filename
294
295
        return mets_file
296
297
    def remove_file(self, *args, **kwargs):
298
        """
299
        Delete all `ocrd:file`s matching the query. Same arguments as :py:meth:`find_files`
300
        """
301
        files = list(self.find_files(*args, **kwargs))
302
        if files:
303
            for f in files:
304
                self.remove_one_file(f)
305
            if len(files) > 1:
306
                return files
307
            else:
308
                return files[0] # for backwards-compatibility
309
        raise FileNotFoundError("File not found: %s %s" % (args, kwargs))
310
311
    def remove_one_file(self, ID):
312
        """
313
        Delete an existing :py:class:`ocrd_models.ocrd_file.OcrdFile`.
314
        
315
        Arguments:
316
            ID (string): `@ID` of the `mets:file` to delete
317
            
318
        Returns:
319
            The old :py:class:`ocrd_models.ocrd_file.OcrdFile` reference.
320
        """
321
        log = getLogger('ocrd_models.ocrd_mets.remove_one_file')
322
        log.info("remove_one_file(%s)" % ID)
323
        if isinstance(ID, OcrdFile):
324
            ocrd_file = ID
325
            ID = ocrd_file.ID
326
        else:
327
            ocrd_file = next(self.find_files(ID=ID), None)
328
329
        if not ocrd_file:
330
            raise FileNotFoundError("File not found: %s" % ID)
331
332
        # Delete the physical page ref
333
        for fptr in self._tree.getroot().findall('.//mets:fptr[@FILEID="%s"]' % ID, namespaces=NS):
334
            log.info("Delete fptr element %s for page '%s'", fptr, ID)
335
            page_div = fptr.getparent()
336
            page_div.remove(fptr)
337
            # delete empty pages
338
            if not page_div.getchildren():
339
                log.info("Delete empty page %s", page_div)
340
                page_div.getparent().remove(page_div)
341
342
        # Delete the file reference
343
        # pylint: disable=protected-access
344
        ocrd_file._el.getparent().remove(ocrd_file._el)
345
346
        return ocrd_file
347
348
    @property
349
    def physical_pages(self):
350
        """
351
        List all page IDs (the `@ID`s of all physical `mets:structMap` `mets:div`s)
352
        """
353
        return self._tree.getroot().xpath(
354
            'mets:structMap[@TYPE="PHYSICAL"]/mets:div[@TYPE="physSequence"]/mets:div[@TYPE="page"]/@ID',
355
            namespaces=NS)
356
357
    def get_physical_pages(self, for_fileIds=None):
358
        """
359
        List all page IDs (the `@ID`s of all physical `mets:structMap` `mets:div`s),
360
        optionally for a subset of `mets:file` `@ID`s ``for_fileIds``.
361
        """
362
        if for_fileIds is None:
363
            return self.physical_pages
364
        ret = [None] * len(for_fileIds)
365
        for page in self._tree.getroot().xpath(
366
            'mets:structMap[@TYPE="PHYSICAL"]/mets:div[@TYPE="physSequence"]/mets:div[@TYPE="page"]',
367
                namespaces=NS):
368
            for fptr in page.findall('mets:fptr', NS):
369
                if fptr.get('FILEID') in for_fileIds:
370
                    ret[for_fileIds.index(fptr.get('FILEID'))] = page.get('ID')
371
        return ret
372
373
    def set_physical_page_for_file(self, pageId, ocrd_file, order=None, orderlabel=None):
374
        """
375
        Set the physical page ID (`@ID` of the physical `mets:structMap` `mets:div` entry)
376
        corresponding to the `mets:file` ``ocrd_file``, creating all structures if necessary.
377
        
378
        Arguments:
379
            pageId (string): `@ID` of the physical `mets:structMap` entry to use
380
            ocrd_file (object): existing :py:class:`ocrd_models.ocrd_file.OcrdFile` object
381
        Keyword Args:
382
            order (string): `@ORDER` to use
383
            orderlabel (string): `@ORDERLABEL` to use
384
        """
385
        #  print(pageId, ocrd_file)
386
        # delete any page mapping for this file.ID
387
        for el_fptr in self._tree.getroot().findall(
388
                'mets:structMap[@TYPE="PHYSICAL"]/mets:div[@TYPE="physSequence"]/mets:div[@TYPE="page"]/mets:fptr[@FILEID="%s"]' %
389
                ocrd_file.ID, namespaces=NS):
390
            el_fptr.getparent().remove(el_fptr)
391
392
        # find/construct as necessary
393
        el_structmap = self._tree.getroot().find('mets:structMap[@TYPE="PHYSICAL"]', NS)
394
        if el_structmap is None:
395
            el_structmap = ET.SubElement(self._tree.getroot(), TAG_METS_STRUCTMAP)
396
            el_structmap.set('TYPE', 'PHYSICAL')
397
        el_seqdiv = el_structmap.find('mets:div[@TYPE="physSequence"]', NS)
398
        if el_seqdiv is None:
399
            el_seqdiv = ET.SubElement(el_structmap, TAG_METS_DIV)
400
            el_seqdiv.set('TYPE', 'physSequence')
401
        el_pagediv = el_seqdiv.find('mets:div[@ID="%s"]' % pageId, NS)
402
        if el_pagediv is None:
403
            el_pagediv = ET.SubElement(el_seqdiv, TAG_METS_DIV)
404
            el_pagediv.set('TYPE', 'page')
405
            el_pagediv.set('ID', pageId)
406
            if order:
407
                el_pagediv.set('ORDER', order)
408
            if orderlabel:
409
                el_pagediv.set('ORDERLABEL', orderlabel)
410
        el_fptr = ET.SubElement(el_pagediv, TAG_METS_FPTR)
411
        el_fptr.set('FILEID', ocrd_file.ID)
412
413
    def get_physical_page_for_file(self, ocrd_file):
414
        """
415
        Get the physical page ID (`@ID` of the physical `mets:structMap` `mets:div` entry)
416
        corresponding to the `mets:file` ``ocrd_file``.
417
        """
418
        ret = self._tree.getroot().xpath(
419
            '/mets:mets/mets:structMap[@TYPE="PHYSICAL"]/mets:div[@TYPE="physSequence"]/mets:div[@TYPE="page"][./mets:fptr[@FILEID="%s"]]/@ID' %
420
            ocrd_file.ID, namespaces=NS)
421
        if ret:
422
            return ret[0]
423
424
    def remove_physical_page(self, ID):
425
        """
426
        Delete page (physical `mets:structMap` `mets:div` entry `@ID`) ``ID``.
427
        """
428
        mets_div = self._tree.getroot().xpath(
429
            'mets:structMap[@TYPE="PHYSICAL"]/mets:div[@TYPE="physSequence"]/mets:div[@TYPE="page"][@ID="%s"]' % ID,
430
            namespaces=NS)
431
        if mets_div:
432
            mets_div[0].getparent().remove(mets_div[0])
433
434
    def merge(self, other_mets, fileGrp_mapping=None, after_add_cb=None, **kwargs):
435
        """
436
        Add all files from other_mets.
437
438
        Accepts the same kwargs as :py:func:find_files
439
440
        Keyword Args:
441
            fileGrp_mapping (dict): Map ``other_mets`` fileGrp to fileGrp in this METS
442
            after_add_cb (function): Callback received after file is added to the METS
443
        """
444
        if not fileGrp_mapping:
445
            fileGrp_mapping = {}
446
        for f_src in other_mets.find_files(**kwargs):
447
            f_dest = self.add_file(
448
                    fileGrp_mapping.get(f_src.fileGrp, f_src.fileGrp),
449
                    mimetype=f_src.mimetype,
450
                    url=f_src.url,
451
                    ID=f_src.ID,
452
                    pageId=f_src.pageId)
453
            if after_add_cb:
454
                after_add_cb(f_dest)
455
456