Passed
Push — master ( 061bae...c20a66 )
by Konstantin
03:02
created

ocrd_models.ocrd_mets.OcrdMets.add_file()   B

Complexity

Conditions 8

Size

Total Lines 38
Code Lines 22

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 22
dl 0
loc 38
rs 7.3333
c 0
b 0
f 0
cc 8
nop 10

How to fix   Many Parameters   

Many Parameters

Methods with many parameters are not only hard to understand, but their parameters also often become inconsistent when you need more, or different data.

There are several approaches to avoid long parameter lists:

1
"""
2
API to METS
3
"""
4
from datetime import datetime
5
from re import fullmatch, search
6
from lxml import etree as ET
7
8
from ocrd_utils import (
9
    is_local_filename,
10
    getLogger,
11
    generate_range,
12
    VERSION,
13
    REGEX_PREFIX,
14
    REGEX_FILE_ID
15
)
16
17
from .constants import (
18
    NAMESPACES as NS,
19
    TAG_METS_AGENT,
20
    TAG_METS_DIV,
21
    TAG_METS_FILE,
22
    TAG_METS_FILEGRP,
23
    TAG_METS_FILESEC,
24
    TAG_METS_FPTR,
25
    TAG_METS_METSHDR,
26
    TAG_METS_STRUCTMAP,
27
    IDENTIFIER_PRIORITY,
28
    TAG_MODS_IDENTIFIER,
29
    METS_XML_EMPTY,
30
)
31
32
from .ocrd_xml_base import OcrdXmlDocument, ET
33
from .ocrd_file import OcrdFile
34
from .ocrd_agent import OcrdAgent
35
36
REGEX_PREFIX_LEN = len(REGEX_PREFIX)
37
38
class OcrdMets(OcrdXmlDocument):
39
    """
40
    API to a single METS file
41
    """
42
43
    @staticmethod
44
    def empty_mets(now=None):
45
        """
46
        Create an empty METS file from bundled template.
47
        """
48
        if not now:
49
            now = datetime.now().isoformat()
50
        tpl = METS_XML_EMPTY.decode('utf-8')
51
        tpl = tpl.replace('{{ VERSION }}', VERSION)
52
        tpl = tpl.replace('{{ NOW }}', '%s' % now)
53
        return OcrdMets(content=tpl.encode('utf-8'))
54
55
    def __init__(self, **kwargs):
56
        """
57
58
        """
59
        super(OcrdMets, self).__init__(**kwargs)
60
61
    def __str__(self):
62
        """
63
        String representation
64
        """
65
        return 'OcrdMets[fileGrps=%s,files=%s]' % (self.file_groups, list(self.find_files()))
66
67
    @property
68
    def unique_identifier(self):
69
        """
70
        Get the unique identifier by looking through ``mods:identifier``
71
72
        See `specs <https://ocr-d.de/en/spec/mets#unique-id-for-the-document-processed>`_ for details.
73
        """
74
        for t in IDENTIFIER_PRIORITY:
75
            found = self._tree.getroot().find('.//mods:identifier[@type="%s"]' % t, NS)
76
            if found is not None:
77
                return found.text
78
79
    @unique_identifier.setter
80
    def unique_identifier(self, purl):
81
        """
82
        Set the unique identifier by looking through ``mods:identifier``
83
84
        See `specs <https://ocr-d.de/en/spec/mets#unique-id-for-the-document-processed>`_ for details.
85
        """
86
        id_el = None
87
        for t in IDENTIFIER_PRIORITY:
88
            id_el = self._tree.getroot().find('.//mods:identifier[@type="%s"]' % t, NS)
89
            if id_el is not None:
90
                break
91
        if id_el is None:
92
            mods = self._tree.getroot().find('.//mods:mods', NS)
93
            id_el = ET.SubElement(mods, TAG_MODS_IDENTIFIER)
94
            id_el.set('type', 'purl')
95
        id_el.text = purl
96
97
    @property
98
    def agents(self):
99
        """
100
        List all :py:class:`ocrd_models.ocrd_agent.OcrdAgent`s
101
        """
