ocrd_models.ocrd_mets.OcrdMets.add_file()   D
last analyzed

Complexity

Conditions 12

Size

Total Lines 54
Code Lines 31

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 31
dl 0
loc 54
rs 4.8
c 0
b 0
f 0
cc 12
nop 10

How to fix   Long Method    Complexity    Many Parameters   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

Complexity

Complex classes like ocrd_models.ocrd_mets.OcrdMets.add_file() often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

Many Parameters

Methods with many parameters are not only hard to understand, but their parameters also often become inconsistent when you need more, or different data.

There are several approaches to avoid long parameter lists:

1
"""
2
API to METS
3
"""
4
from datetime import datetime
5
import re
6
from lxml import etree as ET
7
from typing import Any, Callable, Dict, Iterator, List, Optional, Tuple, Union
8
9
from ocrd_utils import (
10
    getLogger,
11
    generate_range,
12
    VERSION,
13
    REGEX_PREFIX,
14
    REGEX_FILE_ID
15
)
16
17
from ocrd_utils.config import config
18
19
from .constants import (
20
    NAMESPACES as NS,
21
    TAG_METS_AGENT,
22
    TAG_METS_DIV,
23
    TAG_METS_FILE,
24
    TAG_METS_FILEGRP,
25
    TAG_METS_FILESEC,
26
    TAG_METS_FPTR,
27
    TAG_METS_METSHDR,
28
    TAG_METS_STRUCTMAP,
29
    IDENTIFIER_PRIORITY,
30
    TAG_MODS_IDENTIFIER,
31
    METS_XML_EMPTY,
32
    METS_PAGE_DIV_ATTRIBUTE
33
)
34
35
from .ocrd_xml_base import OcrdXmlDocument, ET      # type: ignore
36
from .ocrd_file import OcrdFile
37
from .ocrd_agent import OcrdAgent
38
39
REGEX_PREFIX_LEN = len(REGEX_PREFIX)
40
41
class OcrdMets(OcrdXmlDocument):
42
    """
43
    API to a single METS file
44
    """
45
    _cache_flag : bool
46
    # Cache for the pages (mets:div)
47
    # The dictionary's Key: 'div.ID'
48
    # The dictionary's Value: a 'div' object at some memory location
49
    _page_cache : Dict[METS_PAGE_DIV_ATTRIBUTE, Dict[str, ET._Element]]
50
    # Cache for the files (mets:file) - two nested dictionaries
51
    # The outer dictionary's Key: 'fileGrp.USE'
52
    # The outer dictionary's Value: Inner dictionary
53
    # The inner dictionary's Key: 'file.ID'
54
    # The inner dictionary's Value: a 'file' object at some memory location
55
    _file_cache : Dict[str, Dict[str, ET._Element]]
56
    # Cache for the file pointers (mets:fptr) - two nested dictionaries
57
    # The outer dictionary's Key: 'div.ID'
58
    # The outer dictionary's Value: Inner dictionary
59
    # The inner dictionary's Key: 'fptr.FILEID'
60
    # The inner dictionary's Value: a 'fptr' object at some memory location
61
    _fptr_cache : Dict[str, Dict[str, ET._Element]]
62
63
    @staticmethod
64
    def empty_mets(now : Optional[str] = None, cache_flag : bool = False):
65
        """
66
        Create an empty METS file from bundled template.
67
        """
68
        if not now:
69
            now = datetime.now().isoformat()
70
        tpl = METS_XML_EMPTY
71
        tpl = tpl.replace('{{ VERSION }}', VERSION)
72
        tpl = tpl.replace('{{ NOW }}', '%s' % now)
73
        return OcrdMets(content=tpl.encode('utf-8'), cache_flag=cache_flag)
74
75
    def __init__(self, **kwargs) -> None:
76
        """
77
        """
78
        super().__init__(**kwargs)
79
80
        # XXX If the environment variable OCRD_METS_CACHING is set to "true",
81
        # then enable caching, if "false", disable caching, overriding the
82
        # kwarg to the constructor
83
        if config.is_set('OCRD_METS_CACHING'):
84
            getLogger('ocrd.models.ocrd_mets').debug('METS Caching %s because OCRD_METS_CACHING is %s',
85
                    'enabled' if config.OCRD_METS_CACHING else 'disabled', config.raw_value('OCRD_METS_CACHING'))
86
            self._cache_flag = config.OCRD_METS_CACHING
87
88
89
        # If cache is enabled
90
        if self._cache_flag:
91
            self._initialize_caches()
92
            self._refresh_caches()
93
94
    def __str__(self) -> str:
95
        """
96
        String representation
97
        """
98
        return 'OcrdMets[cached=%s,fileGrps=%s,files=%s]' % (
99
        self._cache_flag, self.file_groups, list(self.find_files()))
100
101
    def _fill_caches(self) -> None:
102
        """
103
        Fills the caches with fileGrps and FileIDs
104
        """
105
106
        tree_root = self._tree.getroot()
107
108
        # Fill with files
109
        el_fileSec = tree_root.find("mets:fileSec", NS)
110
        if el_fileSec is None:
111
            return
112
113
        log = getLogger('ocrd.models.ocrd_mets._fill_caches-files')
114
115
        for el_fileGrp in el_fileSec.findall('mets:fileGrp', NS):
116
            fileGrp_use = el_fileGrp.get('USE')
117
118
            # Assign an empty dictionary that will hold the files of the added fileGrp
119
            self._file_cache[fileGrp_use] = {}
120
121
            for el_file in el_fileGrp:
122
                file_id = el_file.get('ID')
123
                self._file_cache[fileGrp_use].update({file_id: el_file})
124
                # log.info("File added to the cache: %s" % file_id)
125
126
        # Fill with pages
127
        el_div_list = tree_root.findall(".//mets:div[@TYPE='page']", NS)
128
        if len(el_div_list) == 0:
129
            return
130
        log = getLogger('ocrd.models.ocrd_mets._fill_caches-pages')
131
132
        for el_div in el_div_list:
