Completed
Pull Request — master (#409)
by Konstantin
02:45 queued 16s
created

ocrd_models.ocrd_mets.OcrdMets.add_file()   B

Complexity

Conditions 6

Size

Total Lines 34
Code Lines 18

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 18
dl 0
loc 34
rs 8.5666
c 0
b 0
f 0
cc 6
nop 9

How to fix   Many Parameters   

Many Parameters

Methods with many parameters are not only hard to understand, but their parameters also often become inconsistent when you need more, or different data.

There are several approaches to avoid long parameter lists:

1
"""
2
API to METS
3
"""
4
from datetime import datetime
5
6
from ocrd_utils import is_local_filename, getLogger, VERSION
7
8
from .constants import (
9
    NAMESPACES as NS,
10
    TAG_METS_AGENT,
11
    TAG_METS_DIV,
12
    TAG_METS_FILE,
13
    TAG_METS_FILEGRP,
14
    TAG_METS_FILESEC,
15
    TAG_METS_FPTR,
16
    TAG_METS_METSHDR,
17
    TAG_METS_STRUCTMAP,
18
    IDENTIFIER_PRIORITY,
19
    TAG_MODS_IDENTIFIER,
20
    METS_XML_EMPTY,
21
)
22
23
from .ocrd_xml_base import OcrdXmlDocument, ET
24
from .ocrd_file import OcrdFile
25
from .ocrd_agent import OcrdAgent
26
27
log = getLogger('ocrd_models.ocrd_mets')
28
29
class OcrdMets(OcrdXmlDocument):
30
    """
31
    API to a single METS file
32
    """
33
34
    @staticmethod
35
    def empty_mets(now=None):
36
        """
37
        Create an empty METS file from bundled template.
38
        """
39
        if not now:
40
            now = datetime.now().isoformat()
41
        tpl = METS_XML_EMPTY.decode('utf-8')
42
        tpl = tpl.replace('{{ VERSION }}', VERSION)
43
        tpl = tpl.replace('{{ NOW }}', '%s' % now)
44
        return OcrdMets(content=tpl.encode('utf-8'))
45
46
    def __init__(self, file_by_id=None, **kwargs):
47
        """
48
49
        Arguments:
50
            file_by_id (dict): Cache mapping file ID to OcrdFile
51
        """
52
        super(OcrdMets, self).__init__(**kwargs)
53
        if file_by_id is None:
54
            file_by_id = {}
55
        self._file_by_id = file_by_id
56
57
    def __str__(self):
58
        """
59
        String representation
60
        """
61
        return 'OcrdMets[fileGrps=%s,files=%s]' % (self.file_groups, self.find_files())
62
63
    @property
64
    def unique_identifier(self):
65
        """
66
        Get the unique identifier by looking through ``mods:identifier``
67
68
        See `specs <https://ocr-d.github.io/mets#unique-id-for-the-document-processed>`_ for details.
69
        """
70
        for t in IDENTIFIER_PRIORITY:
71
            found = self._tree.getroot().find('.//mods:identifier[@type="%s"]' % t, NS)
72
            if found is not None:
73
                return found.text
74
75
    @unique_identifier.setter
76
    def unique_identifier(self, purl):
77
        """
78
        Set the unique identifier by looking through ``mods:identifier``
79
80
        See `specs <https://ocr-d.github.io/mets#unique-id-for-the-document-processed>`_ for details.
81
        """
82
        id_el = None
83
        for t in IDENTIFIER_PRIORITY:
84
            id_el = self._tree.getroot().find('.//mods:identifier[@type="%s"]' % t, NS)
85
            if id_el is not None:
86
                break
87
        if id_el is None:
88
            mods = self._tree.getroot().find('.//mods:mods', NS)
89
            id_el = ET.SubElement(mods, TAG_MODS_IDENTIFIER)
90
            id_el.set('type', 'purl')
91
        id_el.text = purl
92
93
    @property
94
    def agents(self):
95
        """
96
        List all `OcrdAgent </../../ocrd_models/ocrd_models.ocrd_agent.html>`_
97
        """
98
        return [OcrdAgent(el_agent) for el_agent in self._tree.getroot().findall('mets:metsHdr/mets:agent', NS)]
99
100
    def add_agent(self, *args, **kwargs):
101
        """
102
        Add an `OcrdAgent </../../ocrd_models/ocrd_models.ocrd_agent.html>`_ to the list of agents in the metsHdr.
103
        """
104
        el_metsHdr = self._tree.getroot().find('.//mets:metsHdr', NS)
105
        if el_metsHdr is None:
106
            el_metsHdr = ET.Element(TAG_METS_METSHDR)
107
            self._tree.getroot().insert(0, el_metsHdr)
108
        #  assert(el_metsHdr is not None)
109
        el_agent = ET.SubElement(el_metsHdr, TAG_METS_AGENT)
110
        #  print(ET.tostring(el_metsHdr))
111
        return OcrdAgent(el_agent, *args, **kwargs)
112
113
    @property
114
    def file_groups(self):
115
        """
116
        List the ``USE`` attributes of all ``mets:fileGrp``.
117
        """
118
        return [el.get('USE') for el in self._tree.getroot().findall('.//mets:fileGrp', NS)]
119
120
    def find_files(self, ID=None, fileGrp=None, pageId=None, mimetype=None, url=None, local_only=False):
121
        """
122
        Search ``mets:file`` in this METS document.
123
        Args:
124
            ID (string) : ID of the file
125
            fileGrp (string) : USE of the fileGrp to list files of
126
            pageId (string) : ID of physical page manifested by matching files
127
            url (string) : @xlink:href of mets:Flocat of mets:file
128
            mimetype (string) : MIMETYPE of matching files
129
            local (boolean) : Whether to restrict results to local files
130
131
        Return:
132
            List of files.
133
        """
134
        ret = []
135
        fileGrp_clause = '' if fileGrp is None else '[@USE="%s"]' % fileGrp
136
        file_clause = ''
137
        if ID is not None:
138
            file_clause += '[@ID="%s"]' % ID
139
        if mimetype is not None:
140
            file_clause += '[@MIMETYPE="%s"]' % mimetype
141
        if url is not None:
142
            file_clause += '[mets:FLocat[@xlink:href = "%s"]]' % url
143
        # TODO lxml says invalid predicate. I disagree
144
        #  if local_only:
145
        #      file_clause += "[mets:FLocat[starts-with(@xlink:href, 'file://')]]"
146
147
        # Search
148
        file_ids = self._tree.getroot().xpath("//mets:fileGrp%s/mets:file%s/@ID" % (fileGrp_clause, file_clause), namespaces=NS)
149
        if pageId is not None:
150
            by_pageid = self._tree.getroot().xpath('//mets:div[@TYPE="page"][@ID="%s"]/mets:fptr/@FILEID' % pageId, namespaces=NS)
151
            file_ids = [i for i in by_pageid if i in file_ids]
152
153
        # instantiate / get from cache
154
        for file_id in file_ids:
155
            el = self._tree.getroot().find('.//mets:file[@ID="%s"]' % file_id, NS)
156
            if file_id not in self._file_by_id:
157
                self._file_by_id[file_id] = OcrdFile(el, mets=self)
158
159
            # If only local resources should be returned and file is not a file path: skip the file
160
            url = self._file_by_id[file_id].url
161
            if local_only and not is_local_filename(url):
162
                continue
163
            ret.append(self._file_by_id[file_id])
164
        return ret
165
166
    def add_file_group(self, fileGrp):
167
        """
168
        Add a new ``mets:fileGrp``.
169
170
        Arguments:
171
            fileGrp (string): ``USE`` attribute of the new filegroup.
172
        """
173
        if ',' in fileGrp:
174
            raise Exception('fileGrp must not contain commas')
175
        el_fileSec = self._tree.getroot().find('mets:fileSec', NS)
176
        if el_fileSec is None:
177
            el_fileSec = ET.SubElement(self._tree.getroot(), TAG_METS_FILESEC)
178
        el_fileGrp = el_fileSec.find('mets:fileGrp[@USE="%s"]' % fileGrp, NS)
179
        if el_fileGrp is None:
180
            el_fileGrp = ET.SubElement(el_fileSec, TAG_METS_FILEGRP)
181
            el_fileGrp.set('USE', fileGrp)
182
        return el_fileGrp
183
184
    def remove_file_group(self, USE, recursive=False):
185
        """
186
        Remove a fileGrp.
187
188
        Arguments:
189
            USE (string): USE attribute of the fileGrp to delete
190
            recursive (boolean): Whether to recursively delete all files in the group
191
        """
192
        el_fileSec = self._tree.getroot().find('mets:fileSec', NS)
193
        if el_fileSec is None:
194
            raise Exception("No fileSec!")
195
        el_fileGrp = el_fileSec.find('mets:fileGrp[@USE="%s"]' % USE, NS)
196
        if el_fileGrp is None:   # pylint: disable=len-as-condition
197
            raise Exception("No such fileGrp: %s" % USE)
198
        files = el_fileGrp.findall('mets:file', NS)
199
        if len(files) > 0:  # pylint: disable=len-as-condition
200
            if not recursive:
201
                raise Exception("fileGrp %s is not empty and recursive wasn't set" % USE)
202
            for f in files:
203
                self.remove_file(f.get('ID'))
204
        el_fileGrp.getparent().remove(el_fileGrp)
205
206
    def add_file(self, fileGrp, mimetype=None, url=None, ID=None, pageId=None, force=False, local_filename=None, **kwargs):
207
        """
208
        Add a `OcrdFile </../../ocrd_models/ocrd_models.ocrd_file.html>`_.
209
210
        Arguments:
211
            fileGrp (string): Add file to ``mets:fileGrp`` with this ``USE`` attribute
212
            mimetype (string):
213
            url (string):
214
            ID (string):
215
            pageId (string):
216
            force (boolean): Whether to add the file even if a ``mets:file`` with the same ``ID`` already exists.
217
            local_filename (string):
218
            mimetype (string):
219
        """
220
        if not ID:
221
            raise Exception("Must set ID of the mets:file")
222
        el_fileGrp = self._tree.getroot().find(".//mets:fileGrp[@USE='%s']" % (fileGrp), NS)
223
        if el_fileGrp is None:
224
            el_fileGrp = self.add_file_group(fileGrp)
225
        if ID is not None and self.find_files(ID=ID) != []:
226
            if not force:
227
                raise Exception("File with ID='%s' already exists" % ID)
228
            mets_file = self.find_files(ID=ID)[0]
229
        else:
230
            mets_file = OcrdFile(ET.SubElement(el_fileGrp, TAG_METS_FILE), mets=self)
231
        mets_file.url = url
232
        mets_file.mimetype = mimetype
233
        mets_file.ID = ID
234
        mets_file.pageId = pageId
235
        mets_file.local_filename = local_filename
236
237
        self._file_by_id[ID] = mets_file
238
239
        return mets_file
240
241
    def remove_file(self, ID):
242
        """
243
        Delete a `OcrdFile </../../ocrd_models/ocrd_models.ocrd_file.html>`_.
244
        """
245
        log.info("remove_file(%s)" % ID)
246
        ocrd_file = self.find_files(ID)
247
        if not ocrd_file:
248
            raise FileNotFoundError("File not found: %s" % ID)
249
        ocrd_file = ocrd_file[0]
250
251
        # Delete the physical page ref
252
        for fptr in self._tree.getroot().findall('.//mets:fptr[@FILEID="%s"]' % ID, namespaces=NS):
253
            log.info("Delete fptr element %s for page '%s'", fptr, ID)
254
            page_div = fptr.getparent()
255
            page_div.remove(fptr)
256
            # delete empty pages
257
            if not page_div.getchildren():
258
                log.info("Delete empty page %s", page_div)
259
                page_div.getparent().remove(page_div)
260
261
        # Delete the file reference
262
        # pylint: disable=protected-access
263
        ocrd_file._el.getparent().remove(ocrd_file._el)
264
265
        # Uncache
266
        if ID in self._file_by_id:
267
            del self._file_by_id[ID]
268
        return ocrd_file
269
270
    @property
271
    def physical_pages(self):
272
        """
273
        List all page IDs
274
        """
275
        return self._tree.getroot().xpath(
276
            'mets:structMap[@TYPE="PHYSICAL"]/mets:div[@TYPE="physSequence"]/mets:div[@TYPE="page"]/@ID',
277
            namespaces=NS)
278
279
    def set_physical_page_for_file(self, pageId, ocrd_file, order=None, orderlabel=None):
280
        """
281
        Create a new physical page
282
        """
283
        #  print(pageId, ocrd_file)
284
        # delete any page mapping for this file.ID
285
        for el_fptr in self._tree.getroot().findall(
286
                'mets:structMap[@TYPE="PHYSICAL"]/mets:div[@TYPE="physSequence"]/mets:div[@TYPE="page"]/mets:fptr[@FILEID="%s"]' %
287
                ocrd_file.ID, namespaces=NS):
288
            el_fptr.getparent().remove(el_fptr)
289
290
        # find/construct as necessary
291
        el_structmap = self._tree.getroot().find('mets:structMap[@TYPE="PHYSICAL"]', NS)
292
        if el_structmap is None:
293
            el_structmap = ET.SubElement(self._tree.getroot(), TAG_METS_STRUCTMAP)
294
            el_structmap.set('TYPE', 'PHYSICAL')
295
        el_seqdiv = el_structmap.find('mets:div[@TYPE="physSequence"]', NS)
296
        if el_seqdiv is None:
297
            el_seqdiv = ET.SubElement(el_structmap, TAG_METS_DIV)
298
            el_seqdiv.set('TYPE', 'physSequence')
299
        el_pagediv = el_seqdiv.find('mets:div[@ID="%s"]' % pageId, NS)
300
        if el_pagediv is None:
301
            el_pagediv = ET.SubElement(el_seqdiv, TAG_METS_DIV)
302
            el_pagediv.set('TYPE', 'page')
303
            el_pagediv.set('ID', pageId)
304
            if order:
305
                el_pagediv.set('ORDER', order)
306
            if orderlabel:
307
                el_pagediv.set('ORDERLABEL', orderlabel)
308
        el_fptr = ET.SubElement(el_pagediv, TAG_METS_FPTR)
309
        el_fptr.set('FILEID', ocrd_file.ID)
310
311
    def get_physical_page_for_file(self, ocrd_file):
312
        """
313
        Get the pageId for a ocrd_file
314
        """
315
        ret = self._tree.getroot().xpath(
316
            '/mets:mets/mets:structMap[@TYPE="PHYSICAL"]/mets:div[@TYPE="physSequence"]/mets:div[@TYPE="page"][./mets:fptr[@FILEID="%s"]]/@ID' %
317
            ocrd_file.ID, namespaces=NS)
318
        if ret:
319
            return ret[0]
320
321
    def remove_physical_page(self, ID):
322
        mets_div = self._tree.getroot().xpath(
323
            'mets:structMap[@TYPE="PHYSICAL"]/mets:div[@TYPE="physSequence"]/mets:div[@TYPE="page"][@ID="%s"]' % ID,
324
            namespaces=NS)
325
        if mets_div:
326
            mets_div[0].getparent().remove(mets_div[0])
327