102
        return [OcrdAgent(el_agent) for el_agent in self._tree.getroot().findall('mets:metsHdr/mets:agent', NS)]
103
104
    def add_agent(self, *args, **kwargs):
105
        """
106
        Add an :py:class:`ocrd_models.ocrd_agent.OcrdAgent` to the list of agents in the ``metsHdr``.
107
        """
108
        el_metsHdr = self._tree.getroot().find('.//mets:metsHdr', NS)
109
        if el_metsHdr is None:
110
            el_metsHdr = ET.Element(TAG_METS_METSHDR)
111
            self._tree.getroot().insert(0, el_metsHdr)
112
        #  assert(el_metsHdr is not None)
113
        el_agent = ET.SubElement(el_metsHdr, TAG_METS_AGENT)
114
        #  print(ET.tostring(el_metsHdr))
115
        return OcrdAgent(el_agent, *args, **kwargs)
116
117
    @property
118
    def file_groups(self):
119
        """
120
        List the `@USE` of all `mets:fileGrp` entries.
121
        """
122
        return [el.get('USE') for el in self._tree.getroot().findall('.//mets:fileGrp', NS)]
123
124
    def find_all_files(self, *args, **kwargs):
125
        """
126
        Like :py:meth:`find_files` but return a list of all results.
127
128
        Equivalent to ``list(self.find_files(...))``
129
        """
130
        return list(self.find_files(*args, **kwargs))
131
132
    # pylint: disable=multiple-statements
133
    def find_files(self, ID=None, fileGrp=None, pageId=None, mimetype=None, url=None, local_only=False):
134
        """
135
        Search ``mets:file`` entries in this METS document and yield results.
136
137
138
        The :py:attr:`ID`, :py:attr:`fileGrp`, :py:attr:`url` and :py:attr:`mimetype`
139
        parameters can each be either a literal string, or a regular expression if
140
        the string starts with ``//`` (double slash).
141
142
        If it is a regex, the leading ``//`` is removed and candidates are matched
143
        against the regex with `re.fullmatch`. If it is a literal string, comparison
144
        is done with string equality.
145
146
        The :py:attr:`pageId` parameter supports the numeric range operator ``..``. For
147
        example, to find all files in pages ``PHYS_0001`` to ``PHYS_0003``, 
148
        ``PHYS_0001..PHYS_0003`` will be expanded to ``PHYS_0001,PHYS_0002,PHYS_0003``.
149
150
        Keyword Args:
151
            ID (string) : ``@ID`` of the ``mets:file``
152
            fileGrp (string) : ``@USE`` of the ``mets:fileGrp`` to list files of
153
            pageId (string) : ``@ID`` of the corresponding physical ``mets:structMap`` entry (physical page)
154
            url (string) : ``@xlink:href`` (URL or path) of ``mets:Flocat`` of ``mets:file``
155
            mimetype (string) : ``@MIMETYPE`` of ``mets:file``
156
            local (boolean) : Whether to restrict results to local files in the filesystem
157
158
        Yields:
159
            :py:class:`ocrd_models:ocrd_file:OcrdFile` instantiations
160
        """
161
        ret = []
162
        if pageId:
163
            if pageId.startswith(REGEX_PREFIX):
164
                raise Exception("find_files does not support regex search for pageId")
165
            pageIds, pageId = pageId.split(','), list()
166
            pageIds_expanded = []
167
            for pageId_ in pageIds:
168
                if '..' in pageId_:
169
                    pageIds_expanded += generate_range(*pageId_.split('..', 2))
170
            pageIds += pageIds_expanded
171
            for page in self._tree.getroot().xpath(
172
                '//mets:div[@TYPE="page"]', namespaces=NS):
173
                if page.get('ID') in pageIds:
174
                    pageId.extend(
175
                        [fptr.get('FILEID') for fptr in page.findall('mets:fptr', NS)])
176
        for cand in self._tree.getroot().xpath('//mets:file', namespaces=NS):
177
            if ID:
178
                if ID.startswith(REGEX_PREFIX):
