Passed
Pull Request — master (#1329)
by
unknown
03:57 queued 01:46
created

ocrd_models.ocrd_mets.OcrdMets.merge()   B

Complexity

Conditions 6

Size

Total Lines 34
Code Lines 22

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 22
dl 0
loc 34
rs 8.4186
c 0
b 0
f 0
cc 6
nop 8

How to fix   Many Parameters   

Many Parameters

Methods with many parameters are not only hard to understand, but their parameters also often become inconsistent when you need more, or different data.

There are several approaches to avoid long parameter lists:

1
"""
2
API to METS
3
"""
4
from datetime import datetime
5
import re
6
from lxml import etree as ET
7
from typing import Any, Callable, Dict, Iterator, List, Optional, Tuple, Union
8
9
from ocrd_utils import (
10
    getLogger,
11
    generate_range,
12
    VERSION,
13
    REGEX_PREFIX,
14
    REGEX_FILE_ID
15
)
16
17
from ocrd_utils.config import config
18
19
from .constants import (
20
    NAMESPACES as NS,
21
    TAG_METS_AGENT,
22
    TAG_METS_DIV,
23
    TAG_METS_FILE,
24
    TAG_METS_FILEGRP,
25
    TAG_METS_FILESEC,
26
    TAG_METS_FPTR,
27
    TAG_METS_METSHDR,
28
    TAG_METS_STRUCTMAP,
29
    IDENTIFIER_PRIORITY,
30
    TAG_MODS_IDENTIFIER,
31
    METS_XML_EMPTY,
32
    METS_PAGE_DIV_ATTRIBUTE,
33
    METS_STRUCT_DIV_ATTRIBUTE,
34
    METS_DIV_ATTRIBUTE_PATTERN,
35
    METS_DIV_ATTRIBUTE_ATOM_PATTERN,
36
    METS_DIV_ATTRIBUTE_RANGE_PATTERN,
37
    METS_DIV_ATTRIBUTE_REGEX_PATTERN,
38
)
39
40
from .ocrd_xml_base import OcrdXmlDocument, ET      # type: ignore
41
from .ocrd_file import OcrdFile
42
from .ocrd_agent import OcrdAgent
43
44
REGEX_PREFIX_LEN = len(REGEX_PREFIX)
45
46
class OcrdMets(OcrdXmlDocument):
47
    """
48
    API to a single METS file
49
    """
50
    _cache_flag : bool
51
    # Cache for the physical pages (mets:div) - two nested dictionaries
52
    # The outer dictionary's key: attribute type
53
    # The outer dictionary's value: inner dictionary
54
    # The inner dictionary's key: attribute value (str)
55
    # The inner dictionary's value: a 'div' object at some memory location
56
    _page_cache : Dict[METS_PAGE_DIV_ATTRIBUTE, Dict[str, ET._Element]]
57
    # Cache for the files (mets:file) - two nested dictionaries
58
    # The outer dictionary's Key: 'fileGrp.USE'
59
    # The outer dictionary's Value: Inner dictionary
60
    # The inner dictionary's Key: 'file.ID'
61
    # The inner dictionary's Value: a 'file' object at some memory location
62
    _file_cache : Dict[str, Dict[str, ET._Element]]
63
    # Cache for the file pointers (mets:fptr) - two nested dictionaries
64
    # The outer dictionary's Key: 'div.ID'
65
    # The outer dictionary's Value: Inner dictionary
66
    # The inner dictionary's Key: 'fptr.FILEID'
67
    # The inner dictionary's Value: a 'fptr' object at some memory location
68
    _fptr_cache : Dict[str, Dict[str, ET._Element]]
69
    # Cache for the logical structural divs (mets:div) - two nested dictionaries
70
    # The outer dictionary's key: attribute type
71
    # The outer dictionary's value: inner dictionary
72
    # The inner dictionary's key: attribute value (str)
73
    # The inner dictionary's value: a list of corresponding physical div.ID
74
    _struct_cache : Dict[METS_STRUCT_DIV_ATTRIBUTE, Dict[str, List[str]]]
75
76
    @staticmethod
77
    def empty_mets(now : Optional[str] = None, cache_flag : bool = False):
78
        """
79
        Create an empty METS file from bundled template.
80
        """
81
        if not now:
82
            now = datetime.now().isoformat()
83
        tpl = METS_XML_EMPTY
84
        tpl = tpl.replace('{{ VERSION }}', VERSION)
85
        tpl = tpl.replace('{{ NOW }}', '%s' % now)
86
        return OcrdMets(content=tpl.encode('utf-8'), cache_flag=cache_flag)
87
88
    def __init__(self, **kwargs) -> None:
89
        """
90
        """
91
        super().__init__(**kwargs)
92
93
        # XXX If the environment variable OCRD_METS_CACHING is set to "true",
94
        # then enable caching, if "false", disable caching, overriding the
95
        # kwarg to the constructor
96
        if config.is_set('OCRD_METS_CACHING'):
97
            getLogger('ocrd.models.ocrd_mets').debug('METS Caching %s because OCRD_METS_CACHING is %s',
98
                    'enabled' if config.OCRD_METS_CACHING else 'disabled', config.raw_value('OCRD_METS_CACHING'))
99
            self._cache_flag = config.OCRD_METS_CACHING
100
101
102
        # If cache is enabled
103
        if self._cache_flag:
104
            self._initialize_caches()
105
            self._refresh_caches()
106
107
    def __str__(self) -> str:
108
        """
109
        String representation
110
        """
111
        return 'OcrdMets[cached=%s,fileGrps=%s,files=%s]' % (
112
        self._cache_flag, self.file_groups, list(self.find_files()))
113
114
    def _fill_caches(self) -> None:
115
        """
116
        Fills the caches with fileGrps and FileIDs
117
        """
118
119
        tree_root = self._tree.getroot()
120
121
        # Fill with files
122
        el_fileSec = tree_root.find("mets:fileSec", NS)
123
        if el_fileSec is None:
124
            return
125
126
        log = getLogger('ocrd.models.ocrd_mets._fill_caches-files')
127
        for el_fileGrp in el_fileSec.findall('mets:fileGrp', NS):
128
            fileGrp_use = el_fileGrp.get('USE')
129
130
            # Assign an empty dictionary that will hold the files of the added fileGrp
131
            self._file_cache[fileGrp_use] = {}
132
133
            for el_file in el_fileGrp:
134
                file_id = el_file.get('ID')
135
                self._file_cache[fileGrp_use].update({file_id: el_file})
136
                # log.info("File added to the cache: %s" % file_id)
137
138
        # Fill with pages
139
        log = getLogger('ocrd.models.ocrd_mets._fill_caches-pages')
140
        el_div_list = tree_root.findall(".//mets:div[@TYPE='page']", NS)
141
        if len(el_div_list) == 0:
142
            return
143
144
        for el_div in el_div_list:
145
            div_id = el_div.get('ID')
146
            log.debug("DIV_ID: %s" % el_div.get('ID'))
147
148
            for attr in METS_PAGE_DIV_ATTRIBUTE:
149
                self._page_cache[attr][str(el_div.get(attr.name))] = el_div
150
151
            # Assign an empty dictionary that will hold the fptr of the added page (div)
152
            self._fptr_cache[div_id] = {}
153
154
            # log.info("Page_id added to the cache: %s" % div_id)
155
156
            for el_fptr in el_div:
157
                self._fptr_cache[div_id].update({el_fptr.get('FILEID'): el_fptr})
158
                # log.info("Fptr added to the cache: %s" % el_fptr.get('FILEID'))
159
160
        # log.info("Len of page_cache: %s" % len(self._page_cache[METS_PAGE_DIV_ATTRIBUTE.ID]))
161
        # log.info("Len of fptr_cache: %s" % len(self._fptr_cache))
162
163
        # Fill with logical divs
164
        log = getLogger('ocrd.models.ocrd_mets._fill_caches-structs')
165
        el_struct_list = tree_root.findall("mets:structMap[@TYPE='LOGICAL']//mets:div", NS)
166
        el_smlink_list = tree_root.findall("mets:structLink/mets:smLink", NS)
167
        if len(el_struct_list) == 0 or len(el_smlink_list) == 0:
168
            return
169
        smlink_map = {}
170
        for link in el_smlink_list:
171
            link_log = link.get('{%s}from' % NS['xlink'])
172
            link_phy = link.get('{%s}to' % NS['xlink'])
173
            smlink_map.setdefault(link_log, list()).append(link_phy)
174
        for el_div in el_struct_list:
175
            for attr in METS_STRUCT_DIV_ATTRIBUTE:
176
                val = self._struct_cache[attr].setdefault(str(el_div.get(attr.name)), list())
177
                val.extend(smlink_map.get(el_div.get('ID'), []))
178
179
        # log.info("Len of struct_cache: %s" % len(self._struct_cache[METS_STRUCT_DIV_ATTRIBUTE.ID]))
180
181
    def _initialize_caches(self) -> None:
182
        self._file_cache = {}
183
        # NOTE we can only guarantee uniqueness for @ID and @ORDER
184
        self._page_cache = {k : {} for k in METS_PAGE_DIV_ATTRIBUTE}
185
        self._fptr_cache = {}
186
        self._struct_cache = {k : {} for k in METS_STRUCT_DIV_ATTRIBUTE}
187
188
    def _refresh_caches(self) -> None:
189
        if self._cache_flag:
190
            self._initialize_caches()
191
192
            # Note, if the empty_mets() function is used to instantiate OcrdMets
193
            # Then the cache is empty even after this operation
194
            self._fill_caches()
195
196
    @property
197
    def unique_identifier(self) -> Optional[str]:
198
        """
199
        Get the unique identifier by looking through ``mods:identifier``
200
        See `specs <https://ocr-d.de/en/spec/mets#unique-id-for-the-document-processed>`_ for details.
201
        """
202
        for t in IDENTIFIER_PRIORITY:
203
            found = self._tree.getroot().find('.//mods:identifier[@type="%s"]' % t, NS)
204
            if found is not None:
205
                return found.text
206
207
    @unique_identifier.setter
208
    def unique_identifier(self, purl : str) -> None:
209
        """
210
        Set the unique identifier by looking through ``mods:identifier``
211
        See `specs <https://ocr-d.de/en/spec/mets#unique-id-for-the-document-processed>`_ for details.
212
        """
213
        id_el = None
214
        for t in IDENTIFIER_PRIORITY:
215
            id_el = self._tree.getroot().find('.//mods:identifier[@type="%s"]' % t, NS)
216
            if id_el is not None:
217
                break
218
        if id_el is None:
219
            mods = self._tree.getroot().find('.//mods:mods', NS)
220
            assert mods is not None
221
            id_el = ET.SubElement(mods, TAG_MODS_IDENTIFIER)
222
            id_el.set('type', 'purl')
223
        id_el.text = purl
224
225
    @property
226
    def agents(self) -> List[OcrdAgent]:
227
        """
228
        List all :py:class:`ocrd_models.ocrd_agent.OcrdAgent` entries.
229
        """
230
        return [OcrdAgent(el_agent) for el_agent in self._tree.getroot().findall('mets:metsHdr/mets:agent', NS)]
231
232
    def add_agent(self, **kwargs) -> OcrdAgent:
233
        """
234
        Add an :py:class:`ocrd_models.ocrd_agent.OcrdAgent` to the list of agents in the ``metsHdr``.
235
        """
236
        el_metsHdr = self._tree.getroot().find('.//mets:metsHdr', NS)
237
        if el_metsHdr is None:
238
            el_metsHdr = ET.Element(TAG_METS_METSHDR)
239
            self._tree.getroot().insert(0, el_metsHdr)
240
        #  assert(el_metsHdr is not None)
241
        el_agent = ET.Element(TAG_METS_AGENT)
242
        try:
243
            el_agent_last = next(el_metsHdr.iterchildren(tag=TAG_METS_AGENT, reversed=True))
244
            el_agent_last.addnext(el_agent)
245
        except StopIteration:
246
            el_metsHdr.insert(0, el_agent)
247
        return OcrdAgent(el_agent, **kwargs)
248
249
    @property
250
    def file_groups(self) -> List[str]:
251
        """
252
        List the ``@USE`` of all ``mets:fileGrp`` entries.
253
        """
254
255
        # WARNING: Actually we cannot return strings in place of elements!
256
        if self._cache_flag:
257
            return list(self._file_cache.keys())
258
259
        return [el.get('USE') for el in self._tree.getroot().findall('.//mets:fileGrp', NS)]
260
261
    def find_all_files(self, *args, **kwargs) -> List[OcrdFile]:
262
        """
263
        Like :py:meth:`find_files` but return a list of all results.
264
        Equivalent to ``list(self.find_files(...))``
265
        """
266
        return list(self.find_files(*args, **kwargs))
267
268
    # pylint: disable=multiple-statements
269
    def find_files(
270
        self,
271
        ID : Optional[str] = None,
272
        fileGrp : Optional[str] = None,
273
        pageId : Optional[str] = None,
274
        mimetype : Optional[str] = None,
275
        url : Optional[str] = None,
276
        local_filename : Optional[str] = None,
277
        local_only : bool = False,
278
        include_fileGrp : Optional[List[str]] = None,
279
        exclude_fileGrp : Optional[List[str]] = None,
280
    ) -> Iterator[OcrdFile]:
281
        """
282
        Search ``mets:file`` entries in this METS document and yield results.
283
        The :py:attr:`ID`, :py:attr:`pageId`, :py:attr:`fileGrp`,
284
        :py:attr:`url` and :py:attr:`mimetype` parameters can each be either a
285
        literal string, or a regular expression if the string starts with
286
        ``//`` (double slash).
287
288
        If it is a regex, the leading ``//`` is removed and candidates are matched
289
        against the regex with `re.fullmatch`. If it is a literal string, comparison
290
        is done with string equality.
291
292
        The :py:attr:`pageId` parameter also supports comma-separated lists, as well
293
        as the numeric range operator ``..`` and the negation operator ``~``.
294
295
        For example, to find all files in pages ``PHYS_0001`` to ``PHYS_0003``, the
296
        both expressions ``PHYS_0001..PHYS_0003`` and ``PHYS_0001,PHYS_0002,PHYS_0003``
297
        will be expanded to the same 3 pages. To find all files above that subrange,
298
        both expressions ``~PHYS_0001..PHYS_0003`` and ``~PHYS_0001,~PHYS_0002,~PHYS_0003``
299
        will be expanded to ``PHYS_0004`` and upwards.
300
301
        Keyword Args:
302
            ID (string) : ``@ID`` of the ``mets:file``
303
            fileGrp (string) : ``@USE`` of the ``mets:fileGrp`` to list files of
304
            pageId (string) : ``@ID`` of the corresponding physical ``mets:structMap`` entry (physical page)
305
            url (string) : ``@xlink:href`` remote/original URL of ``mets:Flocat`` of ``mets:file``
306
            local_filename (string) : ``@xlink:href`` local/cached filename of ``mets:Flocat`` of ``mets:file``
307
            mimetype (string) : ``@MIMETYPE`` of ``mets:file``
308
            local (boolean) : Whether to restrict results to local files in the filesystem
309
            include_fileGrp (list[str]) : List of allowed file groups
310
            exclude_fileGrp (list[str]) : List of disallowd file groups
311
        Yields:
312
            :py:class:`ocrd_models:ocrd_file:OcrdFile` instantiations
313
        """
314
        pageId_list = []
315
        if pageId:
316
            # returns divs instead of strings of ids
317
            physical_pages = self.get_physical_pages(for_pageIds=pageId, return_divs=True)
318
            for div in physical_pages:
319
                if self._cache_flag:
320
                    pageId_list += self._fptr_cache[div.get('ID')]
321
                else:
322
                    pageId_list += [fptr.get('FILEID') for fptr in div.findall('mets:fptr', NS)]
323
324
        if ID and ID.startswith(REGEX_PREFIX):
325
            ID = re.compile(ID[REGEX_PREFIX_LEN:])
326
        if fileGrp and fileGrp.startswith(REGEX_PREFIX):
327
            fileGrp = re.compile(fileGrp[REGEX_PREFIX_LEN:])
328
        if mimetype and mimetype.startswith(REGEX_PREFIX):
329
            mimetype = re.compile(mimetype[REGEX_PREFIX_LEN:])
330
        if url and url.startswith(REGEX_PREFIX):
331
            url = re.compile(url[REGEX_PREFIX_LEN:])
332
333
        candidates = []
334
        if self._cache_flag:
335
            if fileGrp:
336
                if isinstance(fileGrp, str):
337
                    candidates += self._file_cache.get(fileGrp, {}).values()
338
                else:
339
                    candidates = [x for fileGrp_needle, el_file_list in self._file_cache.items() if
340
                                  fileGrp.match(fileGrp_needle) for x in el_file_list.values()]
341
            else:
342
                candidates = [el_file for id_to_file in self._file_cache.values() for el_file in id_to_file.values()]
343
        else:
344
            candidates = self._tree.getroot().xpath('//mets:file', namespaces=NS)
345
346
        for cand in candidates:
347
            if ID:
348
                if isinstance(ID, str):
349
                    if not ID == cand.get('ID'): continue
350
                else:
351
                    if not ID.fullmatch(cand.get('ID')): continue
352
353
            if pageId is not None and cand.get('ID') not in pageId_list:
354
                continue
355
356
            if not self._cache_flag and fileGrp:
357
                if isinstance(fileGrp, str):
358
                    if cand.getparent().get('USE') != fileGrp: continue
359
                else:
360
                    if not fileGrp.fullmatch(cand.getparent().get('USE')): continue
361
362
            if mimetype:
363
                if isinstance(mimetype, str):
364
                    if cand.get('MIMETYPE') != mimetype: continue
365
                else:
366
                    if not mimetype.fullmatch(cand.get('MIMETYPE') or ''): continue
367
368
            if url:
369
                cand_locat = cand.find('mets:FLocat[@LOCTYPE="URL"]', namespaces=NS)
370
                if cand_locat is None:
371
                    continue
372
                cand_url = cand_locat.get('{%s}href' % NS['xlink'])
373
                if isinstance(url, str):
374
                    if cand_url != url: continue
375
                else:
376
                    if not url.fullmatch(cand_url): continue
377
378
            if local_filename:
379
                cand_locat = cand.find('mets:FLocat[@LOCTYPE="OTHER"][@OTHERLOCTYPE="FILE"]', namespaces=NS)
380
                if cand_locat is None:
381
                    continue
382
                cand_local_filename = cand_locat.get('{%s}href' % NS['xlink'])
383
                if isinstance(local_filename, str):
384
                    if cand_local_filename != local_filename: continue
385
                else:
386
                    if not local_filename.fullmatch(cand_local_filename): continue
387
388
            if local_only:
389
                # deprecation_warning("'local_only' is deprecated, use 'local_filename=\"//.+\"' instead")
390
                is_local = cand.find('mets:FLocat[@LOCTYPE="OTHER"][@OTHERLOCTYPE="FILE"][@xlink:href]', namespaces=NS)
391
                if is_local is None:
392
                    continue
393
394
            ret = OcrdFile(cand, mets=self)
395
396
            # XXX include_fileGrp is redundant to fileGrp but for completeness
397
            if exclude_fileGrp and ret.fileGrp in exclude_fileGrp:
398
                continue
399
            if include_fileGrp and ret.fileGrp not in include_fileGrp:
400
                continue
401
402
            yield ret
403
404
    def add_file_group(self, fileGrp: str) -> ET._Element:
405
        """
406
        Add a new ``mets:fileGrp``.
407
        Arguments:
408
            fileGrp (string): ``@USE`` of the new ``mets:fileGrp``.
409
        """
410
        if ',' in fileGrp:
411
            raise ValueError('fileGrp must not contain commas')
412
        el_fileSec = self._tree.getroot().find('mets:fileSec', NS)
413
        if el_fileSec is None:
414
            el_fileSec = ET.SubElement(self._tree.getroot(), TAG_METS_FILESEC)
415
        el_fileGrp = el_fileSec.find('mets:fileGrp[@USE="%s"]' % fileGrp, NS)
416
        if el_fileGrp is None:
417
            el_fileGrp = ET.SubElement(el_fileSec, TAG_METS_FILEGRP)
418
            el_fileGrp.set('USE', fileGrp)
419
420
            if self._cache_flag:
421
                # Assign an empty dictionary that will hold the files of the added fileGrp
422
                self._file_cache[fileGrp] = {}
423
424
        return el_fileGrp
425
426
    def rename_file_group(self, old: str, new: str) -> None:
427
        """
428
        Rename a ``mets:fileGrp`` by changing the ``@USE`` from :py:attr:`old` to :py:attr:`new`.
429
        """
430
        el_fileGrp = self._tree.getroot().find('mets:fileSec/mets:fileGrp[@USE="%s"]' % old, NS)
431
        if el_fileGrp is None:
432
            raise FileNotFoundError("No such fileGrp '%s'" % old)
433
        el_fileGrp.set('USE', new)
434
435
        if self._cache_flag:
436
            self._file_cache[new] = self._file_cache.pop(old)
437
438
    def remove_file_group(self, USE: str, recursive : bool = False, force : bool = False) -> None:
439
        """
440
        Remove a ``mets:fileGrp`` (single fixed ``@USE`` or multiple regex ``@USE``)
441
        Arguments:
442
            USE (string): ``@USE`` of the ``mets:fileGrp`` to delete. Can be a regex if prefixed with ``//``
443
            recursive (boolean): Whether to recursively delete each ``mets:file`` in the group
444
            force (boolean): Do not raise an exception if ``mets:fileGrp`` does not exist
445
        """
446
        log = getLogger('ocrd.models.ocrd_mets.remove_file_group')
447
        el_fileSec = self._tree.getroot().find('mets:fileSec', NS)
448
        if el_fileSec is None:
449
            raise Exception("No fileSec!")
450
        if isinstance(USE, str):
451
            if USE.startswith(REGEX_PREFIX):
452
                use = re.compile(USE[REGEX_PREFIX_LEN:])
453
                for cand in el_fileSec.findall('mets:fileGrp', NS):
454
                    if use.fullmatch(cand.get('USE')):
455
                        self.remove_file_group(cand, recursive=recursive)
456
                return
457
            else:
458
                el_fileGrp = el_fileSec.find('mets:fileGrp[@USE="%s"]' % USE, NS)
459
        else:
460
            el_fileGrp = USE
461
        if el_fileGrp is None:  # pylint: disable=len-as-condition
462
            msg = "No such fileGrp: %s" % USE
463
            if force:
464
                log.warning(msg)
465
                return
466
            raise Exception(msg)
467
468
        # The cache should also be used here
469
        if self._cache_flag:
470
            files = self._file_cache.get(el_fileGrp.get('USE'), {}).values()
471
        else:
472
            files = el_fileGrp.findall('mets:file', NS)
473
474
        if files:
475
            if not recursive:
476
                raise Exception("fileGrp %s is not empty and recursive wasn't set" % USE)
477
            for f in list(files):
478
                self.remove_one_file(ID=f.get('ID'), fileGrp=f.getparent().get('USE'))
479
480
        if self._cache_flag:
481
            # Note: Since the files inside the group are removed
482
            # with the 'remove_one_file' method above, 
483
            # we should not take care of that again.
484
            # We just remove the fileGrp.
485
            del self._file_cache[el_fileGrp.get('USE')]
486
487
        el_fileGrp.getparent().remove(el_fileGrp)
488
489
    def add_file(self, fileGrp : str, mimetype : Optional[str] = None, url : Optional[str] = None, 
490
                 ID : Optional[str] = None, pageId : Optional[str] = None, force : bool = False, 
491
                 local_filename : Optional[str] = None, ignore : bool = False, **kwargs) -> OcrdFile:
492
        """
493
        Instantiate and add a new :py:class:`ocrd_models.ocrd_file.OcrdFile`.
494
        Arguments:
495
            fileGrp (string): ``@USE`` of ``mets:fileGrp`` to add to
496
        Keyword Args:
497
            mimetype (string): ``@MIMETYPE`` of the ``mets:file`` to use
498
            url (string): ``@xlink:href`` (URL or path) of the ``mets:file`` to use
499
            ID (string): ``@ID`` of the ``mets:file`` to use
500
            pageId (string): ``@ID`` in the physical ``mets:structMap`` to link to
501
            force (boolean): Whether to add the file even if a ``mets:file`` with the same ``@ID`` already exists.
502
            ignore (boolean): Do not look for existing files at all. Shift responsibility for preventing errors from duplicate ID to the user.
503
            local_filename (string):
504
        """
505
        if not ID:
506
            raise ValueError("Must set ID of the mets:file")
507
        if not fileGrp:
508
            raise ValueError("Must set fileGrp of the mets:file")
509
        if not REGEX_FILE_ID.fullmatch(ID):
510
            raise ValueError("Invalid syntax for mets:file/@ID %s (not an xs:ID)" % ID)
511
        if not REGEX_FILE_ID.fullmatch(fileGrp):
512
            raise ValueError("Invalid syntax for mets:fileGrp/@USE %s (not an xs:ID)" % fileGrp)
513
514
        el_fileGrp = self.add_file_group(fileGrp)
515
        if not ignore:
516
            mets_file = next(self.find_files(ID=ID, fileGrp=fileGrp), None)
517
            if mets_file:
518
                if mets_file.fileGrp == fileGrp and \
519
                        mets_file.pageId == pageId and \
520
                        mets_file.mimetype == mimetype:
521
                    if not force:
522
                        raise FileExistsError(
523
                            f"A file with ID=={ID} already exists {mets_file} and neither force nor ignore are set")
524
                    self.remove_file(ID=ID, fileGrp=fileGrp)
525
                else:
526
                    raise FileExistsError(
527
                        f"A file with ID=={ID} already exists {mets_file} but unrelated - cannot mitigate")
528
529
        # To get rid of Python's FutureWarning - checking if v is not None
530
        kwargs = {k: v for k, v in locals().items()
531
                  if k in ['url', 'ID', 'mimetype', 'pageId', 'local_filename'] and v is not None}
532
        # This separation is needed to reuse the same el_mets_file element in the caching if block
533
        el_mets_file = ET.SubElement(el_fileGrp, TAG_METS_FILE)
534
        # The caching of the physical page is done in the OcrdFile constructor
535
        # (which calls us back with set_physical_page_for_file)
536
        mets_file = OcrdFile(el_mets_file, mets=self, **kwargs)
537
538
        if self._cache_flag:
539
            # Add the file to the file cache
540
            self._file_cache[fileGrp].update({ID: el_mets_file})
541
542
        return mets_file
543
544
    def remove_file(self, *args, **kwargs) -> Union[List[OcrdFile],OcrdFile]:
545
        """
546
        Delete each ``ocrd:file`` matching the query. Same arguments as :py:meth:`find_files`
547
        """
548
        files = list(self.find_files(*args, **kwargs))
549
        if files:
550
            for f in files:
551
                self.remove_one_file(f)
552
            if len(files) > 1:
553
                return files
554
            else:
555
                return files[0]  # for backwards-compatibility
556
        if any(1 for kwarg in kwargs
557
               if isinstance(kwarg, str) and kwarg.startswith(REGEX_PREFIX)):
558
            # allow empty results if filter criteria involve a regex
559
            return []
560
        raise FileNotFoundError("File not found: %s %s" % (args, kwargs))
561
562
    def remove_one_file(self, ID : Union[str, OcrdFile], fileGrp : str = None) -> OcrdFile:
563
        """
564
        Delete an existing :py:class:`ocrd_models.ocrd_file.OcrdFile`.
565
        Arguments:
566
            ID (string|OcrdFile): ``@ID`` of the ``mets:file`` to delete  Can also be an :py:class:`ocrd_models.ocrd_file.OcrdFile` to avoid search via ``ID``.
567
            fileGrp (string): ``@USE`` of the ``mets:fileGrp`` containing the ``mets:file``. Used only for optimization.
568
        Returns:
569
            The old :py:class:`ocrd_models.ocrd_file.OcrdFile` reference.
570
        """
571
        log = getLogger('ocrd.models.ocrd_mets.remove_one_file')
572
        log.debug("remove_one_file(%s %s)" % (ID, fileGrp))
573
        if isinstance(ID, OcrdFile):
574
            ocrd_file = ID
575
            ID = ocrd_file.ID
576
        else:
577
            ocrd_file = next(self.find_files(ID=ID, fileGrp=fileGrp), None)
578
579
        if not ocrd_file:
580
            raise FileNotFoundError("File not found: %s (fileGr=%s)" % (ID, fileGrp))
581
582
        # Delete the physical page ref
583
        fptrs = []
584
        if self._cache_flag:
585
            for pageId, fptrdict in self._fptr_cache.items():
586
                if ID in fptrdict:
587
                    fptrs.append(fptrdict[ID])
588
        else:
589
            fptrs = self._tree.getroot().findall('.//mets:fptr[@FILEID="%s"]' % ID, namespaces=NS)
590
591
        # Delete the physical page ref
592
        for fptr in fptrs:
593
            log.debug("Delete fptr element %s for page '%s'", fptr, ID)
594
            page_div = fptr.getparent()
595
            page_div.remove(fptr)
596
            # Remove the fptr from the cache as well
597
            if self._cache_flag:
598
                del self._fptr_cache[page_div.get('ID')][ID]
599
            # delete empty pages
600
            if not list(page_div):
601
                log.debug("Delete empty page %s", page_div)
602
                page_div.getparent().remove(page_div)
603
                # Delete the empty pages from caches as well
604
                if self._cache_flag:
605
                    for attr in METS_PAGE_DIV_ATTRIBUTE:
606
                        if attr.name in page_div.attrib:
607
                            del self._page_cache[attr][page_div.attrib[attr.name]]
608
609
        # Delete the file reference from the cache
610
        if self._cache_flag:
611
            parent_use = ocrd_file._el.getparent().get('USE')
612
            del self._file_cache[parent_use][ocrd_file.ID]
613
614
        # Delete the file reference
615
        # pylint: disable=protected-access
616
        ocrd_file._el.getparent().remove(ocrd_file._el)
617
618
        return ocrd_file
619
620
    @property
621
    def physical_pages(self) -> List[str]:
622
        """
623
        List all page IDs (the ``@ID`` of each physical ``mets:structMap`` ``mets:div``)
624
        """
625
        if self._cache_flag:
626
            return list(self._page_cache[METS_PAGE_DIV_ATTRIBUTE.ID].keys())
627
628
        return [str(x) for x in self._tree.getroot().xpath(
629
            'mets:structMap[@TYPE="PHYSICAL"]/mets:div[@TYPE="physSequence"]/mets:div[@TYPE="page"]/@ID',
630
            namespaces=NS)]
631
632
    def get_physical_pages(self, for_fileIds : Optional[List[str]] = None, for_pageIds : Optional[str] = None, 
633
                           return_divs : bool = False) -> List[Union[str, ET._Element]]:
634
        """
635
        List all page IDs (the ``@ID`` of each physical ``mets:structMap`` ``mets:div``),
636
        optionally for a subset of ``mets:file`` ``@ID`` :py:attr:`for_fileIds`,
637
        or for a subset selector expression (comma-separated, range, and/or regex) :py:attr:`for_pageIds`.
638
        If return_divs is set, returns div memory objects instead of strings of ids
639
        """
640
        if for_fileIds is None and for_pageIds is None:
641
            if return_divs:
642
                if self._cache_flag:
643
                    return list(self._page_cache[METS_PAGE_DIV_ATTRIBUTE.ID].values())
644
645
                return [x for x in self._tree.getroot().xpath(
646
                    'mets:structMap[@TYPE="PHYSICAL"]/mets:div[@TYPE="physSequence"]/mets:div[@TYPE="page"]',
647
                    namespaces=NS)]
648
649
            return self.physical_pages
650
651
        log = getLogger('ocrd.models.ocrd_mets.get_physical_pages')
652
        if for_pageIds is not None:
653
            page_attr_patterns = []
654
            page_attr_antipatterns = []
655
            for pageId_token in re.split(r',', for_pageIds):
656
                pageId_token_raw = pageId_token
657
                # prefix for disambiguation of attribute?
658
                attr = list(METS_PAGE_DIV_ATTRIBUTE) + list(METS_STRUCT_DIV_ATTRIBUTE)
659
                for attr_type in [METS_STRUCT_DIV_ATTRIBUTE, METS_PAGE_DIV_ATTRIBUTE]:
660
                    if pageId_token.startswith(attr_type.type_prefix()):
661
                        for attr_val in list(attr_type):
662
                            if pageId_token.startswith(attr_val.prefix()):
663
                                # disambiguated to e.g. "logical:label:"
664
                                attr = [attr_val]
665
                                pageId_token = pageId_token[len(attr_val.prefix()):]
666
                                break
667
                        if len(attr) > 1:
668
                            # just "logical:" or "physical:"
669
                            attr = list(attr_type)
670
                            pageId_token = pageId_token[len(attr_type.type_prefix()):]
671
                        break
672
                if not pageId_token:
673
                    raise ValueError("invalid pageId syntax '%s': empty after type prefix" % pageId_token_raw)
674
                # negation prefix
675
                if pageId_token.startswith('~'):
676
                    page_attr_xpatterns = page_attr_antipatterns
677
                    pageId_token = pageId_token[1:]
678
                else:
679
                    page_attr_xpatterns = page_attr_patterns
680
                if not pageId_token:
681
                    raise ValueError("invalid pageId syntax '%s': empty after negator prefix" % pageId_token_raw)
682
                # operator prefix
683
                if pageId_token.startswith(REGEX_PREFIX):
684
                    pageId_token = pageId_token[REGEX_PREFIX_LEN:]
685
                    if not pageId_token:
686
                        raise ValueError("invalid pageId syntax '%s': empty after regex prefix" % pageId_token_raw)
687
                    val_expr = re.compile(pageId_token)
688
                    page_attr_xpatterns.append(
689
                        METS_DIV_ATTRIBUTE_REGEX_PATTERN(val_expr, attr))
690
                elif '..' in pageId_token:
691
                    try:
692
                        val_range = generate_range(*pageId_token.split('..', 1))
693
                    except ValueError as e:
694
                        raise ValueError("invalid pageId syntax '%s': %s" % (pageId_token_raw, str(e))) from None
695
                    page_attr_xpatterns.append(
696
                        METS_DIV_ATTRIBUTE_RANGE_PATTERN(val_range, attr))
697
                else:
698
                    if not pageId_token:
699
                        raise ValueError("invalid pageId syntax '%s': empty" % pageId_token_raw)
700
                    page_attr_xpatterns.append(
701
                        METS_DIV_ATTRIBUTE_ATOM_PATTERN(pageId_token, attr))
702
                log.debug("parsed pattern '%s' to %s", pageId_token_raw, page_attr_xpatterns[-1])
703
            if not page_attr_patterns and not page_attr_antipatterns:
704
                return []
705
            if page_attr_patterns:
706
                divs = self.get_physical_page_patterns(page_attr_patterns)
707
            else:
708
                all_pages = [METS_DIV_ATTRIBUTE_REGEX_PATTERN(
709
                    re.compile(".*"), [METS_PAGE_DIV_ATTRIBUTE.ID])]
710
                divs = self.get_physical_page_patterns(all_pages)
711
            if page_attr_antipatterns:
712
                antidivs = self.get_physical_page_patterns(page_attr_antipatterns)
713
                divs = [div for div in divs if div not in antidivs]
714
            if return_divs:
715
                return divs
716
            else:
717
                return [div.get('ID') for div in divs]
718
719
        if for_fileIds == []:
720
            return []
721
        assert for_fileIds # at this point we know for_fileIds is set, assert to convince pyright
722
        ret = [None] * len(for_fileIds)
723
        if self._cache_flag:
724
            for pageId, fptrdict in self._fptr_cache.items():
725
                for fptr in fptrdict:
726
                    if fptr in for_fileIds:
727
                        index = for_fileIds.index(fptr)
728
                        if return_divs:
729
                            ret[index] = self._page_cache[METS_PAGE_DIV_ATTRIBUTE.ID][pageId]
730
                        else:
731
                            ret[index] = pageId
732
        else:
733
            for page in self._tree.getroot().xpath(
734
                    'mets:structMap[@TYPE="PHYSICAL"]/mets:div[@TYPE="physSequence"]/mets:div[@TYPE="page"]',
735
                    namespaces=NS):
736
                for fptr in page.findall('mets:fptr', NS):
737
                    if fptr.get('FILEID') in for_fileIds:
738
                        index = for_fileIds.index(fptr.get('FILEID'))
739
                        if return_divs:
740
                            ret[index] = page
741
                        else:
742
                            ret[index] = page.get('ID')
743
        return ret
744
745
    def get_physical_page_patterns(self, page_attr_patterns: List[METS_DIV_ATTRIBUTE_PATTERN]) -> List[ET._Element]:
746
        log = getLogger('ocrd.models.ocrd_mets.get_physical_pages')
747
        ret = []
748
        page_attr_patterns_copy = list(page_attr_patterns)
749
        if self._cache_flag:
750
            for pat in page_attr_patterns:
751
                for attr in pat.attr:
752
                    if isinstance(attr, METS_PAGE_DIV_ATTRIBUTE):
753
                        cache = self._page_cache[attr]
754
                    else:
755
                        cache = self._struct_cache[attr]
756
                    if (isinstance(pat, METS_DIV_ATTRIBUTE_RANGE_PATTERN) and
757
                        # @TYPE makes no sense in range expressions
758
                        # @LABEL makes no sense in range expressions
759
                        attr in [METS_STRUCT_DIV_ATTRIBUTE.TYPE,
760
                                 METS_STRUCT_DIV_ATTRIBUTE.LABEL]):
761
                        continue
762
                    if cache_keys := [v for v in cache if pat.matches(v)]:
763
                        if isinstance(attr, METS_PAGE_DIV_ATTRIBUTE):
764
                            ret += [cache[v] for v in cache_keys]
765
                            log.debug('physical matches for %s: %s', pat, str(cache_keys))
766
                        else:
767
                            for v in cache_keys:
768
                                ret += [self._page_cache[METS_PAGE_DIV_ATTRIBUTE.ID][p]
769
                                        for p in cache[v]]
770
                            log.debug('logical matches for %s: %s', pat, str(cache_keys))
771
                        if isinstance(pat, METS_DIV_ATTRIBUTE_RANGE_PATTERN):
772
                            # remove matches for final range check
773
                            for v in cache_keys:
774
                                pat.expr.remove(v)
775
                        break
776
                if not cache_keys:
777
                    raise ValueError(f"{pat} matches none of the keys of any of the _page_caches and _struct_caches.")
778
        else:
779
            # cache logical structmap:
780
            el_struct_list = self._tree.getroot().findall("mets:structMap[@TYPE='LOGICAL']//mets:div", NS)
781
            el_smlink_list = self._tree.getroot().findall("mets:structLink/mets:smLink", NS)
782
            smlink_map = {}
783
            for link in el_smlink_list:
784
                link_log = link.get('{%s}from' % NS['xlink'])
785
                link_phy = link.get('{%s}to' % NS['xlink'])
786
                smlink_map.setdefault(link_log, list()).append(link_phy)
787
            struct_cache = {k: {} for k in METS_STRUCT_DIV_ATTRIBUTE}
788
            for el_div in el_struct_list:
789
                for attr in METS_STRUCT_DIV_ATTRIBUTE:
790
                    if not el_div.get(attr.name):
791
                        # avoid mapping None indiscriminately
792
                        continue
793
                    val = struct_cache[attr].setdefault(str(el_div.get(attr.name)), list())
794
                    val.extend(smlink_map.get(el_div.get('ID'), []))
795
            log.debug("found %d smLink entries for %d logical divs", len(el_smlink_list), len(el_struct_list))
796
            page_attr_patterns_matched = []
797
            for page in self._tree.getroot().xpath(
798
                    'mets:structMap[@TYPE="PHYSICAL"]/mets:div[@TYPE="physSequence"]/mets:div[@TYPE="page"]',
799
                    namespaces=NS):
800
                patterns_exhausted = []
801
                for pat in page_attr_patterns:
802
                    for attr in pat.attr:
803
                        if isinstance(attr, METS_PAGE_DIV_ATTRIBUTE):
804
                            cache = [page.get(attr.name) or '']
805
                        else:
806
                            cache = struct_cache[attr]
807
                        if (isinstance(pat, METS_DIV_ATTRIBUTE_RANGE_PATTERN) and
808
                            # @TYPE makes no sense in range expressions
809
                            # @LABEL makes no sense in range expressions
810
                            attr in [METS_STRUCT_DIV_ATTRIBUTE.TYPE,
811
                                     METS_STRUCT_DIV_ATTRIBUTE.LABEL]):
812
                            continue
813
                        if cache_keys := [v for v in cache if pat.matches(v)]:
814
                            pat.attr = [attr] # disambiguate next
815
                            if isinstance(attr, METS_PAGE_DIV_ATTRIBUTE):
816
                                ret.append(page)
817
                                log.debug('physical match for %s on page %s', pat, page.get('ID'))
818
                                if isinstance(pat, METS_DIV_ATTRIBUTE_ATOM_PATTERN):
819
                                    patterns_exhausted.append(pat)
820
                                elif isinstance(pat, METS_DIV_ATTRIBUTE_RANGE_PATTERN):
821
                                    # remove for efficiency and final range check
822
                                    pat.expr.remove(cache_keys[0])
823
                                    if not pat.expr:
824
                                        patterns_exhausted.append(pat)
825
                            elif cache_key := next((v for v in cache_keys
826
                                                    if page.get('ID') in cache[v]), None):
827
                                ret.append(page)
828
                                log.debug('logical match for %s on page %s', pat, page.get('ID'))
829
                                cache[cache_key].remove(page.get('ID'))
830
                                # remove for efficiency and final range check
831
                                if not cache[cache_key]:
832
                                    if isinstance(pat, METS_DIV_ATTRIBUTE_ATOM_PATTERN):
833
                                        patterns_exhausted.append(pat)
834
                                    elif isinstance(pat, METS_DIV_ATTRIBUTE_RANGE_PATTERN):
835
                                        pat.expr.remove(cache_key)
836
                                        if not pat.expr:
837
                                            patterns_exhausted.append(pat)
838
                            break # no more attributes for this pattern
839
                    # keep matching in order to exhaust and consume pattern list
840
                    #if page in ret:
841
                    #    break # no more patterns for this page
842
                for p in patterns_exhausted:
843
                    page_attr_patterns.remove(p)
844
            unmatched = [pat for pat in page_attr_patterns_copy
845
                         if not pat.has_matched]
846
            if unmatched:
847
                raise ValueError(f"Patterns {unmatched} match none of the pages")
848
849
        ranges_without_start_match = []
850
        ranges_without_stop_match = []
851
        for pat in page_attr_patterns_copy:
852
            if isinstance(pat, METS_DIV_ATTRIBUTE_RANGE_PATTERN):
853
                # range expression, expanded to pattern list
854
                # list items get consumed (pat.expr.remove) when matched,
855
                # exhausted patterns also get consumed (page_attr_patterns.remove)
856
                # (but top-level list copy references the same list objects)
857
                if pat.start in pat.expr:
858
                    log.debug((pat, pat.expr))
859
                    ranges_without_start_match.append(pat)
860
                # if pat.stop in pat.expr:
861
                #     ranges_without_stop_match.append(pat)
862
        if ranges_without_start_match:
863
            raise ValueError(f"Start of range patterns {ranges_without_start_match} not matched - invalid range")
864
        # if ranges_without_stop_match:
865
        #     raise ValueError(f"End of range patterns {ranges_without_stop_match} not matched - invalid range")
866
        return ret
867
868
    def set_physical_page_for_file(self, pageId : str, ocrd_file : OcrdFile, 
869
                                   order : Optional[str] = None, orderlabel : Optional[str] = None) -> None:
870
        """
871
        Set the physical page ID (``@ID`` of the physical ``mets:structMap`` ``mets:div`` entry)
872
        corresponding to the ``mets:file`` :py:attr:`ocrd_file`, creating all structures if necessary.
873
        Arguments:
874
            pageId (string): ``@ID`` of the physical ``mets:structMap`` entry to use
875
            ocrd_file (object): existing :py:class:`ocrd_models.ocrd_file.OcrdFile` object
876
        Keyword Args:
877
            order (string): ``@ORDER`` to use
878
            orderlabel (string): ``@ORDERLABEL`` to use
879
        """
880
881
        # delete any existing page mapping for this file.ID
882
        fptrs = []
883
        if self._cache_flag:
884
            for page, fptrdict in self._fptr_cache.items():
885
                if ocrd_file.ID in fptrdict:
886
                    if fptrdict[ocrd_file.ID] is not None:
887
                        fptrs.append(fptrdict[ocrd_file.ID])
888
        else:
889
            fptrs = self._tree.getroot().findall(
890
                'mets:structMap[@TYPE="PHYSICAL"]/mets:div[@TYPE="physSequence"]/mets:div[@TYPE="page"]/mets:fptr[@FILEID="%s"]' %
891
                ocrd_file.ID, namespaces=NS)
892
893
        for el_fptr in fptrs:
894
            if self._cache_flag:
895
                del self._fptr_cache[el_fptr.getparent().get('ID')][ocrd_file.ID]
896
            el_fptr.getparent().remove(el_fptr)
897
898
        # find/construct as necessary
899
        el_structmap = self._tree.getroot().find('mets:structMap[@TYPE="PHYSICAL"]', NS)
900
        if el_structmap is None:
901
            el_structmap = ET.SubElement(self._tree.getroot(), TAG_METS_STRUCTMAP)
902
            el_structmap.set('TYPE', 'PHYSICAL')
903
        el_seqdiv = el_structmap.find('mets:div[@TYPE="physSequence"]', NS)
904
        if el_seqdiv is None:
905
            el_seqdiv = ET.SubElement(el_structmap, TAG_METS_DIV)
906
            el_seqdiv.set('TYPE', 'physSequence')
907
908
        el_pagediv = None
909
        if self._cache_flag:
910
            if pageId in self._page_cache[METS_PAGE_DIV_ATTRIBUTE.ID]:
911
                el_pagediv = self._page_cache[METS_PAGE_DIV_ATTRIBUTE.ID][pageId]
912
        else:
913
            el_pagediv = el_seqdiv.find('mets:div[@ID="%s"]' % pageId, NS)
914
915
        if el_pagediv is None:
916
            el_pagediv = ET.SubElement(el_seqdiv, TAG_METS_DIV)
917
            el_pagediv.set('TYPE', 'page')
918
            el_pagediv.set('ID', pageId)
919
            if order:
920
                el_pagediv.set('ORDER', order)
921
            if orderlabel:
922
                el_pagediv.set('ORDERLABEL', orderlabel)
923
            if self._cache_flag:
924
                # Create a new entry in the page cache
925
                self._page_cache[METS_PAGE_DIV_ATTRIBUTE.ID][pageId] = el_pagediv
926
                # Create a new entry in the fptr cache and 
927
                # assign an empty dictionary to hold the fileids
928
                self._fptr_cache.setdefault(pageId, {})
929
930
        el_fptr = ET.SubElement(el_pagediv, TAG_METS_FPTR)
931
        el_fptr.set('FILEID', ocrd_file.ID)
932
933
        if self._cache_flag:
934
            # Assign the ocrd fileID to the pageId in the cache
935
            self._fptr_cache[pageId].update({ocrd_file.ID: el_fptr})
936
937
    def update_physical_page_attributes(self, page_id : str, **kwargs) -> None:
938
        invalid_keys = list(k for k in kwargs if k not in METS_PAGE_DIV_ATTRIBUTE.names())
939
        if invalid_keys:
940
            raise ValueError(f"Invalid attribute {invalid_keys}. Allowed values: {METS_PAGE_DIV_ATTRIBUTE.names()}")
941
942
        page_div = self.get_physical_pages(for_pageIds=page_id, return_divs=True)
943
        if not page_div:
944
            raise ValueError(f"Could not find mets:div[@ID=={page_id}]")
945
        page_div = page_div[0]
946
947
        for k, v in kwargs.items():
948
            if not v:
949
                page_div.attrib.pop(k)
950
            else:
951
                page_div.attrib[k] = v
952
953
    def get_physical_page_for_file(self, ocrd_file : OcrdFile) -> Optional[str]:
954
        """
955
        Get the physical page ID (``@ID`` of the physical ``mets:structMap`` ``mets:div`` entry)
956
        corresponding to the ``mets:file`` :py:attr:`ocrd_file`.
957
        """
958
        if self._cache_flag:
959
            for pageId, fptrdict in self._fptr_cache.items():
960
                if ocrd_file.ID in fptrdict:
961
                    return pageId
962
        else:
963
            ret = self._tree.getroot().find(
964
                'mets:structMap[@TYPE="PHYSICAL"]/mets:div[@TYPE="physSequence"]/mets:div[@TYPE="page"]/mets:fptr[@FILEID="%s"]' %
965
                ocrd_file.ID, namespaces=NS)
966
            if ret is not None:
967
                return ret.getparent().get('ID')
968
969
    def remove_physical_page(self, ID : str) -> None:
970
        """
971
        Delete page (physical ``mets:structMap`` ``mets:div`` entry ``@ID``) :py:attr:`ID`.
972
        """
973
        mets_div = None
974
        if self._cache_flag:
975
            if ID in self._page_cache[METS_PAGE_DIV_ATTRIBUTE.ID]:
976
                mets_div = [self._page_cache[METS_PAGE_DIV_ATTRIBUTE.ID][ID]]
977
        else:
978
            mets_div = self._tree.getroot().xpath(
979
                'mets:structMap[@TYPE="PHYSICAL"]/mets:div[@TYPE="physSequence"]/mets:div[@TYPE="page"][@ID="%s"]' % ID,
980
                namespaces=NS)
981
        if mets_div:
982
            mets_div_attrib = {** mets_div[0].attrib}
983
            mets_div[0].getparent().remove(mets_div[0])
984
            if self._cache_flag:
985
                for attr in METS_PAGE_DIV_ATTRIBUTE:
986
                    if attr.name in mets_div_attrib:
987
                        del self._page_cache[attr][mets_div_attrib[attr.name]]
988
                del self._fptr_cache[ID]
989
990
    def remove_physical_page_fptr(self, fileId : str) -> List[str]:
991
        """
992
        Delete all ``mets:fptr[@FILEID = fileId]`` to ``mets:file[@ID == fileId]`` for :py:attr:`fileId` from all ``mets:div`` entries in the physical ``mets:structMap``.
993
        Returns:
994
            List of pageIds that mets:fptrs were deleted from
995
        """
996
997
        # Question: What is the reason to keep a list of mets_fptrs?
998
        # Do we have a situation in which the fileId is same for different pageIds ?
999
        # From the examples I have seen inside 'assets' that is not the case
1000
        # and the mets_fptrs list will always contain a single element.
1001
        # If that's the case then we do not need to iterate 2 loops, just one.
1002
        mets_fptrs = []
1003
        if self._cache_flag:
1004
            for pageId, fptrdict in self._fptr_cache.items():
1005
                if fileId in fptrdict:
1006
                    mets_fptrs.append(fptrdict[fileId])
1007
        else:
1008
            mets_fptrs = self._tree.getroot().xpath(
1009
                'mets:structMap[@TYPE="PHYSICAL"]/mets:div[@TYPE="physSequence"]/mets:div[@TYPE="page"]/mets:fptr[@FILEID="%s"]' % fileId,
1010
                namespaces=NS)
1011
        ret = []
1012
        for mets_fptr in mets_fptrs:
1013
            mets_div = mets_fptr.getparent()
1014
            ret.append(mets_div.get('ID'))
1015
            if self._cache_flag:
1016
                del self._fptr_cache[mets_div.get('ID')][mets_fptr.get('FILEID')]
1017
            mets_div.remove(mets_fptr)
1018
        return ret
1019
1020
    @property
1021
    def physical_pages_labels(self) -> Dict[str, Tuple[Optional[str], Optional[str], Optional[str]]]:
1022
        """
1023
        Map all page IDs (the ``@ID`` of each physical ``mets:structMap`` ``mets:div``) to their
1024
        ``@ORDER``, ``@ORDERLABEL`` and ``@LABEL`` attributes, if any.
1025
        """
1026
        divs = self._tree.getroot().xpath(
1027
            'mets:structMap[@TYPE="PHYSICAL"]/mets:div[@TYPE="physSequence"]/mets:div[@TYPE="page"]',
1028
            namespaces=NS)
1029
        return {div.get('ID'): (div.get('ORDER', None), div.get('ORDERLABEL', None), div.get('LABEL', None))
1030
                for div in divs}
1031
1032
    def merge(self, other_mets, force : bool = False, 
1033
              fileGrp_mapping : Optional[Dict[str, str]] = None, 
1034
              fileId_mapping : Optional[Dict[str, str]] = None, 
1035
              pageId_mapping : Optional[Dict[str, str]] = None,
1036
              after_add_cb : Optional[Callable[[OcrdFile], Any]] = None, **kwargs) -> None:
1037
        """
1038
        Add all files from other_mets.
1039
        Accepts the same kwargs as :py:func:`find_files`
1040
        Keyword Args:
1041
            force (boolean): Whether to do :py:meth:`add_file` with ``force`` (overwriting existing ``mets:file`` entries)
1042
            fileGrp_mapping (dict): Map :py:attr:`other_mets` fileGrp to fileGrp in this METS
1043
            fileId_mapping (dict): Map :py:attr:`other_mets` file ID to file ID in this METS
1044
            pageId_mapping (dict): Map :py:attr:`other_mets` page ID to page ID in this METS
1045
            after_add_cb (function): Callback received after file is added to the METS
1046
        """
1047
        if not fileGrp_mapping:
1048
            fileGrp_mapping = {}
1049
        if not fileId_mapping:
1050
            fileId_mapping = {}
1051
        if not pageId_mapping:
1052
            pageId_mapping = {}
1053
        for f_src in other_mets.find_files(**kwargs):
1054
            f_dest = self.add_file(
1055
                fileGrp_mapping.get(f_src.fileGrp, f_src.fileGrp),
1056
                mimetype=f_src.mimetype,
1057
                url=f_src.url,
1058
                local_filename=f_src.local_filename,
1059
                ID=fileId_mapping.get(f_src.ID, f_src.ID),
1060
                pageId=pageId_mapping.get(f_src.pageId, f_src.pageId),
1061
                force=force)
1062
            # FIXME: merge metsHdr, amdSec, dmdSec as well
1063
            # FIXME: merge structMap logical and structLink as well
1064
            if after_add_cb:
1065
                after_add_cb(f_dest)
1066