Passed
Pull Request — master (#1063)
by Konstantin
03:19
created

ocrd_models.ocrd_mets.OcrdMets.find_files()   F

Complexity

Conditions 48

Size

Total Lines 126
Code Lines 76

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 76
dl 0
loc 126
rs 0
c 0
b 0
f 0
cc 48
nop 10

How to fix   Long Method    Complexity    Many Parameters   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

Complexity

Complex classes like ocrd_models.ocrd_mets.OcrdMets.find_files() often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

Many Parameters

Methods with many parameters are not only hard to understand, but their parameters also often become inconsistent when you need more, or different data.

There are several approaches to avoid long parameter lists:

1
"""
2
API to METS
3
"""
4
from datetime import datetime
5
import re
6
from typing import Dict, Optional
7
8
from ocrd_utils import (
9
    getLogger,
10
    deprecation_warning,
11
    generate_range,
12
    VERSION,
13
    REGEX_PREFIX,
14
    REGEX_FILE_ID
15
)
16
17
from ocrd_utils.config import config
18
19
from .constants import (
20
    NAMESPACES as NS,
21
    TAG_METS_AGENT,
22
    TAG_METS_DIV,
23
    TAG_METS_FILE,
24
    TAG_METS_FILEGRP,
25
    TAG_METS_FILESEC,
26
    TAG_METS_FPTR,
27
    TAG_METS_METSHDR,
28
    TAG_METS_STRUCTMAP,
29
    IDENTIFIER_PRIORITY,
30
    TAG_MODS_IDENTIFIER,
31
    METS_XML_EMPTY,
32
    METS_PAGE_DIV_ATTRIBUTE
33
)
34
35
from .ocrd_xml_base import OcrdXmlDocument, ET      # type: ignore
36
from .ocrd_file import OcrdFile
37
from .ocrd_agent import OcrdAgent
38
39
REGEX_PREFIX_LEN = len(REGEX_PREFIX)
40
41
class OcrdMets(OcrdXmlDocument):
42
    """
43
    API to a single METS file
44
    """
45
    _cache_flag : bool
46
    # Cache for the pages (mets:div)
47
    # The dictionary's Key: 'div.ID'
48
    # The dictionary's Value: a 'div' object at some memory location
49
    _page_cache : Dict[METS_PAGE_DIV_ATTRIBUTE, Dict[str, ET.Element]]
50
    # Cache for the files (mets:file) - two nested dictionaries
51
    # The outer dictionary's Key: 'fileGrp.USE'
52
    # The outer dictionary's Value: Inner dictionary
53
    # The inner dictionary's Key: 'file.ID'
54
    # The inner dictionary's Value: a 'file' object at some memory location
55
    _file_cache : Dict[str, Dict[str, ET.Element]]
56
    # Cache for the file pointers (mets:fptr) - two nested dictionaries
57
    # The outer dictionary's Key: 'div.ID'
58
    # The outer dictionary's Value: Inner dictionary
59
    # The inner dictionary's Key: 'fptr.FILEID'
60
    # The inner dictionary's Value: a 'fptr' object at some memory location
61
    _fptr_cache : Dict[str, Dict[str, ET.Element]]
62
63
    @staticmethod
64
    def empty_mets(now=None, cache_flag=False):
65
        """
66
        Create an empty METS file from bundled template.
67
        """
68
        if not now:
69
            now = datetime.now().isoformat()
70
        tpl = METS_XML_EMPTY
71
        tpl = tpl.replace('{{ VERSION }}', VERSION)
72
        tpl = tpl.replace('{{ NOW }}', '%s' % now)
73
        return OcrdMets(content=tpl.encode('utf-8'), cache_flag=cache_flag)
74
75
    def __init__(self, **kwargs):
76
        """
77
        """
78
        super(OcrdMets, self).__init__(**kwargs)
79
80
        # XXX If the environment variable OCRD_METS_CACHING is set to "true",
81
        # then enable caching, if "false", disable caching, overriding the
82
        # kwarg to the constructor
83
        if config.is_set('OCRD_METS_CACHING'):
84
            getLogger('ocrd.models.ocrd_mets').debug('METS Caching %s because OCRD_METS_CACHING is %s',
85
                    'enabled' if config.OCRD_METS_CACHING else 'disabled', config.raw_value('OCRD_METS_CACHING'))
86
            self._cache_flag = config.OCRD_METS_CACHING
87
88
89
        # If cache is enabled
90
        if self._cache_flag:
91
            self._initialize_caches()
92
            self._refresh_caches()
93
94
    def __str__(self):
95
        """
96
        String representation
97
        """
98
        return 'OcrdMets[cached=%s,fileGrps=%s,files=%s]' % (
99
        self._cache_flag, self.file_groups, list(self.find_files()))
100
101
    def _fill_caches(self):
102
        """
103
        Fills the caches with fileGrps and FileIDs
104
        """
105
106
        tree_root = self._tree.getroot()
107
108
        # Fill with files
109
        el_fileSec = tree_root.find("mets:fileSec", NS)
110
        if el_fileSec is None:
111
            return
112
113
        log = getLogger('ocrd.models.ocrd_mets._fill_caches-files')
114
115
        for el_fileGrp in el_fileSec.findall('mets:fileGrp', NS):
116
            fileGrp_use = el_fileGrp.get('USE')
117
118
            # Assign an empty dictionary that will hold the files of the added fileGrp
119
            self._file_cache[fileGrp_use] = {}
120
121
            for el_file in el_fileGrp:
122
                file_id = el_file.get('ID')
123
                self._file_cache[fileGrp_use].update({file_id: el_file})
124
                # log.info("File added to the cache: %s" % file_id)
125
126
        # Fill with pages
127
        el_div_list = tree_root.findall(".//mets:div[@TYPE='page']", NS)
128
        if len(el_div_list) == 0:
129
            return
130
        log = getLogger('ocrd.models.ocrd_mets._fill_caches-pages')
131
132
        for el_div in el_div_list:
133
            div_id = el_div.get('ID')
134
            log.debug("DIV_ID: %s" % el_div.get('ID'))
135
136
            for attr in METS_PAGE_DIV_ATTRIBUTE:
137
                self._page_cache[attr][str(el_div.get(attr.name))] = el_div
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable str does not seem to be defined.
Loading history...
138
139
            # Assign an empty dictionary that will hold the fptr of the added page (div)
140
            self._fptr_cache[div_id] = {}
141
142
            # log.info("Page_id added to the cache: %s" % div_id)
143
144
            for el_fptr in el_div:
145
                self._fptr_cache[div_id].update({el_fptr.get('FILEID'): el_fptr})
146
                # log.info("Fptr added to the cache: %s" % el_fptr.get('FILEID'))
147
148
        # log.info("Len of page_cache: %s" % len(self._page_cache[METS_PAGE_DIV_ATTRIBUTE.ID]))
149
        # log.info("Len of fptr_cache: %s" % len(self._fptr_cache))
150
151
    def _initialize_caches(self):
152
        self._file_cache = {}
153
        # NOTE we can only guarantee uniqueness for @ID and @ORDER
154
        self._page_cache = {k : {} for k in METS_PAGE_DIV_ATTRIBUTE}
155
        self._fptr_cache = {}
156
157
    def _refresh_caches(self):
158
        if self._cache_flag:
159
            self._initialize_caches()
160
161
            # Note, if the empty_mets() function is used to instantiate OcrdMets
162
            # Then the cache is empty even after this operation
163
            self._fill_caches()
164
165
    @property
166
    def unique_identifier(self):
167
        """
168
        Get the unique identifier by looking through ``mods:identifier``
169
        See `specs <https://ocr-d.de/en/spec/mets#unique-id-for-the-document-processed>`_ for details.
170
        """
171
        for t in IDENTIFIER_PRIORITY:
172
            found = self._tree.getroot().find('.//mods:identifier[@type="%s"]' % t, NS)
173
            if found is not None:
174
                return found.text
175
176
    @unique_identifier.setter
177
    def unique_identifier(self, purl):
178
        """
179
        Set the unique identifier by looking through ``mods:identifier``
180
        See `specs <https://ocr-d.de/en/spec/mets#unique-id-for-the-document-processed>`_ for details.
181
        """
182
        id_el = None
183
        for t in IDENTIFIER_PRIORITY:
184
            id_el = self._tree.getroot().find('.//mods:identifier[@type="%s"]' % t, NS)
185
            if id_el is not None:
186
                break
187
        if id_el is None:
188
            mods = self._tree.getroot().find('.//mods:mods', NS)
189
            id_el = ET.SubElement(mods, TAG_MODS_IDENTIFIER)
190
            id_el.set('type', 'purl')
191
        id_el.text = purl
192
193
    @property
194
    def agents(self):
195
        """
196
        List all :py:class:`ocrd_models.ocrd_agent.OcrdAgent`s
197
        """
198
        return [OcrdAgent(el_agent) for el_agent in self._tree.getroot().findall('mets:metsHdr/mets:agent', NS)]
199
200
    def add_agent(self, *args, **kwargs):
201
        """
202
        Add an :py:class:`ocrd_models.ocrd_agent.OcrdAgent` to the list of agents in the ``metsHdr``.
203
        """
204
        el_metsHdr = self._tree.getroot().find('.//mets:metsHdr', NS)
205
        if el_metsHdr is None:
206
            el_metsHdr = ET.Element(TAG_METS_METSHDR)
207
            self._tree.getroot().insert(0, el_metsHdr)
208
        #  assert(el_metsHdr is not None)
209
        el_agent = ET.Element(TAG_METS_AGENT)
210
        try:
211
            el_agent_last = next(el_metsHdr.iterchildren(tag=TAG_METS_AGENT, reversed=True))
212
            el_agent_last.addnext(el_agent)
213
        except StopIteration:
214
            el_metsHdr.insert(0, el_agent)
215
        return OcrdAgent(el_agent, *args, **kwargs)
216
217
    @property
218
    def file_groups(self):
219
        """
220
        List the `@USE` of all `mets:fileGrp` entries.
221
        """
222
223
        # WARNING: Actually we cannot return strings in place of elements!
224
        if self._cache_flag:
225
            return list(self._file_cache.keys())
226
227
        return [el.get('USE') for el in self._tree.getroot().findall('.//mets:fileGrp', NS)]
228
229
    def find_all_files(self, *args, **kwargs):
230
        """
231
        Like :py:meth:`find_files` but return a list of all results.
232
        Equivalent to ``list(self.find_files(...))``
233
        """
234
        return list(self.find_files(*args, **kwargs))
235
236
    # pylint: disable=multiple-statements
237
    def find_files(
238
        self,
239
        ID=None,
240
        fileGrp=None,
241
        pageId=None,
242
        mimetype=None,
243
        url=None,
244
        local_filename : Optional[str] = None,
245
        local_only=False,
246
        include_fileGrp=None,
247
        exclude_fileGrp=None,
248
    ):
249
        """
250
        Search ``mets:file`` entries in this METS document and yield results.
251
        The :py:attr:`ID`, :py:attr:`pageId`, :py:attr:`fileGrp`,
252
        :py:attr:`url` and :py:attr:`mimetype` parameters can each be either a
253
        literal string, or a regular expression if the string starts with
254
        ``//`` (double slash).
255
        If it is a regex, the leading ``//`` is removed and candidates are matched
256
        against the regex with `re.fullmatch`. If it is a literal string, comparison
257
        is done with string equality.
258
        The :py:attr:`pageId` parameter supports the numeric range operator ``..``. For
259
        example, to find all files in pages ``PHYS_0001`` to ``PHYS_0003``,
260
        ``PHYS_0001..PHYS_0003`` will be expanded to ``PHYS_0001,PHYS_0002,PHYS_0003``.
261
        Keyword Args:
262
            ID (string) : ``@ID`` of the ``mets:file``
263
            fileGrp (string) : ``@USE`` of the ``mets:fileGrp`` to list files of
264
            pageId (string) : ``@ID`` of the corresponding physical ``mets:structMap`` entry (physical page)
265
            url (string) : ``@xlink:href`` remote/original URL of ``mets:Flocat`` of ``mets:file``
266
            local_filename (string) : ``@xlink:href`` local/cached filename of ``mets:Flocat`` of ``mets:file``
267
            mimetype (string) : ``@MIMETYPE`` of ``mets:file``
268
            local (boolean) : Whether to restrict results to local files in the filesystem
269
            include_fileGrp (list[str]) : Whitelist of allowd file groups 
270
            exclude_fileGrp (list[str]) : Blacklist of disallowd file groups 
271
        Yields:
272
            :py:class:`ocrd_models:ocrd_file:OcrdFile` instantiations
273
        """
274
        pageId_list = []
275
        if pageId:
276
            # returns divs instead of strings of ids
277
            physical_pages = self.get_physical_pages(for_pageIds=pageId, return_divs=True)
278
            for div in physical_pages:
279
                if self._cache_flag:
280
                    pageId_list += self._fptr_cache[div.get('ID')]
281
                else:
282
                    pageId_list += [fptr.get('FILEID') for fptr in div.findall('mets:fptr', NS)]
283
284
        if ID and ID.startswith(REGEX_PREFIX):
285
            ID = re.compile(ID[REGEX_PREFIX_LEN:])
286
        if fileGrp and fileGrp.startswith(REGEX_PREFIX):
287
            fileGrp = re.compile(fileGrp[REGEX_PREFIX_LEN:])
288
        if mimetype and mimetype.startswith(REGEX_PREFIX):
289
            mimetype = re.compile(mimetype[REGEX_PREFIX_LEN:])
290
        if url and url.startswith(REGEX_PREFIX):
291
            url = re.compile(url[REGEX_PREFIX_LEN:])
292
293
        candidates = []
294
        if self._cache_flag:
295
            if fileGrp:
296
                if isinstance(fileGrp, str):
297
                    candidates += self._file_cache.get(fileGrp, {}).values()
298
                else:
299
                    candidates = [x for fileGrp_needle, el_file_list in self._file_cache.items() if
300
                                  fileGrp.match(fileGrp_needle) for x in el_file_list.values()]
301
            else:
302
                candidates = [el_file for id_to_file in self._file_cache.values() for el_file in id_to_file.values()]
303
        else:
304
            candidates = self._tree.getroot().xpath('//mets:file', namespaces=NS)
305
306
        for cand in candidates:
307
            if ID:
308
                if isinstance(ID, str):
309
                    if not ID == cand.get('ID'): continue
310
                else:
311
                    if not ID.fullmatch(cand.get('ID')): continue
312
313
            if pageId is not None and cand.get('ID') not in pageId_list:
314
                continue
315
316
            if not self._cache_flag and fileGrp:
317
                if isinstance(fileGrp, str):
318
                    if cand.getparent().get('USE') != fileGrp: continue
319
                else:
320
                    if not fileGrp.fullmatch(cand.getparent().get('USE')): continue
321
322
            if mimetype:
323
                if isinstance(mimetype, str):
324
                    if cand.get('MIMETYPE') != mimetype: continue
325
                else:
326
                    if not mimetype.fullmatch(cand.get('MIMETYPE') or ''): continue
327
328
            if url:
329
                cand_locat = cand.find('mets:FLocat[@LOCTYPE="URL"]', namespaces=NS)
330
                if cand_locat is None:
331
                    continue
332
                cand_url = cand_locat.get('{%s}href' % NS['xlink'])
333
                if isinstance(url, str):
334
                    if cand_url != url: continue
335
                else:
336
                    if not url.fullmatch(cand_url): continue
337
338
            if local_filename:
339
                cand_locat = cand.find('mets:FLocat[@LOCTYPE="OTHER"][@OTHERLOCTYPE="FILE"]', namespaces=NS)
340
                if cand_locat is None:
341
                    continue
342
                cand_local_filename = cand_locat.get('{%s}href' % NS['xlink'])
343
                if isinstance(local_filename, str):
344
                    if cand_local_filename != local_filename: continue
345
                else:
346
                    if not local_filename.fullmatch(cand_local_filename): continue
347
348
            if local_only:
349
                # deprecation_warning("'local_only' is deprecated, use 'local_filename=\"//.+\"' instead")
350
                is_local = cand.find('mets:FLocat[@LOCTYPE="OTHER"][@OTHERLOCTYPE="FILE"][@xlink:href]', namespaces=NS)
351
                if is_local is None:
352
                    continue
353
354
            ret = OcrdFile(cand, mets=self)
355
356
            # XXX include_fileGrp is redundant to fileGrp but for completeness
357
            if exclude_fileGrp and ret.fileGrp in exclude_fileGrp:
358
                continue
359
            if include_fileGrp and ret.fileGrp not in include_fileGrp:
360
                continue
361
362
            yield ret
363
364
    def add_file_group(self, fileGrp):
365
        """
366
        Add a new ``mets:fileGrp``.
367
        Arguments:
368
            fileGrp (string): ``@USE`` of the new ``mets:fileGrp``.
369
        """
370
        if ',' in fileGrp:
371
            raise ValueError('fileGrp must not contain commas')
372
        el_fileSec = self._tree.getroot().find('mets:fileSec', NS)
373
        if el_fileSec is None:
374
            el_fileSec = ET.SubElement(self._tree.getroot(), TAG_METS_FILESEC)
375
        el_fileGrp = el_fileSec.find('mets:fileGrp[@USE="%s"]' % fileGrp, NS)
376
        if el_fileGrp is None:
377
            el_fileGrp = ET.SubElement(el_fileSec, TAG_METS_FILEGRP)
378
            el_fileGrp.set('USE', fileGrp)
379
380
            if self._cache_flag:
381
                # Assign an empty dictionary that will hold the files of the added fileGrp
382
                self._file_cache[fileGrp] = {}
383
384
        return el_fileGrp
385
386
    def rename_file_group(self, old, new):
387
        """
388
        Rename a ``mets:fileGrp`` by changing the ``@USE`` from :py:attr:`old` to :py:attr:`new`.
389
        """
390
        el_fileGrp = self._tree.getroot().find('mets:fileSec/mets:fileGrp[@USE="%s"]' % old, NS)
391
        if el_fileGrp is None:
392
            raise FileNotFoundError("No such fileGrp '%s'" % old)
393
        el_fileGrp.set('USE', new)
394
395
        if self._cache_flag:
396
            self._file_cache[new] = self._file_cache.pop(old)
397
398
    def remove_file_group(self, USE, recursive=False, force=False):
399
        """
400
        Remove a ``mets:fileGrp`` (single fixed ``@USE`` or multiple regex ``@USE``)
401
        Arguments:
402
            USE (string): ``@USE`` of the ``mets:fileGrp`` to delete. Can be a regex if prefixed with ``//``
403
            recursive (boolean): Whether to recursively delete each ``mets:file`` in the group
404
            force (boolean): Do not raise an exception if ``mets:fileGrp`` does not exist
405
        """
406
        log = getLogger('ocrd.models.ocrd_mets.remove_file_group')
407
        el_fileSec = self._tree.getroot().find('mets:fileSec', NS)
408
        if el_fileSec is None:
409
            raise Exception("No fileSec!")
410
        if isinstance(USE, str):
411
            if USE.startswith(REGEX_PREFIX):
412
                use = re.compile(USE[REGEX_PREFIX_LEN:])
413
                for cand in el_fileSec.findall('mets:fileGrp', NS):
414
                    if use.fullmatch(cand.get('USE')):
415
                        self.remove_file_group(cand, recursive=recursive)
416
                return
417
            else:
418
                el_fileGrp = el_fileSec.find('mets:fileGrp[@USE="%s"]' % USE, NS)
419
        else:
420
            el_fileGrp = USE
421
        if el_fileGrp is None:  # pylint: disable=len-as-condition
422
            msg = "No such fileGrp: %s" % USE
423
            if force:
424
                log.warning(msg)
425
                return
426
            raise Exception(msg)
427
428
        # The cache should also be used here
429
        if self._cache_flag:
430
            files = self._file_cache.get(el_fileGrp.get('USE'), {}).values()
431
        else:
432
            files = el_fileGrp.findall('mets:file', NS)
433
434
        if files:
435
            if not recursive:
436
                raise Exception("fileGrp %s is not empty and recursive wasn't set" % USE)
437
            for f in list(files):
438
                self.remove_one_file(ID=f.get('ID'), fileGrp=f.getparent().get('USE'))
439
440
        if self._cache_flag:
441
            # Note: Since the files inside the group are removed
442
            # with the 'remove_one_file' method above, 
443
            # we should not take care of that again.
444
            # We just remove the fileGrp.
445
            del self._file_cache[el_fileGrp.get('USE')]
446
447
        el_fileGrp.getparent().remove(el_fileGrp)
448
449
    def add_file(self, fileGrp, mimetype=None, url=None, ID=None, pageId=None, force=False, local_filename=None,
450
                 ignore=False, **kwargs):
451
        """
452
        Instantiate and add a new :py:class:`ocrd_models.ocrd_file.OcrdFile`.
453
        Arguments:
454
            fileGrp (string): ``@USE`` of ``mets:fileGrp`` to add to
455
        Keyword Args:
456
            mimetype (string): ``@MIMETYPE`` of the ``mets:file`` to use
457
            url (string): ``@xlink:href`` (URL or path) of the ``mets:file`` to use
458
            ID (string): ``@ID`` of the ``mets:file`` to use
459
            pageId (string): ``@ID`` in the physical ``mets:structMap`` to link to
460
            force (boolean): Whether to add the file even if a ``mets:file`` with the same ``@ID`` already exists.
461
            ignore (boolean): Do not look for existing files at all. Shift responsibility for preventing errors from duplicate ID to the user.
462
            local_filename (string):
463
        """
464
        if not ID:
465
            raise ValueError("Must set ID of the mets:file")
466
        if not fileGrp:
467
            raise ValueError("Must set fileGrp of the mets:file")
468
        if not REGEX_FILE_ID.fullmatch(ID):
469
            raise ValueError("Invalid syntax for mets:file/@ID %s (not an xs:ID)" % ID)
470
        if not REGEX_FILE_ID.fullmatch(fileGrp):
471
            raise ValueError("Invalid syntax for mets:fileGrp/@USE %s (not an xs:ID)" % fileGrp)
472
        log = getLogger('ocrd.models.ocrd_mets.add_file')
473
474
        el_fileGrp = self.add_file_group(fileGrp)
475
        if not ignore:
476
            mets_file = next(self.find_files(ID=ID, fileGrp=fileGrp), None)
477
            if mets_file:
478
                if mets_file.fileGrp == fileGrp and \
479
                        mets_file.pageId == pageId and \
480
                        mets_file.mimetype == mimetype:
481
                    if not force:
482
                        raise FileExistsError(
483
                            f"A file with ID=={ID} already exists {mets_file} and neither force nor ignore are set")
484
                    self.remove_file(ID=ID, fileGrp=fileGrp)
485
                else:
486
                    raise FileExistsError(
487
                        f"A file with ID=={ID} already exists {mets_file} but unrelated - cannot mitigate")
488
489
        # To get rid of Python's FutureWarning - checking if v is not None
490
        kwargs = {k: v for k, v in locals().items() if
491
                  k in ['url', 'ID', 'mimetype', 'pageId', 'local_filename'] and v is not None}
492
        # This separation is needed to reuse the same el_mets_file element in the caching if block
493
        el_mets_file = ET.SubElement(el_fileGrp, TAG_METS_FILE)
494
        # The caching of the physical page is done in the OcrdFile constructor
495
        mets_file = OcrdFile(el_mets_file, mets=self, **kwargs)
496
497
        if self._cache_flag:
498
            # Add the file to the file cache
499
            self._file_cache[fileGrp].update({ID: el_mets_file})
500
501
        return mets_file
502
503
    def remove_file(self, *args, **kwargs):
504
        """
505
        Delete each ``ocrd:file`` matching the query. Same arguments as :py:meth:`find_files`
506
        """
507
        files = list(self.find_files(*args, **kwargs))
508
        if files:
509
            for f in files:
510
                self.remove_one_file(f)
511
            if len(files) > 1:
512
                return files
513
            else:
514
                return files[0]  # for backwards-compatibility
515
        if any(1 for kwarg in kwargs
516
               if isinstance(kwarg, str) and kwarg.startswith(REGEX_PREFIX)):
517
            # allow empty results if filter criteria involve a regex
518
            return []
519
        raise FileNotFoundError("File not found: %s %s" % (args, kwargs))
520
521
    def remove_one_file(self, ID, fileGrp=None):
522
        """
523
        Delete an existing :py:class:`ocrd_models.ocrd_file.OcrdFile`.
524
        Arguments:
525
            ID (string|OcrdFile): ``@ID`` of the ``mets:file`` to delete  Can also be an :py:class:`ocrd_models.ocrd_file.OcrdFile` to avoid search via ``ID``.
526
            fileGrp (string): ``@USE`` of the ``mets:fileGrp`` containing the ``mets:file``. Used only for optimization.
527
        Returns:
528
            The old :py:class:`ocrd_models.ocrd_file.OcrdFile` reference.
529
        """
530
        log = getLogger('ocrd.models.ocrd_mets.remove_one_file')
531
        log.debug("remove_one_file(%s %s)" % (ID, fileGrp))
532
        if isinstance(ID, OcrdFile):
533
            ocrd_file = ID
534
            ID = ocrd_file.ID
535
        else:
536
            ocrd_file = next(self.find_files(ID=ID, fileGrp=fileGrp), None)
537
538
        if not ocrd_file:
539
            raise FileNotFoundError("File not found: %s (fileGr=%s)" % (ID, fileGrp))
540
541
        # Delete the physical page ref
542
        fptrs = []
543
        if self._cache_flag:
544
            for page in self._fptr_cache.keys():
545
                if ID in self._fptr_cache[page]:
546
                    fptrs.append(self._fptr_cache[page][ID])
547
        else:
548
            fptrs = self._tree.getroot().findall('.//mets:fptr[@FILEID="%s"]' % ID, namespaces=NS)
549
550
        # Delete the physical page ref
551
        for fptr in fptrs:
552
            log.debug("Delete fptr element %s for page '%s'", fptr, ID)
553
            page_div = fptr.getparent()
554
            page_div.remove(fptr)
555
            # Remove the fptr from the cache as well
556
            if self._cache_flag:
557
                del self._fptr_cache[page_div.get('ID')][ID]
558
            # delete empty pages
559
            if not page_div.getchildren():
560
                log.debug("Delete empty page %s", page_div)
561
                page_div.getparent().remove(page_div)
562
                # Delete the empty pages from caches as well
563
                if self._cache_flag:
564
                    for attr in METS_PAGE_DIV_ATTRIBUTE:
565
                        if attr.name in page_div.attrib:
566
                            del self._page_cache[attr][page_div.attrib[attr.name]]
567
568
        # Delete the file reference from the cache
569
        if self._cache_flag:
570
            parent_use = ocrd_file._el.getparent().get('USE')
571
            del self._file_cache[parent_use][ocrd_file.ID]
572
573
        # Delete the file reference
574
        # pylint: disable=protected-access
575
        ocrd_file._el.getparent().remove(ocrd_file._el)
576
577
        return ocrd_file
578
579
    @property
580
    def physical_pages(self):
581
        """
582
        List all page IDs (the ``@ID`` of each physical ``mets:structMap`` ``mets:div``)
583
        """
584
        if self._cache_flag:
585
            return list(self._page_cache[METS_PAGE_DIV_ATTRIBUTE.ID].keys())
586
587
        return [str(x) for x in self._tree.getroot().xpath(
588
            'mets:structMap[@TYPE="PHYSICAL"]/mets:div[@TYPE="physSequence"]/mets:div[@TYPE="page"]/@ID',
589
            namespaces=NS)]
590
591
    def get_physical_pages(self, for_fileIds : Optional[str] = None, for_pageIds : Optional[str] = None, return_divs : bool = False):
592
        """
593
        List all page IDs (the ``@ID`` of each physical ``mets:structMap`` ``mets:div``),
594
        optionally for a subset of ``mets:file`` ``@ID`` :py:attr:`for_fileIds`,
595
        or for a subset selector expression (comma-separated, range, and/or regex) :py:attr:`for_pageIds`.
596
        If return_divs is set, returns div memory objects instead of strings of ids
597
        """
598
        if for_fileIds is None and for_pageIds is None:
599
            return self.physical_pages
600
        # log = getLogger('ocrd.models.ocrd_mets.get_physical_pages')
601
        if for_pageIds is not None:
602
            ret = []
603
            page_attr_patterns = []
604
            page_attr_patterns_raw = re.split(r',', for_pageIds)
605
            for pageId_token in page_attr_patterns_raw:
606
                if pageId_token.startswith(REGEX_PREFIX):
607
                    page_attr_patterns.append((None, re.compile(pageId_token[REGEX_PREFIX_LEN:])))
608
                elif '..' in pageId_token:
609
                    val_range = generate_range(*pageId_token.split('..', 1))
610
                    page_attr_patterns.append(val_range)
611
                else:
612
                    page_attr_patterns.append(pageId_token)
613
            if not page_attr_patterns:
614
                return []
615
            range_patterns_first_last = [(x[0], x[-1]) if isinstance(x, list) else None for x in page_attr_patterns]
616
            page_attr_patterns_copy = list(page_attr_patterns)
617
            if self._cache_flag:
618
                for pat in page_attr_patterns:
619
                    try:
620
                        attr : METS_PAGE_DIV_ATTRIBUTE
621
                        if isinstance(pat, str):
622
                            attr = next(a for a in list(METS_PAGE_DIV_ATTRIBUTE) if pat in self._page_cache[a])
623
                            cache_keys = [pat]
624
                        elif isinstance(pat, list):
625
                            attr = next(a for a in list(METS_PAGE_DIV_ATTRIBUTE) if any(x in self._page_cache[a] for x in pat))
626
                            cache_keys = [v for v in pat if v in self._page_cache[attr]]
627
                            for k in cache_keys:
628
                                pat.remove(k)
629
                        elif isinstance(pat, tuple):
630
                            _, re_pat = pat
631
                            attr = next(a for a in list(METS_PAGE_DIV_ATTRIBUTE) for v in self._page_cache[a] if re_pat.fullmatch(v))
632
                            cache_keys = [v for v in self._page_cache[attr] if re_pat.fullmatch(v)]
633
                        else:
634
                            raise ValueError
635
                        if return_divs:
636
                            ret += [self._page_cache[attr][v] for v in cache_keys]
637
                        else:
638
                            ret += [self._page_cache[attr][v].get('ID') for v in cache_keys]
639
                    except StopIteration:
640
                        raise ValueError(f"{pat} matches none of the keys of any of the _page_caches.")
641
            else:
642
                page_attr_patterns_matched = []
643
                for page in self._tree.getroot().xpath(
644
                        'mets:structMap[@TYPE="PHYSICAL"]/mets:div[@TYPE="physSequence"]/mets:div[@TYPE="page"]',
645
                        namespaces=NS):
646
                    patterns_exhausted = []
647
                    for pat_idx, pat in enumerate(page_attr_patterns):
648
                        try:
649
                            if isinstance(pat, str):
650
                                attr = next(a for a in list(METS_PAGE_DIV_ATTRIBUTE) if pat == page.get(a.name))
651
                                ret.append(page if return_divs else page.get('ID'))
652
                                patterns_exhausted.append(pat)
653
                            elif isinstance(pat, list):
654
                                if not isinstance(pat[0], METS_PAGE_DIV_ATTRIBUTE):
655
                                    pat.insert(0, next(a for a in list(METS_PAGE_DIV_ATTRIBUTE) if any(x == page.get(a.name) for x in pat)))
656
                                attr_val = page.get(pat[0].name)
657
                                if attr_val in pat:
658
                                    pat.remove(attr_val)
659
                                    ret.append(page if return_divs else page.get('ID'))
660
                                if len(pat) == 1:
661
                                    patterns_exhausted.append(pat)
662
                            elif isinstance(pat, tuple):
663
                                attr, re_pat = pat
664
                                if not attr:
665
                                    attr = next(a for a in list(METS_PAGE_DIV_ATTRIBUTE) if re_pat.fullmatch(page.get(a.name) or ''))
666
                                    page_attr_patterns[pat_idx] = (attr, re_pat)
667
                                if re_pat.fullmatch(page.get(attr.name) or ''):
668
                                    ret.append(page if return_divs else page.get('ID'))
669
                            else:
670
                                raise ValueError
671
                            page_attr_patterns_matched.append(pat)
672
                        except StopIteration:
673
                            continue
674
                    for p in patterns_exhausted:
675
                        page_attr_patterns.remove(p)
676
                unmatched = [x for x in page_attr_patterns_copy if x not in page_attr_patterns_matched]
677
                if unmatched:
678
                    raise ValueError(f"Patterns {unmatched} match none of the pages")
679
680
            ranges_without_start_match = []
681
            ranges_without_last_match = []
682
            for idx, pat in enumerate(page_attr_patterns_copy):
683
                if isinstance(pat, list):
684
                    start, last = range_patterns_first_last[idx]
685
                    if start in pat:
686
                        print(pat, start, last)
687
                        ranges_without_start_match.append(page_attr_patterns_raw[idx])
688
                    # if last in pat:
689
                    #     ranges_without_last_match.append(page_attr_patterns_raw[idx])
690
            if ranges_without_start_match:
691
                raise ValueError(f"Start of range patterns {ranges_without_start_match} not matched - invalid range")
692
            # if ranges_without_last_match:
693
            #     raise ValueError(f"End of range patterns {ranges_without_last_match} not matched - invalid range")
694
            return ret
695
696
        assert for_fileIds # at this point we know for_fileIds is set, assert to convince pyright
697
        ret = [None] * len(for_fileIds)
698
        if self._cache_flag:
699
            for pageId in self._fptr_cache.keys():
700
                for fptr in self._fptr_cache[pageId].keys():
701
                    if fptr in for_fileIds:
702
                        index = for_fileIds.index(fptr)
703
                        if return_divs:
704
                            ret[index] = self._page_cache[METS_PAGE_DIV_ATTRIBUTE.ID][pageId]
705
                        else:
706
                            ret[index] = pageId
707
        else:
708
            for page in self._tree.getroot().xpath(
709
                    'mets:structMap[@TYPE="PHYSICAL"]/mets:div[@TYPE="physSequence"]/mets:div[@TYPE="page"]',
710
                    namespaces=NS):
711
                for fptr in page.findall('mets:fptr', NS):
712
                    if fptr.get('FILEID') in for_fileIds:
713
                        index = for_fileIds.index(fptr.get('FILEID'))
714
                        if return_divs:
715
                            ret[index] = page
716
                        else:
717
                            ret[index] = page.get('ID')
718
        return ret
719
720
    def set_physical_page_for_file(self, pageId, ocrd_file, order=None, orderlabel=None):
721
        """
722
        Set the physical page ID (``@ID`` of the physical ``mets:structMap`` ``mets:div`` entry)
723
        corresponding to the ``mets:file`` :py:attr:`ocrd_file`, creating all structures if necessary.
724
        Arguments:
725
            pageId (string): ``@ID`` of the physical ``mets:structMap`` entry to use
726
            ocrd_file (object): existing :py:class:`ocrd_models.ocrd_file.OcrdFile` object
727
        Keyword Args:
728
            order (string): ``@ORDER`` to use
729
            orderlabel (string): ``@ORDERLABEL`` to use
730
        """
731
732
        # delete any page mapping for this file.ID
733
        candidates = []
734
        if self._cache_flag:
735
            for page_id in self._fptr_cache.keys():
736
                if ocrd_file.ID in self._fptr_cache[page_id].keys():
737
                    if self._fptr_cache[page_id][ocrd_file.ID] is not None:
738
                        candidates.append(self._fptr_cache[page_id][ocrd_file.ID])
739
        else:
740
            candidates = self._tree.getroot().findall(
741
                'mets:structMap[@TYPE="PHYSICAL"]/mets:div[@TYPE="physSequence"]/mets:div[@TYPE="page"]/mets:fptr[@FILEID="%s"]' %
742
                ocrd_file.ID, namespaces=NS)
743
744
        for el_fptr in candidates:
745
            if self._cache_flag:
746
                del self._fptr_cache[el_fptr.getparent().get('ID')][ocrd_file.ID]
747
            el_fptr.getparent().remove(el_fptr)
748
749
        # find/construct as necessary
750
        el_structmap = self._tree.getroot().find('mets:structMap[@TYPE="PHYSICAL"]', NS)
751
        if el_structmap is None:
752
            el_structmap = ET.SubElement(self._tree.getroot(), TAG_METS_STRUCTMAP)
753
            el_structmap.set('TYPE', 'PHYSICAL')
754
        el_seqdiv = el_structmap.find('mets:div[@TYPE="physSequence"]', NS)
755
        if el_seqdiv is None:
756
            el_seqdiv = ET.SubElement(el_structmap, TAG_METS_DIV)
757
            el_seqdiv.set('TYPE', 'physSequence')
758
759
        el_pagediv = None
760
        if self._cache_flag:
761
            if pageId in self._page_cache:
762
                el_pagediv = self._page_cache[METS_PAGE_DIV_ATTRIBUTE.ID][pageId]
763
        else:
764
            el_pagediv = el_seqdiv.find('mets:div[@ID="%s"]' % pageId, NS)
765
766
        if el_pagediv is None:
767
            el_pagediv = ET.SubElement(el_seqdiv, TAG_METS_DIV)
768
            el_pagediv.set('TYPE', 'page')
769
            el_pagediv.set('ID', pageId)
770
            if order:
771
                el_pagediv.set('ORDER', order)
772
            if orderlabel:
773
                el_pagediv.set('ORDERLABEL', orderlabel)
774
            if self._cache_flag:
775
                # Create a new entry in the page cache
776
                self._page_cache[METS_PAGE_DIV_ATTRIBUTE.ID][pageId] = el_pagediv
777
                # Create a new entry in the fptr cache and 
778
                # assign an empty dictionary to hold the fileids
779
                self._fptr_cache[pageId] = {}
780
781
        el_fptr = ET.SubElement(el_pagediv, TAG_METS_FPTR)
782
        el_fptr.set('FILEID', ocrd_file.ID)
783
784
        if self._cache_flag:
785
            # Assign the ocrd fileID to the pageId in the cache
786
            self._fptr_cache[el_pagediv.get('ID')].update({ocrd_file.ID: el_fptr})
787
788
    def update_physical_page_attributes(self, page_id, **kwargs):
789
        invalid_keys = list(k for k in kwargs.keys() if k not in METS_PAGE_DIV_ATTRIBUTE.names())
790
        if invalid_keys:
791
            raise ValueError(f"Invalid attribute {invalid_keys}. Allowed values: {METS_PAGE_DIV_ATTRIBUTE.names()}")
792
793
        page_div = self.get_physical_pages(for_pageIds=page_id, return_divs=True)
794
        if not page_div:
795
            raise ValueError(f"Could not find mets:div[@ID=={page_id}]")
796
        page_div = page_div[0]
797
798
        for k, v in kwargs.items():
799
            if not v:
800
                page_div.attrib.pop(k)
801
            else:
802
                page_div.attrib[k] = v
803
804
    def get_physical_page_for_file(self, ocrd_file):
805
        """
806
        Get the physical page ID (``@ID`` of the physical ``mets:structMap`` ``mets:div`` entry)
807
        corresponding to the ``mets:file`` :py:attr:`ocrd_file`.
808
        """
809
        ret = []
810
        if self._cache_flag:
811
            for pageId in self._fptr_cache.keys():
812
                if ocrd_file.ID in self._fptr_cache[pageId].keys():
813
                    ret.append(self._page_cache[METS_PAGE_DIV_ATTRIBUTE.ID][pageId].get('ID'))
814
        else:
815
            ret = self._tree.getroot().xpath(
816
                '/mets:mets/mets:structMap[@TYPE="PHYSICAL"]/mets:div[@TYPE="physSequence"]/mets:div[@TYPE="page"][./mets:fptr[@FILEID="%s"]]/@ID' %
817
                ocrd_file.ID, namespaces=NS)
818
819
        # To get rid of the python's FutureWarning
820
        if len(ret):
821
            return ret[0]
822
823
    def remove_physical_page(self, ID):
824
        """
825
        Delete page (physical ``mets:structMap`` ``mets:div`` entry ``@ID``) :py:attr:`ID`.
826
        """
827
        mets_div = None
828
        if self._cache_flag:
829
            if ID in self._page_cache[METS_PAGE_DIV_ATTRIBUTE.ID].keys():
830
                mets_div = [self._page_cache[METS_PAGE_DIV_ATTRIBUTE.ID][ID]]
831
        else:
832
            mets_div = self._tree.getroot().xpath(
833
                'mets:structMap[@TYPE="PHYSICAL"]/mets:div[@TYPE="physSequence"]/mets:div[@TYPE="page"][@ID="%s"]' % ID,
834
                namespaces=NS)
835
        if mets_div:
836
            mets_div_attrib = {** mets_div[0].attrib}
837
            mets_div[0].getparent().remove(mets_div[0])
838
            if self._cache_flag:
839
                for attr in METS_PAGE_DIV_ATTRIBUTE:
840
                    if attr.name in mets_div_attrib:
841
                        del self._page_cache[attr][mets_div_attrib[attr.name]]
842
                del self._fptr_cache[ID]
843
844
    def remove_physical_page_fptr(self, fileId):
845
        """
846
        Delete all ``mets:fptr[@FILEID = fileId]`` to ``mets:file[@ID == fileId]`` for :py:attr:`fileId` from all ``mets:div`` entries in the physical ``mets:structMap``.
847
        Returns:
848
            List of pageIds that mets:fptrs were deleted from
849
        """
850
851
        # Question: What is the reason to keep a list of mets_fptrs?
852
        # Do we have a situation in which the fileId is same for different pageIds ?
853
        # From the examples I have seen inside 'assets' that is not the case
854
        # and the mets_fptrs list will always contain a single element.
855
        # If that's the case then we do not need to iterate 2 loops, just one.
856
        mets_fptrs = []
857
        if self._cache_flag:
858
            for page_id in self._fptr_cache.keys():
859
                if fileId in self._fptr_cache[page_id].keys():
860
                    mets_fptrs.append(self._fptr_cache[page_id][fileId])
861
        else:
862
            mets_fptrs = self._tree.getroot().xpath(
863
                'mets:structMap[@TYPE="PHYSICAL"]/mets:div[@TYPE="physSequence"]/mets:div[@TYPE="page"]/mets:fptr[@FILEID="%s"]' % fileId,
864
                namespaces=NS)
865
        ret = []
866
        for mets_fptr in mets_fptrs:
867
            mets_div = mets_fptr.getparent()
868
            ret.append(mets_div.get('ID'))
869
            if self._cache_flag:
870
                del self._fptr_cache[mets_div.get('ID')][mets_fptr.get('FILEID')]
871
            mets_div.remove(mets_fptr)
872
        return ret
873
874
    @property
875
    def physical_pages_labels(self):
876
        """
877
        Map all page IDs (the ``@ID`` of each physical ``mets:structMap`` ``mets:div``) to their
878
        ``@ORDER``, ``@ORDERLABEL`` and ``@LABEL`` attributes, if any.
879
        """
880
        divs = self._tree.getroot().xpath(
881
            'mets:structMap[@TYPE="PHYSICAL"]/mets:div[@TYPE="physSequence"]/mets:div[@TYPE="page"]',
882
            namespaces=NS)
883
        return {div.get('ID'): (div.get('ORDER', None), div.get('ORDERLABEL', None), div.get('LABEL', None))
884
                for div in divs}
885
886
    def merge(self, other_mets, force=False, fileGrp_mapping=None, fileId_mapping=None, pageId_mapping=None,
887
              after_add_cb=None, **kwargs):
888
        """
889
        Add all files from other_mets.
890
        Accepts the same kwargs as :py:func:`find_files`
891
        Keyword Args:
892
            force (boolean): Whether to :py:meth:`add_file`s with force (overwriting existing ``mets:file``s)
893
            fileGrp_mapping (dict): Map :py:attr:`other_mets` fileGrp to fileGrp in this METS
894
            fileId_mapping (dict): Map :py:attr:`other_mets` file ID to file ID in this METS
895
            pageId_mapping (dict): Map :py:attr:`other_mets` page ID to page ID in this METS
896
            after_add_cb (function): Callback received after file is added to the METS
897
        """
898
        if not fileGrp_mapping:
899
            fileGrp_mapping = {}
900
        if not fileId_mapping:
901
            fileId_mapping = {}
902
        if not pageId_mapping:
903
            pageId_mapping = {}
904
        for f_src in other_mets.find_files(**kwargs):
905
            f_dest = self.add_file(
906
                fileGrp_mapping.get(f_src.fileGrp, f_src.fileGrp),
907
                mimetype=f_src.mimetype,
908
                url=f_src.url,
909
                local_filename=f_src.local_filename,
910
                ID=fileId_mapping.get(f_src.ID, f_src.ID),
911
                pageId=pageId_mapping.get(f_src.pageId, f_src.pageId),
912
                force=force)
913
            # FIXME: merge metsHdr, amdSec, dmdSec as well
914
            # FIXME: merge structMap logical and structLink as well
915
            if after_add_cb:
916
                after_add_cb(f_dest)
917
918