179
                    if not fullmatch(ID[REGEX_PREFIX_LEN:], cand.get('ID')): continue
180
                else:
181
                    if not ID == cand.get('ID'): continue
182
183
            if pageId is not None and cand.get('ID') not in pageId:
184
                continue
185
186
            if fileGrp:
187
                if fileGrp.startswith(REGEX_PREFIX):
188
                    if not fullmatch(fileGrp[REGEX_PREFIX_LEN:], cand.getparent().get('USE')): continue
189
                else:
190
                    if cand.getparent().get('USE') != fileGrp: continue
191
192
            if mimetype:
193
                if mimetype.startswith(REGEX_PREFIX):
194
                    if not fullmatch(mimetype[REGEX_PREFIX_LEN:], cand.get('MIMETYPE') or ''): continue
195
                else:
196
                    if cand.get('MIMETYPE') != mimetype: continue
197
198
            if url:
199
                cand_locat = cand.find('mets:FLocat', namespaces=NS)
200
                if cand_locat is None:
201
                    continue
202
                cand_url = cand_locat.get('{%s}href' % NS['xlink'])
203
                if url.startswith(REGEX_PREFIX):
204
                    if not fullmatch(url[REGEX_PREFIX_LEN:], cand_url): continue
205
                else:
206
                    if cand_url != url: continue
207
208
            f = OcrdFile(cand, mets=self)
209
210
            # If only local resources should be returned and f is not a file path: skip the file
211
            if local_only and not is_local_filename(f.url):
212
                continue
213
            yield f
214
215
    def add_file_group(self, fileGrp):
216
        """
217
        Add a new ``mets:fileGrp``.
218
219
        Arguments:
220
            fileGrp (string): ``@USE`` of the new ``mets:fileGrp``.
221
        """
222
        if ',' in fileGrp:
223
            raise Exception('fileGrp must not contain commas')
224
        el_fileSec = self._tree.getroot().find('mets:fileSec', NS)
225
        if el_fileSec is None:
226
            el_fileSec = ET.SubElement(self._tree.getroot(), TAG_METS_FILESEC)
227
        el_fileGrp = el_fileSec.find('mets:fileGrp[@USE="%s"]' % fileGrp, NS)
228
        if el_fileGrp is None:
229
            el_fileGrp = ET.SubElement(el_fileSec, TAG_METS_FILEGRP)
230
            el_fileGrp.set('USE', fileGrp)
231
        return el_fileGrp
232
233
    def rename_file_group(self, old, new):
234
        """
235
        Rename a ``mets:fileGrp`` by changing the ``@USE`` from :py:attr:`old` to :py:attr:`new`.
236
        """
237
        el_fileGrp = self._tree.getroot().find('mets:fileSec/mets:fileGrp[@USE="%s"]' % old, NS)
238
        if el_fileGrp is None:
239
            raise FileNotFoundError("No such fileGrp '%s'" % old)
240
        el_fileGrp.set('USE', new)
241
242
    def remove_file_group(self, USE, recursive=False, force=False):
243
        """
244
        Remove a ``mets:fileGrp`` (single fixed ``@USE`` or multiple regex ``@USE``)
245
246
        Arguments:
247
            USE (string): ``@USE`` of the ``mets:fileGrp`` to delete. Can be a regex if prefixed with ``//``
248
            recursive (boolean): Whether to recursively delete each ``mets:file`` in the group
249
            force (boolean): Do not raise an exception if ``mets:fileGrp`` does not exist
250
        """
251
        log = getLogger('ocrd_models.ocrd_mets.remove_file_group')
252
        el_fileSec = self._tree.getroot().find('mets:fileSec', NS)
253
        if el_fileSec is None:
254
            raise Exception("No fileSec!")
255
        if isinstance(USE, str):
256
            if USE.startswith(REGEX_PREFIX):
257
                for cand in el_fileSec.findall('mets:fileGrp', NS):
258
                    if fullmatch(USE[REGEX_PREFIX_LEN:], cand.get('USE')):
259
                        self.remove_file_group(cand, recursive=recursive)
260
                return
261
            else:
262
                el_fileGrp = el_fileSec.find('mets:fileGrp[@USE="%s"]' % USE, NS)
263
        else:
264
            el_fileGrp = USE
265
        if el_fileGrp is None:   # pylint: disable=len-as-condition
266
            msg = "No such fileGrp: %s" % USE
267
            if force:
268
                log.warning(msg)
269
                return
270
            raise Exception(msg)
271
        files = el_fileGrp.findall('mets:file', NS)
272
        if files:
273
            if not recursive:
274
                raise Exception("fileGrp %s is not empty and recursive wasn't set" % USE)
275
            for f in files:
276
                self.remove_one_file(f.get('ID'))
277
        el_fileGrp.getparent().remove(el_fileGrp)
278
279
    def add_file(self, fileGrp, mimetype=None, url=None, ID=None, pageId=None, force=False, local_filename=None, ignore=False, **kwargs):
280
        """
281
        Instantiate and add a new :py:class:`ocrd_models.ocrd_file.OcrdFile`.
282
283
        Arguments:
284
            fileGrp (string): ``@USE`` of ``mets:fileGrp`` to add to
285
        Keyword Args:
286
            mimetype (string): ``@MIMETYPE`` of the ``mets:file`` to use
287
            url (string): ``@xlink:href`` (URL or path) of the ``mets:file`` to use
288
            ID (string): ``@ID`` of the ``mets:file`` to use
289
            pageId (string): ``@ID`` in the physical ``mets:structMap`` to link to
290
            force (boolean): Whether to add the file even if a ``mets:file`` with the same ``@ID`` already exists.
291
            ignore (boolean): Do not look for existing files at all. Shift responsibility for preventing errors from duplicate ID to the user.
292
            local_filename (string):
293
        """
294
        if not ID:
295
            raise ValueError("Must set ID of the mets:file")
296
        if not fileGrp:
297
            raise ValueError("Must set fileGrp of the mets:file")
298
        if not REGEX_FILE_ID.fullmatch(ID):
299
            raise ValueError("Invalid syntax for mets:file/@ID %s" % ID)
300
        el_fileGrp = self._tree.getroot().find(".//mets:fileGrp[@USE='%s']" % (fileGrp), NS)
301
        if el_fileGrp is None:
302
            el_fileGrp = self.add_file_group(fileGrp)
303
        mets_file = next(self.find_files(ID=ID), None)
304
        if mets_file and not ignore:
305
            if not force:
306
                raise Exception("File with ID='%s' already exists" % ID)
307
            mets_file.url = url
308
            mets_file.mimetype = mimetype
309
            mets_file.ID = ID
310
            mets_file.pageId = pageId
311
            mets_file.local_filename = local_filename
312
        else:
313
            kwargs = {k: v for k, v in locals().items() if k in ['url', 'ID', 'mimetype', 'pageId', 'local_filename'] and v}
314
            mets_file = OcrdFile(ET.SubElement(el_fileGrp, TAG_METS_FILE), mets=self, **kwargs)
315
316
        return mets_file
317
318
    def remove_file(self, *args, **kwargs):
319
        """
320
        Delete each ``ocrd:file`` matching the query. Same arguments as :py:meth:`find_files`
321
        """
322
        files = list(self.find_files(*args, **kwargs))
323
        if files:
324
            for f in files:
325
                self.remove_one_file(f)
326
            if len(files) > 1:
327
                return files
328
            else:
329
                return files[0] # for backwards-compatibility
330
        if any(1 for kwarg in kwargs
331
               if isinstance(kwarg, str) and kwarg.startswith(REGEX_PREFIX)):
332
            # allow empty results if filter criteria involve a regex
333
            return []
334
        raise FileNotFoundError("File not found: %s %s" % (args, kwargs))
335
336
    def remove_one_file(self, ID):
337
        """
338
        Delete an existing :py:class:`ocrd_models.ocrd_file.OcrdFile`.
339
        
340
        Arguments:
341
            ID (string): ``@ID`` of the ``mets:file`` to delete
342
            
343
        Returns:
344
            The old :py:class:`ocrd_models.ocrd_file.OcrdFile` reference.
345
        """