133
            div_id = el_div.get('ID')
134
            log.debug("DIV_ID: %s" % el_div.get('ID'))
135
136
            for attr in METS_PAGE_DIV_ATTRIBUTE:
137
                self._page_cache[attr][str(el_div.get(attr.name))] = el_div
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable str does not seem to be defined.
Loading history...
138
139
            # Assign an empty dictionary that will hold the fptr of the added page (div)
140
            self._fptr_cache[div_id] = {}
141
142
            # log.info("Page_id added to the cache: %s" % div_id)
143
144
            for el_fptr in el_div:
145
                self._fptr_cache[div_id].update({el_fptr.get('FILEID'): el_fptr})
146
                # log.info("Fptr added to the cache: %s" % el_fptr.get('FILEID'))
147
148
        # log.info("Len of page_cache: %s" % len(self._page_cache[METS_PAGE_DIV_ATTRIBUTE.ID]))
149
        # log.info("Len of fptr_cache: %s" % len(self._fptr_cache))
150
151
    def _initialize_caches(self) -> None:
152
        self._file_cache = {}
153
        # NOTE we can only guarantee uniqueness for @ID and @ORDER
154
        self._page_cache = {k : {} for k in METS_PAGE_DIV_ATTRIBUTE}
155
        self._fptr_cache = {}
156
157
    def _refresh_caches(self) -> None:
158
        if self._cache_flag:
159
            self._initialize_caches()
160
161
            # Note, if the empty_mets() function is used to instantiate OcrdMets
162
            # Then the cache is empty even after this operation
163
            self._fill_caches()
164
165
    @property
166
    def unique_identifier(self) -> Optional[str]:
167
        """
168
        Get the unique identifier by looking through ``mods:identifier``
169
        See `specs <https://ocr-d.de/en/spec/mets#unique-id-for-the-document-processed>`_ for details.
170
        """
171
        for t in IDENTIFIER_PRIORITY:
172
            found = self._tree.getroot().find('.//mods:identifier[@type="%s"]' % t, NS)
173
            if found is not None:
174
                return found.text
175
176
    @unique_identifier.setter
177
    def unique_identifier(self, purl : str) -> None:
178
        """
179
        Set the unique identifier by looking through ``mods:identifier``
180
        See `specs <https://ocr-d.de/en/spec/mets#unique-id-for-the-document-processed>`_ for details.
181
        """
182
        id_el = None
183
        for t in IDENTIFIER_PRIORITY:
184
            id_el = self._tree.getroot().find('.//mods:identifier[@type="%s"]' % t, NS)
185
            if id_el is not None:
186
                break
187
        if id_el is None:
188
            mods = self._tree.getroot().find('.//mods:mods', NS)
189
            assert mods is not None
190
            id_el = ET.SubElement(mods, TAG_MODS_IDENTIFIER)
191
            id_el.set('type', 'purl')
192
        id_el.text = purl
193
194
    @property
195
    def agents(self) -> List[OcrdAgent]:
196
        """
197
        List all :py:class:`ocrd_models.ocrd_agent.OcrdAgent` entries.
198
        """
199
        return [OcrdAgent(el_agent) for el_agent in self._tree.getroot().findall('mets:metsHdr/mets:agent', NS)]
200
201
    def add_agent(self, **kwargs) -> OcrdAgent:
202
        """
203
        Add an :py:class:`ocrd_models.ocrd_agent.OcrdAgent` to the list of agents in the ``metsHdr``.
204
        """
205
        el_metsHdr = self._tree.getroot().find('.//mets:metsHdr', NS)
206
        if el_metsHdr is None:
207
            el_metsHdr = ET.Element(TAG_METS_METSHDR)
208
            self._tree.getroot().insert(0, el_metsHdr)
209
        #  assert(el_metsHdr is not None)
210
        el_agent = ET.Element(TAG_METS_AGENT)
211
        try:
212
            el_agent_last = next(el_metsHdr.iterchildren(tag=TAG_METS_AGENT, reversed=True))
213
            el_agent_last.addnext(el_agent)
214
        except StopIteration:
215
            el_metsHdr.insert(0, el_agent)
216
        return OcrdAgent(el_agent, **kwargs)
217
218
    @property
219
    def file_groups(self) -> List[str]:
220
        """
221
        List the ``@USE`` of all ``mets:fileGrp`` entries.
222
        """
223
224
        # WARNING: Actually we cannot return strings in place of elements!
225
        if self._cache_flag:
226
            return list(self._file_cache.keys())
227
228
        return [el.get('USE') for el in self._tree.getroot().findall('.//mets:fileGrp', NS)]
229
230
    def find_all_files(self, *args, **kwargs) -> List[OcrdFile]:
231
        """
232
        Like :py:meth:`find_files` but return a list of all results.
233
        Equivalent to ``list(self.find_files(...))``
234
        """
235
        return list(self.find_files(*args, **kwargs))
236
237
    # pylint: disable=multiple-statements
238
    def find_files(
239
        self,
240
        ID : Optional[str] = None,
241
        fileGrp : Optional[str] = None,
242
        pageId : Optional[str] = None,
243
        mimetype : Optional[str] = None,
244
        url : Optional[str] = None,
245
        local_filename : Optional[str] = None,
246
        local_only : bool = False,
247
        include_fileGrp : Optional[List[str]] = None,
248
        exclude_fileGrp : Optional[List[str]] = None,
249
    ) -> Iterator[OcrdFile]:
