Passed
Pull Request — master (#1329)
by
unknown
03:01
created

ocrd_models.ocrd_mets.OcrdMets.find_files()   F

Complexity

Conditions 48

Size

Total Lines 126
Code Lines 76

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 76
dl 0
loc 126
rs 0
c 0
b 0
f 0
cc 48
nop 10

How to fix   Long Method    Complexity    Many Parameters   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

Complexity

Complex classes like ocrd_models.ocrd_mets.OcrdMets.find_files() often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

Many Parameters

Methods with many parameters are not only hard to understand, but their parameters also often become inconsistent when you need more, or different data.

There are several approaches to avoid long parameter lists:

1
"""
2
API to METS
3
"""
4
from datetime import datetime
5
import re
6
from lxml import etree as ET
7
from typing import Any, Callable, Dict, Iterator, List, Optional, Tuple, Union
8
9
from ocrd_utils import (
10
    getLogger,
11
    generate_range,
12
    VERSION,
13
    REGEX_PREFIX,
14
    REGEX_FILE_ID
15
)
16
17
from ocrd_utils.config import config
18
19
from .constants import (
20
    NAMESPACES as NS,
21
    TAG_METS_AGENT,
22
    TAG_METS_DIV,
23
    TAG_METS_FILE,
24
    TAG_METS_FILEGRP,
25
    TAG_METS_FILESEC,
26
    TAG_METS_FPTR,
27
    TAG_METS_METSHDR,
28
    TAG_METS_STRUCTMAP,
29
    IDENTIFIER_PRIORITY,
30
    TAG_MODS_IDENTIFIER,
31
    METS_XML_EMPTY,
32
    METS_PAGE_DIV_ATTRIBUTE,
33
    METS_STRUCT_DIV_ATTRIBUTE,
34
    METS_DIV_ATTRIBUTE_PATTERN,
35
    METS_DIV_ATTRIBUTE_ATOM_PATTERN,
36
    METS_DIV_ATTRIBUTE_RANGE_PATTERN,
37
    METS_DIV_ATTRIBUTE_REGEX_PATTERN,
38
)
39
40
from .ocrd_xml_base import OcrdXmlDocument, ET      # type: ignore
41
from .ocrd_file import OcrdFile
42
from .ocrd_agent import OcrdAgent
43
44
REGEX_PREFIX_LEN = len(REGEX_PREFIX)
45
46
class OcrdMets(OcrdXmlDocument):
47
    """
48
    API to a single METS file
49
    """
50
    _cache_flag : bool
51
    # Cache for the physical pages (mets:div) - two nested dictionaries
52
    # The outer dictionary's key: attribute type
53
    # The outer dictionary's value: inner dictionary
54
    # The inner dictionary's key: attribute value (str)
55
    # The inner dictionary's value: a 'div' object at some memory location
56
    _page_cache : Dict[METS_PAGE_DIV_ATTRIBUTE, Dict[str, ET._Element]]
57
    # Cache for the files (mets:file) - two nested dictionaries
58
    # The outer dictionary's Key: 'fileGrp.USE'
59
    # The outer dictionary's Value: Inner dictionary
60
    # The inner dictionary's Key: 'file.ID'
61
    # The inner dictionary's Value: a 'file' object at some memory location
62
    _file_cache : Dict[str, Dict[str, ET._Element]]
63
    # Cache for the file pointers (mets:fptr) - two nested dictionaries
64
    # The outer dictionary's Key: 'div.ID'
65
    # The outer dictionary's Value: Inner dictionary
66
    # The inner dictionary's Key: 'fptr.FILEID'
67
    # The inner dictionary's Value: a 'fptr' object at some memory location
68
    _fptr_cache : Dict[str, Dict[str, ET._Element]]
69
    # Cache for the logical structural divs (mets:div) - two nested dictionaries
70
    # The outer dictionary's key: attribute type
71
    # The outer dictionary's value: inner dictionary
72
    # The inner dictionary's key: attribute value (str)
73
    # The inner dictionary's value: a list of corresponding physical div.ID
74
    _struct_cache : Dict[METS_STRUCT_DIV_ATTRIBUTE, Dict[str, List[str]]]
75
76
    @staticmethod
77
    def empty_mets(now : Optional[str] = None, cache_flag : bool = False):
78
        """
79
        Create an empty METS file from bundled template.
80
        """
81
        if not now:
82
            now = datetime.now().isoformat()
83
        tpl = METS_XML_EMPTY
84
        tpl = tpl.replace('{{ VERSION }}', VERSION)
85
        tpl = tpl.replace('{{ NOW }}', '%s' % now)
86
        return OcrdMets(content=tpl.encode('utf-8'), cache_flag=cache_flag)
87
88
    def __init__(self, **kwargs) -> None:
89
        """
90
        """
91
        super().__init__(**kwargs)
92
93
        # XXX If the environment variable OCRD_METS_CACHING is set to "true",
94
        # then enable caching, if "false", disable caching, overriding the
95
        # kwarg to the constructor
96
        if config.is_set('OCRD_METS_CACHING'):
97
            getLogger('ocrd.models.ocrd_mets').debug('METS Caching %s because OCRD_METS_CACHING is %s',
98
                    'enabled' if config.OCRD_METS_CACHING else 'disabled', config.raw_value('OCRD_METS_CACHING'))
99
            self._cache_flag = config.OCRD_METS_CACHING
100
101
102
        # If cache is enabled
103
        if self._cache_flag:
104
            self._initialize_caches()
105
            self._refresh_caches()
106
107
    def __str__(self) -> str:
108
        """
109
        String representation
110
        """
111
        return 'OcrdMets[cached=%s,fileGrps=%s,files=%s]' % (
112
        self._cache_flag, self.file_groups, list(self.find_files()))
113
114
    def _fill_caches(self) -> None:
115
        """
116
        Fills the caches with fileGrps and FileIDs
117
        """
118
119
        tree_root = self._tree.getroot()
120
121
        # Fill with files
122
        el_fileSec = tree_root.find("mets:fileSec", NS)
123
        if el_fileSec is None:
124
            return
125
126
        log = getLogger('ocrd.models.ocrd_mets._fill_caches-files')
127
        for el_fileGrp in el_fileSec.findall('mets:fileGrp', NS):
128
            fileGrp_use = el_fileGrp.get('USE')
129
130
            # Assign an empty dictionary that will hold the files of the added fileGrp
131
            self._file_cache[fileGrp_use] = {}
132
133
            for el_file in el_fileGrp:
134
                file_id = el_file.get('ID')
135
                self._file_cache[fileGrp_use].update({file_id: el_file})
136
                # log.info("File added to the cache: %s" % file_id)
137
138
        # Fill with pages
139
        log = getLogger('ocrd.models.ocrd_mets._fill_caches-pages')
140
        el_div_list = tree_root.findall(".//mets:div[@TYPE='page']", NS)
141
        if len(el_div_list) == 0:
142
            return
143
144
        for el_div in el_div_list:
145
            div_id = el_div.get('ID')
146
            log.debug("DIV_ID: %s" % el_div.get('ID'))
147
148
            for attr in METS_PAGE_DIV_ATTRIBUTE:
149
                self._page_cache[attr][str(el_div.get(attr.name))] = el_div
150
151
            # Assign an empty dictionary that will hold the fptr of the added page (div)
152
            self._fptr_cache[div_id] = {}
153
154
            # log.info("Page_id added to the cache: %s" % div_id)
155
156
            for el_fptr in el_div:
157
                self._fptr_cache[div_id].update({el_fptr.get('FILEID'): el_fptr})
158
                # log.info("Fptr added to the cache: %s" % el_fptr.get('FILEID'))
159
160
        # log.info("Len of page_cache: %s" % len(self._page_cache[METS_PAGE_DIV_ATTRIBUTE.ID]))
161
        # log.info("Len of fptr_cache: %s" % len(self._fptr_cache))
162
163
        # Fill with logical divs
164
        log = getLogger('ocrd.models.ocrd_mets._fill_caches-structs')
165
        el_struct_list = tree_root.findall("mets:structMap[@TYPE='LOGICAL']//mets:div", NS)
166
        el_smlink_list = tree_root.findall("mets:structLink/mets:smLink", NS)
167
        if len(el_struct_list) == 0 or len(el_smlink_list) == 0:
168
            return
169
        smlink_map = {}
170
        for link in el_smlink_list:
171
            link_log = link.get('{%s}from' % NS['xlink'])
172
            link_phy = link.get('{%s}to' % NS['xlink'])
173
            smlink_map.setdefault(link_log, list()).append(link_phy)
174
        for el_div in el_struct_list:
175
            for attr in METS_STRUCT_DIV_ATTRIBUTE:
176
                val = self._struct_cache[attr].setdefault(str(el_div.get(attr.name)), list())
177
                val.extend(smlink_map.get(el_div.get('ID'), []))
178
179
        # log.info("Len of struct_cache: %s" % len(self._struct_cache[METS_STRUCT_DIV_ATTRIBUTE.ID]))
180
181
    def _initialize_caches(self) -> None:
182
        self._file_cache = {}
183
        # NOTE we can only guarantee uniqueness for @ID and @ORDER
184
        self._page_cache = {k : {} for k in METS_PAGE_DIV_ATTRIBUTE}
185
        self._fptr_cache = {}
186
        self._struct_cache = {k : {} for k in METS_STRUCT_DIV_ATTRIBUTE}
187
188
    def _refresh_caches(self) -> None:
189
        if self._cache_flag:
190
            self._initialize_caches()
191
192
            # Note, if the empty_mets() function is used to instantiate OcrdMets
193
            # Then the cache is empty even after this operation
194
            self._fill_caches()
195
196
    @property
197
    def unique_identifier(self) -> Optional[str]:
198
        """
199
        Get the unique identifier by looking through ``mods:identifier``
200
        See `specs <https://ocr-d.de/en/spec/mets#unique-id-for-the-document-processed>`_ for details.
201
        """
202
        for t in IDENTIFIER_PRIORITY:
203
            found = self._tree.getroot().find('.//mods:identifier[@type="%s"]' % t, NS)
204
            if found is not None:
205
                return found.text
206
207
    @unique_identifier.setter
208
    def unique_identifier(self, purl : str) -> None:
209
        """
210
        Set the unique identifier by looking through ``mods:identifier``
211
        See `specs <https://ocr-d.de/en/spec/mets#unique-id-for-the-document-processed>`_ for details.
212
        """
213
        id_el = None
214
        for t in IDENTIFIER_PRIORITY:
215
            id_el = self._tree.getroot().find('.//mods:identifier[@type="%s"]' % t, NS)
216
            if id_el is not None:
217
                break
218
        if id_el is None:
219
            mods = self._tree.getroot().find('.//mods:mods', NS)
220
            assert mods is not None
221
            id_el = ET.SubElement(mods, TAG_MODS_IDENTIFIER)
222
            id_el.set('type', 'purl')
223
        id_el.text = purl
224
225
    @property
226
    def agents(self) -> List[OcrdAgent]:
227
        """
228
        List all :py:class:`ocrd_models.ocrd_agent.OcrdAgent` entries.
229
        """
230
        return [OcrdAgent(el_agent) for el_agent in self._tree.getroot().findall('mets:metsHdr/mets:agent', NS)]
231
232
    def add_agent(self, **kwargs) -> OcrdAgent:
233
        """
234
        Add an :py:class:`ocrd_models.ocrd_agent.OcrdAgent` to the list of agents in the ``metsHdr``.
235
        """
236
        el_metsHdr = self._tree.getroot().find('.//mets:metsHdr', NS)
237
        if el_metsHdr is None:
238
            el_metsHdr = ET.Element(TAG_METS_METSHDR)
239
            self._tree.getroot().insert(0, el_metsHdr)
240
        #  assert(el_metsHdr is not None)
241
        el_agent = ET.Element(TAG_METS_AGENT)
242
        try:
243
            el_agent_last = next(el_metsHdr.iterchildren(tag=TAG_METS_AGENT, reversed=True))
244
            el_agent_last.addnext(el_agent)
245
        except StopIteration:
246
            el_metsHdr.insert(0, el_agent)
247
        return OcrdAgent(el_agent, **kwargs)
248
249
    @property
250
    def file_groups(self) -> List[str]:
251
        """
252
        List the ``@USE`` of all ``mets:fileGrp`` entries.
253
        """
254
255
        # WARNING: Actually we cannot return strings in place of elements!
256
        if self._cache_flag:
257
            return list(self._file_cache.keys())
258
259
        return [el.get('USE') for el in self._tree.getroot().findall('.//mets:fileGrp', NS)]
260
261
    def find_all_files(self, *args, **kwargs) -> List[OcrdFile]:
262
        """
263
        Like :py:meth:`find_files` but return a list of all results.
264
        Equivalent to ``list(self.find_files(...))``
265
        """
266
        return list(self.find_files(*args, **kwargs))
267
268
    # pylint: disable=multiple-statements
269
    def find_files(
270
        self,
271
        ID : Optional[str] = None,
272
        fileGrp : Optional[str] = None,
273
        pageId : Optional[str] = None,
274
        mimetype : Optional[str] = None,
275
        url : Optional[str] = None,
276
        local_filename : Optional[str] = None,
277
        local_only : bool = False,
278
        include_fileGrp : Optional[List[str]] = None,
279
        exclude_fileGrp : Optional[List[str]] = None,
280
    ) -> Iterator[OcrdFile]:
281
        """
282
        Search ``mets:file`` entries in this METS document and yield results.
283
        The :py:attr:`ID`, :py:attr:`pageId`, :py:attr:`fileGrp`,
284
        :py:attr:`url` and :py:attr:`mimetype` parameters can each be either a
285
        literal string, or a regular expression if the string starts with
286
        ``//`` (double slash).
287
288
        If it is a regex, the leading ``//`` is removed and candidates are matched
289
        against the regex with `re.fullmatch`. If it is a literal string, comparison
290
        is done with string equality.
291
292
        The :py:attr:`pageId` parameter also supports comma-separated lists, as well
293
        as the numeric range operator ``..`` and the negation operator ``~``.
294
295
        For example, to find all files in pages ``PHYS_0001`` to ``PHYS_0003``, the
296
        both expressions ``PHYS_0001..PHYS_0003`` and ``PHYS_0001,PHYS_0002,PHYS_0003``
297
        will be expanded to the same 3 pages. To find all files above that subrange,
298
        both expressions ``~PHYS_0001..PHYS_0003`` and ``~PHYS_0001,~PHYS_0002,~PHYS_0003``
299
        will be expanded to ``PHYS_0004`` and upwards.
300
301
        Keyword Args:
302
            ID (string) : ``@ID`` of the ``mets:file``
303
            fileGrp (string) : ``@USE`` of the ``mets:fileGrp`` to list files of
304
            pageId (string) : ``@ID`` of the corresponding physical ``mets:structMap`` entry (physical page)
305
            url (string) : ``@xlink:href`` remote/original URL of ``mets:Flocat`` of ``mets:file``
306
            local_filename (string) : ``@xlink:href`` local/cached filename of ``mets:Flocat`` of ``mets:file``
307
            mimetype (string) : ``@MIMETYPE`` of ``mets:file``
308
            local (boolean) : Whether to restrict results to local files in the filesystem
309
            include_fileGrp (list[str]) : List of allowed file groups
310
            exclude_fileGrp (list[str]) : List of disallowd file groups
311
        Yields:
312
            :py:class:`ocrd_models:ocrd_file:OcrdFile` instantiations
313
        """
314
        pageId_list = []
315
        if pageId:
316
            # returns divs instead of strings of ids
317
            physical_pages = self.get_physical_pages(for_pageIds=pageId, return_divs=True)
318
            for div in physical_pages:
319
                if self._cache_flag:
320
                    pageId_list += self._fptr_cache[div.get('ID')]
321
                else:
322
                    pageId_list += [fptr.get('FILEID') for fptr in div.findall('mets:fptr', NS)]
323
324
        if ID and ID.startswith(REGEX_PREFIX):
325
            ID = re.compile(ID[REGEX_PREFIX_LEN:])
326
        if fileGrp and fileGrp.startswith(REGEX_PREFIX):
327
            fileGrp = re.compile(fileGrp[REGEX_PREFIX_LEN:])
328
        if mimetype and mimetype.startswith(REGEX_PREFIX):
329
            mimetype = re.compile(mimetype[REGEX_PREFIX_LEN:])
330
        if url and url.startswith(REGEX_PREFIX):
331
            url = re.compile(url[REGEX_PREFIX_LEN:])
332
333
        candidates = []
334
        if self._cache_flag:
335
            if fileGrp:
336
                if isinstance(fileGrp, str):
337
                    candidates += self._file_cache.get(fileGrp, {}).values()
338
                else:
339
                    candidates = [x for fileGrp_needle, el_file_list in self._file_cache.items() if
340
                                  fileGrp.match(fileGrp_needle) for x in el_file_list.values()]
341
            else:
342
                candidates = [el_file for id_to_file in self._file_cache.values() for el_file in id_to_file.values()]
343
        else:
344
            candidates = self._tree.getroot().xpath('//mets:file', namespaces=NS)
345
346
        for cand in candidates:
347
            if ID:
348
                if isinstance(ID, str):
349
                    if not ID == cand.get('ID'): continue
350
                else:
351
                    if not ID.fullmatch(cand.get('ID')): continue
352
353
            if pageId is not None and cand.get('ID') not in pageId_list:
354
                continue
355
356
            if not self._cache_flag and fileGrp:
357
                if isinstance(fileGrp, str):
358
                    if cand.getparent().get('USE') != fileGrp: continue
359
                else:
360
                    if not fileGrp.fullmatch(cand.getparent().get('USE')): continue
361
362
            if mimetype:
363
                if isinstance(mimetype, str):
364
                    if cand.get('MIMETYPE') != mimetype: continue
365
                else:
366
                    if not mimetype.fullmatch(cand.get('MIMETYPE') or ''): continue
367
368
            if url:
369
                cand_locat = cand.find('mets:FLocat[@LOCTYPE="URL"]', namespaces=NS)
370
                if cand_locat is None:
371
                    continue
372
                cand_url = cand_locat.get('{%s}href' % NS['xlink'])
373
                if isinstance(url, str):
374
                    if cand_url != url: continue
375
                else:
376
                    if not url.fullmatch(cand_url): continue
377
378
            if local_filename:
379
                cand_locat = cand.find('mets:FLocat[@LOCTYPE="OTHER"][@OTHERLOCTYPE="FILE"]', namespaces=NS)
380
                if cand_locat is None:
381
                    continue
382
                cand_local_filename = cand_locat.get('{%s}href' % NS['xlink'])
383
                if isinstance(local_filename, str):
384
                    if cand_local_filename != local_filename: continue
385
                else:
386
                    if not local_filename.fullmatch(cand_local_filename): continue
387
388
            if local_only:
389
                # deprecation_warning("'local_only' is deprecated, use 'local_filename=\"//.+\"' instead")
390
                is_local = cand.find('mets:FLocat[@LOCTYPE="OTHER"][@OTHERLOCTYPE="FILE"][@xlink:href]', namespaces=NS)
391
                if is_local is None:
392
                    continue
393
394
            ret = OcrdFile(cand, mets=self)
395
396
            # XXX include_fileGrp is redundant to fileGrp but for completeness
397
            if exclude_fileGrp and ret.fileGrp in exclude_fileGrp:
398
                continue
399
            if include_fileGrp and ret.fileGrp not in include_fileGrp:
400
                continue
401
402
            yield ret
403
404
    def add_file_group(self, fileGrp: str) -> ET._Element:
405
        """
406
        Add a new ``mets:fileGrp``.
407
        Arguments:
408
            fileGrp (string): ``@USE`` of the new ``mets:fileGrp``.
409
        """
410
        if ',' in fileGrp:
411
            raise ValueError('fileGrp must not contain commas')
412
        el_fileSec = self._tree.getroot().find('mets:fileSec', NS)
413
        if el_fileSec is None:
414
            el_fileSec = ET.SubElement(self._tree.getroot(), TAG_METS_FILESEC)
415
        el_fileGrp = el_fileSec.find('mets:fileGrp[@USE="%s"]' % fileGrp, NS)
416
        if el_fileGrp is None:
417
            el_fileGrp = ET.SubElement(el_fileSec, TAG_METS_FILEGRP)
418
            el_fileGrp.set('USE', fileGrp)
419
420
            if self._cache_flag:
421
                # Assign an empty dictionary that will hold the files of the added fileGrp
422
                self._file_cache[fileGrp] = {}
423
424
        return el_fileGrp
425
426
    def rename_file_group(self, old: str, new: str) -> None:
427
        """
428
        Rename a ``mets:fileGrp`` by changing the ``@USE`` from :py:attr:`old` to :py:attr:`new`.
429
        """
430
        el_fileGrp = self._tree.getroot().find('mets:fileSec/mets:fileGrp[@USE="%s"]' % old, NS)
431
        if el_fileGrp is None:
432
            raise FileNotFoundError("No such fileGrp '%s'" % old)
433
        el_fileGrp.set('USE', new)
434
435
        if self._cache_flag:
436
            self._file_cache[new] = self._file_cache.pop(old)
437
438
    def remove_file_group(self, USE: str, recursive : bool = False, force : bool = False) -> None:
439
        """
440
        Remove a ``mets:fileGrp`` (single fixed ``@USE`` or multiple regex ``@USE``)
441
        Arguments:
442
            USE (string): ``@USE`` of the ``mets:fileGrp`` to delete. Can be a regex if prefixed with ``//``
443
            recursive (boolean): Whether to recursively delete each ``mets:file`` in the group
444
            force (boolean): Do not raise an exception if ``mets:fileGrp`` does not exist
445
        """
446
        log = getLogger('ocrd.models.ocrd_mets.remove_file_group')
447
        el_fileSec = self._tree.getroot().find('mets:fileSec', NS)
448
        if el_fileSec is None:
449
            raise Exception("No fileSec!")
450
        if isinstance(USE, str):
451
            if USE.startswith(REGEX_PREFIX):
452
                use = re.compile(USE[REGEX_PREFIX_LEN:])
453
                for cand in el_fileSec.findall('mets:fileGrp', NS):
454
                    if use.fullmatch(cand.get('USE')):
455
                        self.remove_file_group(cand, recursive=recursive)
456
                return
457
            else:
458
                el_fileGrp = el_fileSec.find('mets:fileGrp[@USE="%s"]' % USE, NS)
459
        else:
460
            el_fileGrp = USE
461
        if el_fileGrp is None:  # pylint: disable=len-as-condition
462
            msg = "No such fileGrp: %s" % USE
463
            if force:
464
                log.warning(msg)
465
                return
466
            raise Exception(msg)
467
468
        # The cache should also be used here
469
        if self._cache_flag:
470
            files = self._file_cache.get(el_fileGrp.get('USE'), {}).values()
471
        else:
472
            files = el_fileGrp.findall('mets:file', NS)
473
474
        if files:
475
            if not recursive:
476
                raise Exception("fileGrp %s is not empty and recursive wasn't set" % USE)
477
            for f in list(files):
478
                self.remove_one_file(ID=f.get('ID'), fileGrp=f.getparent().get('USE'))
479
480
        if self._cache_flag:
481
            # Note: Since the files inside the group are removed
482
            # with the 'remove_one_file' method above, 
483
            # we should not take care of that again.
484
            # We just remove the fileGrp.
485
            del self._file_cache[el_fileGrp.get('USE')]
486
487
        el_fileGrp.getparent().remove(el_fileGrp)
488
489
    def add_file(self, fileGrp : str, mimetype : Optional[str] = None, url : Optional[str] = None, 
490
                 ID : Optional[str] = None, pageId : Optional[str] = None, force : bool = False, 
491
                 local_filename : Optional[str] = None, ignore : bool = False, **kwargs) -> OcrdFile:
492
        """
493
        Instantiate and add a new :py:class:`ocrd_models.ocrd_file.OcrdFile`.
494
        Arguments:
495
            fileGrp (string): ``@USE`` of ``mets:fileGrp`` to add to
496
        Keyword Args:
497
            mimetype (string): ``@MIMETYPE`` of the ``mets:file`` to use
498
            url (string): ``@xlink:href`` (URL or path) of the ``mets:file`` to use
499
            ID (string): ``@ID`` of the ``mets:file`` to use
500
            pageId (string): ``@ID`` in the physical ``mets:structMap`` to link to
501
            force (boolean): Whether to add the file even if a ``mets:file`` with the same ``@ID`` already exists.
502
            ignore (boolean): Do not look for existing files at all. Shift responsibility for preventing errors from duplicate ID to the user.
503
            local_filename (string):
504
        """
505
        if not ID:
506
            raise ValueError("Must set ID of the mets:file")
507
        if not fileGrp:
508
            raise ValueError("Must set fileGrp of the mets:file")
509
        if not REGEX_FILE_ID.fullmatch(ID):
510
            raise ValueError("Invalid syntax for mets:file/@ID %s (not an xs:ID)" % ID)
511
        if not REGEX_FILE_ID.fullmatch(fileGrp):
512
            raise ValueError("Invalid syntax for mets:fileGrp/@USE %s (not an xs:ID)" % fileGrp)
513
514
        el_fileGrp = self.add_file_group(fileGrp)
515
        if not ignore:
516
            mets_file = next(self.find_files(ID=ID, fileGrp=fileGrp), None)
517
            if mets_file:
518
                if mets_file.fileGrp == fileGrp and \
519
                        mets_file.pageId == pageId and \
520
                        mets_file.mimetype == mimetype:
521
                    if not force:
522
                        raise FileExistsError(
523
                            f"A file with ID=={ID} already exists {mets_file} and neither force nor ignore are set")
524
                    self.remove_file(ID=ID, fileGrp=fileGrp)
525
                else:
526
                    raise FileExistsError(
527
                        f"A file with ID=={ID} already exists {mets_file} but unrelated - cannot mitigate")
528
529
        # To get rid of Python's FutureWarning - checking if v is not None
530
        kwargs = {k: v for k, v in locals().items()
531
                  if k in ['url', 'ID', 'mimetype', 'pageId', 'local_filename'] and v is not None}
532
        # This separation is needed to reuse the same el_mets_file element in the caching if block
533
        el_mets_file = ET.SubElement(el_fileGrp, TAG_METS_FILE)
534
        # The caching of the physical page is done in the OcrdFile constructor
535
        # (which calls us back with set_physical_page_for_file)
536
        mets_file = OcrdFile(el_mets_file, mets=self, **kwargs)
537
538
        if self._cache_flag:
539
            # Add the file to the file cache
540
            self._file_cache[fileGrp].update({ID: el_mets_file})
541
542
        return mets_file
543
544
    def remove_file(self, *args, **kwargs) -> Union[List[OcrdFile],OcrdFile]:
545
        """
546
        Delete each ``ocrd:file`` matching the query. Same arguments as :py:meth:`find_files`
547
        """
548
        files = list(self.find_files(*args, **kwargs))
549
        if files:
550
            for f in files:
551
                self.remove_one_file(f)
552
            if len(files) > 1:
553
                return files
554
            else:
555
                return files[0]  # for backwards-compatibility
556
        if any(1 for kwarg in kwargs
557
               if isinstance(kwarg, str) and kwarg.startswith(REGEX_PREFIX)):
558
            # allow empty results if filter criteria involve a regex
559
            return []
560
        raise FileNotFoundError("File not found: %s %s" % (args, kwargs))
561
562
    def remove_one_file(self, ID : Union[str, OcrdFile], fileGrp : str = None) -> OcrdFile:
563
        """
564
        Delete an existing :py:class:`ocrd_models.ocrd_file.OcrdFile`.
565
        Arguments:
566
            ID (string|OcrdFile): ``@ID`` of the ``mets:file`` to delete  Can also be an :py:class:`ocrd_models.ocrd_file.OcrdFile` to avoid search via ``ID``.
567
            fileGrp (string): ``@USE`` of the ``mets:fileGrp`` containing the ``mets:file``. Used only for optimization.
568
        Returns:
569
            The old :py:class:`ocrd_models.ocrd_file.OcrdFile` reference.
570
        """
571
        log = getLogger('ocrd.models.ocrd_mets.remove_one_file')
572
        log.debug("remove_one_file(%s %s)" % (ID, fileGrp))
573
        if isinstance(ID, OcrdFile):
574
            ocrd_file = ID
575
            ID = ocrd_file.ID
576
        else:
577
            ocrd_file = next(self.find_files(ID=ID, fileGrp=fileGrp), None)
578
579
        if not ocrd_file:
580
            raise FileNotFoundError("File not found: %s (fileGr=%s)" % (ID, fileGrp))
581
582
        # Delete the physical page ref
583
        fptrs = []
584
        if self._cache_flag:
585
            for pageId, fptrdict in self._fptr_cache.items():
586
                if ID in fptrdict:
587
                    fptrs.append(fptrdict[ID])
588
        else:
589
            fptrs = self._tree.getroot().findall('.//mets:fptr[@FILEID="%s"]' % ID, namespaces=NS)
590
591
        # Delete the physical page ref
592
        for fptr in fptrs:
593
            log.debug("Delete fptr element %s for page '%s'", fptr, ID)
594
            page_div = fptr.getparent()
595
            page_div.remove(fptr)
596
            # Remove the fptr from the cache as well
597
            if self._cache_flag:
598
                del self._fptr_cache[page_div.get('ID')][ID]
599
            # delete empty pages
600
            if not list(page_div):
601
                log.debug("Delete empty page %s", page_div)
602
                page_div.getparent().remove(page_div)
603
                # Delete the empty pages from caches as well
604
                if self._cache_flag:
605
                    for attr in METS_PAGE_DIV_ATTRIBUTE:
606
                        if attr.name in page_div.attrib:
607
                            del self._page_cache[attr][page_div.attrib[attr.name]]
608
609
        # Delete the file reference from the cache
610
        if self._cache_flag:
611
            parent_use = ocrd_file._el.getparent().get('USE')
612
            del self._file_cache[parent_use][ocrd_file.ID]
613
614
        # Delete the file reference
615
        # pylint: disable=protected-access
616
        ocrd_file._el.getparent().remove(ocrd_file._el)
617
618
        return ocrd_file
619
620
    @property
621
    def physical_pages(self) -> List[str]:
622
        """
623
        List all page IDs (the ``@ID`` of each physical ``mets:structMap`` ``mets:div``)
624
        """
625
        if self._cache_flag:
626
            return list(self._page_cache[METS_PAGE_DIV_ATTRIBUTE.ID].keys())
627
628
        return [str(x) for x in self._tree.getroot().xpath(
629
            'mets:structMap[@TYPE="PHYSICAL"]/mets:div[@TYPE="physSequence"]/mets:div[@TYPE="page"]/@ID',
630
            namespaces=NS)]
631
632
    def get_physical_pages(self, for_fileIds : Optional[List[str]] = None, for_pageIds : Optional[str] = None, 
633
                           return_divs : bool = False) -> List[Union[str, ET._Element]]:
634
        """
635
        List all page IDs (the ``@ID`` of each physical ``mets:structMap`` ``mets:div``),
636
        optionally for a subset of ``mets:file`` ``@ID`` :py:attr:`for_fileIds`,
637
        or for a subset selector expression (comma-separated, range, and/or regex) :py:attr:`for_pageIds`.
638
        If return_divs is set, returns div memory objects instead of strings of ids
639
        """
640
        if for_fileIds is None and for_pageIds is None:
641
            if return_divs:
642
                if self._cache_flag:
643
                    return list(self._page_cache[METS_PAGE_DIV_ATTRIBUTE.ID].values())
644
645
                return [x for x in self._tree.getroot().xpath(
646
                    'mets:structMap[@TYPE="PHYSICAL"]/mets:div[@TYPE="physSequence"]/mets:div[@TYPE="page"]',
647
                    namespaces=NS)]
648
649
            return self.physical_pages
650
651
        # log = getLogger('ocrd.models.ocrd_mets.get_physical_pages')
652
        if for_pageIds is not None:
653
            page_attr_patterns = []
654
            page_attr_antipatterns = []
655
            page_attr_patterns_raw = re.split(r',', for_pageIds)
656
            for pageId_token in page_attr_patterns_raw:
657
                # prefix for disambiguation of attribute?
658
                if pageId_token.startswith('logical:'):
659
                    if pageId_token.startswith('logical:id:'):
660
                        attr = [METS_STRUCT_DIV_ATTRIBUTE.ID]
661
                        pageId_token = pageId_token[len('logical:id:'):]
662
                    elif pageId_token.startswith('logical:dmdid:'):
663
                        attr = [METS_STRUCT_DIV_ATTRIBUTE.DMDID]
664
                        pageId_token = pageId_token[len('logical:dmdid:'):]
665
                    elif pageId_token.startswith('logical:type:'):
666
                        attr = [METS_STRUCT_DIV_ATTRIBUTE.TYPE]
667
                        pageId_token = pageId_token[len('logical:type:'):]
668
                    elif pageId_token.startswith('logical:label:'):
669
                        attr = [METS_STRUCT_DIV_ATTRIBUTE.LABEL]
670
                        pageId_token = pageId_token[len('logical:label:'):]
671
                    else:
672
                        attr = list(METS_STRUCT_DIV_ATTRIBUTE)
673
                        pageId_token = pageId_token[len('logical:'):]
674
                elif pageId_token.startswith('physical:'):
675
                    if pageId_token.startswith('physical:id:'):
676
                        attr = [METS_PAGE_DIV_ATTRIBUTE.ID]
677
                        pageId_token = pageId_token[len('physical:id:'):]
678
                    elif pageId_token.startswith('physical:order:'):
679
                        attr = [METS_PAGE_DIV_ATTRIBUTE.ORDER]
680
                        pageId_token = pageId_token[len('physical:order:'):]
681
                    elif pageId_token.startswith('physical:orderlabel:'):
682
                        attr = [METS_PAGE_DIV_ATTRIBUTE.ORDERLABEL]
683
                        pageId_token = pageId_token[len('physical:orderlabel:'):]
684
                    elif pageId_token.startswith('physical:label:'):
685
                        attr = [METS_PAGE_DIV_ATTRIBUTE.LABEL]
686
                        pageId_token = pageId_token[len('physical:label:'):]
687
                    elif pageId_token.startswith('physical:contentids:'):
688
                        attr = [METS_PAGE_DIV_ATTRIBUTE.CONTENTIDS]
689
                        pageId_token = pageId_token[len('physical:contentids:'):]
690
                    else:
691
                        attr = list(METS_PAGE_DIV_ATTRIBUTE)
692
                        pageId_token = pageId_token[len('physical:'):]
693
                else:
694
                    attr = list(METS_PAGE_DIV_ATTRIBUTE) + list(METS_STRUCT_DIV_ATTRIBUTE)
695
                # negation prefix
696
                if pageId_token.startswith('~'):
697
                    page_attr_xpatterns = page_attr_antipatterns
698
                    pageId_token = pageId_token[1:]
699
                else:
700
                    page_attr_xpatterns = page_attr_patterns
701
                # operator prefix
702
                if pageId_token.startswith(REGEX_PREFIX):
703
                    val_expr = re.compile(pageId_token[REGEX_PREFIX_LEN:])
704
                    page_attr_xpatterns.append(
705
                        METS_DIV_ATTRIBUTE_REGEX_PATTERN(val_expr, attr))
706
                elif '..' in pageId_token:
707
                    val_range = generate_range(*pageId_token.split('..', 1))
708
                    page_attr_xpatterns.append(
709
                        METS_DIV_ATTRIBUTE_RANGE_PATTERN(val_range, attr))
710
                else:
711
                    page_attr_xpatterns.append(
712
                        METS_DIV_ATTRIBUTE_ATOM_PATTERN(pageId_token, attr))
713
            if not page_attr_patterns and not page_attr_antipatterns:
714
                return []
715
            if page_attr_patterns:
716
                divs = self.get_physical_page_patterns(page_attr_patterns)
717
            else:
718
                all_pages = [METS_DIV_ATTRIBUTE_REGEX_PATTERN(
719
                    re.compile(".*"), [METS_PAGE_DIV_ATTRIBUTE.ID])]
720
                divs = self.get_physical_page_patterns(all_pages)
721
            if page_attr_antipatterns:
722
                antidivs = self.get_physical_page_patterns(page_attr_antipatterns)
723
                divs = [div for div in divs if div not in antidivs]
724
            if return_divs:
725
                return divs
726
            else:
727
                return [div.get('ID') for div in divs]
728
729
        if for_fileIds == []:
730
            return []
731
        assert for_fileIds # at this point we know for_fileIds is set, assert to convince pyright
732
        ret = [None] * len(for_fileIds)
733
        if self._cache_flag:
734
            for pageId, fptrdict in self._fptr_cache.items():
735
                for fptr in fptrdict:
736
                    if fptr in for_fileIds:
737
                        index = for_fileIds.index(fptr)
738
                        if return_divs:
739
                            ret[index] = self._page_cache[METS_PAGE_DIV_ATTRIBUTE.ID][pageId]
740
                        else:
741
                            ret[index] = pageId
742
        else:
743
            for page in self._tree.getroot().xpath(
744
                    'mets:structMap[@TYPE="PHYSICAL"]/mets:div[@TYPE="physSequence"]/mets:div[@TYPE="page"]',
745
                    namespaces=NS):
746
                for fptr in page.findall('mets:fptr', NS):
747
                    if fptr.get('FILEID') in for_fileIds:
748
                        index = for_fileIds.index(fptr.get('FILEID'))
749
                        if return_divs:
750
                            ret[index] = page
751
                        else:
752
                            ret[index] = page.get('ID')
753
        return ret
754
755
    def get_physical_page_patterns(self, page_attr_patterns: List[METS_DIV_ATTRIBUTE_PATTERN]) -> List[ET._Element]:
756
        log = getLogger('ocrd.models.ocrd_mets.get_physical_pages')
757
        ret = []
758
        range_patterns_first_last = [(x[0], x[-1]) if isinstance(x, list) else None for x in page_attr_patterns]
759
        page_attr_patterns_copy = list(page_attr_patterns)
760
        if self._cache_flag:
761
            for pat in page_attr_patterns:
762
                for attr in pat.attr:
763
                    if isinstance(attr, METS_PAGE_DIV_ATTRIBUTE):
764
                        cache = self._page_cache[attr]
765
                    else:
766
                        cache = self._struct_cache[attr]
767
                    if (isinstance(pat, METS_DIV_ATTRIBUTE_RANGE_PATTERN) and
768
                        # @TYPE makes no sense in range expressions
769
                        # @LABEL makes no sense in range expressions
770
                        attr in [METS_STRUCT_DIV_ATTRIBUTE.TYPE,
771
                                 METS_STRUCT_DIV_ATTRIBUTE.LABEL]):
772
                        continue
773
                    if cache_keys := [v for v in cache if pat.matches(v)]:
774
                        if isinstance(attr, METS_PAGE_DIV_ATTRIBUTE):
775
                            ret += [cache[v] for v in cache_keys]
776
                            log.debug('physical matches for %s: %s', pat, str(cache_keys))
777
                        else:
778
                            for v in cache_keys:
779
                                ret += [self._page_cache[METS_PAGE_DIV_ATTRIBUTE.ID][p]
780
                                        for p in cache[v]]
781
                            log.debug('logical matches for %s: %s', pat, str(cache_keys))
782
                        if isinstance(pat, METS_DIV_ATTRIBUTE_RANGE_PATTERN):
783
                            # remove matches for final range check
784
                            for v in cache_keys:
785
                                pat.expr.remove(v)
786
                        break
787
                if not cache_keys:
788
                    raise ValueError(f"{pat} matches none of the keys of any of the _page_caches and _struct_caches.")
789
        else:
790
            # cache logical structmap:
791
            el_struct_list = self._tree.getroot().findall("mets:structMap[@TYPE='LOGICAL']//mets:div", NS)
792
            el_smlink_list = self._tree.getroot().findall("mets:structLink/mets:smLink", NS)
793
            smlink_map = {}
794
            for link in el_smlink_list:
795
                link_log = link.get('{%s}from' % NS['xlink'])
796
                link_phy = link.get('{%s}to' % NS['xlink'])
797
                smlink_map.setdefault(link_log, list()).append(link_phy)
798
            struct_cache = {k: {} for k in METS_STRUCT_DIV_ATTRIBUTE}
799
            for el_div in el_struct_list:
800
                for attr in METS_STRUCT_DIV_ATTRIBUTE:
801
                    if not el_div.get(attr.name):
802
                        # avoid mapping None indiscriminately
803
                        continue
804
                    val = struct_cache[attr].setdefault(str(el_div.get(attr.name)), list())
805
                    val.extend(smlink_map.get(el_div.get('ID'), []))
806
            log.debug("found %d smLink entries for %d logical divs", len(el_smlink_list), len(el_struct_list))
807
            page_attr_patterns_matched = []
808
            for page in self._tree.getroot().xpath(
809
                    'mets:structMap[@TYPE="PHYSICAL"]/mets:div[@TYPE="physSequence"]/mets:div[@TYPE="page"]',
810
                    namespaces=NS):
811
                patterns_exhausted = []
812
                for pat in page_attr_patterns:
813
                    for attr in pat.attr:
814
                        if isinstance(attr, METS_PAGE_DIV_ATTRIBUTE):
815
                            cache = [page.get(attr.name) or '']
816
                        else:
817
                            cache = struct_cache[attr]
818
                        if (isinstance(pat, METS_DIV_ATTRIBUTE_RANGE_PATTERN) and
819
                            # @TYPE makes no sense in range expressions
820
                            # @LABEL makes no sense in range expressions
821
                            attr in [METS_STRUCT_DIV_ATTRIBUTE.TYPE,
822
                                     METS_STRUCT_DIV_ATTRIBUTE.LABEL]):
823
                            continue
824
                        if cache_keys := [v for v in cache if pat.matches(v)]:
825
                            pat.attr = [attr] # disambiguate next
826
                            if isinstance(attr, METS_PAGE_DIV_ATTRIBUTE):
827
                                ret.append(page)
828
                                log.debug('physical match for %s on page %s', pat, page.get('ID'))
829
                                if isinstance(pat, METS_DIV_ATTRIBUTE_ATOM_PATTERN):
830
                                    patterns_exhausted.append(pat)
831
                                elif isinstance(pat, METS_DIV_ATTRIBUTE_RANGE_PATTERN):
832
                                    # remove for efficiency and final range check
833
                                    pat.expr.remove(cache_keys[0])
834
                                    if not pat.expr:
835
                                        patterns_exhausted.append(pat)
836
                            elif cache_key := next((v for v in cache_keys
837
                                                    if page.get('ID') in cache[v]), None):
838
                                ret.append(page)
839
                                log.debug('logical match for %s on page %s', pat, page.get('ID'))
840
                                cache[cache_key].remove(page.get('ID'))
841
                                # remove for efficiency and final range check
842
                                if not cache[cache_key]:
843
                                    if isinstance(pat, METS_DIV_ATTRIBUTE_ATOM_PATTERN):
844
                                        patterns_exhausted.append(pat)
845
                                    elif isinstance(pat, METS_DIV_ATTRIBUTE_RANGE_PATTERN):
846
                                        pat.expr.remove(cache_key)
847
                                        if not pat.expr:
848
                                            patterns_exhausted.append(pat)
849
                            page_attr_patterns_matched.append(pat)
850
                            break # no more attributes for this pattern
851
                    if page in ret:
852
                        break # no more patterns for this page
853
                for p in patterns_exhausted:
854
                    page_attr_patterns.remove(p)
855
            unmatched = [x for x in page_attr_patterns_copy
856
                         if x not in page_attr_patterns_matched]
857
            if unmatched:
858
                raise ValueError(f"Patterns {unmatched} match none of the pages")
859
860
        ranges_without_start_match = []
861
        ranges_without_stop_match = []
862
        for pat in page_attr_patterns_copy:
863
            if isinstance(pat, METS_DIV_ATTRIBUTE_RANGE_PATTERN):
864
                # range expression, expanded to pattern list
865
                # list items get consumed (pat.expr.remove) when matched,
866
                # exhausted patterns also get consumed (page_attr_patterns.remove)
867
                # (but top-level list copy references the same list objects)
868
                log.debug(pat)
869
                if pat.start in pat.expr:
870
                    ranges_without_start_match.append(pat)
871
                # if pat.stop in pat.expr:
872
                #     ranges_without_stop_match.append(pat)
873
        if ranges_without_start_match:
874
            raise ValueError(f"Start of range patterns {ranges_without_start_match} not matched - invalid range")
875
        # if ranges_without_stop_match:
876
        #     raise ValueError(f"End of range patterns {ranges_without_stop_match} not matched - invalid range")
877
        return ret
878
879
    def set_physical_page_for_file(self, pageId : str, ocrd_file : OcrdFile, 
880
                                   order : Optional[str] = None, orderlabel : Optional[str] = None) -> None:
881
        """
882
        Set the physical page ID (``@ID`` of the physical ``mets:structMap`` ``mets:div`` entry)
883
        corresponding to the ``mets:file`` :py:attr:`ocrd_file`, creating all structures if necessary.
884
        Arguments:
885
            pageId (string): ``@ID`` of the physical ``mets:structMap`` entry to use
886
            ocrd_file (object): existing :py:class:`ocrd_models.ocrd_file.OcrdFile` object
887
        Keyword Args:
888
            order (string): ``@ORDER`` to use
889
            orderlabel (string): ``@ORDERLABEL`` to use
890
        """
891
892
        # delete any existing page mapping for this file.ID
893
        fptrs = []
894
        if self._cache_flag:
895
            for page, fptrdict in self._fptr_cache.items():
896
                if ocrd_file.ID in fptrdict:
897
                    if fptrdict[ocrd_file.ID] is not None:
898
                        fptrs.append(fptrdict[ocrd_file.ID])
899
        else:
900
            fptrs = self._tree.getroot().findall(
901
                'mets:structMap[@TYPE="PHYSICAL"]/mets:div[@TYPE="physSequence"]/mets:div[@TYPE="page"]/mets:fptr[@FILEID="%s"]' %
902
                ocrd_file.ID, namespaces=NS)
903
904
        for el_fptr in fptrs:
905
            if self._cache_flag:
906
                del self._fptr_cache[el_fptr.getparent().get('ID')][ocrd_file.ID]
907
            el_fptr.getparent().remove(el_fptr)
908
909
        # find/construct as necessary
910
        el_structmap = self._tree.getroot().find('mets:structMap[@TYPE="PHYSICAL"]', NS)
911
        if el_structmap is None:
912
            el_structmap = ET.SubElement(self._tree.getroot(), TAG_METS_STRUCTMAP)
913
            el_structmap.set('TYPE', 'PHYSICAL')
914
        el_seqdiv = el_structmap.find('mets:div[@TYPE="physSequence"]', NS)
915
        if el_seqdiv is None:
916
            el_seqdiv = ET.SubElement(el_structmap, TAG_METS_DIV)
917
            el_seqdiv.set('TYPE', 'physSequence')
918
919
        el_pagediv = None
920
        if self._cache_flag:
921
            if pageId in self._page_cache[METS_PAGE_DIV_ATTRIBUTE.ID]:
922
                el_pagediv = self._page_cache[METS_PAGE_DIV_ATTRIBUTE.ID][pageId]
923
        else:
924
            el_pagediv = el_seqdiv.find('mets:div[@ID="%s"]' % pageId, NS)
925
926
        if el_pagediv is None:
927
            el_pagediv = ET.SubElement(el_seqdiv, TAG_METS_DIV)
928
            el_pagediv.set('TYPE', 'page')
929
            el_pagediv.set('ID', pageId)
930
            if order:
931
                el_pagediv.set('ORDER', order)
932
            if orderlabel:
933
                el_pagediv.set('ORDERLABEL', orderlabel)
934
            if self._cache_flag:
935
                # Create a new entry in the page cache
936
                self._page_cache[METS_PAGE_DIV_ATTRIBUTE.ID][pageId] = el_pagediv
937
                # Create a new entry in the fptr cache and 
938
                # assign an empty dictionary to hold the fileids
939
                self._fptr_cache.setdefault(pageId, {})
940
941
        el_fptr = ET.SubElement(el_pagediv, TAG_METS_FPTR)
942
        el_fptr.set('FILEID', ocrd_file.ID)
943
944
        if self._cache_flag:
945
            # Assign the ocrd fileID to the pageId in the cache
946
            self._fptr_cache[pageId].update({ocrd_file.ID: el_fptr})
947
948
    def update_physical_page_attributes(self, page_id : str, **kwargs) -> None:
949
        invalid_keys = list(k for k in kwargs if k not in METS_PAGE_DIV_ATTRIBUTE.names())
950
        if invalid_keys:
951
            raise ValueError(f"Invalid attribute {invalid_keys}. Allowed values: {METS_PAGE_DIV_ATTRIBUTE.names()}")
952
953
        page_div = self.get_physical_pages(for_pageIds=page_id, return_divs=True)
954
        if not page_div:
955
            raise ValueError(f"Could not find mets:div[@ID=={page_id}]")
956
        page_div = page_div[0]
957
958
        for k, v in kwargs.items():
959
            if not v:
960
                page_div.attrib.pop(k)
961
            else:
962
                page_div.attrib[k] = v
963
964
    def get_physical_page_for_file(self, ocrd_file : OcrdFile) -> Optional[str]:
965
        """
966
        Get the physical page ID (``@ID`` of the physical ``mets:structMap`` ``mets:div`` entry)
967
        corresponding to the ``mets:file`` :py:attr:`ocrd_file`.
968
        """
969
        if self._cache_flag:
970
            for pageId, fptrdict in self._fptr_cache.items():
971
                if ocrd_file.ID in fptrdict:
972
                    return pageId
973
        else:
974
            ret = self._tree.getroot().find(
975
                'mets:structMap[@TYPE="PHYSICAL"]/mets:div[@TYPE="physSequence"]/mets:div[@TYPE="page"]/mets:fptr[@FILEID="%s"]' %
976
                ocrd_file.ID, namespaces=NS)
977
            if ret is not None:
978
                return ret.getparent().get('ID')
979
980
    def remove_physical_page(self, ID : str) -> None:
981
        """
982
        Delete page (physical ``mets:structMap`` ``mets:div`` entry ``@ID``) :py:attr:`ID`.
983
        """
984
        mets_div = None
985
        if self._cache_flag:
986
            if ID in self._page_cache[METS_PAGE_DIV_ATTRIBUTE.ID]:
987
                mets_div = [self._page_cache[METS_PAGE_DIV_ATTRIBUTE.ID][ID]]
988
        else:
989
            mets_div = self._tree.getroot().xpath(
990
                'mets:structMap[@TYPE="PHYSICAL"]/mets:div[@TYPE="physSequence"]/mets:div[@TYPE="page"][@ID="%s"]' % ID,
991
                namespaces=NS)
992
        if mets_div:
993
            mets_div_attrib = {** mets_div[0].attrib}
994
            mets_div[0].getparent().remove(mets_div[0])
995
            if self._cache_flag:
996
                for attr in METS_PAGE_DIV_ATTRIBUTE:
997
                    if attr.name in mets_div_attrib:
998
                        del self._page_cache[attr][mets_div_attrib[attr.name]]
999
                del self._fptr_cache[ID]
1000
1001
    def remove_physical_page_fptr(self, fileId : str) -> List[str]:
1002
        """
1003
        Delete all ``mets:fptr[@FILEID = fileId]`` to ``mets:file[@ID == fileId]`` for :py:attr:`fileId` from all ``mets:div`` entries in the physical ``mets:structMap``.
1004
        Returns:
1005
            List of pageIds that mets:fptrs were deleted from
1006
        """
1007
1008
        # Question: What is the reason to keep a list of mets_fptrs?
1009
        # Do we have a situation in which the fileId is same for different pageIds ?
1010
        # From the examples I have seen inside 'assets' that is not the case
1011
        # and the mets_fptrs list will always contain a single element.
1012
        # If that's the case then we do not need to iterate 2 loops, just one.
1013
        mets_fptrs = []
1014
        if self._cache_flag:
1015
            for pageId, fptrdict in self._fptr_cache.items():
1016
                if fileId in fptrdict:
1017
                    mets_fptrs.append(fptrdict[fileId])
1018
        else:
1019
            mets_fptrs = self._tree.getroot().xpath(
1020
                'mets:structMap[@TYPE="PHYSICAL"]/mets:div[@TYPE="physSequence"]/mets:div[@TYPE="page"]/mets:fptr[@FILEID="%s"]' % fileId,
1021
                namespaces=NS)
1022
        ret = []
1023
        for mets_fptr in mets_fptrs:
1024
            mets_div = mets_fptr.getparent()
1025
            ret.append(mets_div.get('ID'))
1026
            if self._cache_flag:
1027
                del self._fptr_cache[mets_div.get('ID')][mets_fptr.get('FILEID')]
1028
            mets_div.remove(mets_fptr)
1029
        return ret
1030
1031
    @property
1032
    def physical_pages_labels(self) -> Dict[str, Tuple[Optional[str], Optional[str], Optional[str]]]:
1033
        """
1034
        Map all page IDs (the ``@ID`` of each physical ``mets:structMap`` ``mets:div``) to their
1035
        ``@ORDER``, ``@ORDERLABEL`` and ``@LABEL`` attributes, if any.
1036
        """
1037
        divs = self._tree.getroot().xpath(
1038
            'mets:structMap[@TYPE="PHYSICAL"]/mets:div[@TYPE="physSequence"]/mets:div[@TYPE="page"]',
1039
            namespaces=NS)
1040
        return {div.get('ID'): (div.get('ORDER', None), div.get('ORDERLABEL', None), div.get('LABEL', None))
1041
                for div in divs}
1042
1043
    def merge(self, other_mets, force : bool = False, 
1044
              fileGrp_mapping : Optional[Dict[str, str]] = None, 
1045
              fileId_mapping : Optional[Dict[str, str]] = None, 
1046
              pageId_mapping : Optional[Dict[str, str]] = None,
1047
              after_add_cb : Optional[Callable[[OcrdFile], Any]] = None, **kwargs) -> None:
1048
        """
1049
        Add all files from other_mets.
1050
        Accepts the same kwargs as :py:func:`find_files`
1051
        Keyword Args:
1052
            force (boolean): Whether to do :py:meth:`add_file` with ``force`` (overwriting existing ``mets:file`` entries)
1053
            fileGrp_mapping (dict): Map :py:attr:`other_mets` fileGrp to fileGrp in this METS
1054
            fileId_mapping (dict): Map :py:attr:`other_mets` file ID to file ID in this METS
1055
            pageId_mapping (dict): Map :py:attr:`other_mets` page ID to page ID in this METS
1056
            after_add_cb (function): Callback received after file is added to the METS
1057
        """
1058
        if not fileGrp_mapping:
1059
            fileGrp_mapping = {}
1060
        if not fileId_mapping:
1061
            fileId_mapping = {}
1062
        if not pageId_mapping:
1063
            pageId_mapping = {}
1064
        for f_src in other_mets.find_files(**kwargs):
1065
            f_dest = self.add_file(
1066
                fileGrp_mapping.get(f_src.fileGrp, f_src.fileGrp),
1067
                mimetype=f_src.mimetype,
1068
                url=f_src.url,
1069
                local_filename=f_src.local_filename,
1070
                ID=fileId_mapping.get(f_src.ID, f_src.ID),
1071
                pageId=pageId_mapping.get(f_src.pageId, f_src.pageId),
1072
                force=force)
1073
            # FIXME: merge metsHdr, amdSec, dmdSec as well
1074
            # FIXME: merge structMap logical and structLink as well
1075
            if after_add_cb:
1076
                after_add_cb(f_dest)
1077