346
        log = getLogger('ocrd_models.ocrd_mets.remove_one_file')
347
        log.info("remove_one_file(%s)" % ID)
348
        if isinstance(ID, OcrdFile):
349
            ocrd_file = ID
350
            ID = ocrd_file.ID
351
        else:
352
            ocrd_file = next(self.find_files(ID=ID), None)
353
354
        if not ocrd_file:
355
            raise FileNotFoundError("File not found: %s" % ID)
356
357
        # Delete the physical page ref
358
        for fptr in self._tree.getroot().findall('.//mets:fptr[@FILEID="%s"]' % ID, namespaces=NS):
359
            log.info("Delete fptr element %s for page '%s'", fptr, ID)
360
            page_div = fptr.getparent()
361
            page_div.remove(fptr)
362
            # delete empty pages
363
            if not page_div.getchildren():
364
                log.info("Delete empty page %s", page_div)
365
                page_div.getparent().remove(page_div)
366
367
        # Delete the file reference
368
        # pylint: disable=protected-access
369
        ocrd_file._el.getparent().remove(ocrd_file._el)
370
371
        return ocrd_file
372
373
    @property
374
    def physical_pages(self):
375
        """
376
        List all page IDs (the ``@ID`` of each physical ``mets:structMap`` ``mets:div``)
377
        """
378
        return self._tree.getroot().xpath(
379
            'mets:structMap[@TYPE="PHYSICAL"]/mets:div[@TYPE="physSequence"]/mets:div[@TYPE="page"]/@ID',
380
            namespaces=NS)
381
382
    def get_physical_pages(self, for_fileIds=None):
383
        """
384
        List all page IDs (the ``@ID`` of each physical ``mets:structMap`` ``mets:div``),
385
        optionally for a subset of ``mets:file`` ``@ID`` :py:attr:`for_fileIds`.
386
        """
387
        if for_fileIds is None:
388
            return self.physical_pages
389
        ret = [None] * len(for_fileIds)
390
        for page in self._tree.getroot().xpath(
391
            'mets:structMap[@TYPE="PHYSICAL"]/mets:div[@TYPE="physSequence"]/mets:div[@TYPE="page"]',
392
                namespaces=NS):
393
            for fptr in page.findall('mets:fptr', NS):
394
                if fptr.get('FILEID') in for_fileIds:
395
                    ret[for_fileIds.index(fptr.get('FILEID'))] = page.get('ID')
396
        return ret
397
398
    def set_physical_page_for_file(self, pageId, ocrd_file, order=None, orderlabel=None):
399
        """
400
        Set the physical page ID (``@ID`` of the physical ``mets:structMap`` ``mets:div`` entry)
401
        corresponding to the ``mets:file`` :py:attr:`ocrd_file`, creating all structures if necessary.
402
        
403
        Arguments:
404
            pageId (string): ``@ID`` of the physical ``mets:structMap`` entry to use
405
            ocrd_file (object): existing :py:class:`ocrd_models.ocrd_file.OcrdFile` object
406
        Keyword Args:
407
            order (string): ``@ORDER`` to use
408
            orderlabel (string): ``@ORDERLABEL`` to use
409
        """
410
        #  print(pageId, ocrd_file)
411
        # delete any page mapping for this file.ID
412
        for el_fptr in self._tree.getroot().findall(
413
                'mets:structMap[@TYPE="PHYSICAL"]/mets:div[@TYPE="physSequence"]/mets:div[@TYPE="page"]/mets:fptr[@FILEID="%s"]' %
414
                ocrd_file.ID, namespaces=NS):
415
            el_fptr.getparent().remove(el_fptr)
416
417
        # find/construct as necessary
418
        el_structmap = self._tree.getroot().find('mets:structMap[@TYPE="PHYSICAL"]', NS)
419
        if el_structmap is None:
420
            el_structmap = ET.SubElement(self._tree.getroot(), TAG_METS_STRUCTMAP)
421
            el_structmap.set('TYPE', 'PHYSICAL')