250
        """
251
        Search ``mets:file`` entries in this METS document and yield results.
252
        The :py:attr:`ID`, :py:attr:`pageId`, :py:attr:`fileGrp`,
253
        :py:attr:`url` and :py:attr:`mimetype` parameters can each be either a
254
        literal string, or a regular expression if the string starts with
255
        ``//`` (double slash).
256
        If it is a regex, the leading ``//`` is removed and candidates are matched
257
        against the regex with `re.fullmatch`. If it is a literal string, comparison
258
        is done with string equality.
259
        The :py:attr:`pageId` parameter supports the numeric range operator ``..``. For
260
        example, to find all files in pages ``PHYS_0001`` to ``PHYS_0003``,
261
        ``PHYS_0001..PHYS_0003`` will be expanded to ``PHYS_0001,PHYS_0002,PHYS_0003``.
262
        Keyword Args:
263
            ID (string) : ``@ID`` of the ``mets:file``
264
            fileGrp (string) : ``@USE`` of the ``mets:fileGrp`` to list files of
265
            pageId (string) : ``@ID`` of the corresponding physical ``mets:structMap`` entry (physical page)
266
            url (string) : ``@xlink:href`` remote/original URL of ``mets:Flocat`` of ``mets:file``
267
            local_filename (string) : ``@xlink:href`` local/cached filename of ``mets:Flocat`` of ``mets:file``
268
            mimetype (string) : ``@MIMETYPE`` of ``mets:file``
269
            local (boolean) : Whether to restrict results to local files in the filesystem
270
            include_fileGrp (list[str]) : List of allowed file groups
271
            exclude_fileGrp (list[str]) : List of disallowd file groups
272
        Yields:
273
            :py:class:`ocrd_models:ocrd_file:OcrdFile` instantiations
274
        """
275
        pageId_list = []
276
        if pageId:
277
            # returns divs instead of strings of ids
278
            physical_pages = self.get_physical_pages(for_pageIds=pageId, return_divs=True)
279
            for div in physical_pages:
280
                if self._cache_flag:
281
                    pageId_list += self._fptr_cache[div.get('ID')]
282
                else:
283
                    pageId_list += [fptr.get('FILEID') for fptr in div.findall('mets:fptr', NS)]
284
285
        if ID and ID.startswith(REGEX_PREFIX):
286
            ID = re.compile(ID[REGEX_PREFIX_LEN:])
287
        if fileGrp and fileGrp.startswith(REGEX_PREFIX):
288
            fileGrp = re.compile(fileGrp[REGEX_PREFIX_LEN:])
289
        if mimetype and mimetype.startswith(REGEX_PREFIX):
290
            mimetype = re.compile(mimetype[REGEX_PREFIX_LEN:])
291
        if url and url.startswith(REGEX_PREFIX):
292
            url = re.compile(url[REGEX_PREFIX_LEN:])
293
294
        candidates = []
295
        if self._cache_flag:
296
            if fileGrp:
297
                if isinstance(fileGrp, str):
298
                    candidates += self._file_cache.get(fileGrp, {}).values()
299
                else:
300
                    candidates = [x for fileGrp_needle, el_file_list in self._file_cache.items() if
301
                                  fileGrp.match(fileGrp_needle) for x in el_file_list.values()]
302
            else:
303
                candidates = [el_file for id_to_file in self._file_cache.values() for el_file in id_to_file.values()]
304
        else:
305
            candidates = self._tree.getroot().xpath('//mets:file', namespaces=NS)
306
307
        for cand in candidates:
308
            if ID:
309
                if isinstance(ID, str):
310
                    if not ID == cand.get('ID'): continue
311
                else:
312
                    if not ID.fullmatch(cand.get('ID')): continue
313
314
            if pageId is not None and cand.get('ID') not in pageId_list:
315
                continue
316
317
            if not self._cache_flag and fileGrp:
318
                if isinstance(fileGrp, str):
319
                    if cand.getparent().get('USE') != fileGrp: continue
320
                else:
321
                    if not fileGrp.fullmatch(cand.getparent().get('USE')): continue
322
323
            if mimetype:
324
                if isinstance(mimetype, str):
325
                    if cand.get('MIMETYPE') != mimetype: continue
326
                else:
327
                    if not mimetype.fullmatch(cand.get('MIMETYPE') or ''): continue
328
329
            if url:
330
                cand_locat = cand.find('mets:FLocat[@LOCTYPE="URL"]', namespaces=NS)
331
                if cand_locat is None:
332
                    continue
333
                cand_url = cand_locat.get('{%s}href' % NS['xlink'])
334
                if isinstance(url, str):
335
                    if cand_url != url: continue
336
                else:
337
                    if not url.fullmatch(cand_url): continue
338
339
            if local_filename:
340
                cand_locat = cand.find('mets:FLocat[@LOCTYPE="OTHER"][@OTHERLOCTYPE="FILE"]', namespaces=NS)
341
                if cand_locat is None:
342
                    continue
343
                cand_local_filename = cand_locat.get('{%s}href' % NS['xlink'])
344
                if isinstance(local_filename, str):
345
                    if cand_local_filename != local_filename: continue
346
                else:
347
                    if not local_filename.fullmatch(cand_local_filename): continue
348
349
            if local_only:
350
                # deprecation_warning("'local_only' is deprecated, use 'local_filename=\"//.+\"' instead")
351
                is_local = cand.find('mets:FLocat[@LOCTYPE="OTHER"][@OTHERLOCTYPE="FILE"][@xlink:href]', namespaces=NS)
352
                if is_local is None:
353
                    continue
354
355
            ret = OcrdFile(cand, mets=self)
356
357
            # XXX include_fileGrp is redundant to fileGrp but for completeness
358
            if exclude_fileGrp and ret.fileGrp in exclude_fileGrp:
359
                continue
360
            if include_fileGrp and ret.fileGrp not in include_fileGrp:
361
                continue
362
363
            yield ret
364
365
    def add_file_group(self, fileGrp: str) -> ET._Element:
366
        """
367
        Add a new ``mets:fileGrp``.
368
        Arguments:
369
            fileGrp (string): ``@USE`` of the new ``mets:fileGrp``.
370
        """
371
        if ',' in fileGrp:
372
            raise ValueError('fileGrp must not contain commas')
373
        el_fileSec = self._tree.getroot().find('mets:fileSec', NS)
374
        if el_fileSec is None:
375
            el_fileSec = ET.SubElement(self._tree.getroot(), TAG_METS_FILESEC)
