Passed
Pull Request — master (#759)
by Konstantin
01:48
created

ocrd_models.ocrd_mets.OcrdMets.add_file()   C

Complexity

Conditions 9

Size

Total Lines 40
Code Lines 24

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 24
dl 0
loc 40
rs 6.6666
c 0
b 0
f 0
cc 9
nop 10

How to fix   Many Parameters   

Many Parameters

Methods with many parameters are not only hard to understand, but their parameters also often become inconsistent when you need more, or different data.

There are several approaches to avoid long parameter lists:

1
"""
2
API to METS
3
"""
4
from datetime import datetime
5
from re import fullmatch, search
6
from lxml import etree as ET
7
8
from ocrd_utils import (
9
    is_local_filename,
10
    getLogger,
11
    generate_range,
12
    VERSION,
13
    REGEX_PREFIX,
14
    REGEX_FILE_ID
15
)
16
17
from .constants import (
18
    NAMESPACES as NS,
19
    TAG_METS_AGENT,
20
    TAG_METS_DIV,
21
    TAG_METS_FILE,
22
    TAG_METS_FILEGRP,
23
    TAG_METS_FILESEC,
24
    TAG_METS_FPTR,
25
    TAG_METS_METSHDR,
26
    TAG_METS_STRUCTMAP,
27
    IDENTIFIER_PRIORITY,
28
    TAG_MODS_IDENTIFIER,
29
    METS_XML_EMPTY,
30
)
31
32
from .ocrd_xml_base import OcrdXmlDocument, ET
33
from .ocrd_file import OcrdFile
34
from .ocrd_agent import OcrdAgent
35
36
REGEX_PREFIX_LEN = len(REGEX_PREFIX)
37
38
class OcrdMets(OcrdXmlDocument):
39
    """
40
    API to a single METS file
41
    """
42
43
    @staticmethod
44
    def empty_mets(now=None):
45
        """
46
        Create an empty METS file from bundled template.
47
        """
48
        if not now:
49
            now = datetime.now().isoformat()
50
        tpl = METS_XML_EMPTY.decode('utf-8')
51
        tpl = tpl.replace('{{ VERSION }}', VERSION)
52
        tpl = tpl.replace('{{ NOW }}', '%s' % now)
53
        return OcrdMets(content=tpl.encode('utf-8'))
54
55
    def __init__(self, **kwargs):
56
        """
57
58
        """
59
        super(OcrdMets, self).__init__(**kwargs)
60
61
    def __str__(self):
62
        """
63
        String representation
64
        """
65
        return 'OcrdMets[fileGrps=%s,files=%s]' % (self.file_groups, list(self.find_files()))
66
67
    @property
68
    def unique_identifier(self):
69
        """
70
        Get the unique identifier by looking through ``mods:identifier``
71
72
        See `specs <https://ocr-d.de/en/spec/mets#unique-id-for-the-document-processed>`_ for details.
73
        """
74
        for t in IDENTIFIER_PRIORITY:
75
            found = self._tree.getroot().find('.//mods:identifier[@type="%s"]' % t, NS)
76
            if found is not None:
77
                return found.text
78
79
    @unique_identifier.setter
80
    def unique_identifier(self, purl):
81
        """
82
        Set the unique identifier by looking through ``mods:identifier``
83
84
        See `specs <https://ocr-d.de/en/spec/mets#unique-id-for-the-document-processed>`_ for details.
85
        """
86
        id_el = None
87
        for t in IDENTIFIER_PRIORITY:
88
            id_el = self._tree.getroot().find('.//mods:identifier[@type="%s"]' % t, NS)
89
            if id_el is not None:
90
                break
91
        if id_el is None:
92
            mods = self._tree.getroot().find('.//mods:mods', NS)
93
            id_el = ET.SubElement(mods, TAG_MODS_IDENTIFIER)
94
            id_el.set('type', 'purl')
95
        id_el.text = purl
96
97
    @property
98
    def agents(self):
99
        """
100
        List all :py:class:`ocrd_models.ocrd_agent.OcrdAgent`s
101
        """
102
        return [OcrdAgent(el_agent) for el_agent in self._tree.getroot().findall('mets:metsHdr/mets:agent', NS)]
103
104
    def add_agent(self, *args, **kwargs):
105
        """
106
        Add an :py:class:`ocrd_models.ocrd_agent.OcrdAgent` to the list of agents in the ``metsHdr``.
107
        """
108
        el_metsHdr = self._tree.getroot().find('.//mets:metsHdr', NS)
109
        if el_metsHdr is None:
110
            el_metsHdr = ET.Element(TAG_METS_METSHDR)
111
            self._tree.getroot().insert(0, el_metsHdr)
112
        #  assert(el_metsHdr is not None)
113
        el_agent = ET.SubElement(el_metsHdr, TAG_METS_AGENT)
114
        #  print(ET.tostring(el_metsHdr))
115
        return OcrdAgent(el_agent, *args, **kwargs)
116
117
    @property
118
    def file_groups(self):
119
        """
120
        List the `@USE` of all `mets:fileGrp` entries.
121
        """
122
        return [el.get('USE') for el in self._tree.getroot().findall('.//mets:fileGrp', NS)]
123
124
    def find_all_files(self, *args, **kwargs):
125
        """
126
        Like :py:meth:`find_files` but return a list of all results.
127
128
        Equivalent to ``list(self.find_files(...))``
129
        """
130
        return list(self.find_files(*args, **kwargs))
131
132
    # pylint: disable=multiple-statements
133
    def find_files(self, ID=None, fileGrp=None, pageId=None, mimetype=None, url=None, local_only=False):
134
        """
135
        Search ``mets:file`` entries in this METS document and yield results.
136
137
138
        The :py:attr:`ID`, :py:attr:`fileGrp`, :py:attr:`url` and :py:attr:`mimetype`
139
        parameters can each be either a literal string, or a regular expression if
140
        the string starts with ``//`` (double slash).
141
142
        If it is a regex, the leading ``//`` is removed and candidates are matched
143
        against the regex with `re.fullmatch`. If it is a literal string, comparison
144
        is done with string equality.
145
146
        The :py:attr:`pageId` parameter supports the numeric range operator ``..``. For
147
        example, to find all files in pages ``PHYS_0001`` to ``PHYS_0003``, 
148
        ``PHYS_0001..PHYS_0003`` will be expanded to ``PHYS_0001,PHYS_0002,PHYS_0003``.
149
150
        Keyword Args:
151
            ID (string) : ``@ID`` of the ``mets:file``
152
            fileGrp (string) : ``@USE`` of the ``mets:fileGrp`` to list files of
153
            pageId (string) : ``@ID`` of the corresponding physical ``mets:structMap`` entry (physical page)
154
            url (string) : ``@xlink:href`` (URL or path) of ``mets:Flocat`` of ``mets:file``
155
            mimetype (string) : ``@MIMETYPE`` of ``mets:file``
156
            local (boolean) : Whether to restrict results to local files in the filesystem
157
158
        Yields:
159
            :py:class:`ocrd_models:ocrd_file:OcrdFile` instantiations
160
        """
161
        ret = []
162
        if pageId:
163
            if pageId.startswith(REGEX_PREFIX):
164
                raise Exception("find_files does not support regex search for pageId")
165
            pageIds, pageId = pageId.split(','), list()
166
            pageIds_expanded = []
167
            for pageId_ in pageIds:
168
                if '..' in pageId_:
169
                    pageIds_expanded += generate_range(*pageId_.split('..', 2))
170
            pageIds += pageIds_expanded
171
            for page in self._tree.getroot().xpath(
172
                '//mets:div[@TYPE="page"]', namespaces=NS):
173
                if page.get('ID') in pageIds:
174
                    pageId.extend(
175
                        [fptr.get('FILEID') for fptr in page.findall('mets:fptr', NS)])
176
        for cand in self._tree.getroot().xpath('//mets:file', namespaces=NS):
177
            if ID:
178
                if ID.startswith(REGEX_PREFIX):
179
                    if not fullmatch(ID[REGEX_PREFIX_LEN:], cand.get('ID')): continue
180
                else:
181
                    if not ID == cand.get('ID'): continue
182
183
            if pageId is not None and cand.get('ID') not in pageId:
184
                continue
185
186
            if fileGrp:
187
                if fileGrp.startswith(REGEX_PREFIX):
188
                    if not fullmatch(fileGrp[REGEX_PREFIX_LEN:], cand.getparent().get('USE')): continue
189
                else:
190
                    if cand.getparent().get('USE') != fileGrp: continue
191
192
            if mimetype:
193
                if mimetype.startswith(REGEX_PREFIX):
194
                    if not fullmatch(mimetype[REGEX_PREFIX_LEN:], cand.get('MIMETYPE') or ''): continue
195
                else:
196
                    if cand.get('MIMETYPE') != mimetype: continue
197
198
            if url:
199
                cand_locat = cand.find('mets:FLocat', namespaces=NS)
200
                if cand_locat is None:
201
                    continue
202
                cand_url = cand_locat.get('{%s}href' % NS['xlink'])
203
                if url.startswith(REGEX_PREFIX):
204
                    if not fullmatch(url[REGEX_PREFIX_LEN:], cand_url): continue
205
                else:
206
                    if cand_url != url: continue
207
208
            f = OcrdFile(cand, mets=self)
209
210
            # If only local resources should be returned and f is not a file path: skip the file
211
            if local_only and not is_local_filename(f.url):
212
                continue
213
            yield f
214
215
    def add_file_group(self, fileGrp):
216
        """
217
        Add a new ``mets:fileGrp``.
218
219
        Arguments:
220
            fileGrp (string): ``@USE`` of the new ``mets:fileGrp``.
221
        """
222
        if ',' in fileGrp:
223
            raise Exception('fileGrp must not contain commas')
224
        el_fileSec = self._tree.getroot().find('mets:fileSec', NS)
225
        if el_fileSec is None:
226
            el_fileSec = ET.SubElement(self._tree.getroot(), TAG_METS_FILESEC)
227
        el_fileGrp = el_fileSec.find('mets:fileGrp[@USE="%s"]' % fileGrp, NS)
228
        if el_fileGrp is None:
229
            el_fileGrp = ET.SubElement(el_fileSec, TAG_METS_FILEGRP)
230
            el_fileGrp.set('USE', fileGrp)
231
        return el_fileGrp
232
233
    def rename_file_group(self, old, new):
234
        """
235
        Rename a ``mets:fileGrp`` by changing the ``@USE`` from :py:attr:`old` to :py:attr:`new`.
236
        """
237
        el_fileGrp = self._tree.getroot().find('mets:fileSec/mets:fileGrp[@USE="%s"]' % old, NS)
238
        if el_fileGrp is None:
239
            raise FileNotFoundError("No such fileGrp '%s'" % old)
240
        el_fileGrp.set('USE', new)
241
242
    def remove_file_group(self, USE, recursive=False, force=False):
243
        """
244
        Remove a ``mets:fileGrp`` (single fixed ``@USE`` or multiple regex ``@USE``)
245
246
        Arguments:
247
            USE (string): ``@USE`` of the ``mets:fileGrp`` to delete. Can be a regex if prefixed with ``//``
248
            recursive (boolean): Whether to recursively delete each ``mets:file`` in the group
249
            force (boolean): Do not raise an exception if ``mets:fileGrp`` does not exist
250
        """
251
        log = getLogger('ocrd_models.ocrd_mets.remove_file_group')
252
        el_fileSec = self._tree.getroot().find('mets:fileSec', NS)
253
        if el_fileSec is None:
254
            raise Exception("No fileSec!")
255
        if isinstance(USE, str):
256
            if USE.startswith(REGEX_PREFIX):
257
                for cand in el_fileSec.findall('mets:fileGrp', NS):
258
                    if fullmatch(USE[REGEX_PREFIX_LEN:], cand.get('USE')):
259
                        self.remove_file_group(cand, recursive=recursive)
260
                return
261
            else:
262
                el_fileGrp = el_fileSec.find('mets:fileGrp[@USE="%s"]' % USE, NS)
263
        else:
264
            el_fileGrp = USE
265
        if el_fileGrp is None:   # pylint: disable=len-as-condition
266
            msg = "No such fileGrp: %s" % USE
267
            if force:
268
                log.warning(msg)
269
                return
270
            raise Exception(msg)
271
        files = el_fileGrp.findall('mets:file', NS)
272
        if files:
273
            if not recursive:
274
                raise Exception("fileGrp %s is not empty and recursive wasn't set" % USE)
275
            for f in files:
276
                self.remove_one_file(f.get('ID'))
277
        el_fileGrp.getparent().remove(el_fileGrp)
278
279
    def add_file(self, fileGrp, mimetype=None, url=None, ID=None, pageId=None, force=False, local_filename=None, ignore=False, **kwargs):
280
        """
281
        Instantiate and add a new :py:class:`ocrd_models.ocrd_file.OcrdFile`.
282
283
        Arguments:
284
            fileGrp (string): ``@USE`` of ``mets:fileGrp`` to add to
285
        Keyword Args:
286
            mimetype (string): ``@MIMETYPE`` of the ``mets:file`` to use
287
            url (string): ``@xlink:href`` (URL or path) of the ``mets:file`` to use
288
            ID (string): ``@ID`` of the ``mets:file`` to use
289
            pageId (string): ``@ID`` in the physical ``mets:structMap`` to link to
290
            force (boolean): Whether to add the file even if a ``mets:file`` with the same ``@ID`` already exists.
291
            ignore (boolean): Do not look for existing files at all. Shift responsibility for preventing errors from duplicate ID to the user.
292
            local_filename (string):
293
        """
294
        if not ID:
295
            raise ValueError("Must set ID of the mets:file")
296
        if not fileGrp:
297
            raise ValueError("Must set fileGrp of the mets:file")
298
        if not REGEX_FILE_ID.fullmatch(ID):
299
            raise ValueError("Invalid syntax for mets:file/@ID %s (not an xs:ID)" % ID)
300
        if not REGEX_FILE_ID.fullmatch(fileGrp):
301
            raise ValueError("Invalid syntax for mets:fileGrp/@USE %s (not an xs:ID)" % ID)
302
        el_fileGrp = self._tree.getroot().find(".//mets:fileGrp[@USE='%s']" % (fileGrp), NS)
303
        if el_fileGrp is None:
304
            el_fileGrp = self.add_file_group(fileGrp)
305
        mets_file = next(self.find_files(ID=ID), None)
306
        if mets_file and not ignore:
307
            if not force:
308
                raise Exception("File with ID='%s' already exists" % ID)
309
            mets_file.url = url
310
            mets_file.mimetype = mimetype
311
            mets_file.ID = ID
312
            mets_file.pageId = pageId
313
            mets_file.local_filename = local_filename
314
        else:
315
            kwargs = {k: v for k, v in locals().items() if k in ['url', 'ID', 'mimetype', 'pageId', 'local_filename'] and v}
316
            mets_file = OcrdFile(ET.SubElement(el_fileGrp, TAG_METS_FILE), mets=self, **kwargs)
317
318
        return mets_file
319
320
    def remove_file(self, *args, **kwargs):
321
        """
322
        Delete each ``ocrd:file`` matching the query. Same arguments as :py:meth:`find_files`
323
        """
324
        files = list(self.find_files(*args, **kwargs))
325
        if files:
326
            for f in files:
327
                self.remove_one_file(f)
328
            if len(files) > 1:
329
                return files
330
            else:
331
                return files[0] # for backwards-compatibility
332
        if any(1 for kwarg in kwargs
333
               if isinstance(kwarg, str) and kwarg.startswith(REGEX_PREFIX)):
334
            # allow empty results if filter criteria involve a regex
335
            return []
336
        raise FileNotFoundError("File not found: %s %s" % (args, kwargs))
337
338
    def remove_one_file(self, ID):
339
        """
340
        Delete an existing :py:class:`ocrd_models.ocrd_file.OcrdFile`.
341
        
342
        Arguments:
343
            ID (string): ``@ID`` of the ``mets:file`` to delete
344
            
345
        Returns:
346
            The old :py:class:`ocrd_models.ocrd_file.OcrdFile` reference.
347
        """
348
        log = getLogger('ocrd_models.ocrd_mets.remove_one_file')
349
        log.info("remove_one_file(%s)" % ID)
350
        if isinstance(ID, OcrdFile):
351
            ocrd_file = ID
352
            ID = ocrd_file.ID
353
        else:
354
            ocrd_file = next(self.find_files(ID=ID), None)
355
356
        if not ocrd_file:
357
            raise FileNotFoundError("File not found: %s" % ID)
358
359
        # Delete the physical page ref
360
        for fptr in self._tree.getroot().findall('.//mets:fptr[@FILEID="%s"]' % ID, namespaces=NS):
361
            log.info("Delete fptr element %s for page '%s'", fptr, ID)
362
            page_div = fptr.getparent()
363
            page_div.remove(fptr)
364
            # delete empty pages
365
            if not page_div.getchildren():
366
                log.info("Delete empty page %s", page_div)
367
                page_div.getparent().remove(page_div)
368
369
        # Delete the file reference
370
        # pylint: disable=protected-access
371
        ocrd_file._el.getparent().remove(ocrd_file._el)
372
373
        return ocrd_file
374
375
    @property
376
    def physical_pages(self):
377
        """
378
        List all page IDs (the ``@ID`` of each physical ``mets:structMap`` ``mets:div``)
379
        """
380
        return self._tree.getroot().xpath(
381
            'mets:structMap[@TYPE="PHYSICAL"]/mets:div[@TYPE="physSequence"]/mets:div[@TYPE="page"]/@ID',
382
            namespaces=NS)
383
384
    def get_physical_pages(self, for_fileIds=None):
385
        """
386
        List all page IDs (the ``@ID`` of each physical ``mets:structMap`` ``mets:div``),
387
        optionally for a subset of ``mets:file`` ``@ID`` :py:attr:`for_fileIds`.
388
        """
389
        if for_fileIds is None:
390
            return self.physical_pages
391
        ret = [None] * len(for_fileIds)
392
        for page in self._tree.getroot().xpath(
393
            'mets:structMap[@TYPE="PHYSICAL"]/mets:div[@TYPE="physSequence"]/mets:div[@TYPE="page"]',
394
                namespaces=NS):
395
            for fptr in page.findall('mets:fptr', NS):
396
                if fptr.get('FILEID') in for_fileIds:
397
                    ret[for_fileIds.index(fptr.get('FILEID'))] = page.get('ID')
398
        return ret
399
400
    def set_physical_page_for_file(self, pageId, ocrd_file, order=None, orderlabel=None):
401
        """
402
        Set the physical page ID (``@ID`` of the physical ``mets:structMap`` ``mets:div`` entry)
403
        corresponding to the ``mets:file`` :py:attr:`ocrd_file`, creating all structures if necessary.
404
        
405
        Arguments:
406
            pageId (string): ``@ID`` of the physical ``mets:structMap`` entry to use
407
            ocrd_file (object): existing :py:class:`ocrd_models.ocrd_file.OcrdFile` object
408
        Keyword Args:
409
            order (string): ``@ORDER`` to use
410
            orderlabel (string): ``@ORDERLABEL`` to use
411
        """
412
        #  print(pageId, ocrd_file)
413
        # delete any page mapping for this file.ID
414
        for el_fptr in self._tree.getroot().findall(
415
                'mets:structMap[@TYPE="PHYSICAL"]/mets:div[@TYPE="physSequence"]/mets:div[@TYPE="page"]/mets:fptr[@FILEID="%s"]' %
416
                ocrd_file.ID, namespaces=NS):
417
            el_fptr.getparent().remove(el_fptr)
418
419
        # find/construct as necessary
420
        el_structmap = self._tree.getroot().find('mets:structMap[@TYPE="PHYSICAL"]', NS)
421
        if el_structmap is None:
422
            el_structmap = ET.SubElement(self._tree.getroot(), TAG_METS_STRUCTMAP)
423
            el_structmap.set('TYPE', 'PHYSICAL')
424
        el_seqdiv = el_structmap.find('mets:div[@TYPE="physSequence"]', NS)
425
        if el_seqdiv is None:
426
            el_seqdiv = ET.SubElement(el_structmap, TAG_METS_DIV)
427
            el_seqdiv.set('TYPE', 'physSequence')
428
        el_pagediv = el_seqdiv.find('mets:div[@ID="%s"]' % pageId, NS)
429
        if el_pagediv is None:
430
            el_pagediv = ET.SubElement(el_seqdiv, TAG_METS_DIV)
431
            el_pagediv.set('TYPE', 'page')
432
            el_pagediv.set('ID', pageId)
433
            if order:
434
                el_pagediv.set('ORDER', order)
435
            if orderlabel:
436
                el_pagediv.set('ORDERLABEL', orderlabel)
437
        el_fptr = ET.SubElement(el_pagediv, TAG_METS_FPTR)
438
        el_fptr.set('FILEID', ocrd_file.ID)
439
440
    def get_physical_page_for_file(self, ocrd_file):
441
        """
442
        Get the physical page ID (``@ID`` of the physical ``mets:structMap`` ``mets:div`` entry)
443
        corresponding to the ``mets:file`` :py:attr:`ocrd_file`.
444
        """
445
        ret = self._tree.getroot().xpath(
446
            '/mets:mets/mets:structMap[@TYPE="PHYSICAL"]/mets:div[@TYPE="physSequence"]/mets:div[@TYPE="page"][./mets:fptr[@FILEID="%s"]]/@ID' %
447
            ocrd_file.ID, namespaces=NS)
448
        if ret:
449
            return ret[0]
450
451
    def remove_physical_page(self, ID):
452
        """
453
        Delete page (physical ``mets:structMap`` ``mets:div`` entry ``@ID``) :py:attr:`ID`.
454
        """
455
        mets_div = self._tree.getroot().xpath(
456
            'mets:structMap[@TYPE="PHYSICAL"]/mets:div[@TYPE="physSequence"]/mets:div[@TYPE="page"][@ID="%s"]' % ID,
457
            namespaces=NS)
458
        if mets_div:
459
            mets_div[0].getparent().remove(mets_div[0])
460
461
    def remove_physical_page_fptr(self, fileId):
462
        """
463
        Delete all ``mets:fptr[@FILEID = fileId]`` to ``mets:file[@ID == fileId]`` for :py:attr:`fileId` from all ``mets:div`` entries in the physical ``mets:structMap``.
464
        Returns:
465
            List of pageIds that mets:fptrs were deleted from
466
        """
467
        mets_fptrs = self._tree.getroot().xpath(
468
            'mets:structMap[@TYPE="PHYSICAL"]/mets:div[@TYPE="physSequence"]/mets:div[@TYPE="page"]/mets:fptr[@FILEID="%s"]' % fileId,
469
            namespaces=NS)
470
        ret = []
471
        for mets_fptr in mets_fptrs:
472
            mets_div = mets_fptr.getparent()
473
            ret.append(mets_div.get('ID'))
474
            mets_div.remove(mets_fptr)
475
        return ret
476
477
    def merge(self, other_mets, fileGrp_mapping=None, after_add_cb=None, **kwargs):
478
        """
479
        Add all files from other_mets.
480
481
        Accepts the same kwargs as :py:func:`find_files`
482
483
        Keyword Args:
484
            fileGrp_mapping (dict): Map :py:attr:`other_mets` fileGrp to fileGrp in this METS
485
            after_add_cb (function): Callback received after file is added to the METS
486
        """
487
        if not fileGrp_mapping:
488
            fileGrp_mapping = {}
489
        for f_src in other_mets.find_files(**kwargs):
490
            f_dest = self.add_file(
491
                    fileGrp_mapping.get(f_src.fileGrp, f_src.fileGrp),
492
                    mimetype=f_src.mimetype,
493
                    url=f_src.url,
494
                    ID=f_src.ID,
495
                    pageId=f_src.pageId)
496
            if after_add_cb:
497
                after_add_cb(f_dest)
498
499