Passed
Pull Request — master (#1063)
by Konstantin
03:13
created

ocrd_models.ocrd_mets.OcrdMets.find_files()   F

Complexity

Conditions 48

Size

Total Lines 126
Code Lines 76

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 76
dl 0
loc 126
rs 0
c 0
b 0
f 0
cc 48
nop 10

How to fix   Long Method    Complexity    Many Parameters   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

Complexity

Complex classes like ocrd_models.ocrd_mets.OcrdMets.find_files() often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

Many Parameters

Methods with many parameters are not only hard to understand, but their parameters also often become inconsistent when you need more, or different data.

There are several approaches to avoid long parameter lists:

1
"""
2
API to METS
3
"""
4
from datetime import datetime
5
import re
6
import typing
7
from typing import Dict, Optional
8
from warnings import warn
9
10
from ocrd_utils import (
11
    getLogger,
12
    deprecation_warning,
13
    generate_range,
14
    VERSION,
15
    REGEX_PREFIX,
16
    REGEX_FILE_ID
17
)
18
19
from ocrd_utils.config import config
20
21
from .constants import (
22
    NAMESPACES as NS,
23
    TAG_METS_AGENT,
24
    TAG_METS_DIV,
25
    TAG_METS_FILE,
26
    TAG_METS_FILEGRP,
27
    TAG_METS_FILESEC,
28
    TAG_METS_FPTR,
29
    TAG_METS_METSHDR,
30
    TAG_METS_STRUCTMAP,
31
    IDENTIFIER_PRIORITY,
32
    TAG_MODS_IDENTIFIER,
33
    METS_XML_EMPTY,
34
    METS_PAGE_DIV_ATTRIBUTE
35
)
36
37
from .ocrd_xml_base import OcrdXmlDocument, ET      # type: ignore
38
from .ocrd_file import OcrdFile
39
from .ocrd_agent import OcrdAgent
40
41
REGEX_PREFIX_LEN = len(REGEX_PREFIX)
42
43
class OcrdMets(OcrdXmlDocument):
44
    """
45
    API to a single METS file
46
    """
47
    _cache_flag : bool
48
    # Cache for the pages (mets:div)
49
    # The dictionary's Key: 'div.ID'
50
    # The dictionary's Value: a 'div' object at some memory location
51
    _page_cache : Dict[METS_PAGE_DIV_ATTRIBUTE, Dict[str, ET.Element]]
52
    # Cache for the files (mets:file) - two nested dictionaries
53
    # The outer dictionary's Key: 'fileGrp.USE'
54
    # The outer dictionary's Value: Inner dictionary
55
    # The inner dictionary's Key: 'file.ID'
56
    # The inner dictionary's Value: a 'file' object at some memory location
57
    _file_cache : Dict[str, Dict[str, ET.Element]]
58
    # Cache for the file pointers (mets:fptr) - two nested dictionaries
59
    # The outer dictionary's Key: 'div.ID'
60
    # The outer dictionary's Value: Inner dictionary
61
    # The inner dictionary's Key: 'fptr.FILEID'
62
    # The inner dictionary's Value: a 'fptr' object at some memory location
63
    _fptr_cache : Dict[str, Dict[str, ET.Element]]
64
65
    @staticmethod
66
    def empty_mets(now=None, cache_flag=False):
67
        """
68
        Create an empty METS file from bundled template.
69
        """
70
        if not now:
71
            now = datetime.now().isoformat()
72
        tpl = METS_XML_EMPTY
73
        tpl = tpl.replace('{{ VERSION }}', VERSION)
74
        tpl = tpl.replace('{{ NOW }}', '%s' % now)
75
        return OcrdMets(content=tpl.encode('utf-8'), cache_flag=cache_flag)
76
77
    def __init__(self, **kwargs):
78
        """
79
        """
80
        super(OcrdMets, self).__init__(**kwargs)
81
82
        # XXX If the environment variable OCRD_METS_CACHING is set to "true",
83
        # then enable caching, if "false", disable caching, overriding the
84
        # kwarg to the constructor
85
        if config.is_set('OCRD_METS_CACHING'):
86
            getLogger('ocrd.models.ocrd_mets').debug('METS Caching %s because OCRD_METS_CACHING is %s',
87
                    'enabled' if config.OCRD_METS_CACHING else 'disabled', config.raw_value('OCRD_METS_CACHING'))
88
            self._cache_flag = config.OCRD_METS_CACHING
89
90
91
        # If cache is enabled
92
        if self._cache_flag:
93
            self._initialize_caches()
94
            self._refresh_caches()
95
96
    def __str__(self):
97
        """
98
        String representation
99
        """
100
        return 'OcrdMets[cached=%s,fileGrps=%s,files=%s]' % (
101
        self._cache_flag, self.file_groups, list(self.find_files()))
102
103
    def _fill_caches(self):
104
        """
105
        Fills the caches with fileGrps and FileIDs
106
        """
107
108
        tree_root = self._tree.getroot()
109
110
        # Fill with files
111
        el_fileSec = tree_root.find("mets:fileSec", NS)
112
        if el_fileSec is None:
113
            return
114
115
        log = getLogger('ocrd.models.ocrd_mets._fill_caches-files')
116
117
        for el_fileGrp in el_fileSec.findall('mets:fileGrp', NS):
118
            fileGrp_use = el_fileGrp.get('USE')
119
120
            # Assign an empty dictionary that will hold the files of the added fileGrp
121
            self._file_cache[fileGrp_use] = {}
122
123
            for el_file in el_fileGrp:
124
                file_id = el_file.get('ID')
125
                self._file_cache[fileGrp_use].update({file_id: el_file})
126
                # log.info("File added to the cache: %s" % file_id)
127
128
        # Fill with pages
129
        el_div_list = tree_root.findall(".//mets:div[@TYPE='page']", NS)
130
        if len(el_div_list) == 0:
131
            return
132
        log = getLogger('ocrd.models.ocrd_mets._fill_caches-pages')
133
134
        for el_div in el_div_list:
135
            div_id = el_div.get('ID')
136
            log.debug("DIV_ID: %s" % el_div.get('ID'))
137
138
            for attr in METS_PAGE_DIV_ATTRIBUTE:
139
                self._page_cache[attr][str(el_div.get(attr.name))] = el_div
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable str does not seem to be defined.
Loading history...
140
141
            # Assign an empty dictionary that will hold the fptr of the added page (div)
142
            self._fptr_cache[div_id] = {}
143
144
            # log.info("Page_id added to the cache: %s" % div_id)
145
146
            for el_fptr in el_div:
147
                self._fptr_cache[div_id].update({el_fptr.get('FILEID'): el_fptr})
148
                # log.info("Fptr added to the cache: %s" % el_fptr.get('FILEID'))
149
150
        # log.info("Len of page_cache: %s" % len(self._page_cache[METS_PAGE_DIV_ATTRIBUTE.ID]))
151
        # log.info("Len of fptr_cache: %s" % len(self._fptr_cache))
152
153
    def _initialize_caches(self):
154
        self._file_cache = {}
155
        # NOTE we can only guarantee uniqueness for @ID and @ORDER
156
        self._page_cache = {k : {} for k in METS_PAGE_DIV_ATTRIBUTE}
157
        self._fptr_cache = {}
158
159
    def _refresh_caches(self):
160
        if self._cache_flag:
161
            self._initialize_caches()
162
163
            # Note, if the empty_mets() function is used to instantiate OcrdMets
164
            # Then the cache is empty even after this operation
165
            self._fill_caches()
166
167
    @property
168
    def unique_identifier(self):
169
        """
170
        Get the unique identifier by looking through ``mods:identifier``
171
        See `specs <https://ocr-d.de/en/spec/mets#unique-id-for-the-document-processed>`_ for details.
172
        """
173
        for t in IDENTIFIER_PRIORITY:
174
            found = self._tree.getroot().find('.//mods:identifier[@type="%s"]' % t, NS)
175
            if found is not None:
176
                return found.text
177
178
    @unique_identifier.setter
179
    def unique_identifier(self, purl):
180
        """
181
        Set the unique identifier by looking through ``mods:identifier``
182
        See `specs <https://ocr-d.de/en/spec/mets#unique-id-for-the-document-processed>`_ for details.
183
        """
184
        id_el = None
185
        for t in IDENTIFIER_PRIORITY:
186
            id_el = self._tree.getroot().find('.//mods:identifier[@type="%s"]' % t, NS)
187
            if id_el is not None:
188
                break
189
        if id_el is None:
190
            mods = self._tree.getroot().find('.//mods:mods', NS)
191
            id_el = ET.SubElement(mods, TAG_MODS_IDENTIFIER)
192
            id_el.set('type', 'purl')
193
        id_el.text = purl
194
195
    @property
196
    def agents(self):
197
        """
198
        List all :py:class:`ocrd_models.ocrd_agent.OcrdAgent`s
199
        """
200
        return [OcrdAgent(el_agent) for el_agent in self._tree.getroot().findall('mets:metsHdr/mets:agent', NS)]
201
202
    def add_agent(self, *args, **kwargs):
203
        """
204
        Add an :py:class:`ocrd_models.ocrd_agent.OcrdAgent` to the list of agents in the ``metsHdr``.
205
        """
206
        el_metsHdr = self._tree.getroot().find('.//mets:metsHdr', NS)
207
        if el_metsHdr is None:
208
            el_metsHdr = ET.Element(TAG_METS_METSHDR)
209
            self._tree.getroot().insert(0, el_metsHdr)
210
        #  assert(el_metsHdr is not None)
211
        el_agent = ET.Element(TAG_METS_AGENT)
212
        try:
213
            el_agent_last = next(el_metsHdr.iterchildren(tag=TAG_METS_AGENT, reversed=True))
214
            el_agent_last.addnext(el_agent)
215
        except StopIteration:
216
            el_metsHdr.insert(0, el_agent)
217
        #  print(ET.tostring(el_metsHdr))
218
        return OcrdAgent(el_agent, *args, **kwargs)
219
220
    @property
221
    def file_groups(self):
222
        """
223
        List the `@USE` of all `mets:fileGrp` entries.
224
        """
225
226
        # WARNING: Actually we cannot return strings in place of elements!
227
        if self._cache_flag:
228
            return list(self._file_cache.keys())
229
230
        return [el.get('USE') for el in self._tree.getroot().findall('.//mets:fileGrp', NS)]
231
232
    def find_all_files(self, *args, **kwargs):
233
        """
234
        Like :py:meth:`find_files` but return a list of all results.
235
        Equivalent to ``list(self.find_files(...))``
236
        """
237
        return list(self.find_files(*args, **kwargs))
238
239
    # pylint: disable=multiple-statements
240
    def find_files(
241
        self,
242
        ID=None,
243
        fileGrp=None,
244
        pageId=None,
245
        mimetype=None,
246
        url=None,
247
        local_filename=None,
248
        local_only=False,
249
        include_fileGrp=None,
250
        exclude_fileGrp=None,
251
    ):
252
        """
253
        Search ``mets:file`` entries in this METS document and yield results.
254
        The :py:attr:`ID`, :py:attr:`pageId`, :py:attr:`fileGrp`,
255
        :py:attr:`url` and :py:attr:`mimetype` parameters can each be either a
256
        literal string, or a regular expression if the string starts with
257
        ``//`` (double slash).
258
        If it is a regex, the leading ``//`` is removed and candidates are matched
259
        against the regex with `re.fullmatch`. If it is a literal string, comparison
260
        is done with string equality.
261
        The :py:attr:`pageId` parameter supports the numeric range operator ``..``. For
262
        example, to find all files in pages ``PHYS_0001`` to ``PHYS_0003``,
263
        ``PHYS_0001..PHYS_0003`` will be expanded to ``PHYS_0001,PHYS_0002,PHYS_0003``.
264
        Keyword Args:
265
            ID (string) : ``@ID`` of the ``mets:file``
266
            fileGrp (string) : ``@USE`` of the ``mets:fileGrp`` to list files of
267
            pageId (string) : ``@ID`` of the corresponding physical ``mets:structMap`` entry (physical page)
268
            url (string) : ``@xlink:href`` remote/original URL of ``mets:Flocat`` of ``mets:file``
269
            local_filename (string) : ``@xlink:href`` local/cached filename of ``mets:Flocat`` of ``mets:file``
270
            mimetype (string) : ``@MIMETYPE`` of ``mets:file``
271
            local (boolean) : Whether to restrict results to local files in the filesystem
272
            include_fileGrp (list[str]) : Whitelist of allowd file groups 
273
            exclude_fileGrp (list[str]) : Blacklist of disallowd file groups 
274
        Yields:
275
            :py:class:`ocrd_models:ocrd_file:OcrdFile` instantiations
276
        """
277
        pageId_list = []
278
        if pageId:
279
            # returns divs instead of strings of ids
280
            physical_pages = self.get_physical_pages(for_pageIds=pageId, return_divs=True)
281
            for div in physical_pages:
282
                if self._cache_flag:
283
                    pageId_list += self._fptr_cache[div.get('ID')]
284
                else:
285
                    pageId_list += [fptr.get('FILEID') for fptr in div.findall('mets:fptr', NS)]
286
287
        if ID and ID.startswith(REGEX_PREFIX):
288
            ID = re.compile(ID[REGEX_PREFIX_LEN:])
289
        if fileGrp and fileGrp.startswith(REGEX_PREFIX):
290
            fileGrp = re.compile(fileGrp[REGEX_PREFIX_LEN:])
291
        if mimetype and mimetype.startswith(REGEX_PREFIX):
292
            mimetype = re.compile(mimetype[REGEX_PREFIX_LEN:])
293
        if url and url.startswith(REGEX_PREFIX):
294
            url = re.compile(url[REGEX_PREFIX_LEN:])
295
296
        candidates = []
297
        if self._cache_flag:
298
            if fileGrp:
299
                if isinstance(fileGrp, str):
300
                    candidates += self._file_cache.get(fileGrp, {}).values()
301
                else:
302
                    candidates = [x for fileGrp_needle, el_file_list in self._file_cache.items() if
303
                                  fileGrp.match(fileGrp_needle) for x in el_file_list.values()]
304
            else:
305
                candidates = [el_file for id_to_file in self._file_cache.values() for el_file in id_to_file.values()]
306
        else:
307
            candidates = self._tree.getroot().xpath('//mets:file', namespaces=NS)
308
309
        for cand in candidates:
310
            if ID:
311
                if isinstance(ID, str):
312
                    if not ID == cand.get('ID'): continue
313
                else:
314
                    if not ID.fullmatch(cand.get('ID')): continue
315
316
            if pageId is not None and cand.get('ID') not in pageId_list:
317
                continue
318
319
            if not self._cache_flag and fileGrp:
320
                if isinstance(fileGrp, str):
321
                    if cand.getparent().get('USE') != fileGrp: continue
322
                else:
323
                    if not fileGrp.fullmatch(cand.getparent().get('USE')): continue
324
325
            if mimetype:
326
                if isinstance(mimetype, str):
327
                    if cand.get('MIMETYPE') != mimetype: continue
328
                else:
329
                    if not mimetype.fullmatch(cand.get('MIMETYPE') or ''): continue
330
331
            if url:
332
                cand_locat = cand.find('mets:FLocat[@LOCTYPE="URL"]', namespaces=NS)
333
                if cand_locat is None:
334
                    continue
335
                cand_url = cand_locat.get('{%s}href' % NS['xlink'])
336
                if isinstance(url, str):
337
                    if cand_url != url: continue
338
                else:
339
                    if not url.fullmatch(cand_url): continue
340
341
            if local_filename:
342
                cand_locat = cand.find('mets:FLocat[@LOCTYPE="OTHER"][@OTHERLOCTYPE="FILE"]', namespaces=NS)
343
                if cand_locat is None:
344
                    continue
345
                cand_local_filename = cand_locat.get('{%s}href' % NS['xlink'])
346
                if isinstance(local_filename, str):
347
                    if cand_local_filename != local_filename: continue
348
                else:
349
                    if not local_filename.fullmatch(cand_local_filename): continue
350
351
            if local_only:
352
                # deprecation_warning("'local_only' is deprecated, use 'local_filename=\"//.+\"' instead")
353
                is_local = cand.find('mets:FLocat[@LOCTYPE="OTHER"][@OTHERLOCTYPE="FILE"][@xlink:href]', namespaces=NS)
354
                if is_local is None:
355
                    continue
356
357
            ret = OcrdFile(cand, mets=self)
358
359
            # XXX include_fileGrp is redundant to fileGrp but for completeness
360
            if exclude_fileGrp and ret.fileGrp in exclude_fileGrp:
361
                continue
362
            if include_fileGrp and ret.fileGrp not in include_fileGrp:
363
                continue
364
365
            yield ret
366
367
    def add_file_group(self, fileGrp):
368
        """
369
        Add a new ``mets:fileGrp``.
370
        Arguments:
371
            fileGrp (string): ``@USE`` of the new ``mets:fileGrp``.
372
        """
373
        if ',' in fileGrp:
374
            raise ValueError('fileGrp must not contain commas')
375
        el_fileSec = self._tree.getroot().find('mets:fileSec', NS)
376
        if el_fileSec is None:
377
            el_fileSec = ET.SubElement(self._tree.getroot(), TAG_METS_FILESEC)
378
        el_fileGrp = el_fileSec.find('mets:fileGrp[@USE="%s"]' % fileGrp, NS)
379
        if el_fileGrp is None:
380
            el_fileGrp = ET.SubElement(el_fileSec, TAG_METS_FILEGRP)
381
            el_fileGrp.set('USE', fileGrp)
382
383
            if self._cache_flag:
384
                # Assign an empty dictionary that will hold the files of the added fileGrp
385
                self._file_cache[fileGrp] = {}
386
387
        return el_fileGrp
388
389
    def rename_file_group(self, old, new):
390
        """
391
        Rename a ``mets:fileGrp`` by changing the ``@USE`` from :py:attr:`old` to :py:attr:`new`.
392
        """
393
        el_fileGrp = self._tree.getroot().find('mets:fileSec/mets:fileGrp[@USE="%s"]' % old, NS)
394
        if el_fileGrp is None:
395
            raise FileNotFoundError("No such fileGrp '%s'" % old)
396
        el_fileGrp.set('USE', new)
397
398
        if self._cache_flag:
399
            self._file_cache[new] = self._file_cache.pop(old)
400
401
    def remove_file_group(self, USE, recursive=False, force=False):
402
        """
403
        Remove a ``mets:fileGrp`` (single fixed ``@USE`` or multiple regex ``@USE``)
404
        Arguments:
405
            USE (string): ``@USE`` of the ``mets:fileGrp`` to delete. Can be a regex if prefixed with ``//``
406
            recursive (boolean): Whether to recursively delete each ``mets:file`` in the group
407
            force (boolean): Do not raise an exception if ``mets:fileGrp`` does not exist
408
        """
409
        log = getLogger('ocrd.models.ocrd_mets.remove_file_group')
410
        el_fileSec = self._tree.getroot().find('mets:fileSec', NS)
411
        if el_fileSec is None:
412
            raise Exception("No fileSec!")
413
        if isinstance(USE, str):
414
            if USE.startswith(REGEX_PREFIX):
415
                use = re.compile(USE[REGEX_PREFIX_LEN:])
416
                for cand in el_fileSec.findall('mets:fileGrp', NS):
417
                    if use.fullmatch(cand.get('USE')):
418
                        self.remove_file_group(cand, recursive=recursive)
419
                return
420
            else:
421
                el_fileGrp = el_fileSec.find('mets:fileGrp[@USE="%s"]' % USE, NS)
422
        else:
423
            el_fileGrp = USE
424
        if el_fileGrp is None:  # pylint: disable=len-as-condition
425
            msg = "No such fileGrp: %s" % USE
426
            if force:
427
                log.warning(msg)
428
                return
429
            raise Exception(msg)
430
431
        # The cache should also be used here
432
        if self._cache_flag:
433
            files = self._file_cache.get(el_fileGrp.get('USE'), {}).values()
434
        else:
435
            files = el_fileGrp.findall('mets:file', NS)
436
437
        if files:
438
            if not recursive:
439
                raise Exception("fileGrp %s is not empty and recursive wasn't set" % USE)
440
            for f in list(files):
441
                self.remove_one_file(ID=f.get('ID'), fileGrp=f.getparent().get('USE'))
442
443
        if self._cache_flag:
444
            # Note: Since the files inside the group are removed
445
            # with the 'remove_one_file' method above, 
446
            # we should not take care of that again.
447
            # We just remove the fileGrp.
448
            del self._file_cache[el_fileGrp.get('USE')]
449
450
        el_fileGrp.getparent().remove(el_fileGrp)
451
452
    def add_file(self, fileGrp, mimetype=None, url=None, ID=None, pageId=None, force=False, local_filename=None,
453
                 ignore=False, **kwargs):
454
        """
455
        Instantiate and add a new :py:class:`ocrd_models.ocrd_file.OcrdFile`.
456
        Arguments:
457
            fileGrp (string): ``@USE`` of ``mets:fileGrp`` to add to
458
        Keyword Args:
459
            mimetype (string): ``@MIMETYPE`` of the ``mets:file`` to use
460
            url (string): ``@xlink:href`` (URL or path) of the ``mets:file`` to use
461
            ID (string): ``@ID`` of the ``mets:file`` to use
462
            pageId (string): ``@ID`` in the physical ``mets:structMap`` to link to
463
            force (boolean): Whether to add the file even if a ``mets:file`` with the same ``@ID`` already exists.
464
            ignore (boolean): Do not look for existing files at all. Shift responsibility for preventing errors from duplicate ID to the user.
465
            local_filename (string):
466
        """
467
        if not ID:
468
            raise ValueError("Must set ID of the mets:file")
469
        if not fileGrp:
470
            raise ValueError("Must set fileGrp of the mets:file")
471
        if not REGEX_FILE_ID.fullmatch(ID):
472
            raise ValueError("Invalid syntax for mets:file/@ID %s (not an xs:ID)" % ID)
473
        if not REGEX_FILE_ID.fullmatch(fileGrp):
474
            raise ValueError("Invalid syntax for mets:fileGrp/@USE %s (not an xs:ID)" % fileGrp)
475
        log = getLogger('ocrd.models.ocrd_mets.add_file')
476
477
        el_fileGrp = self.add_file_group(fileGrp)
478
        if not ignore:
479
            mets_file = next(self.find_files(ID=ID, fileGrp=fileGrp), None)
480
            if mets_file:
481
                if mets_file.fileGrp == fileGrp and \
482
                        mets_file.pageId == pageId and \
483
                        mets_file.mimetype == mimetype:
484
                    if not force:
485
                        raise FileExistsError(
486
                            f"A file with ID=={ID} already exists {mets_file} and neither force nor ignore are set")
487
                    self.remove_file(ID=ID, fileGrp=fileGrp)
488
                else:
489
                    raise FileExistsError(
490
                        f"A file with ID=={ID} already exists {mets_file} but unrelated - cannot mitigate")
491
492
        # To get rid of Python's FutureWarning - checking if v is not None
493
        kwargs = {k: v for k, v in locals().items() if
494
                  k in ['url', 'ID', 'mimetype', 'pageId', 'local_filename'] and v is not None}
495
        # This separation is needed to reuse the same el_mets_file element in the caching if block
496
        el_mets_file = ET.SubElement(el_fileGrp, TAG_METS_FILE)
497
        # The caching of the physical page is done in the OcrdFile constructor
498
        mets_file = OcrdFile(el_mets_file, mets=self, **kwargs)
499
500
        if self._cache_flag:
501
            # Add the file to the file cache
502
            self._file_cache[fileGrp].update({ID: el_mets_file})
503
504
        return mets_file
505
506
    def remove_file(self, *args, **kwargs):
507
        """
508
        Delete each ``ocrd:file`` matching the query. Same arguments as :py:meth:`find_files`
509
        """
510
        files = list(self.find_files(*args, **kwargs))
511
        if files:
512
            for f in files:
513
                self.remove_one_file(f)
514
            if len(files) > 1:
515
                return files
516
            else:
517
                return files[0]  # for backwards-compatibility
518
        if any(1 for kwarg in kwargs
519
               if isinstance(kwarg, str) and kwarg.startswith(REGEX_PREFIX)):
520
            # allow empty results if filter criteria involve a regex
521
            return []
522
        raise FileNotFoundError("File not found: %s %s" % (args, kwargs))
523
524
    def remove_one_file(self, ID, fileGrp=None):
525
        """
526
        Delete an existing :py:class:`ocrd_models.ocrd_file.OcrdFile`.
527
        Arguments:
528
            ID (string|OcrdFile): ``@ID`` of the ``mets:file`` to delete  Can also be an :py:class:`ocrd_models.ocrd_file.OcrdFile` to avoid search via ``ID``.
529
            fileGrp (string): ``@USE`` of the ``mets:fileGrp`` containing the ``mets:file``. Used only for optimization.
530
        Returns:
531
            The old :py:class:`ocrd_models.ocrd_file.OcrdFile` reference.
532
        """
533
        log = getLogger('ocrd.models.ocrd_mets.remove_one_file')
534
        log.debug("remove_one_file(%s %s)" % (ID, fileGrp))
535
        if isinstance(ID, OcrdFile):
536
            ocrd_file = ID
537
            ID = ocrd_file.ID
538
        else:
539
            ocrd_file = next(self.find_files(ID=ID, fileGrp=fileGrp), None)
540
541
        if not ocrd_file:
542
            raise FileNotFoundError("File not found: %s (fileGr=%s)" % (ID, fileGrp))
543
544
        # Delete the physical page ref
545
        fptrs = []
546
        if self._cache_flag:
547
            for page in self._fptr_cache.keys():
548
                if ID in self._fptr_cache[page]:
549
                    fptrs.append(self._fptr_cache[page][ID])
550
        else:
551
            fptrs = self._tree.getroot().findall('.//mets:fptr[@FILEID="%s"]' % ID, namespaces=NS)
552
553
        # Delete the physical page ref
554
        for fptr in fptrs:
555
            log.debug("Delete fptr element %s for page '%s'", fptr, ID)
556
            page_div = fptr.getparent()
557
            page_div.remove(fptr)
558
            # Remove the fptr from the cache as well
559
            if self._cache_flag:
560
                del self._fptr_cache[page_div.get('ID')][ID]
561
            # delete empty pages
562
            if not page_div.getchildren():
563
                log.debug("Delete empty page %s", page_div)
564
                page_div.getparent().remove(page_div)
565
                # Delete the empty pages from caches as well
566
                if self._cache_flag:
567
                    for attr in METS_PAGE_DIV_ATTRIBUTE:
568
                        if attr.name in page_div.attrib:
569
                            del self._page_cache[attr][page_div.attrib[attr.name]]
570
571
        # Delete the file reference from the cache
572
        if self._cache_flag:
573
            parent_use = ocrd_file._el.getparent().get('USE')
574
            del self._file_cache[parent_use][ocrd_file.ID]
575
576
        # Delete the file reference
577
        # pylint: disable=protected-access
578
        ocrd_file._el.getparent().remove(ocrd_file._el)
579
580
        return ocrd_file
581
582
    @property
583
    def physical_pages(self):
584
        """
585
        List all page IDs (the ``@ID`` of each physical ``mets:structMap`` ``mets:div``)
586
        """
587
        if self._cache_flag:
588
            return list(self._page_cache[METS_PAGE_DIV_ATTRIBUTE.ID].keys())
589
590
        return [str(x) for x in self._tree.getroot().xpath(
591
            'mets:structMap[@TYPE="PHYSICAL"]/mets:div[@TYPE="physSequence"]/mets:div[@TYPE="page"]/@ID',
592
            namespaces=NS)]
593
594
    def get_physical_pages(self, for_fileIds : Optional[str] = None, for_pageIds : Optional[str] = None, return_divs : bool = False):
595
        """
596
        List all page IDs (the ``@ID`` of each physical ``mets:structMap`` ``mets:div``),
597
        optionally for a subset of ``mets:file`` ``@ID`` :py:attr:`for_fileIds`,
598
        or for a subset selector expression (comma-separated, range, and/or regex) :py:attr:`for_pageIds`.
599
        If return_divs is set, returns div memory objects instead of strings of ids
600
        """
601
        if for_fileIds is None and for_pageIds is None:
602
            return self.physical_pages
603
        log = getLogger('ocrd.models.ocrd_mets.get_physical_pages')
604
        if for_pageIds is not None:
605
            ret = []
606
            page_attr_patterns = []
607
            for pageId_token in re.split(r',', for_pageIds):
608
                if pageId_token.startswith(REGEX_PREFIX):
609
                    page_attr_patterns.append(re.compile(pageId_token[REGEX_PREFIX_LEN:]))
610
                elif '..' in pageId_token:
611
                    page_attr_patterns += generate_range(*pageId_token.split('..', 1))
612
                else:
613
                    page_attr_patterns += [pageId_token]
614
            if not page_attr_patterns:
615
                return []
616
            if self._cache_flag:
617
                # determine attr to look for before iterating
618
                try:
619
                    attr = next(a for a in METS_PAGE_DIV_ATTRIBUTE if (
620
                                any(p in self._page_cache[a] for p in page_attr_patterns) or \
621
                                any([isinstance(p, typing.Pattern) and p.fullmatch(attr_val) \
622
                                    for p in page_attr_patterns \
623
                                    for attr_val in self._page_cache[a]]
624
                                )))
625
                    for attr_val in self._page_cache[attr].keys():
626
                        if attr_val in page_attr_patterns or \
627
                                any([isinstance(p, typing.Pattern) and p.fullmatch(attr_val) for p in page_attr_patterns]):
628
                            if return_divs:
629
                                ret.append(self._page_cache[attr][attr_val])
630
                            else:
631
                                ret.append(attr_val)
632
                except StopIteration:
633
                    log.debug(f"No pattern matches any keys of any of the _page_caches. patterns: {page_attr_patterns}")
634
            else:
635
                # determine attr during iterating
636
                attr = None
637
                for page in self._tree.getroot().xpath(
638
                        'mets:structMap[@TYPE="PHYSICAL"]/mets:div[@TYPE="physSequence"]/mets:div[@TYPE="page"]',
639
                        namespaces=NS):
640
                    try:
641
                        if not attr:
642
                            attr = next(a for a in METS_PAGE_DIV_ATTRIBUTE if \
643
                                page.get(a.name) in page_attr_patterns or \
644
                                any([isinstance(p, typing.Pattern) and p.fullmatch(page.get(a.name)) for p in page_attr_patterns]))
645
                        attr_val = page.get(attr.name)
646
                        if attr_val in page_attr_patterns or \
647
                                any([isinstance(p, typing.Pattern) and p.fullmatch(attr_val) for p in page_attr_patterns]):
648
                            if return_divs:
649
                                ret.append(page)
650
                            else:
651
                                ret.append(attr_val)
652
                    except StopIteration:
653
                        log.debug(f"No pattern matches any mets:div attributes. patterns: {page_attr_patterns}")
654
            return ret
655
656
        assert for_fileIds # at this point we know for_fileIds is set, assert to convince pyright
657
        ret = [None] * len(for_fileIds)
658
        if self._cache_flag:
659
            for pageId in self._fptr_cache.keys():
660
                for fptr in self._fptr_cache[pageId].keys():
661
                    if fptr in for_fileIds:
662
                        index = for_fileIds.index(fptr)
663
                        if return_divs:
664
                            ret[index] = self._page_cache[METS_PAGE_DIV_ATTRIBUTE.ID][pageId]
665
                        else:
666
                            ret[index] = pageId
667
        else:
668
            for page in self._tree.getroot().xpath(
669
                    'mets:structMap[@TYPE="PHYSICAL"]/mets:div[@TYPE="physSequence"]/mets:div[@TYPE="page"]',
670
                    namespaces=NS):
671
                for fptr in page.findall('mets:fptr', NS):
672
                    if fptr.get('FILEID') in for_fileIds:
673
                        index = for_fileIds.index(fptr.get('FILEID'))
674
                        if return_divs:
675
                            ret[index] = page
676
                        else:
677
                            ret[index] = page.get('ID')
678
        return ret
679
680
    def set_physical_page_for_file(self, pageId, ocrd_file, order=None, orderlabel=None):
681
        """
682
        Set the physical page ID (``@ID`` of the physical ``mets:structMap`` ``mets:div`` entry)
683
        corresponding to the ``mets:file`` :py:attr:`ocrd_file`, creating all structures if necessary.
684
        Arguments:
685
            pageId (string): ``@ID`` of the physical ``mets:structMap`` entry to use
686
            ocrd_file (object): existing :py:class:`ocrd_models.ocrd_file.OcrdFile` object
687
        Keyword Args:
688
            order (string): ``@ORDER`` to use
689
            orderlabel (string): ``@ORDERLABEL`` to use
690
        """
691
692
        # delete any page mapping for this file.ID
693
        candidates = []
694
        if self._cache_flag:
695
            for page_id in self._fptr_cache.keys():
696
                if ocrd_file.ID in self._fptr_cache[page_id].keys():
697
                    if self._fptr_cache[page_id][ocrd_file.ID] is not None:
698
                        candidates.append(self._fptr_cache[page_id][ocrd_file.ID])
699
        else:
700
            candidates = self._tree.getroot().findall(
701
                'mets:structMap[@TYPE="PHYSICAL"]/mets:div[@TYPE="physSequence"]/mets:div[@TYPE="page"]/mets:fptr[@FILEID="%s"]' %
702
                ocrd_file.ID, namespaces=NS)
703
704
        for el_fptr in candidates:
705
            if self._cache_flag:
706
                del self._fptr_cache[el_fptr.getparent().get('ID')][ocrd_file.ID]
707
            el_fptr.getparent().remove(el_fptr)
708
709
        # find/construct as necessary
710
        el_structmap = self._tree.getroot().find('mets:structMap[@TYPE="PHYSICAL"]', NS)
711
        if el_structmap is None:
712
            el_structmap = ET.SubElement(self._tree.getroot(), TAG_METS_STRUCTMAP)
713
            el_structmap.set('TYPE', 'PHYSICAL')
714
        el_seqdiv = el_structmap.find('mets:div[@TYPE="physSequence"]', NS)
715
        if el_seqdiv is None:
716
            el_seqdiv = ET.SubElement(el_structmap, TAG_METS_DIV)
717
            el_seqdiv.set('TYPE', 'physSequence')
718
719
        el_pagediv = None
720
        if self._cache_flag:
721
            if pageId in self._page_cache:
722
                el_pagediv = self._page_cache[METS_PAGE_DIV_ATTRIBUTE.ID][pageId]
723
        else:
724
            el_pagediv = el_seqdiv.find('mets:div[@ID="%s"]' % pageId, NS)
725
726
        if el_pagediv is None:
727
            el_pagediv = ET.SubElement(el_seqdiv, TAG_METS_DIV)
728
            el_pagediv.set('TYPE', 'page')
729
            el_pagediv.set('ID', pageId)
730
            if order:
731
                el_pagediv.set('ORDER', order)
732
            if orderlabel:
733
                el_pagediv.set('ORDERLABEL', orderlabel)
734
            if self._cache_flag:
735
                # Create a new entry in the page cache
736
                self._page_cache[METS_PAGE_DIV_ATTRIBUTE.ID][pageId] = el_pagediv
737
                # Create a new entry in the fptr cache and 
738
                # assign an empty dictionary to hold the fileids
739
                self._fptr_cache[pageId] = {}
740
741
        el_fptr = ET.SubElement(el_pagediv, TAG_METS_FPTR)
742
        el_fptr.set('FILEID', ocrd_file.ID)
743
744
        if self._cache_flag:
745
            # Assign the ocrd fileID to the pageId in the cache
746
            self._fptr_cache[el_pagediv.get('ID')].update({ocrd_file.ID: el_fptr})
747
748
    def update_physical_page_attributes(self, page_id, **kwargs):
749
        invalid_keys = list(k for k in kwargs.keys() if k not in METS_PAGE_DIV_ATTRIBUTE.names())
750
        if invalid_keys:
751
            raise ValueError(f"Invalid attribute {invalid_keys}. Allowed values: {METS_PAGE_DIV_ATTRIBUTE.names()}")
752
753
        page_div = self.get_physical_pages(for_pageIds=page_id, return_divs=True)
754
        if not page_div:
755
            raise ValueError(f"Could not find mets:div[@ID=={page_id}]")
756
        page_div = page_div[0]
757
758
        for k, v in kwargs.items():
759
            if not v:
760
                page_div.attrib.pop(k)
761
            else:
762
                page_div.attrib[k] = v
763
764
    def get_physical_page_for_file(self, ocrd_file):
765
        """
766
        Get the physical page ID (``@ID`` of the physical ``mets:structMap`` ``mets:div`` entry)
767
        corresponding to the ``mets:file`` :py:attr:`ocrd_file`.
768
        """
769
        ret = []
770
        if self._cache_flag:
771
            for pageId in self._fptr_cache.keys():
772
                if ocrd_file.ID in self._fptr_cache[pageId].keys():
773
                    ret.append(self._page_cache[METS_PAGE_DIV_ATTRIBUTE.ID][pageId].get('ID'))
774
        else:
775
            ret = self._tree.getroot().xpath(
776
                '/mets:mets/mets:structMap[@TYPE="PHYSICAL"]/mets:div[@TYPE="physSequence"]/mets:div[@TYPE="page"][./mets:fptr[@FILEID="%s"]]/@ID' %
777
                ocrd_file.ID, namespaces=NS)
778
779
        # To get rid of the python's FutureWarning
780
        if len(ret):
781
            return ret[0]
782
783
    def remove_physical_page(self, ID):
784
        """
785
        Delete page (physical ``mets:structMap`` ``mets:div`` entry ``@ID``) :py:attr:`ID`.
786
        """
787
        mets_div = None
788
        if self._cache_flag:
789
            if ID in self._page_cache[METS_PAGE_DIV_ATTRIBUTE.ID].keys():
790
                mets_div = [self._page_cache[METS_PAGE_DIV_ATTRIBUTE.ID][ID]]
791
        else:
792
            mets_div = self._tree.getroot().xpath(
793
                'mets:structMap[@TYPE="PHYSICAL"]/mets:div[@TYPE="physSequence"]/mets:div[@TYPE="page"][@ID="%s"]' % ID,
794
                namespaces=NS)
795
        if mets_div:
796
            mets_div_attrib = {** mets_div[0].attrib}
797
            mets_div[0].getparent().remove(mets_div[0])
798
            if self._cache_flag:
799
                for attr in METS_PAGE_DIV_ATTRIBUTE:
800
                    if attr.name in mets_div_attrib:
801
                        del self._page_cache[attr][mets_div_attrib[attr.name]]
802
                del self._fptr_cache[ID]
803
804
    def remove_physical_page_fptr(self, fileId):
805
        """
806
        Delete all ``mets:fptr[@FILEID = fileId]`` to ``mets:file[@ID == fileId]`` for :py:attr:`fileId` from all ``mets:div`` entries in the physical ``mets:structMap``.
807
        Returns:
808
            List of pageIds that mets:fptrs were deleted from
809
        """
810
811
        # Question: What is the reason to keep a list of mets_fptrs?
812
        # Do we have a situation in which the fileId is same for different pageIds ?
813
        # From the examples I have seen inside 'assets' that is not the case
814
        # and the mets_fptrs list will always contain a single element.
815
        # If that's the case then we do not need to iterate 2 loops, just one.
816
        mets_fptrs = []
817
        if self._cache_flag:
818
            for page_id in self._fptr_cache.keys():
819
                if fileId in self._fptr_cache[page_id].keys():
820
                    mets_fptrs.append(self._fptr_cache[page_id][fileId])
821
        else:
822
            mets_fptrs = self._tree.getroot().xpath(
823
                'mets:structMap[@TYPE="PHYSICAL"]/mets:div[@TYPE="physSequence"]/mets:div[@TYPE="page"]/mets:fptr[@FILEID="%s"]' % fileId,
824
                namespaces=NS)
825
        ret = []
826
        for mets_fptr in mets_fptrs:
827
            mets_div = mets_fptr.getparent()
828
            ret.append(mets_div.get('ID'))
829
            if self._cache_flag:
830
                del self._fptr_cache[mets_div.get('ID')][mets_fptr.get('FILEID')]
831
            mets_div.remove(mets_fptr)
832
        return ret
833
834
    @property
835
    def physical_pages_labels(self):
836
        """
837
        Map all page IDs (the ``@ID`` of each physical ``mets:structMap`` ``mets:div``) to their
838
        ``@ORDER``, ``@ORDERLABEL`` and ``@LABEL`` attributes, if any.
839
        """
840
        divs = self._tree.getroot().xpath(
841
            'mets:structMap[@TYPE="PHYSICAL"]/mets:div[@TYPE="physSequence"]/mets:div[@TYPE="page"]',
842
            namespaces=NS)
843
        return {div.get('ID'): (div.get('ORDER', None), div.get('ORDERLABEL', None), div.get('LABEL', None))
844
                for div in divs}
845
846
    def merge(self, other_mets, force=False, fileGrp_mapping=None, fileId_mapping=None, pageId_mapping=None,
847
              after_add_cb=None, **kwargs):
848
        """
849
        Add all files from other_mets.
850
        Accepts the same kwargs as :py:func:`find_files`
851
        Keyword Args:
852
            force (boolean): Whether to :py:meth:`add_file`s with force (overwriting existing ``mets:file``s)
853
            fileGrp_mapping (dict): Map :py:attr:`other_mets` fileGrp to fileGrp in this METS
854
            fileId_mapping (dict): Map :py:attr:`other_mets` file ID to file ID in this METS
855
            pageId_mapping (dict): Map :py:attr:`other_mets` page ID to page ID in this METS
856
            after_add_cb (function): Callback received after file is added to the METS
857
        """
858
        if not fileGrp_mapping:
859
            fileGrp_mapping = {}
860
        if not fileId_mapping:
861
            fileId_mapping = {}
862
        if not pageId_mapping:
863
            pageId_mapping = {}
864
        for f_src in other_mets.find_files(**kwargs):
865
            f_dest = self.add_file(
866
                fileGrp_mapping.get(f_src.fileGrp, f_src.fileGrp),
867
                mimetype=f_src.mimetype,
868
                url=f_src.url,
869
                local_filename=f_src.local_filename,
870
                ID=fileId_mapping.get(f_src.ID, f_src.ID),
871
                pageId=pageId_mapping.get(f_src.pageId, f_src.pageId),
872
                force=force)
873
            # FIXME: merge metsHdr, amdSec, dmdSec as well
874
            # FIXME: merge structMap logical and structLink as well
875
            if after_add_cb:
876
                after_add_cb(f_dest)
877
878