376
        el_fileGrp = el_fileSec.find('mets:fileGrp[@USE="%s"]' % fileGrp, NS)
377
        if el_fileGrp is None:
378
            el_fileGrp = ET.SubElement(el_fileSec, TAG_METS_FILEGRP)
379
            el_fileGrp.set('USE', fileGrp)
380
381
            if self._cache_flag:
382
                # Assign an empty dictionary that will hold the files of the added fileGrp
383
                self._file_cache[fileGrp] = {}
384
385
        return el_fileGrp
386
387
    def rename_file_group(self, old: str, new: str) -> None:
388
        """
389
        Rename a ``mets:fileGrp`` by changing the ``@USE`` from :py:attr:`old` to :py:attr:`new`.
390
        """
391
        el_fileGrp = self._tree.getroot().find('mets:fileSec/mets:fileGrp[@USE="%s"]' % old, NS)
392
        if el_fileGrp is None:
393
            raise FileNotFoundError("No such fileGrp '%s'" % old)
394
        el_fileGrp.set('USE', new)
395
396
        if self._cache_flag:
397
            self._file_cache[new] = self._file_cache.pop(old)
398
399
    def remove_file_group(self, USE: str, recursive : bool = False, force : bool = False) -> None:
400
        """
401
        Remove a ``mets:fileGrp`` (single fixed ``@USE`` or multiple regex ``@USE``)
402
        Arguments:
403
            USE (string): ``@USE`` of the ``mets:fileGrp`` to delete. Can be a regex if prefixed with ``//``
404
            recursive (boolean): Whether to recursively delete each ``mets:file`` in the group
405
            force (boolean): Do not raise an exception if ``mets:fileGrp`` does not exist
406
        """
407
        log = getLogger('ocrd.models.ocrd_mets.remove_file_group')
408
        el_fileSec = self._tree.getroot().find('mets:fileSec', NS)
409
        if el_fileSec is None:
410
            raise Exception("No fileSec!")
411
        if isinstance(USE, str):
412
            if USE.startswith(REGEX_PREFIX):
413
                use = re.compile(USE[REGEX_PREFIX_LEN:])
414
                for cand in el_fileSec.findall('mets:fileGrp', NS):
415
                    if use.fullmatch(cand.get('USE')):
416
                        self.remove_file_group(cand, recursive=recursive)
417
                return
418
            else:
419
                el_fileGrp = el_fileSec.find('mets:fileGrp[@USE="%s"]' % USE, NS)
420
        else:
421
            el_fileGrp = USE
422
        if el_fileGrp is None:  # pylint: disable=len-as-condition
423
            msg = "No such fileGrp: %s" % USE
424
            if force:
425
                log.warning(msg)
426
                return
427
            raise Exception(msg)
428
429
        # The cache should also be used here
430
        if self._cache_flag:
431
            files = self._file_cache.get(el_fileGrp.get('USE'), {}).values()
432
        else:
433
            files = el_fileGrp.findall('mets:file', NS)
434
435
        if files:
436
            if not recursive:
437
                raise Exception("fileGrp %s is not empty and recursive wasn't set" % USE)
438
            for f in list(files):
439
                self.remove_one_file(ID=f.get('ID'), fileGrp=f.getparent().get('USE'))
440
441
        if self._cache_flag:
442
            # Note: Since the files inside the group are removed
443
            # with the 'remove_one_file' method above, 
444
            # we should not take care of that again.
445
            # We just remove the fileGrp.
446
            del self._file_cache[el_fileGrp.get('USE')]
447
448
        el_fileGrp.getparent().remove(el_fileGrp)
449
450
    def add_file(self, fileGrp : str, mimetype : Optional[str] = None, url : Optional[str] = None, 
451
                 ID : Optional[str] = None, pageId : Optional[str] = None, force : bool = False, 
452
                 local_filename : Optional[str] = None, ignore : bool = False, **kwargs) -> OcrdFile:
453
        """
454
        Instantiate and add a new :py:class:`ocrd_models.ocrd_file.OcrdFile`.
455
        Arguments:
456
            fileGrp (string): ``@USE`` of ``mets:fileGrp`` to add to
457
        Keyword Args:
458
            mimetype (string): ``@MIMETYPE`` of the ``mets:file`` to use
459
            url (string): ``@xlink:href`` (URL or path) of the ``mets:file`` to use
460
            ID (string): ``@ID`` of the ``mets:file`` to use
461
            pageId (string): ``@ID`` in the physical ``mets:structMap`` to link to
462
            force (boolean): Whether to add the file even if a ``mets:file`` with the same ``@ID`` already exists.
463
            ignore (boolean): Do not look for existing files at all. Shift responsibility for preventing errors from duplicate ID to the user.
464
            local_filename (string):
465
        """
466
        if not ID:
467
            raise ValueError("Must set ID of the mets:file")
468
        if not fileGrp:
469
            raise ValueError("Must set fileGrp of the mets:file")
470
        if not REGEX_FILE_ID.fullmatch(ID):
471
            raise ValueError("Invalid syntax for mets:file/@ID %s (not an xs:ID)" % ID)
472
        if not REGEX_FILE_ID.fullmatch(fileGrp):
473
            raise ValueError("Invalid syntax for mets:fileGrp/@USE %s (not an xs:ID)" % fileGrp)
474
475
        el_fileGrp = self.add_file_group(fileGrp)
476
        if not ignore:
477
            mets_file = next(self.find_files(ID=ID, fileGrp=fileGrp), None)
478
            if mets_file:
479
                if mets_file.fileGrp == fileGrp and \
480
                        mets_file.pageId == pageId and \
481
                        mets_file.mimetype == mimetype:
482
                    if not force:
483
                        raise FileExistsError(
484
                            f"A file with ID=={ID} already exists {mets_file} and neither force nor ignore are set")
485
                    self.remove_file(ID=ID, fileGrp=fileGrp)
486
                else:
487
                    raise FileExistsError(
488
                        f"A file with ID=={ID} already exists {mets_file} but unrelated - cannot mitigate")
