Passed
Pull Request — master (#889)
by Konstantin
02:28
created

ocrd_models.ocrd_mets.OcrdMets.merge()   B

Complexity

Conditions 6

Size

Total Lines 29
Code Lines 16

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 16
dl 0
loc 29
rs 8.6666
c 0
b 0
f 0
cc 6
nop 7
1
"""
2
API to METS
3
"""
4
from datetime import datetime
5
import re
6
from lxml import etree as ET
7
8
from ocrd_utils import (
9
    is_local_filename,
10
    getLogger,
11
    generate_range,
12
    VERSION,
13
    REGEX_PREFIX,
14
    REGEX_FILE_ID
15
)
16
17
from .constants import (
18
    NAMESPACES as NS,
19
    TAG_METS_AGENT,
20
    TAG_METS_DIV,
21
    TAG_METS_FILE,
22
    TAG_METS_FILEGRP,
23
    TAG_METS_FILESEC,
24
    TAG_METS_FPTR,
25
    TAG_METS_METSHDR,
26
    TAG_METS_STRUCTMAP,
27
    IDENTIFIER_PRIORITY,
28
    TAG_MODS_IDENTIFIER,
29
    METS_XML_EMPTY,
30
)
31
32
from .ocrd_xml_base import OcrdXmlDocument, ET
33
from .ocrd_file import OcrdFile
34
from .ocrd_agent import OcrdAgent
35
36
REGEX_PREFIX_LEN = len(REGEX_PREFIX)
37
38
class OcrdMets(OcrdXmlDocument):
39
    """
40
    API to a single METS file
41
    """
42
43
    @staticmethod
44
    def empty_mets(now=None):
45
        """
46
        Create an empty METS file from bundled template.
47
        """
48
        if not now:
49
            now = datetime.now().isoformat()
50
        tpl = METS_XML_EMPTY.decode('utf-8')
51
        tpl = tpl.replace('{{ VERSION }}', VERSION)
52
        tpl = tpl.replace('{{ NOW }}', '%s' % now)
53
        return OcrdMets(content=tpl.encode('utf-8'))
54
55
    def __init__(self, **kwargs):
56
        """
57
58
        """
59
        super(OcrdMets, self).__init__(**kwargs)
60
61
    def __str__(self):
62
        """
63
        String representation
64
        """
65
        return 'OcrdMets[fileGrps=%s,files=%s]' % (self.file_groups, list(self.find_files()))
66
67
    @property
68
    def unique_identifier(self):
69
        """
70
        Get the unique identifier by looking through ``mods:identifier``
71
72
        See `specs <https://ocr-d.de/en/spec/mets#unique-id-for-the-document-processed>`_ for details.
73
        """
74
        for t in IDENTIFIER_PRIORITY:
75
            found = self._tree.getroot().find('.//mods:identifier[@type="%s"]' % t, NS)
76
            if found is not None:
77
                return found.text
78
79
    @unique_identifier.setter
80
    def unique_identifier(self, purl):
81
        """
82
        Set the unique identifier by looking through ``mods:identifier``
83
84
        See `specs <https://ocr-d.de/en/spec/mets#unique-id-for-the-document-processed>`_ for details.
85
        """
86
        id_el = None
87
        for t in IDENTIFIER_PRIORITY:
88
            id_el = self._tree.getroot().find('.//mods:identifier[@type="%s"]' % t, NS)
89
            if id_el is not None:
90
                break
91
        if id_el is None:
92
            mods = self._tree.getroot().find('.//mods:mods', NS)
93
            id_el = ET.SubElement(mods, TAG_MODS_IDENTIFIER)
94
            id_el.set('type', 'purl')
95
        id_el.text = purl
96
97
    @property
98
    def agents(self):
99
        """
100
        List all :py:class:`ocrd_models.ocrd_agent.OcrdAgent`s
101
        """
102
        return [OcrdAgent(el_agent) for el_agent in self._tree.getroot().findall('mets:metsHdr/mets:agent', NS)]
103
104
    def add_agent(self, *args, **kwargs):
105
        """
106
        Add an :py:class:`ocrd_models.ocrd_agent.OcrdAgent` to the list of agents in the ``metsHdr``.
107
        """
108
        el_metsHdr = self._tree.getroot().find('.//mets:metsHdr', NS)
109
        if el_metsHdr is None:
110
            el_metsHdr = ET.Element(TAG_METS_METSHDR)
111
            self._tree.getroot().insert(0, el_metsHdr)
112
        #  assert(el_metsHdr is not None)
113
        el_agent = ET.SubElement(el_metsHdr, TAG_METS_AGENT)
114
        #  print(ET.tostring(el_metsHdr))
115
        return OcrdAgent(el_agent, *args, **kwargs)
116
117
    @property
118
    def file_groups(self):
119
        """
120
        List the `@USE` of all `mets:fileGrp` entries.
121
        """
122
        return [el.get('USE') for el in self._tree.getroot().findall('.//mets:fileGrp', NS)]
123
124
    def find_all_files(self, *args, **kwargs):
125
        """
126
        Like :py:meth:`find_files` but return a list of all results.
127
128
        Equivalent to ``list(self.find_files(...))``
129
        """
130
        return list(self.find_files(*args, **kwargs))
131
132
    # pylint: disable=multiple-statements
133
    def find_files(self, ID=None, fileGrp=None, pageId=None, mimetype=None, url=None, local_only=False):
134
        """
135
        Search ``mets:file`` entries in this METS document and yield results.
136
137
138
        The :py:attr:`ID`, :py:attr:`pageId`, :py:attr:`fileGrp`,
139
        :py:attr:`url` and :py:attr:`mimetype` parameters can each be either a
140
        literal string, or a regular expression if the string starts with
141
        ``//`` (double slash).
142
143
        If it is a regex, the leading ``//`` is removed and candidates are matched
144
        against the regex with `re.fullmatch`. If it is a literal string, comparison
145
        is done with string equality.
146
147
        The :py:attr:`pageId` parameter supports the numeric range operator ``..``. For
148
        example, to find all files in pages ``PHYS_0001`` to ``PHYS_0003``,
149
        ``PHYS_0001..PHYS_0003`` will be expanded to ``PHYS_0001,PHYS_0002,PHYS_0003``.
150
151
        Keyword Args:
152
            ID (string) : ``@ID`` of the ``mets:file``
153
            fileGrp (string) : ``@USE`` of the ``mets:fileGrp`` to list files of
154
            pageId (string) : ``@ID`` of the corresponding physical ``mets:structMap`` entry (physical page)
155
            url (string) : ``@xlink:href`` (URL or path) of ``mets:Flocat`` of ``mets:file``
156
            mimetype (string) : ``@MIMETYPE`` of ``mets:file``
157
            local (boolean) : Whether to restrict results to local files in the filesystem
158
159
        Yields:
160
            :py:class:`ocrd_models:ocrd_file:OcrdFile` instantiations
161
        """
162
        if pageId:
163
            if pageId.startswith(REGEX_PREFIX):
164
                pageIds, pageId = re.compile(pageId[REGEX_PREFIX_LEN:]), list()
165
            else:
166
                pageIds, pageId = pageId.split(','), list()
167
                pageIds_expanded = []
168
                for pageId_ in pageIds:
169
                    if '..' in pageId_:
170
                        pageIds_expanded += generate_range(*pageId_.split('..', 1))
171
                pageIds += pageIds_expanded
172
            for page in self._tree.getroot().xpath(
173
                '//mets:div[@TYPE="page"]', namespaces=NS):
174
                if (page.get('ID') in pageIds if isinstance(pageIds, list) else
175
                    pageIds.fullmatch(page.get('ID'))):
176
                    pageId.extend(
177
                        [fptr.get('FILEID') for fptr in page.findall('mets:fptr', NS)])
178
        if ID and ID.startswith(REGEX_PREFIX):
179
            ID = re.compile(ID[REGEX_PREFIX_LEN:])
180
        if fileGrp and fileGrp.startswith(REGEX_PREFIX):
181
            fileGrp = re.compile(fileGrp[REGEX_PREFIX_LEN:])
182
        if mimetype and mimetype.startswith(REGEX_PREFIX):
183
            mimetype = re.compile(mimetype[REGEX_PREFIX_LEN:])
184
        if url and url.startswith(REGEX_PREFIX):
185
            url = re.compile(url[REGEX_PREFIX_LEN:])
186
        for cand in self._tree.getroot().xpath('//mets:file', namespaces=NS):
187
            if ID:
188
                if isinstance(ID, str):
189
                    if not ID == cand.get('ID'): continue
190
                else:
191
                    if not ID.fullmatch(cand.get('ID')): continue
192
193
            if pageId is not None and cand.get('ID') not in pageId:
194
                continue
195
196
            if fileGrp:
197
                if isinstance(fileGrp, str):
198
                    if cand.getparent().get('USE') != fileGrp: continue
199
                else:
200
                    if not fileGrp.fullmatch(cand.getparent().get('USE')): continue
201
202
            if mimetype:
203
                if isinstance(mimetype, str):
204
                    if cand.get('MIMETYPE') != mimetype: continue
205
                else:
206
                    if not mimetype.fullmatch(cand.get('MIMETYPE') or ''): continue
207
208
            if url:
209
                cand_locat = cand.find('mets:FLocat', namespaces=NS)
210
                if cand_locat is None:
211
                    continue
212
                cand_url = cand_locat.get('{%s}href' % NS['xlink'])
213
                if isinstance(url, str):
214
                    if cand_url != url: continue
215
                else:
216
                    if not url.fullmatch(cand_url): continue
217
218
            f = OcrdFile(cand, mets=self)
219
220
            # If only local resources should be returned and f is not a file path: skip the file
221
            if local_only and not is_local_filename(f.url):
222
                continue
223
            yield f
224
225
    def add_file_group(self, fileGrp):
226
        """
227
        Add a new ``mets:fileGrp``.
228
229
        Arguments:
230
            fileGrp (string): ``@USE`` of the new ``mets:fileGrp``.
231
        """
232
        if ',' in fileGrp:
233
            raise Exception('fileGrp must not contain commas')
234
        el_fileSec = self._tree.getroot().find('mets:fileSec', NS)
235
        if el_fileSec is None:
236
            el_fileSec = ET.SubElement(self._tree.getroot(), TAG_METS_FILESEC)
237
        el_fileGrp = el_fileSec.find('mets:fileGrp[@USE="%s"]' % fileGrp, NS)
238
        if el_fileGrp is None:
239
            el_fileGrp = ET.SubElement(el_fileSec, TAG_METS_FILEGRP)
240
            el_fileGrp.set('USE', fileGrp)
241
        return el_fileGrp
242
243
    def rename_file_group(self, old, new):
244
        """
245
        Rename a ``mets:fileGrp`` by changing the ``@USE`` from :py:attr:`old` to :py:attr:`new`.
246
        """
247
        el_fileGrp = self._tree.getroot().find('mets:fileSec/mets:fileGrp[@USE="%s"]' % old, NS)
248
        if el_fileGrp is None:
249
            raise FileNotFoundError("No such fileGrp '%s'" % old)
250
        el_fileGrp.set('USE', new)
251
252
    def remove_file_group(self, USE, recursive=False, force=False):
253
        """
254
        Remove a ``mets:fileGrp`` (single fixed ``@USE`` or multiple regex ``@USE``)
255
256
        Arguments:
257
            USE (string): ``@USE`` of the ``mets:fileGrp`` to delete. Can be a regex if prefixed with ``//``
258
            recursive (boolean): Whether to recursively delete each ``mets:file`` in the group
259
            force (boolean): Do not raise an exception if ``mets:fileGrp`` does not exist
260
        """
261
        log = getLogger('ocrd_models.ocrd_mets.remove_file_group')
262
        el_fileSec = self._tree.getroot().find('mets:fileSec', NS)
263
        if el_fileSec is None:
264
            raise Exception("No fileSec!")
265
        if isinstance(USE, str):
266
            if USE.startswith(REGEX_PREFIX):
267
                use = re.compile(USE[REGEX_PREFIX_LEN:])
268
                for cand in el_fileSec.findall('mets:fileGrp', NS):
269
                    if use.fullmatch(cand.get('USE')):
270
                        self.remove_file_group(cand, recursive=recursive)
271
                return
272
            else:
273
                el_fileGrp = el_fileSec.find('mets:fileGrp[@USE="%s"]' % USE, NS)
274
        else:
275
            el_fileGrp = USE
276
        if el_fileGrp is None:   # pylint: disable=len-as-condition
277
            msg = "No such fileGrp: %s" % USE
278
            if force:
279
                log.warning(msg)
280
                return
281
            raise Exception(msg)
282
        files = el_fileGrp.findall('mets:file', NS)
283
        if files:
284
            if not recursive:
285
                raise Exception("fileGrp %s is not empty and recursive wasn't set" % USE)
286
            for f in files:
287
                self.remove_one_file(f.get('ID'))
288
        el_fileGrp.getparent().remove(el_fileGrp)
289
290
    def add_file(self, fileGrp, mimetype=None, url=None, ID=None, pageId=None, force=False, local_filename=None, ignore=False, **kwargs):
291
        """
292
        Instantiate and add a new :py:class:`ocrd_models.ocrd_file.OcrdFile`.
293
294
        Arguments:
295
            fileGrp (string): ``@USE`` of ``mets:fileGrp`` to add to
296
        Keyword Args:
297
            mimetype (string): ``@MIMETYPE`` of the ``mets:file`` to use
298
            url (string): ``@xlink:href`` (URL or path) of the ``mets:file`` to use
299
            ID (string): ``@ID`` of the ``mets:file`` to use
300
            pageId (string): ``@ID`` in the physical ``mets:structMap`` to link to
301
            force (boolean): Whether to add the file even if a ``mets:file`` with the same ``@ID`` already exists.
302
            ignore (boolean): Do not look for existing files at all. Shift responsibility for preventing errors from duplicate ID to the user.
303
            local_filename (string):
304
        """
305
        if not ID:
306
            raise ValueError("Must set ID of the mets:file")
307
        if not fileGrp:
308
            raise ValueError("Must set fileGrp of the mets:file")
309
        if not REGEX_FILE_ID.fullmatch(ID):
310
            raise ValueError("Invalid syntax for mets:file/@ID %s (not an xs:ID)" % ID)
311
        if not REGEX_FILE_ID.fullmatch(fileGrp):
312
            raise ValueError("Invalid syntax for mets:fileGrp/@USE %s (not an xs:ID)" % fileGrp)
313
        el_fileGrp = self._tree.getroot().find(".//mets:fileGrp[@USE='%s']" % (fileGrp), NS)
314
        if el_fileGrp is None:
315
            el_fileGrp = self.add_file_group(fileGrp)
316
        mets_file = next(self.find_files(ID=ID), None)
317
        if mets_file and not ignore:
318
            if not force:
319
                raise Exception("File with ID='%s' already exists" % ID)
320
            mets_file.url = url
321
            mets_file.mimetype = mimetype
322
            mets_file.ID = ID
323
            mets_file.pageId = pageId
324
            mets_file.local_filename = local_filename
325
        else:
326
            kwargs = {k: v for k, v in locals().items() if k in ['url', 'ID', 'mimetype', 'pageId', 'local_filename'] and v}
327
            mets_file = OcrdFile(ET.SubElement(el_fileGrp, TAG_METS_FILE), mets=self, **kwargs)
328
329
        return mets_file
330
331
    def remove_file(self, *args, **kwargs):
332
        """
333
        Delete each ``ocrd:file`` matching the query. Same arguments as :py:meth:`find_files`
334
        """
335
        files = list(self.find_files(*args, **kwargs))
336
        if files:
337
            for f in files:
338
                self.remove_one_file(f)
339
            if len(files) > 1:
340
                return files
341
            else:
342
                return files[0] # for backwards-compatibility
343
        if any(1 for kwarg in kwargs
344
               if isinstance(kwarg, str) and kwarg.startswith(REGEX_PREFIX)):
345
            # allow empty results if filter criteria involve a regex
346
            return []
347
        raise FileNotFoundError("File not found: %s %s" % (args, kwargs))
348
349
    def remove_one_file(self, ID):
350
        """
351
        Delete an existing :py:class:`ocrd_models.ocrd_file.OcrdFile`.
352
353
        Arguments:
354
            ID (string): ``@ID`` of the ``mets:file`` to delete
355
356
        Returns:
357
            The old :py:class:`ocrd_models.ocrd_file.OcrdFile` reference.
358
        """
359
        log = getLogger('ocrd_models.ocrd_mets.remove_one_file')
360
        log.info("remove_one_file(%s)" % ID)
361
        if isinstance(ID, OcrdFile):
362
            ocrd_file = ID
363
            ID = ocrd_file.ID
364
        else:
365
            ocrd_file = next(self.find_files(ID=ID), None)
366
367
        if not ocrd_file:
368
            raise FileNotFoundError("File not found: %s" % ID)
369
370
        # Delete the physical page ref
371
        for fptr in self._tree.getroot().findall('.//mets:fptr[@FILEID="%s"]' % ID, namespaces=NS):
372
            log.info("Delete fptr element %s for page '%s'", fptr, ID)
373
            page_div = fptr.getparent()
374
            page_div.remove(fptr)
375
            # delete empty pages
376
            if not page_div.getchildren():
377
                log.info("Delete empty page %s", page_div)
378
                page_div.getparent().remove(page_div)
379
380
        # Delete the file reference
381
        # pylint: disable=protected-access
382
        ocrd_file._el.getparent().remove(ocrd_file._el)
383
384
        return ocrd_file
385
386
    @property
387
    def physical_pages(self):
388
        """
389
        List all page IDs (the ``@ID`` of each physical ``mets:structMap`` ``mets:div``)
390
        """
391
        return self._tree.getroot().xpath(
392
            'mets:structMap[@TYPE="PHYSICAL"]/mets:div[@TYPE="physSequence"]/mets:div[@TYPE="page"]/@ID',
393
            namespaces=NS)
394
395
    def get_physical_pages(self, for_fileIds=None):
396
        """
397
        List all page IDs (the ``@ID`` of each physical ``mets:structMap`` ``mets:div``),
398
        optionally for a subset of ``mets:file`` ``@ID`` :py:attr:`for_fileIds`.
399
        """
400
        if for_fileIds is None:
401
            return self.physical_pages
402
        ret = [None] * len(for_fileIds)
403
        for page in self._tree.getroot().xpath(
404
            'mets:structMap[@TYPE="PHYSICAL"]/mets:div[@TYPE="physSequence"]/mets:div[@TYPE="page"]',
405
                namespaces=NS):
406
            for fptr in page.findall('mets:fptr', NS):
407
                if fptr.get('FILEID') in for_fileIds:
408
                    ret[for_fileIds.index(fptr.get('FILEID'))] = page.get('ID')
409
        return ret
410
411
    def set_physical_page_for_file(self, pageId, ocrd_file, order=None, orderlabel=None):
412
        """
413
        Set the physical page ID (``@ID`` of the physical ``mets:structMap`` ``mets:div`` entry)
414
        corresponding to the ``mets:file`` :py:attr:`ocrd_file`, creating all structures if necessary.
415
416
        Arguments:
417
            pageId (string): ``@ID`` of the physical ``mets:structMap`` entry to use
418
            ocrd_file (object): existing :py:class:`ocrd_models.ocrd_file.OcrdFile` object
419
        Keyword Args:
420
            order (string): ``@ORDER`` to use
421
            orderlabel (string): ``@ORDERLABEL`` to use
422
        """
423
        #  print(pageId, ocrd_file)
424
        # delete any page mapping for this file.ID
425
        for el_fptr in self._tree.getroot().findall(
426
                'mets:structMap[@TYPE="PHYSICAL"]/mets:div[@TYPE="physSequence"]/mets:div[@TYPE="page"]/mets:fptr[@FILEID="%s"]' %
427
                ocrd_file.ID, namespaces=NS):
428
            el_fptr.getparent().remove(el_fptr)
429
430
        # find/construct as necessary
431
        el_structmap = self._tree.getroot().find('mets:structMap[@TYPE="PHYSICAL"]', NS)
432
        if el_structmap is None:
433
            el_structmap = ET.SubElement(self._tree.getroot(), TAG_METS_STRUCTMAP)
434
            el_structmap.set('TYPE', 'PHYSICAL')
435
        el_seqdiv = el_structmap.find('mets:div[@TYPE="physSequence"]', NS)
436
        if el_seqdiv is None:
437
            el_seqdiv = ET.SubElement(el_structmap, TAG_METS_DIV)
438
            el_seqdiv.set('TYPE', 'physSequence')
439
        el_pagediv = el_seqdiv.find('mets:div[@ID="%s"]' % pageId, NS)
440
        if el_pagediv is None:
441
            el_pagediv = ET.SubElement(el_seqdiv, TAG_METS_DIV)
442
            el_pagediv.set('TYPE', 'page')
443
            el_pagediv.set('ID', pageId)
444
            if order:
445
                el_pagediv.set('ORDER', order)
446
            if orderlabel:
447
                el_pagediv.set('ORDERLABEL', orderlabel)
448
        el_fptr = ET.SubElement(el_pagediv, TAG_METS_FPTR)
449
        el_fptr.set('FILEID', ocrd_file.ID)
450
451
    def get_physical_page_for_file(self, ocrd_file):
452
        """
453
        Get the physical page ID (``@ID`` of the physical ``mets:structMap`` ``mets:div`` entry)
454
        corresponding to the ``mets:file`` :py:attr:`ocrd_file`.
455
        """
456
        ret = self._tree.getroot().xpath(
457
            '/mets:mets/mets:structMap[@TYPE="PHYSICAL"]/mets:div[@TYPE="physSequence"]/mets:div[@TYPE="page"][./mets:fptr[@FILEID="%s"]]/@ID' %
458
            ocrd_file.ID, namespaces=NS)
459
        if ret:
460
            return ret[0]
461
462
    def remove_physical_page(self, ID):
463
        """
464
        Delete page (physical ``mets:structMap`` ``mets:div`` entry ``@ID``) :py:attr:`ID`.
465
        """
466
        mets_div = self._tree.getroot().xpath(
467
            'mets:structMap[@TYPE="PHYSICAL"]/mets:div[@TYPE="physSequence"]/mets:div[@TYPE="page"][@ID="%s"]' % ID,
468
            namespaces=NS)
469
        if mets_div:
470
            mets_div[0].getparent().remove(mets_div[0])
471
472
    def remove_physical_page_fptr(self, fileId):
473
        """
474
        Delete all ``mets:fptr[@FILEID = fileId]`` to ``mets:file[@ID == fileId]`` for :py:attr:`fileId` from all ``mets:div`` entries in the physical ``mets:structMap``.
475
        Returns:
476
            List of pageIds that mets:fptrs were deleted from
477
        """
478
        mets_fptrs = self._tree.getroot().xpath(
479
            'mets:structMap[@TYPE="PHYSICAL"]/mets:div[@TYPE="physSequence"]/mets:div[@TYPE="page"]/mets:fptr[@FILEID="%s"]' % fileId,
480
            namespaces=NS)
481
        ret = []
482
        for mets_fptr in mets_fptrs:
483
            mets_div = mets_fptr.getparent()
484
            ret.append(mets_div.get('ID'))
485
            mets_div.remove(mets_fptr)
486
        return ret
487
488
    def merge(self, other_mets, fileGrp_mapping=None, fileId_mapping=None, pageId_mapping=None, after_add_cb=None, **kwargs):
489
        """
490
        Add all files from other_mets.
491
492
        Accepts the same kwargs as :py:func:`find_files`
493
494
        Keyword Args:
495
            fileGrp_mapping (dict): Map :py:attr:`other_mets` fileGrp to fileGrp in this METS
496
            fileId_mapping (dict): Map :py:attr:`other_mets` file ID to file ID in this METS
497
            pageId_mapping (dict): Map :py:attr:`other_mets` page ID to page ID in this METS
498
            after_add_cb (function): Callback received after file is added to the METS
499
        """
500
        if not fileGrp_mapping:
501
            fileGrp_mapping = {}
502
        if not fileId_mapping:
503
            fileId_mapping = {}
504
        if not pageId_mapping:
505
            pageId_mapping = {}
506
        for f_src in other_mets.find_files(**kwargs):
507
            f_dest = self.add_file(
508
                    fileGrp_mapping.get(f_src.fileGrp, f_src.fileGrp),
509
                    mimetype=f_src.mimetype,
510
                    url=f_src.url,
511
                    ID=fileId_mapping.get(f_src.ID, f_src.ID),
512
                    pageId=pageId_mapping.get(f_src.pageId, f_src.pageId))
513
            # FIXME: merge metsHdr, amdSec, dmdSec as well
514
            # FIXME: merge structMap logical and structLink as well
515
            if after_add_cb:
516
                after_add_cb(f_dest)
517