Passed
Push — master ( 2e0112...0ca5aa )
by Konstantin
02:24
created

ocrd_models.ocrd_mets.OcrdMets.merge()   A

Complexity

Conditions 4

Size

Total Lines 21
Code Lines 12

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 12
dl 0
loc 21
rs 9.8
c 0
b 0
f 0
cc 4
nop 5
1
"""
2
API to METS
3
"""
4
from datetime import datetime
5
from re import fullmatch, search
6
from lxml import etree as ET
7
8
from ocrd_utils import (
9
    is_local_filename,
10
    getLogger,
11
    generate_range,
12
    VERSION,
13
    REGEX_PREFIX,
14
    REGEX_FILE_ID
15
)
16
17
from .constants import (
18
    NAMESPACES as NS,
19
    TAG_METS_AGENT,
20
    TAG_METS_DIV,
21
    TAG_METS_FILE,
22
    TAG_METS_FILEGRP,
23
    TAG_METS_FILESEC,
24
    TAG_METS_FPTR,
25
    TAG_METS_METSHDR,
26
    TAG_METS_STRUCTMAP,
27
    IDENTIFIER_PRIORITY,
28
    TAG_MODS_IDENTIFIER,
29
    METS_XML_EMPTY,
30
)
31
32
from .ocrd_xml_base import OcrdXmlDocument, ET
33
from .ocrd_file import OcrdFile
34
from .ocrd_agent import OcrdAgent
35
36
REGEX_PREFIX_LEN = len(REGEX_PREFIX)
37
38
class OcrdMets(OcrdXmlDocument):
39
    """
40
    API to a single METS file
41
    """
42
43
    @staticmethod
44
    def empty_mets(now=None):
45
        """
46
        Create an empty METS file from bundled template.
47
        """
48
        if not now:
49
            now = datetime.now().isoformat()
50
        tpl = METS_XML_EMPTY.decode('utf-8')
51
        tpl = tpl.replace('{{ VERSION }}', VERSION)
52
        tpl = tpl.replace('{{ NOW }}', '%s' % now)
53
        return OcrdMets(content=tpl.encode('utf-8'))
54
55
    def __init__(self, **kwargs):
56
        """
57
58
        """
59
        super(OcrdMets, self).__init__(**kwargs)
60
61
    def __str__(self):
62
        """
63
        String representation
64
        """
65
        return 'OcrdMets[fileGrps=%s,files=%s]' % (self.file_groups, list(self.find_files()))
66
67
    @property
68
    def unique_identifier(self):
69
        """
70
        Get the unique identifier by looking through ``mods:identifier``
71
72
        See `specs <https://ocr-d.de/en/spec/mets#unique-id-for-the-document-processed>`_ for details.
73
        """
74
        for t in IDENTIFIER_PRIORITY:
75
            found = self._tree.getroot().find('.//mods:identifier[@type="%s"]' % t, NS)
76
            if found is not None:
77
                return found.text
78
79
    @unique_identifier.setter
80
    def unique_identifier(self, purl):
81
        """
82
        Set the unique identifier by looking through ``mods:identifier``
83
84
        See `specs <https://ocr-d.de/en/spec/mets#unique-id-for-the-document-processed>`_ for details.
85
        """
86
        id_el = None
87
        for t in IDENTIFIER_PRIORITY:
88
            id_el = self._tree.getroot().find('.//mods:identifier[@type="%s"]' % t, NS)
89
            if id_el is not None:
90
                break
91
        if id_el is None:
92
            mods = self._tree.getroot().find('.//mods:mods', NS)
93
            id_el = ET.SubElement(mods, TAG_MODS_IDENTIFIER)
94
            id_el.set('type', 'purl')
95
        id_el.text = purl
96
97
    @property
98
    def agents(self):
99
        """
100
        List all :py:class:`ocrd_models.ocrd_agent.OcrdAgent`s
101
        """
102
        return [OcrdAgent(el_agent) for el_agent in self._tree.getroot().findall('mets:metsHdr/mets:agent', NS)]
103
104
    def add_agent(self, *args, **kwargs):
105
        """
106
        Add an :py:class:`ocrd_models.ocrd_agent.OcrdAgent` to the list of agents in the `metsHdr`.
107
        """
108
        el_metsHdr = self._tree.getroot().find('.//mets:metsHdr', NS)
109
        if el_metsHdr is None:
110
            el_metsHdr = ET.Element(TAG_METS_METSHDR)
111
            self._tree.getroot().insert(0, el_metsHdr)