489
490
        # To get rid of Python's FutureWarning - checking if v is not None
491
        kwargs = {k: v for k, v in locals().items()
492
                  if k in ['url', 'ID', 'mimetype', 'pageId', 'local_filename'] and v is not None}
493
        # This separation is needed to reuse the same el_mets_file element in the caching if block
494
        el_mets_file = ET.SubElement(el_fileGrp, TAG_METS_FILE)
495
        # The caching of the physical page is done in the OcrdFile constructor
496
        # (which calls us back with set_physical_page_for_file)
497
        mets_file = OcrdFile(el_mets_file, mets=self, **kwargs)
498
499
        if self._cache_flag:
500
            # Add the file to the file cache
501
            self._file_cache[fileGrp].update({ID: el_mets_file})
502
503
        return mets_file
504
505
    def remove_file(self, *args, **kwargs) -> Union[List[OcrdFile],OcrdFile]:
506
        """
507
        Delete each ``ocrd:file`` matching the query. Same arguments as :py:meth:`find_files`
508
        """
509
        files = list(self.find_files(*args, **kwargs))
510
        if files:
511
            for f in files:
512
                self.remove_one_file(f)
513
            if len(files) > 1:
514
                return files
515
            else:
516
                return files[0]  # for backwards-compatibility
517
        if any(1 for kwarg in kwargs
518
               if isinstance(kwarg, str) and kwarg.startswith(REGEX_PREFIX)):
519
            # allow empty results if filter criteria involve a regex
520
            return []
521
        raise FileNotFoundError("File not found: %s %s" % (args, kwargs))
522
523
    def remove_one_file(self, ID : Union[str, OcrdFile], fileGrp : str = None) -> OcrdFile:
524
        """
525
        Delete an existing :py:class:`ocrd_models.ocrd_file.OcrdFile`.
526
        Arguments:
527
            ID (string|OcrdFile): ``@ID`` of the ``mets:file`` to delete  Can also be an :py:class:`ocrd_models.ocrd_file.OcrdFile` to avoid search via ``ID``.
528
            fileGrp (string): ``@USE`` of the ``mets:fileGrp`` containing the ``mets:file``. Used only for optimization.
529
        Returns:
530
            The old :py:class:`ocrd_models.ocrd_file.OcrdFile` reference.
531
        """
532
        log = getLogger('ocrd.models.ocrd_mets.remove_one_file')
533
        log.debug("remove_one_file(%s %s)" % (ID, fileGrp))
534
        if isinstance(ID, OcrdFile):
535
            ocrd_file = ID
536
            ID = ocrd_file.ID
537
        else:
538
            ocrd_file = next(self.find_files(ID=ID, fileGrp=fileGrp), None)
539
540
        if not ocrd_file:
541
            raise FileNotFoundError("File not found: %s (fileGr=%s)" % (ID, fileGrp))
542
543
        # Delete the physical page ref
544
        fptrs = []
545
        if self._cache_flag:
546
            for pageId, fptrdict in self._fptr_cache.items():
547
                if ID in fptrdict:
548
                    fptrs.append(fptrdict[ID])
549
        else:
550
            fptrs = self._tree.getroot().findall('.//mets:fptr[@FILEID="%s"]' % ID, namespaces=NS)
551
552
        # Delete the physical page ref
553
        for fptr in fptrs:
554
            log.debug("Delete fptr element %s for page '%s'", fptr, ID)
555
            page_div = fptr.getparent()
556
            page_div.remove(fptr)
557
            # Remove the fptr from the cache as well
558
            if self._cache_flag:
559
                del self._fptr_cache[page_div.get('ID')][ID]
560
            # delete empty pages
561
            if not list(page_div):
562
                log.debug("Delete empty page %s", page_div)
563
                page_div.getparent().remove(page_div)
564
                # Delete the empty pages from caches as well
565
                if self._cache_flag:
566
                    for attr in METS_PAGE_DIV_ATTRIBUTE:
567
                        if attr.name in page_div.attrib:
568
                            del self._page_cache[attr][page_div.attrib[attr.name]]
569
570
        # Delete the file reference from the cache
571
        if self._cache_flag:
572
            parent_use = ocrd_file._el.getparent().get('USE')
573
            del self._file_cache[parent_use][ocrd_file.ID]
574
575
        # Delete the file reference
576
        # pylint: disable=protected-access
577
        ocrd_file._el.getparent().remove(ocrd_file._el)
578
579
        return ocrd_file
580
581
    @property
582
    def physical_pages(self) -> List[str]:
583
        """
584
        List all page IDs (the ``@ID`` of each physical ``mets:structMap`` ``mets:div``)
585
        """
586
        if self._cache_flag:
587
            return list(self._page_cache[METS_PAGE_DIV_ATTRIBUTE.ID].keys())
588
589
        return [str(x) for x in self._tree.getroot().xpath(
590
            'mets:structMap[@TYPE="PHYSICAL"]/mets:div[@TYPE="physSequence"]/mets:div[@TYPE="page"]/@ID',
591
            namespaces=NS)]
592
593
    def get_physical_pages(self, for_fileIds : Optional[List[str]] = None, for_pageIds : Optional[str] = None, 
594
                           return_divs : bool = False) -> List[Union[str, ET._Element]]:
595
        """
596
        List all page IDs (the ``@ID`` of each physical ``mets:structMap`` ``mets:div``),
597
        optionally for a subset of ``mets:file`` ``@ID`` :py:attr:`for_fileIds`,
598
        or for a subset selector expression (comma-separated, range, and/or regex) :py:attr:`for_pageIds`.
599
        If return_divs is set, returns div memory objects instead of strings of ids
600
        """
601
        if for_fileIds is None and for_pageIds is None:
602
            if return_divs:
603
                if self._cache_flag:
604
                    return list(self._page_cache[METS_PAGE_DIV_ATTRIBUTE.ID].values())