422
        el_seqdiv = el_structmap.find('mets:div[@TYPE="physSequence"]', NS)
423
        if el_seqdiv is None:
424
            el_seqdiv = ET.SubElement(el_structmap, TAG_METS_DIV)
425
            el_seqdiv.set('TYPE', 'physSequence')
426
        el_pagediv = el_seqdiv.find('mets:div[@ID="%s"]' % pageId, NS)
427
        if el_pagediv is None:
428
            el_pagediv = ET.SubElement(el_seqdiv, TAG_METS_DIV)
429
            el_pagediv.set('TYPE', 'page')
430
            el_pagediv.set('ID', pageId)
431
            if order:
432
                el_pagediv.set('ORDER', order)
433
            if orderlabel:
434
                el_pagediv.set('ORDERLABEL', orderlabel)
435
        el_fptr = ET.SubElement(el_pagediv, TAG_METS_FPTR)
436
        el_fptr.set('FILEID', ocrd_file.ID)
437
438
    def get_physical_page_for_file(self, ocrd_file):
439
        """
440
        Get the physical page ID (``@ID`` of the physical ``mets:structMap`` ``mets:div`` entry)
441
        corresponding to the ``mets:file`` :py:attr:`ocrd_file`.
442
        """
443
        ret = self._tree.getroot().xpath(
444
            '/mets:mets/mets:structMap[@TYPE="PHYSICAL"]/mets:div[@TYPE="physSequence"]/mets:div[@TYPE="page"][./mets:fptr[@FILEID="%s"]]/@ID' %
445
            ocrd_file.ID, namespaces=NS)
446
        if ret:
447
            return ret[0]
448
449
    def remove_physical_page(self, ID):
450
        """
451
        Delete page (physical ``mets:structMap`` ``mets:div`` entry ``@ID``) :py:attr:`ID`.
452
        """
453
        mets_div = self._tree.getroot().xpath(
454
            'mets:structMap[@TYPE="PHYSICAL"]/mets:div[@TYPE="physSequence"]/mets:div[@TYPE="page"][@ID="%s"]' % ID,
455
            namespaces=NS)
456
        if mets_div:
457
            mets_div[0].getparent().remove(mets_div[0])
458
459
    def remove_physical_page_fptr(self, fileId):
460
        """
461
        Delete all ``mets:fptr[@FILEID = fileId]`` to ``mets:file[@ID == fileId]`` for :py:attr:`fileId` from all ``mets:div`` entries in the physical ``mets:structMap``.
462
        Returns:
463
            List of pageIds that mets:fptrs were deleted from
464
        """
465
        mets_fptrs = self._tree.getroot().xpath(
466
            'mets:structMap[@TYPE="PHYSICAL"]/mets:div[@TYPE="physSequence"]/mets:div[@TYPE="page"]/mets:fptr[@FILEID="%s"]' % fileId,
467
            namespaces=NS)
468
        ret = []
469
        for mets_fptr in mets_fptrs:
470
            mets_div = mets_fptr.getparent()
471
            ret.append(mets_div.get('ID'))
472
            mets_div.remove(mets_fptr)
473
        return ret
474
475
    def merge(self, other_mets, fileGrp_mapping=None, after_add_cb=None, **kwargs):
476
        """
477
        Add all files from other_mets.
478
479
        Accepts the same kwargs as :py:func:`find_files`
480
481
        Keyword Args:
482
            fileGrp_mapping (dict): Map :py:attr:`other_mets` fileGrp to fileGrp in this METS
483
            after_add_cb (function): Callback received after file is added to the METS
484
        """
485
        if not fileGrp_mapping:
486
            fileGrp_mapping = {}
487
        for f_src in other_mets.find_files(**kwargs):
488
            f_dest = self.add_file(
489
                    fileGrp_mapping.get(f_src.fileGrp, f_src.fileGrp),
490
                    mimetype=f_src.mimetype,
491
                    url=f_src.url,
492
                    ID=f_src.ID,
493
                    pageId=f_src.pageId)
494
            if after_add_cb:
495
                after_add_cb(f_dest)
496
497