112
        #  assert(el_metsHdr is not None)
113
        el_agent = ET.SubElement(el_metsHdr, TAG_METS_AGENT)
114
        #  print(ET.tostring(el_metsHdr))
115
        return OcrdAgent(el_agent, *args, **kwargs)
116
117
    @property
118
    def file_groups(self):
119
        """
120
        List the `@USE` of all `mets:fileGrp` entries.
121
        """
122
        return [el.get('USE') for el in self._tree.getroot().findall('.//mets:fileGrp', NS)]
123
124
    def find_all_files(self, *args, **kwargs):
125
        """
126
        Like :py:meth:`find_files` but return a list of all results.
127
128
        Equivalent to ``list(self.find_files(...))``
129
        """
130
        return list(self.find_files(*args, **kwargs))
131
132
    # pylint: disable=multiple-statements
133
    def find_files(self, ID=None, fileGrp=None, pageId=None, mimetype=None, url=None, local_only=False):
134
        """
135
        Search `mets:file` entries in this METS document and yield results.
136
137
138
        The ``ID``, ``fileGrp``, ``url`` and ``mimetype`` parameters can each be
139
        either a literal string or a regular expression if the string starts
140
        with `//` (double slash). If it is a regex, the leading `//` is removed
141
        and candidates are matched against the regex with `re.fullmatch`. If it is
142
        a literal string, comparison is done with string equality.
143
144
        The ``pageId`` parameter supports the numeric range operator ``..``. For
145
        example, to find all files in pages ``PHYS_0001`` to ``PHYS_0003``, 
146
        ``PHYS_0001..PHYS_0003`` will be expanded to ``PHYS_0001,PHYS_0002,PHYS_0003``.
147
148
        Keyword Args:
149
            ID (string) : `@ID` of the `mets:file`
150
            fileGrp (string) : `@USE` of the `mets:fileGrp` to list files of
151
            pageId (string) : `@ID` of the corresponding physical `mets:structMap` entry (physical page)
152
            url (string) : `@xlink:href` (URL or path) of `mets:Flocat` of `mets:file`
153
            mimetype (string) : `@MIMETYPE` of `mets:file`
154
            local (boolean) : Whether to restrict results to local files in the filesystem
155
156
        Yields:
157
            :py:class:`ocrd_models:ocrd_file:OcrdFile` instantiations
158
        """
159
        ret = []
160
        if pageId:
161
            if pageId.startswith(REGEX_PREFIX):
162
                raise Exception("find_files does not support regex search for pageId")
163
            pageIds, pageId = pageId.split(','), list()
164
            pageIds_expanded = []
165
            for pageId_ in pageIds:
166
                if '..' in pageId_:
167
                    pageIds_expanded += generate_range(*pageId_.split('..', 2))
168
            pageIds += pageIds_expanded
169
            for page in self._tree.getroot().xpath(
170
                '//mets:div[@TYPE="page"]', namespaces=NS):
171
                if page.get('ID') in pageIds:
172
                    pageId.extend(
173
                        [fptr.get('FILEID') for fptr in page.findall('mets:fptr', NS)])
174
        for cand in self._tree.getroot().xpath('//mets:file', namespaces=NS):
175
            if ID:
176
                if ID.startswith(REGEX_PREFIX):
177
                    if not fullmatch(ID[REGEX_PREFIX_LEN:], cand.get('ID')): continue
178
                else:
179
                    if not ID == cand.get('ID'): continue
180
181
            if pageId is not None and cand.get('ID') not in pageId:
182
                continue
183
184
            if fileGrp:
185
                if fileGrp.startswith(REGEX_PREFIX):
186
                    if not fullmatch(fileGrp[REGEX_PREFIX_LEN:], cand.getparent().get('USE')): continue
187
                else:
188
                    if cand.getparent().get('USE') != fileGrp: continue
189
190
            if mimetype:
191
                if mimetype.startswith(REGEX_PREFIX):
192
                    if not fullmatch(mimetype[REGEX_PREFIX_LEN:], cand.get('MIMETYPE') or ''): continue
193
                else:
194
                    if cand.get('MIMETYPE') != mimetype: continue
195
196
            if url:
197
                cand_locat = cand.find('mets:FLocat', namespaces=NS)
198
                if cand_locat is None:
199
                    continue
200
                cand_url = cand_locat.get('{%s}href' % NS['xlink'])
201
                if url.startswith(REGEX_PREFIX):
