Passed
Pull Request — master (#673)
by Konstantin
02:29
created

ocrd_models.ocrd_mets.OcrdMets.merge()   A

Complexity

Conditions 4

Size

Total Lines 21
Code Lines 12

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 12
dl 0
loc 21
rs 9.8
c 0
b 0
f 0
cc 4
nop 5
1
"""
2
API to METS
3
"""
4
from datetime import datetime
5
from re import fullmatch
6
from lxml import etree as ET
7
8
from ocrd_utils import is_local_filename, getLogger, VERSION, REGEX_PREFIX, REGEX_FILE_ID
9
10
from .constants import (
11
    NAMESPACES as NS,
12
    TAG_METS_AGENT,
13
    TAG_METS_DIV,
14
    TAG_METS_FILE,
15
    TAG_METS_FILEGRP,
16
    TAG_METS_FILESEC,
17
    TAG_METS_FPTR,
18
    TAG_METS_METSHDR,
19
    TAG_METS_STRUCTMAP,
20
    IDENTIFIER_PRIORITY,
21
    TAG_MODS_IDENTIFIER,
22
    METS_XML_EMPTY,
23
)
24
25
from .ocrd_xml_base import OcrdXmlDocument, ET
26
from .ocrd_file import OcrdFile
27
from .ocrd_agent import OcrdAgent
28
29
REGEX_PREFIX_LEN = len(REGEX_PREFIX)
30
31
class OcrdMets(OcrdXmlDocument):
32
    """
33
    API to a single METS file
34
    """
35
36
    @staticmethod
37
    def empty_mets(now=None):
38
        """
39
        Create an empty METS file from bundled template.
40
        """
41
        if not now:
42
            now = datetime.now().isoformat()
43
        tpl = METS_XML_EMPTY.decode('utf-8')
44
        tpl = tpl.replace('{{ VERSION }}', VERSION)
45
        tpl = tpl.replace('{{ NOW }}', '%s' % now)
46
        return OcrdMets(content=tpl.encode('utf-8'))
47
48
    def __init__(self, **kwargs):
49
        """
50
51
        """
52
        super(OcrdMets, self).__init__(**kwargs)
53
54
    def __str__(self):
55
        """
56
        String representation
57
        """
58
        return 'OcrdMets[fileGrps=%s,files=%s]' % (self.file_groups, list(self.find_files()))
59
60
    @property
61
    def unique_identifier(self):
62
        """
63
        Get the unique identifier by looking through ``mods:identifier``
64
65
        See `specs <https://ocr-d.de/en/spec/mets#unique-id-for-the-document-processed>`_ for details.
66
        """
67
        for t in IDENTIFIER_PRIORITY:
68
            found = self._tree.getroot().find('.//mods:identifier[@type="%s"]' % t, NS)
69
            if found is not None:
70
                return found.text
71
72
    @unique_identifier.setter
73
    def unique_identifier(self, purl):
74
        """
75
        Set the unique identifier by looking through ``mods:identifier``
76
77
        See `specs <https://ocr-d.de/en/spec/mets#unique-id-for-the-document-processed>`_ for details.
78
        """
79
        id_el = None
80
        for t in IDENTIFIER_PRIORITY:
81
            id_el = self._tree.getroot().find('.//mods:identifier[@type="%s"]' % t, NS)
82
            if id_el is not None:
83
                break
84
        if id_el is None:
85
            mods = self._tree.getroot().find('.//mods:mods', NS)
86
            id_el = ET.SubElement(mods, TAG_MODS_IDENTIFIER)
87
            id_el.set('type', 'purl')
88
        id_el.text = purl
89
90
    @property
91
    def agents(self):
92
        """
93
        List all :py:class:`ocrd_models.ocrd_agent.OcrdAgent`s
94
        """
95
        return [OcrdAgent(el_agent) for el_agent in self._tree.getroot().findall('mets:metsHdr/mets:agent', NS)]
96
97
    def add_agent(self, *args, **kwargs):
98
        """
99
        Add an :py:class:`ocrd_models.ocrd_agent.OcrdAgent` to the list of agents in the `metsHdr`.
100
        """
101
        el_metsHdr = self._tree.getroot().find('.//mets:metsHdr', NS)
102
        if el_metsHdr is None:
103
            el_metsHdr = ET.Element(TAG_METS_METSHDR)
104
            self._tree.getroot().insert(0, el_metsHdr)
105
        #  assert(el_metsHdr is not None)
106
        el_agent = ET.SubElement(el_metsHdr, TAG_METS_AGENT)
107
        #  print(ET.tostring(el_metsHdr))
108
        return OcrdAgent(el_agent, *args, **kwargs)
109
110
    @property
111
    def file_groups(self):
112
        """
113
        List the `@USE` of all `mets:fileGrp` entries.
114
        """
115
        return [el.get('USE') for el in self._tree.getroot().findall('.//mets:fileGrp', NS)]
116
117
    def find_all_files(self, *args, **kwargs):
118
        """
119
        Like :py:meth:`find_files` but return a list of all results.
120
121
        Equivalent to ``list(self.find_files(...))``
122
        """
123
        return list(self.find_files(*args, **kwargs))
124
125
    # pylint: disable=multiple-statements
126
    def find_files(self, ID=None, fileGrp=None, pageId=None, mimetype=None, url=None, local_only=False):
127
        """
128
        Search `mets:file` entries in this METS document and yield results.
129
130
131
        The ``ID``, ``fileGrp``, ``url`` and ``mimetype`` parameters can each be
132
        either a literal string or a regular expression if the string starts
133
        with `//` (double slash). If it is a regex, the leading `//` is removed
134
        and candidates are matched against the regex with `re.fullmatch`. If it is
135
        a literal string, comparison is done with string equality.
136
137
        Keyword Args:
138
            ID (string) : `@ID` of the `mets:file`
139
            fileGrp (string) : `@USE` of the `mets:fileGrp` to list files of
140
            pageId (string) : `@ID` of the corresponding physical `mets:structMap` entry (physical page)
141
            url (string) : `@xlink:href` (URL or path) of `mets:Flocat` of `mets:file`
142
            mimetype (string) : `@MIMETYPE` of `mets:file`
143
            local (boolean) : Whether to restrict results to local files in the filesystem
144
145
        Yields:
146
            :py:class:`ocrd_models:ocrd_file:OcrdFile` instantiations
147
        """
148
        ret = []
149
        if pageId:
150
            if pageId.startswith(REGEX_PREFIX):
151
                raise Exception("find_files does not support regex search for pageId")
152
            pageIds, pageId = pageId.split(','), list()
153
            for page in self._tree.getroot().xpath(
154
                '//mets:div[@TYPE="page"]', namespaces=NS):
155
                if page.get('ID') in pageIds:
156
                    pageId.extend(
157
                        [fptr.get('FILEID') for fptr in page.findall('mets:fptr', NS)])
158
        for cand in self._tree.getroot().xpath('//mets:file', namespaces=NS):
159
            if ID:
160
                if ID.startswith(REGEX_PREFIX):
161
                    if not fullmatch(ID[REGEX_PREFIX_LEN:], cand.get('ID')): continue
162
                else:
163
                    if not ID == cand.get('ID'): continue
164
165
            if pageId is not None and cand.get('ID') not in pageId:
166
                continue
167
168
            if fileGrp:
169
                if fileGrp.startswith(REGEX_PREFIX):
170
                    if not fullmatch(fileGrp[REGEX_PREFIX_LEN:], cand.getparent().get('USE')): continue
171
                else:
172
                    if cand.getparent().get('USE') != fileGrp: continue
173
174
            if mimetype:
175
                if mimetype.startswith(REGEX_PREFIX):
176
                    if not fullmatch(mimetype[REGEX_PREFIX_LEN:], cand.get('MIMETYPE') or ''): continue
177
                else:
178
                    if cand.get('MIMETYPE') != mimetype: continue
179
180
            if url:
181
                cand_locat = cand.find('mets:FLocat', namespaces=NS)
182
                if cand_locat is None:
183
                    continue
184
                cand_url = cand_locat.get('{%s}href' % NS['xlink'])
185
                if url.startswith(REGEX_PREFIX):
186
                    if not fullmatch(url[REGEX_PREFIX_LEN:], cand_url): continue
187
                else:
188
                    if cand_url != url: continue
189
190
            f = OcrdFile(cand, mets=self)
191
192
            # If only local resources should be returned and f is not a file path: skip the file
193
            if local_only and not is_local_filename(f.url):
194
                continue
195
            yield f
196
197
    def add_file_group(self, fileGrp):
198
        """
199
        Add a new `mets:fileGrp`.
200
201
        Arguments:
202
            fileGrp (string): `@USE` of the new `mets:fileGrp`.
203
        """
204
        if ',' in fileGrp:
205
            raise Exception('fileGrp must not contain commas')
206
        el_fileSec = self._tree.getroot().find('mets:fileSec', NS)
207
        if el_fileSec is None:
208
            el_fileSec = ET.SubElement(self._tree.getroot(), TAG_METS_FILESEC)
209
        el_fileGrp = el_fileSec.find('mets:fileGrp[@USE="%s"]' % fileGrp, NS)
210
        if el_fileGrp is None:
211
            el_fileGrp = ET.SubElement(el_fileSec, TAG_METS_FILEGRP)
212
            el_fileGrp.set('USE', fileGrp)
213
        return el_fileGrp
214
215
    def rename_file_group(self, old, new):
216
        """
217
        Rename a `mets:fileGrp` by changing the `@USE` from ``old`` to ``new``.
218
        """
219
        el_fileGrp = self._tree.getroot().find('mets:fileSec/mets:fileGrp[@USE="%s"]' % old, NS)
220
        if el_fileGrp is None:
221
            raise FileNotFoundError("No such fileGrp '%s'" % old)
222
        el_fileGrp.set('USE', new)
223
224
    def remove_file_group(self, USE, recursive=False, force=False):
225
        """
226
        Remove a `mets:fileGrp` (fixed `@USE`) or `mets:fileGrp`s (regex `@USE`)
227
228
        Arguments:
229
            USE (string): `@USE` of the `mets:fileGrp` to delete. Can be a regex if prefixed with `//`
230
            recursive (boolean): Whether to recursively delete all `mets:file`s in the group
231
            force (boolean): Do not raise an exception if `mets:fileGrp` does not exist
232
        """
233
        log = getLogger('ocrd_models.ocrd_mets.remove_file_group')
234
        el_fileSec = self._tree.getroot().find('mets:fileSec', NS)
235
        if el_fileSec is None:
236
            raise Exception("No fileSec!")
237
        if isinstance(USE, str):
238
            if USE.startswith(REGEX_PREFIX):
239
                for cand in el_fileSec.findall('mets:fileGrp', NS):
240
                    if fullmatch(USE[REGEX_PREFIX_LEN:], cand.get('USE')):
241
                        self.remove_file_group(cand, recursive=recursive)
242
                return
243
            else:
244
                el_fileGrp = el_fileSec.find('mets:fileGrp[@USE="%s"]' % USE, NS)
245
        else:
246
            el_fileGrp = USE
247
        if el_fileGrp is None:   # pylint: disable=len-as-condition
248
            msg = "No such fileGrp: %s" % USE
249
            if force:
250
                log.warning(msg)
251
                return
252
            raise Exception(msg)
253
        files = el_fileGrp.findall('mets:file', NS)
254
        if files:
255
            if not recursive:
256
                raise Exception("fileGrp %s is not empty and recursive wasn't set" % USE)
257
            for f in files:
258
                self.remove_one_file(f.get('ID'))
259
        el_fileGrp.getparent().remove(el_fileGrp)
260
261
    def add_file(self, fileGrp, mimetype=None, url=None, ID=None, pageId=None, force=False, local_filename=None, ignore=False, **kwargs):
262
        """
263
        Instantiate and add a new :py:class:`ocrd_models.ocrd_file.OcrdFile`.
264
265
        Arguments:
266
            fileGrp (string): `@USE` of `mets:fileGrp` to add to
267
        Keyword Args:
268
            mimetype (string): `@MIMETYPE` of the `mets:file` to use
269
            url (string): `@xlink:href` (URL or path) of the `mets:file` to use
270
            ID (string): `@ID` of the `mets:file` to use
271
            pageId (string): `@ID` in the physical `mets:structMap` to link to
272
            force (boolean): Whether to add the file even if a `mets:file` with the same `@ID` already exists.
273
            ignore (boolean): Do not look for existing files at all. Shift responsibility for preventing errors from duplicate ID to the user.
274
            local_filename (string):
275
        """
276
        if not ID:
277
            raise Exception("Must set ID of the mets:file")
278
        elif not REGEX_FILE_ID.fullmatch(ID):
279
            raise Exception("Invalid syntax for mets:file/@ID %s" % ID)
280
        el_fileGrp = self._tree.getroot().find(".//mets:fileGrp[@USE='%s']" % (fileGrp), NS)
281
        if el_fileGrp is None:
282
            el_fileGrp = self.add_file_group(fileGrp)
283
        if ID and not ignore and next(self.find_files(ID=ID), None):
284
            if not force:
285
                raise Exception("File with ID='%s' already exists" % ID)
286
            mets_file = next(self.find_files(ID=ID))
287
        else:
288
            mets_file = OcrdFile(ET.SubElement(el_fileGrp, TAG_METS_FILE), mets=self)
289
        mets_file.url = url
290
        mets_file.mimetype = mimetype
291
        mets_file.ID = ID
292
        mets_file.pageId = pageId
293
        mets_file.local_filename = local_filename
294
295
        return mets_file
296
297
    def remove_file(self, *args, **kwargs):
298
        """
299
        Delete all `ocrd:file`s matching the query. Same arguments as :py:meth:`find_files`
300
        """
301
        files = list(self.find_files(*args, **kwargs))
302
        if files:
303
            for f in files:
304
                self.remove_one_file(f)
305
            if len(files) > 1:
306
                return files
307
            else:
308
                return files[0] # for backwards-compatibility
309
        raise FileNotFoundError("File not found: %s %s" % (args, kwargs))
310
311
    def remove_one_file(self, ID):
312
        """
313
        Delete an existing :py:class:`ocrd_models.ocrd_file.OcrdFile`.
314
        
315
        Arguments:
316
            ID (string): `@ID` of the `mets:file` to delete
317
            
318
        Returns:
319
            The old :py:class:`ocrd_models.ocrd_file.OcrdFile` reference.
320
        """
321
        log = getLogger('ocrd_models.ocrd_mets.remove_one_file')
322
        log.info("remove_one_file(%s)" % ID)
323
        if isinstance(ID, OcrdFile):
324
            ocrd_file = ID
325
            ID = ocrd_file.ID
326
        else:
327
            ocrd_file = next(self.find_files(ID=ID), None)
328
329
        if not ocrd_file:
330
            raise FileNotFoundError("File not found: %s" % ID)
331
332
        # Delete the physical page ref
333
        for fptr in self._tree.getroot().findall('.//mets:fptr[@FILEID="%s"]' % ID, namespaces=NS):
334
            log.info("Delete fptr element %s for page '%s'", fptr, ID)
335
            page_div = fptr.getparent()
336
            page_div.remove(fptr)
337
            # delete empty pages
338
            if not page_div.getchildren():
339
                log.info("Delete empty page %s", page_div)
340
                page_div.getparent().remove(page_div)
341
342
        # Delete the file reference
343
        # pylint: disable=protected-access
344
        ocrd_file._el.getparent().remove(ocrd_file._el)
345
346
        return ocrd_file
347
348
    @property
349
    def physical_pages(self):
350
        """
351
        List all page IDs (the `@ID`s of all physical `mets:structMap` `mets:div`s)
352
        """
353
        return self._tree.getroot().xpath(
354
            'mets:structMap[@TYPE="PHYSICAL"]/mets:div[@TYPE="physSequence"]/mets:div[@TYPE="page"]/@ID',
355
            namespaces=NS)
356
357
    def get_physical_pages(self, for_fileIds=None):
358
        """
359
        List all page IDs (the `@ID`s of all physical `mets:structMap` `mets:div`s),
360
        optionally for a subset of `mets:file` `@ID`s ``for_fileIds``.
361
        """
362
        if for_fileIds is None:
363
            return self.physical_pages
364
        ret = [None] * len(for_fileIds)
365
        for page in self._tree.getroot().xpath(
366
            'mets:structMap[@TYPE="PHYSICAL"]/mets:div[@TYPE="physSequence"]/mets:div[@TYPE="page"]',
367
                namespaces=NS):
368
            for fptr in page.findall('mets:fptr', NS):
369
                if fptr.get('FILEID') in for_fileIds:
370
                    ret[for_fileIds.index(fptr.get('FILEID'))] = page.get('ID')
371
        return ret
372
373
    def set_physical_page_for_file(self, pageId, ocrd_file, order=None, orderlabel=None):
374
        """
375
        Set the physical page ID (`@ID` of the physical `mets:structMap` `mets:div` entry)
376
        corresponding to the `mets:file` ``ocrd_file``, creating all structures if necessary.
377
        
378
        Arguments:
379
            pageId (string): `@ID` of the physical `mets:structMap` entry to use
380
            ocrd_file (object): existing :py:class:`ocrd_models.ocrd_file.OcrdFile` object
381
        Keyword Args:
382
            order (string): `@ORDER` to use
383
            orderlabel (string): `@ORDERLABEL` to use
384
        """
385
        #  print(pageId, ocrd_file)
386
        # delete any page mapping for this file.ID
387
        for el_fptr in self._tree.getroot().findall(
388
                'mets:structMap[@TYPE="PHYSICAL"]/mets:div[@TYPE="physSequence"]/mets:div[@TYPE="page"]/mets:fptr[@FILEID="%s"]' %
389
                ocrd_file.ID, namespaces=NS):
390
            el_fptr.getparent().remove(el_fptr)
391
392
        # find/construct as necessary
393
        el_structmap = self._tree.getroot().find('mets:structMap[@TYPE="PHYSICAL"]', NS)
394
        if el_structmap is None:
395
            el_structmap = ET.SubElement(self._tree.getroot(), TAG_METS_STRUCTMAP)
396
            el_structmap.set('TYPE', 'PHYSICAL')
397
        el_seqdiv = el_structmap.find('mets:div[@TYPE="physSequence"]', NS)
398
        if el_seqdiv is None:
399
            el_seqdiv = ET.SubElement(el_structmap, TAG_METS_DIV)
400
            el_seqdiv.set('TYPE', 'physSequence')
401
        el_pagediv = el_seqdiv.find('mets:div[@ID="%s"]' % pageId, NS)
402
        if el_pagediv is None:
403
            el_pagediv = ET.SubElement(el_seqdiv, TAG_METS_DIV)
404
            el_pagediv.set('TYPE', 'page')
405
            el_pagediv.set('ID', pageId)
406
            if order:
407
                el_pagediv.set('ORDER', order)
408
            if orderlabel:
409
                el_pagediv.set('ORDERLABEL', orderlabel)
410
        el_fptr = ET.SubElement(el_pagediv, TAG_METS_FPTR)
411
        el_fptr.set('FILEID', ocrd_file.ID)
412
413
    def get_physical_page_for_file(self, ocrd_file):
414
        """
415
        Get the physical page ID (`@ID` of the physical `mets:structMap` `mets:div` entry)
416
        corresponding to the `mets:file` ``ocrd_file``.
417
        """
418
        ret = self._tree.getroot().xpath(
419
            '/mets:mets/mets:structMap[@TYPE="PHYSICAL"]/mets:div[@TYPE="physSequence"]/mets:div[@TYPE="page"][./mets:fptr[@FILEID="%s"]]/@ID' %
420
            ocrd_file.ID, namespaces=NS)
421
        if ret:
422
            return ret[0]
423
424
    def remove_physical_page(self, ID):
425
        """
426
        Delete page (physical `mets:structMap` `mets:div` entry `@ID`) ``ID``.
427
        """
428
        mets_div = self._tree.getroot().xpath(
429
            'mets:structMap[@TYPE="PHYSICAL"]/mets:div[@TYPE="physSequence"]/mets:div[@TYPE="page"][@ID="%s"]' % ID,
430
            namespaces=NS)
431
        if mets_div:
432
            mets_div[0].getparent().remove(mets_div[0])
433
434
    def merge(self, other_mets, fileGrp_mapping=None, after_add_cb=None, **kwargs):
435
        """
436
        Add all files from other_mets.
437
438
        Accepts the same kwargs as :py:func:find_files
439
440
        Keyword Args:
441
            fileGrp_mapping (dict): Map ``other_mets`` fileGrp to fileGrp in this METS
442
            after_add_cb (function): Callback received after file is added to the METS
443
        """
444
        if not fileGrp_mapping:
445
            fileGrp_mapping = {}
446
        for f_src in other_mets.find_files(**kwargs):
447
            f_dest = self.add_file(
448
                    fileGrp_mapping.get(f_src.fileGrp, f_src.fileGrp),
449
                    mimetype=f_src.mimetype,
450
                    url=f_src.url,
451
                    ID=f_src.ID,
452
                    pageId=f_src.pageId)
453
            if after_add_cb:
454
                after_add_cb(f_dest)
455
456