605
606
                return [x for x in self._tree.getroot().xpath(
607
                    'mets:structMap[@TYPE="PHYSICAL"]/mets:div[@TYPE="physSequence"]/mets:div[@TYPE="page"]',
608
                    namespaces=NS)]
609
610
            return self.physical_pages
611
612
        # log = getLogger('ocrd.models.ocrd_mets.get_physical_pages')
613
        if for_pageIds is not None:
614
            ret = []
615
            page_attr_patterns = []
616
            page_attr_patterns_raw = re.split(r',', for_pageIds)
617
            for pageId_token in page_attr_patterns_raw:
618
                if pageId_token.startswith(REGEX_PREFIX):
619
                    page_attr_patterns.append((None, re.compile(pageId_token[REGEX_PREFIX_LEN:])))
620
                elif '..' in pageId_token:
621
                    val_range = generate_range(*pageId_token.split('..', 1))
622
                    page_attr_patterns.append(val_range)
623
                else:
624
                    page_attr_patterns.append(pageId_token)
625
            if not page_attr_patterns:
626
                return []
627
            range_patterns_first_last = [(x[0], x[-1]) if isinstance(x, list) else None for x in page_attr_patterns]
628
            page_attr_patterns_copy = list(page_attr_patterns)
629
            if self._cache_flag:
630
                for pat in page_attr_patterns:
631
                    try:
632
                        attr : METS_PAGE_DIV_ATTRIBUTE
633
                        if isinstance(pat, str):
634
                            attr = next(a for a in list(METS_PAGE_DIV_ATTRIBUTE) if pat in self._page_cache[a])
635
                            cache_keys = [pat]
636
                        elif isinstance(pat, list):
637
                            attr = next(a for a in list(METS_PAGE_DIV_ATTRIBUTE) if any(x in self._page_cache[a] for x in pat))
638
                            cache_keys = [v for v in pat if v in self._page_cache[attr]]
639
                            for k in cache_keys:
640
                                pat.remove(k)
641
                        elif isinstance(pat, tuple):
642
                            _, re_pat = pat
643
                            attr = next(a for a in list(METS_PAGE_DIV_ATTRIBUTE) for v in self._page_cache[a] if re_pat.fullmatch(v))
644
                            cache_keys = [v for v in self._page_cache[attr] if re_pat.fullmatch(v)]
645
                        else:
646
                            raise ValueError
647
                        if return_divs:
648
                            ret += [self._page_cache[attr][v] for v in cache_keys]
649
                        else:
650
                            ret += [self._page_cache[attr][v].get('ID') for v in cache_keys]
651
                    except StopIteration:
652
                        raise ValueError(f"{pat} matches none of the keys of any of the _page_caches.")
653
            else:
654
                page_attr_patterns_matched = []
655
                for page in self._tree.getroot().xpath(
656
                        'mets:structMap[@TYPE="PHYSICAL"]/mets:div[@TYPE="physSequence"]/mets:div[@TYPE="page"]',
657
                        namespaces=NS):
658
                    patterns_exhausted = []
659
                    for pat_idx, pat in enumerate(page_attr_patterns):
660
                        try:
661
                            if isinstance(pat, str):
662
                                attr = next(a for a in list(METS_PAGE_DIV_ATTRIBUTE) if pat == page.get(a.name))
663
                                ret.append(page if return_divs else page.get('ID'))
664
                                patterns_exhausted.append(pat)
665
                            elif isinstance(pat, list):
666
                                if not isinstance(pat[0], METS_PAGE_DIV_ATTRIBUTE):
667
                                    pat.insert(0, next(a for a in list(METS_PAGE_DIV_ATTRIBUTE) if any(x == page.get(a.name) for x in pat)))
668
                                attr_val = page.get(pat[0].name)
669
                                if attr_val in pat:
670
                                    pat.remove(attr_val)
671
                                    ret.append(page if return_divs else page.get('ID'))
672
                                if len(pat) == 1:
673
                                    patterns_exhausted.append(pat)
674
                            elif isinstance(pat, tuple):
675
                                attr, re_pat = pat
676
                                if not attr:
677
                                    attr = next(a for a in list(METS_PAGE_DIV_ATTRIBUTE) if re_pat.fullmatch(page.get(a.name) or ''))
678
                                    page_attr_patterns[pat_idx] = (attr, re_pat)
679
                                if re_pat.fullmatch(page.get(attr.name) or ''):
680
                                    ret.append(page if return_divs else page.get('ID'))
681
                            else:
682
                                raise ValueError
683
                            page_attr_patterns_matched.append(pat)
684
                        except StopIteration:
685
                            continue
686
                    for p in patterns_exhausted:
687
                        page_attr_patterns.remove(p)
688
                unmatched = [x for x in page_attr_patterns_copy if x not in page_attr_patterns_matched]
689
                if unmatched:
690
                    raise ValueError(f"Patterns {unmatched} match none of the pages")
691
692
            ranges_without_start_match = []
693
            ranges_without_last_match = []
694
            for idx, pat in enumerate(page_attr_patterns_copy):
695
                if isinstance(pat, list):
696
                    start, last = range_patterns_first_last[idx]
697
                    if start in pat:
698
                        print(pat, start, last)
699
                        ranges_without_start_match.append(page_attr_patterns_raw[idx])
700
                    # if last in pat:
701
                    #     ranges_without_last_match.append(page_attr_patterns_raw[idx])
702
            if ranges_without_start_match:
703
                raise ValueError(f"Start of range patterns {ranges_without_start_match} not matched - invalid range")
704
            # if ranges_without_last_match:
705
            #     raise ValueError(f"End of range patterns {ranges_without_last_match} not matched - invalid range")
706
            return ret
707
708
        if for_fileIds == []:
709
            return []
710
        assert for_fileIds # at this point we know for_fileIds is set, assert to convince pyright
711
        ret = [None] * len(for_fileIds)
712
        if self._cache_flag:
713
            for pageId, fptrdict in self._fptr_cache.items():
714
                for fptr in fptrdict:
715
                    if fptr in for_fileIds:
716
                        index = for_fileIds.index(fptr)
717
                        if return_divs:
718
                            ret[index] = self._page_cache[METS_PAGE_DIV_ATTRIBUTE.ID][pageId]
719
                        else:
720
                            ret[index] = pageId
721
        else:
722
            for page in self._tree.getroot().xpath(
723
                    'mets:structMap[@TYPE="PHYSICAL"]/mets:div[@TYPE="physSequence"]/mets:div[@TYPE="page"]',
724
                    namespaces=NS):
725
                for fptr in page.findall('mets:fptr', NS):
726
                    if fptr.get('FILEID') in for_fileIds:
727
                        index = for_fileIds.index(fptr.get('FILEID'))
728
                        if return_divs:
729
                            ret[index] = page
730
                        else:
731
                            ret[index] = page.get('ID')
732
        return ret
733
734
    def set_physical_page_for_file(self, pageId : str, ocrd_file : OcrdFile, 
735
                                   order : Optional[str] = None, orderlabel : Optional[str] = None) -> None:
736
        """
737
        Set the physical page ID (``@ID`` of the physical ``mets:structMap`` ``mets:div`` entry)
738
        corresponding to the ``mets:file`` :py:attr:`ocrd_file`, creating all structures if necessary.
739
        Arguments:
740
            pageId (string): ``@ID`` of the physical ``mets:structMap`` entry to use
741
            ocrd_file (object): existing :py:class:`ocrd_models.ocrd_file.OcrdFile` object
742
        Keyword Args:
743
            order (string): ``@ORDER`` to use
744
            orderlabel (string): ``@ORDERLABEL`` to use
745
        """
746
747
        # delete any existing page mapping for this file.ID
748
        fptrs = []
749
        if self._cache_flag:
750
            for page, fptrdict in self._fptr_cache.items():
751
                if ocrd_file.ID in fptrdict:
752
                    if fptrdict[ocrd_file.ID] is not None:
753
                        fptrs.append(fptrdict[ocrd_file.ID])
754
        else:
755
            fptrs = self._tree.getroot().findall(
756
                'mets:structMap[@TYPE="PHYSICAL"]/mets:div[@TYPE="physSequence"]/mets:div[@TYPE="page"]/mets:fptr[@FILEID="%s"]' %
757
                ocrd_file.ID, namespaces=NS)
758
759
        for el_fptr in fptrs:
760
            if self._cache_flag:
761
                del self._fptr_cache[el_fptr.getparent().get('ID')][ocrd_file.ID]
762
            el_fptr.getparent().remove(el_fptr)
763
764
        # find/construct as necessary
765
        el_structmap = self._tree.getroot().find('mets:structMap[@TYPE="PHYSICAL"]', NS)
766
        if el_structmap is None:
767
            el_structmap = ET.SubElement(self._tree.getroot(), TAG_METS_STRUCTMAP)
768
            el_structmap.set('TYPE', 'PHYSICAL')
769
        el_seqdiv = el_structmap.find('mets:div[@TYPE="physSequence"]', NS)
770
        if el_seqdiv is None:
771
            el_seqdiv = ET.SubElement(el_structmap, TAG_METS_DIV)
772
            el_seqdiv.set('TYPE', 'physSequence')
773
774
        el_pagediv = None
775
        if self._cache_flag:
776
            if pageId in self._page_cache[METS_PAGE_DIV_ATTRIBUTE.ID]:
777
                el_pagediv = self._page_cache[METS_PAGE_DIV_ATTRIBUTE.ID][pageId]
778
        else:
779
            el_pagediv = el_seqdiv.find('mets:div[@ID="%s"]' % pageId, NS)
780
781
        if el_pagediv is None:
782
            el_pagediv = ET.SubElement(el_seqdiv, TAG_METS_DIV)
783
            el_pagediv.set('TYPE', 'page')
784
            el_pagediv.set('ID', pageId)
785
            if order:
786
                el_pagediv.set('ORDER', order)
787
            if orderlabel:
788
                el_pagediv.set('ORDERLABEL', orderlabel)
789
            if self._cache_flag:
790
                # Create a new entry in the page cache
791
                self._page_cache[METS_PAGE_DIV_ATTRIBUTE.ID][pageId] = el_pagediv
792
                # Create a new entry in the fptr cache and 
793
                # assign an empty dictionary to hold the fileids
794
                self._fptr_cache.setdefault(pageId, {})
795
796
        el_fptr = ET.SubElement(el_pagediv, TAG_METS_FPTR)
797
        el_fptr.set('FILEID', ocrd_file.ID)
798
799
        if self._cache_flag:
800
            # Assign the ocrd fileID to the pageId in the cache
801
            self._fptr_cache[pageId].update({ocrd_file.ID: el_fptr})
802
803
    def update_physical_page_attributes(self, page_id : str, **kwargs) -> None:
804
        invalid_keys = list(k for k in kwargs if k not in METS_PAGE_DIV_ATTRIBUTE.names())
805
        if invalid_keys:
806
            raise ValueError(f"Invalid attribute {invalid_keys}. Allowed values: {METS_PAGE_DIV_ATTRIBUTE.names()}")
807
808
        page_div = self.get_physical_pages(for_pageIds=page_id, return_divs=True)
809
        if not page_div:
810
            raise ValueError(f"Could not find mets:div[@ID=={page_id}]")
811
        page_div = page_div[0]
812
813
        for k, v in kwargs.items():
814
            if not v:
815
                page_div.attrib.pop(k)
816
            else:
817
                page_div.attrib[k] = v
818
819
    def get_physical_page_for_file(self, ocrd_file : OcrdFile) -> Optional[str]:
820
        """
821
        Get the physical page ID (``@ID`` of the physical ``mets:structMap`` ``mets:div`` entry)
822
        corresponding to the ``mets:file`` :py:attr:`ocrd_file`.
823
        """
824
        if self._cache_flag:
825
            for pageId, fptrdict in self._fptr_cache.items():
826
                if ocrd_file.ID in fptrdict:
827
                    return pageId
828
        else:
829
            ret = self._tree.getroot().find(
830
                'mets:structMap[@TYPE="PHYSICAL"]/mets:div[@TYPE="physSequence"]/mets:div[@TYPE="page"]/mets:fptr[@FILEID="%s"]' %
831
                ocrd_file.ID, namespaces=NS)
832
            if ret is not None:
833
                return ret.getparent().get('ID')
834
835
    def remove_physical_page(self, ID : str) -> None:
836
        """
837
        Delete page (physical ``mets:structMap`` ``mets:div`` entry ``@ID``) :py:attr:`ID`.
838
        """
839
        mets_div = None
840
        if self._cache_flag:
841
            if ID in self._page_cache[METS_PAGE_DIV_ATTRIBUTE.ID]:
842
                mets_div = [self._page_cache[METS_PAGE_DIV_ATTRIBUTE.ID][ID]]
843
        else:
844
            mets_div = self._tree.getroot().xpath(
845
                'mets:structMap[@TYPE="PHYSICAL"]/mets:div[@TYPE="physSequence"]/mets:div[@TYPE="page"][@ID="%s"]' % ID,
846
                namespaces=NS)
847
        if mets_div:
848
            mets_div_attrib = {** mets_div[0].attrib}
849
            mets_div[0].getparent().remove(mets_div[0])
850
            if self._cache_flag:
851
                for attr in METS_PAGE_DIV_ATTRIBUTE:
852
                    if attr.name in mets_div_attrib:
853
                        del self._page_cache[attr][mets_div_attrib[attr.name]]
854
                del self._fptr_cache[ID]
855
856
    def remove_physical_page_fptr(self, fileId : str) -> List[str]:
857
        """
858
        Delete all ``mets:fptr[@FILEID = fileId]`` to ``mets:file[@ID == fileId]`` for :py:attr:`fileId` from all ``mets:div`` entries in the physical ``mets:structMap``.
859
        Returns:
860
            List of pageIds that mets:fptrs were deleted from
861
        """
862
863
        # Question: What is the reason to keep a list of mets_fptrs?
864
        # Do we have a situation in which the fileId is same for different pageIds ?
865
        # From the examples I have seen inside 'assets' that is not the case
866
        # and the mets_fptrs list will always contain a single element.
867
        # If that's the case then we do not need to iterate 2 loops, just one.
868
        mets_fptrs = []
869
        if self._cache_flag:
870
            for pageId, fptrdict in self._fptr_cache.items():
871
                if fileId in fptrdict:
872
                    mets_fptrs.append(fptrdict[fileId])
873
        else:
874
            mets_fptrs = self._tree.getroot().xpath(
875
                'mets:structMap[@TYPE="PHYSICAL"]/mets:div[@TYPE="physSequence"]/mets:div[@TYPE="page"]/mets:fptr[@FILEID="%s"]' % fileId,
876
                namespaces=NS)
877
        ret = []
878
        for mets_fptr in mets_fptrs:
879
            mets_div = mets_fptr.getparent()
880
            ret.append(mets_div.get('ID'))
881
            if self._cache_flag:
882
                del self._fptr_cache[mets_div.get('ID')][mets_fptr.get('FILEID')]
883
            mets_div.remove(mets_fptr)
884
        return ret
885
886
    @property
887
    def physical_pages_labels(self) -> Dict[str, Tuple[Optional[str], Optional[str], Optional[str]]]:
888
        """
889
        Map all page IDs (the ``@ID`` of each physical ``mets:structMap`` ``mets:div``) to their
890
        ``@ORDER``, ``@ORDERLABEL`` and ``@LABEL`` attributes, if any.
891
        """
892
        divs = self._tree.getroot().xpath(
893
            'mets:structMap[@TYPE="PHYSICAL"]/mets:div[@TYPE="physSequence"]/mets:div[@TYPE="page"]',
894
            namespaces=NS)
895
        return {div.get('ID'): (div.get('ORDER', None), div.get('ORDERLABEL', None), div.get('LABEL', None))
896
                for div in divs}
897
898
    def merge(self, other_mets, force : bool = False, 
899
              fileGrp_mapping : Optional[Dict[str, str]] = None, 
900
              fileId_mapping : Optional[Dict[str, str]] = None, 
901
              pageId_mapping : Optional[Dict[str, str]] = None,
902
              after_add_cb : Optional[Callable[[OcrdFile], Any]] = None, **kwargs) -> None:
903
        """
904
        Add all files from other_mets.
905
        Accepts the same kwargs as :py:func:`find_files`
906
        Keyword Args:
907
            force (boolean): Whether to do :py:meth:`add_file` with ``force`` (overwriting existing ``mets:file`` entries)
908
            fileGrp_mapping (dict): Map :py:attr:`other_mets` fileGrp to fileGrp in this METS
909
            fileId_mapping (dict): Map :py:attr:`other_mets` file ID to file ID in this METS
910
            pageId_mapping (dict): Map :py:attr:`other_mets` page ID to page ID in this METS
911
            after_add_cb (function): Callback received after file is added to the METS
912
        """
913
        if not fileGrp_mapping:
914
            fileGrp_mapping = {}
915
        if not fileId_mapping:
916
            fileId_mapping = {}
917
        if not pageId_mapping:
918
            pageId_mapping = {}
919
        for f_src in other_mets.find_files(**kwargs):
920
            f_dest = self.add_file(
921
                fileGrp_mapping.get(f_src.fileGrp, f_src.fileGrp),
922
                mimetype=f_src.mimetype,
923
                url=f_src.url,
924
                local_filename=f_src.local_filename,
925
                ID=fileId_mapping.get(f_src.ID, f_src.ID),
926
                pageId=pageId_mapping.get(f_src.pageId, f_src.pageId),
927
                force=force)
928
            # FIXME: merge metsHdr, amdSec, dmdSec as well
929
            # FIXME: merge structMap logical and structLink as well
930
            if after_add_cb:
931
                after_add_cb(f_dest)
932