202
                    if not fullmatch(url[REGEX_PREFIX_LEN:], cand_url): continue
203
                else:
204
                    if cand_url != url: continue
205
206
            f = OcrdFile(cand, mets=self)
207
208
            # If only local resources should be returned and f is not a file path: skip the file
209
            if local_only and not is_local_filename(f.url):
210
                continue
211
            yield f
212
213
    def add_file_group(self, fileGrp):
214
        """
215
        Add a new `mets:fileGrp`.
216
217
        Arguments:
218
            fileGrp (string): `@USE` of the new `mets:fileGrp`.
219
        """
220
        if ',' in fileGrp:
221
            raise Exception('fileGrp must not contain commas')
222
        el_fileSec = self._tree.getroot().find('mets:fileSec', NS)
223
        if el_fileSec is None:
224
            el_fileSec = ET.SubElement(self._tree.getroot(), TAG_METS_FILESEC)
225
        el_fileGrp = el_fileSec.find('mets:fileGrp[@USE="%s"]' % fileGrp, NS)
226
        if el_fileGrp is None:
227
            el_fileGrp = ET.SubElement(el_fileSec, TAG_METS_FILEGRP)
228
            el_fileGrp.set('USE', fileGrp)
229
        return el_fileGrp
230
231
    def rename_file_group(self, old, new):
232
        """
233
        Rename a `mets:fileGrp` by changing the `@USE` from ``old`` to ``new``.
234
        """
235
        el_fileGrp = self._tree.getroot().find('mets:fileSec/mets:fileGrp[@USE="%s"]' % old, NS)
236
        if el_fileGrp is None:
237
            raise FileNotFoundError("No such fileGrp '%s'" % old)
238
        el_fileGrp.set('USE', new)
239
240
    def remove_file_group(self, USE, recursive=False, force=False):
241
        """
242
        Remove a `mets:fileGrp` (fixed `@USE`) or `mets:fileGrp`s (regex `@USE`)
243
244
        Arguments:
245
            USE (string): `@USE` of the `mets:fileGrp` to delete. Can be a regex if prefixed with `//`
246
            recursive (boolean): Whether to recursively delete all `mets:file`s in the group
247
            force (boolean): Do not raise an exception if `mets:fileGrp` does not exist
248
        """
249
        log = getLogger('ocrd_models.ocrd_mets.remove_file_group')
250
        el_fileSec = self._tree.getroot().find('mets:fileSec', NS)
251
        if el_fileSec is None:
252
            raise Exception("No fileSec!")
253
        if isinstance(USE, str):
254
            if USE.startswith(REGEX_PREFIX):
255
                for cand in el_fileSec.findall('mets:fileGrp', NS):
256
                    if fullmatch(USE[REGEX_PREFIX_LEN:], cand.get('USE')):
257
                        self.remove_file_group(cand, recursive=recursive)
258
                return
259
            else:
260
                el_fileGrp = el_fileSec.find('mets:fileGrp[@USE="%s"]' % USE, NS)
261
        else:
262
            el_fileGrp = USE
263
        if el_fileGrp is None:   # pylint: disable=len-as-condition
264
            msg = "No such fileGrp: %s" % USE
265
            if force:
266
                log.warning(msg)
267
                return
268
            raise Exception(msg)
269
        files = el_fileGrp.findall('mets:file', NS)
270
        if files:
271
            if not recursive:
272
                raise Exception("fileGrp %s is not empty and recursive wasn't set" % USE)
273
            for f in files:
274
                self.remove_one_file(f.get('ID'))
275
        el_fileGrp.getparent().remove(el_fileGrp)
276
277
    def add_file(self, fileGrp, mimetype=None, url=None, ID=None, pageId=None, force=False, local_filename=None, ignore=False, **kwargs):
278
        """
279
        Instantiate and add a new :py:class:`ocrd_models.ocrd_file.OcrdFile`.
280
281
        Arguments:
282
            fileGrp (string): `@USE` of `mets:fileGrp` to add to
283
        Keyword Args:
284
            mimetype (string): `@MIMETYPE` of the `mets:file` to use
285
            url (string): `@xlink:href` (URL or path) of the `mets:file` to use
286
            ID (string): `@ID` of the `mets:file` to use
287
            pageId (string): `@ID` in the physical `mets:structMap` to link to
288
            force (boolean): Whether to add the file even if a `mets:file` with the same `@ID` already exists.
289
            ignore (boolean): Do not look for existing files at all. Shift responsibility for preventing errors from duplicate ID to the user.
290
            local_filename (string):
291
        """
292
        if not ID:
293
            raise Exception("Must set ID of the mets:file")
294
        elif not REGEX_FILE_ID.fullmatch(ID):
295
            raise Exception("Invalid syntax for mets:file/@ID %s" % ID)
296
        el_fileGrp = self._tree.getroot().find(".//mets:fileGrp[@USE='%s']" % (fileGrp), NS)
297
        if el_fileGrp is None:
298
            el_fileGrp = self.add_file_group(fileGrp)
299
        if ID and not ignore and next(self.find_files(ID=ID), None):
300
            if not force:
301
                raise Exception("File with ID='%s' already exists" % ID)
302
            mets_file = next(self.find_files(ID=ID))
303
        else:
304
            mets_file = OcrdFile(ET.SubElement(el_fileGrp, TAG_METS_FILE), mets=self)
305
        mets_file.url = url
306
        mets_file.mimetype = mimetype
307
        mets_file.ID = ID
308
        mets_file.pageId = pageId
309
        mets_file.local_filename = local_filename
310
311
        return mets_file
312
313
    def remove_file(self, *args, **kwargs):
314
        """
315
        Delete all `ocrd:file`s matching the query. Same arguments as :py:meth:`find_files`
316
        """
317
        files = list(self.find_files(*args, **kwargs))
318
        if files:
319
            for f in files:
320
                self.remove_one_file(f)
321
            if len(files) > 1:
322
                return files
323
            else:
324
                return files[0] # for backwards-compatibility
325
        raise FileNotFoundError("File not found: %s %s" % (args, kwargs))
326
327
    def remove_one_file(self, ID):
328
        """
329
        Delete an existing :py:class:`ocrd_models.ocrd_file.OcrdFile`.
330
        
331
        Arguments:
332
            ID (string): `@ID` of the `mets:file` to delete
333
            
334
        Returns:
335
            The old :py:class:`ocrd_models.ocrd_file.OcrdFile` reference.
336
        """
337
        log = getLogger('ocrd_models.ocrd_mets.remove_one_file')
338
        log.info("remove_one_file(%s)" % ID)
339
        if isinstance(ID, OcrdFile):
340
            ocrd_file = ID
341
            ID = ocrd_file.ID
342
        else:
343
            ocrd_file = next(self.find_files(ID=ID), None)
344
345
        if not ocrd_file:
346
            raise FileNotFoundError("File not found: %s" % ID)
347
348
        # Delete the physical page ref
349
        for fptr in self._tree.getroot().findall('.//mets:fptr[@FILEID="%s"]' % ID, namespaces=NS):
350
            log.info("Delete fptr element %s for page '%s'", fptr, ID)
351
            page_div = fptr.getparent()
352
            page_div.remove(fptr)
353
            # delete empty pages
354
            if not page_div.getchildren():
355
                log.info("Delete empty page %s", page_div)
356
                page_div.getparent().remove(page_div)
357
358
        # Delete the file reference
359
        # pylint: disable=protected-access
360
        ocrd_file._el.getparent().remove(ocrd_file._el)
361
362
        return ocrd_file
363
364
    @property
365
    def physical_pages(self):
366
        """
367
        List all page IDs (the `@ID`s of all physical `mets:structMap` `mets:div`s)
368
        """
369
        return self._tree.getroot().xpath(
370
            'mets:structMap[@TYPE="PHYSICAL"]/mets:div[@TYPE="physSequence"]/mets:div[@TYPE="page"]/@ID',
371
            namespaces=NS)
372
373
    def get_physical_pages(self, for_fileIds=None):
374
        """
375
        List all page IDs (the `@ID`s of all physical `mets:structMap` `mets:div`s),
376
        optionally for a subset of `mets:file` `@ID`s ``for_fileIds``.
377
        """
378
        if for_fileIds is None:
379
            return self.physical_pages
380
        ret = [None] * len(for_fileIds)
381
        for page in self._tree.getroot().xpath(
382
            'mets:structMap[@TYPE="PHYSICAL"]/mets:div[@TYPE="physSequence"]/mets:div[@TYPE="page"]',
383
                namespaces=NS):
384
            for fptr in page.findall('mets:fptr', NS):
385
                if fptr.get('FILEID') in for_fileIds:
386
                    ret[for_fileIds.index(fptr.get('FILEID'))] = page.get('ID')
387
        return ret
388
389
    def set_physical_page_for_file(self, pageId, ocrd_file, order=None, orderlabel=None):
390
        """
391
        Set the physical page ID (`@ID` of the physical `mets:structMap` `mets:div` entry)
392
        corresponding to the `mets:file` ``ocrd_file``, creating all structures if necessary.
393
        
394
        Arguments:
395
            pageId (string): `@ID` of the physical `mets:structMap` entry to use
396
            ocrd_file (object): existing :py:class:`ocrd_models.ocrd_file.OcrdFile` object
397
        Keyword Args:
398
            order (string): `@ORDER` to use
399
            orderlabel (string): `@ORDERLABEL` to use
400
        """
401
        #  print(pageId, ocrd_file)
402
        # delete any page mapping for this file.ID
403
        for el_fptr in self._tree.getroot().findall(
404
                'mets:structMap[@TYPE="PHYSICAL"]/mets:div[@TYPE="physSequence"]/mets:div[@TYPE="page"]/mets:fptr[@FILEID="%s"]' %
405
                ocrd_file.ID, namespaces=NS):
406
            el_fptr.getparent().remove(el_fptr)
407
408
        # find/construct as necessary
409
        el_structmap = self._tree.getroot().find('mets:structMap[@TYPE="PHYSICAL"]', NS)
410
        if el_structmap is None:
411
            el_structmap = ET.SubElement(self._tree.getroot(), TAG_METS_STRUCTMAP)
412
            el_structmap.set('TYPE', 'PHYSICAL')
413
        el_seqdiv = el_structmap.find('mets:div[@TYPE="physSequence"]', NS)
414
        if el_seqdiv is None:
415
            el_seqdiv = ET.SubElement(el_structmap, TAG_METS_DIV)
416
            el_seqdiv.set('TYPE', 'physSequence')
417
        el_pagediv = el_seqdiv.find('mets:div[@ID="%s"]' % pageId, NS)
418
        if el_pagediv is None:
419
            el_pagediv = ET.SubElement(el_seqdiv, TAG_METS_DIV)
420
            el_pagediv.set('TYPE', 'page')
421
            el_pagediv.set('ID', pageId)
422
            if order:
423
                el_pagediv.set('ORDER', order)
424
            if orderlabel:
425
                el_pagediv.set('ORDERLABEL', orderlabel)
426
        el_fptr = ET.SubElement(el_pagediv, TAG_METS_FPTR)
427
        el_fptr.set('FILEID', ocrd_file.ID)
428
429
    def get_physical_page_for_file(self, ocrd_file):
430
        """
431
        Get the physical page ID (`@ID` of the physical `mets:structMap` `mets:div` entry)
432
        corresponding to the `mets:file` ``ocrd_file``.
433
        """
434
        ret = self._tree.getroot().xpath(
435
            '/mets:mets/mets:structMap[@TYPE="PHYSICAL"]/mets:div[@TYPE="physSequence"]/mets:div[@TYPE="page"][./mets:fptr[@FILEID="%s"]]/@ID' %
436
            ocrd_file.ID, namespaces=NS)
437
        if ret:
438
            return ret[0]
439
440
    def remove_physical_page(self, ID):
441
        """
442
        Delete page (physical `mets:structMap` `mets:div` entry `@ID`) ``ID``.
443
        """
444
        mets_div = self._tree.getroot().xpath(
445
            'mets:structMap[@TYPE="PHYSICAL"]/mets:div[@TYPE="physSequence"]/mets:div[@TYPE="page"][@ID="%s"]' % ID,
446
            namespaces=NS)
447
        if mets_div:
448
            mets_div[0].getparent().remove(mets_div[0])
449
450
    def merge(self, other_mets, fileGrp_mapping=None, after_add_cb=None, **kwargs):
451
        """
452
        Add all files from other_mets.
453
454
        Accepts the same kwargs as :py:func:find_files
455
456
        Keyword Args:
457
            fileGrp_mapping (dict): Map ``other_mets`` fileGrp to fileGrp in this METS
458
            after_add_cb (function): Callback received after file is added to the METS
459
        """
460
        if not fileGrp_mapping:
461
            fileGrp_mapping = {}
462
        for f_src in other_mets.find_files(**kwargs):
463
            f_dest = self.add_file(
464
                    fileGrp_mapping.get(f_src.fileGrp, f_src.fileGrp),
465
                    mimetype=f_src.mimetype,
466
                    url=f_src.url,
467
                    ID=f_src.ID,
468
                    pageId=f_src.pageId)
469
            if after_add_cb:
470
                after_add_cb(f_dest)
471
472