Passed
Pull Request — master (#582)
by Konstantin
01:59
created

ocrd_models.ocrd_mets.OcrdMets.add_file()   B

Complexity

Conditions 8

Size

Total Lines 35
Code Lines 19

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 19
dl 0
loc 35
rs 7.3333
c 0
b 0
f 0
cc 8
nop 10

How to fix   Many Parameters   

Many Parameters

Methods with many parameters are not only hard to understand, but their parameters also often become inconsistent when you need more, or different data.

There are several approaches to avoid long parameter lists:

1
"""
2
API to METS
3
"""
4
from datetime import datetime
5
from re import fullmatch
6
from lxml import etree as ET
7
8
from ocrd_utils import is_local_filename, getLogger, VERSION, REGEX_PREFIX
9
10
from .constants import (
11
    NAMESPACES as NS,
12
    TAG_METS_AGENT,
13
    TAG_METS_DIV,
14
    TAG_METS_FILE,
15
    TAG_METS_FILEGRP,
16
    TAG_METS_FILESEC,
17
    TAG_METS_FPTR,
18
    TAG_METS_METSHDR,
19
    TAG_METS_STRUCTMAP,
20
    IDENTIFIER_PRIORITY,
21
    TAG_MODS_IDENTIFIER,
22
    METS_XML_EMPTY,
23
    REGEX_FILE_ID
24
)
25
26
from .ocrd_xml_base import OcrdXmlDocument, ET
27
from .ocrd_mets_filter import OcrdMetsFilter
28
from .ocrd_file import OcrdFile
29
from .ocrd_agent import OcrdAgent
30
31
log = getLogger('ocrd_models.ocrd_mets')
32
REGEX_PREFIX_LEN = len(REGEX_PREFIX)
33
34
class OcrdMets(OcrdXmlDocument):
35
    """
36
    API to a single METS file
37
    """
38
39
    @staticmethod
40
    def empty_mets(now=None):
41
        """
42
        Create an empty METS file from bundled template.
43
        """
44
        if not now:
45
            now = datetime.now().isoformat()
46
        tpl = METS_XML_EMPTY.decode('utf-8')
47
        tpl = tpl.replace('{{ VERSION }}', VERSION)
48
        tpl = tpl.replace('{{ NOW }}', '%s' % now)
49
        return OcrdMets(content=tpl.encode('utf-8'))
50
51
    def __init__(self, **kwargs):
52
        """
53
54
        """
55
        super().__init__(**kwargs)
56
57
    def __str__(self):
58
        """
59
        String representation
60
        """
61
        return 'OcrdMets[fileGrps=%s,files=%s]' % (self.file_groups, self.find_files())
62
63
    @property
64
    def unique_identifier(self):
65
        """
66
        Get the unique identifier by looking through ``mods:identifier``
67
68
        See `specs <https://ocr-d.github.io/mets#unique-id-for-the-document-processed>`_ for details.
69
        """
70
        for t in IDENTIFIER_PRIORITY:
71
            found = self.etree_find('.//mods:identifier[@type="%s"]' % t)
72
            if found is not None:
73
                return found.text
74
75
    @unique_identifier.setter
76
    def unique_identifier(self, purl):
77
        """
78
        Set the unique identifier by looking through ``mods:identifier``
79
80
        See `specs <https://ocr-d.github.io/mets#unique-id-for-the-document-processed>`_ for details.
81
        """
82
        id_el = None
83
        for t in IDENTIFIER_PRIORITY:
84
            id_el = self.etree_find('.//mods:identifier[@type="%s"]' % t)
85
            if id_el is not None:
86
                break
87
        if id_el is None:
88
            mods = self.etree_find('.//mods:mods')
89
            id_el = ET.SubElement(mods, TAG_MODS_IDENTIFIER)
90
            id_el.set('type', 'purl')
91
        id_el.text = purl
92
93
    @property
94
    def agents(self):
95
        """
96
        List all `OcrdAgent </../../ocrd_models/ocrd_models.ocrd_agent.html>`_
97
        """
98
        return [OcrdAgent(el_agent) for el_agent in self.etree_findall('mets:metsHdr/mets:agent')]
99
100
    def add_agent(self, *args, **kwargs):
101
        """
102
        Add an `OcrdAgent </../../ocrd_models/ocrd_models.ocrd_agent.html>`_ to the list of agents in the metsHdr.
103
        """
104
        el_metsHdr = self.etree_find('.//mets:metsHdr')
105
        if el_metsHdr is None:
106
            el_metsHdr = ET.Element(TAG_METS_METSHDR)
107
            self.etree_root.insert(0, el_metsHdr)
108
        #  assert(el_metsHdr is not None)
109
        el_agent = ET.SubElement(el_metsHdr, TAG_METS_AGENT)
110
        #  print(ET.tostring(el_metsHdr))
111
        return OcrdAgent(el_agent, *args, **kwargs)
112
113
    @property
114
    def file_groups(self):
115
        """
116
        List the ``USE`` attributes of all ``mets:fileGrp``.
117
        """
118
        return [el.get('USE') for el in self.etree_findall('.//mets:fileGrp')]
119
120
    def find_files(self, **kwargs):
121
        return OcrdMetsFilter(**kwargs).find_files(self)
122
123
    def add_file_group(self, fileGrp):
124
        """
125
        Add a new ``mets:fileGrp``.
126
127
        Arguments:
128
            fileGrp (string): ``USE`` attribute of the new filegroup.
129
        """
130
        if ',' in fileGrp:
131
            raise Exception('fileGrp must not contain commas')
132
        el_fileSec = self.etree_find('mets:fileSec')
133
        if el_fileSec is None:
134
            el_fileSec = ET.SubElement(self.etree_root, TAG_METS_FILESEC)
135
        el_fileGrp = el_fileSec.find('mets:fileGrp[@USE="%s"]' % fileGrp, NS)
136
        if el_fileGrp is None:
137
            el_fileGrp = ET.SubElement(el_fileSec, TAG_METS_FILEGRP)
138
            el_fileGrp.set('USE', fileGrp)
139
        return el_fileGrp
140
141
    def remove_file_group(self, USE, recursive=False):
142
        """
143
        Remove a fileGrp (fixed ``USE``) or fileGrps (regex ``USE``)
144
145
        Arguments:
146
            USE (string): USE attribute of the fileGrp to delete. Can be a regex if prefixed with //
147
            recursive (boolean): Whether to recursively delete all files in the group
148
        """
149
        el_fileSec = self.etree_find('mets:fileSec')
150
        if el_fileSec is None:
151
            raise Exception("No fileSec!")
152
        if isinstance(USE, str):
153
            if USE.startswith(REGEX_PREFIX):
154
                for cand in el_fileSec.findall('mets:fileGrp', NS):
155
                    if fullmatch(USE[REGEX_PREFIX_LEN:], cand.get('USE')):
156
                        self.remove_file_group(cand, recursive=recursive)
157
                return
158
            else:
159
                el_fileGrp = el_fileSec.find('mets:fileGrp[@USE="%s"]' % USE, NS)
160
        else:
161
            el_fileGrp = USE
162
        if el_fileGrp is None:   # pylint: disable=len-as-condition
163
            raise Exception("No such fileGrp: %s" % USE)
164
        files = el_fileGrp.findall('mets:file', NS)
165
        if files:
166
            if not recursive:
167
                raise Exception("fileGrp %s is not empty and recursive wasn't set" % USE)
168
            for f in files:
169
                self.remove_one_file(f.get('ID'))
170
        el_fileGrp.getparent().remove(el_fileGrp)
171
172
    def add_file(self, fileGrp, mimetype=None, url=None, ID=None, pageId=None, force=False, local_filename=None, ignore=False, **kwargs):
173
        """
174
        Add a `OcrdFile </../../ocrd_models/ocrd_models.ocrd_file.html>`_.
175
176
        Arguments:
177
            fileGrp (string): Add file to ``mets:fileGrp`` with this ``USE`` attribute
178
            mimetype (string):
179
            url (string):
180
            ID (string):
181
            pageId (string):
182
            force (boolean): Whether to add the file even if a ``mets:file`` with the same ``ID`` already exists.
183
            ignore (boolean): Don't look for existing files. Shift responsibility for preventing errors from duplicate ID to the user.
184
            local_filename (string):
185
            mimetype (string):
186
        """
187
        if not ID:
188
            raise Exception("Must set ID of the mets:file")
189
        if not REGEX_FILE_ID.fullmatch(ID):
190
            raise Exception("Invalid syntax for mets:file/@ID %s" % ID)
191
        el_fileGrp = self.etree_find(".//mets:fileGrp[@USE='%s']" % fileGrp)
192
        if el_fileGrp is None:
193
            el_fileGrp = self.add_file_group(fileGrp)
194
        if ID and not ignore and self.find_files(ID=ID) != []:
195
            if not force:
196
                raise Exception("File with ID='%s' already exists" % ID)
197
            mets_file = self.find_files(ID=ID)[0]
198
        else:
199
            mets_file = OcrdFile(ET.SubElement(el_fileGrp, TAG_METS_FILE), mets=self)
200
        mets_file.url = url
201
        mets_file.mimetype = mimetype
202
        mets_file.ID = ID
203
        mets_file.pageId = pageId
204
        mets_file.local_filename = local_filename
205
206
        return mets_file
207
208
    def remove_file(self, *args, **kwargs):
209
        """
210
        Delete all files matching the query. Same arguments as ``OcrdMets.find_files``
211
        """
212
        # XXX must be retained for backwards-compatibility
213
        if args:
214
            kwargs['ID'] = args[0]
215
        files = self.find_files(**kwargs)
216
        if files:
217
            for f in files:
218
                self.remove_one_file(f)
219
            if len(files) > 1:
220
                return files
221
            else:
222
                return files[0] # for backwards-compatibility
223
        raise FileNotFoundError("File not found: %s %s" % (args, kwargs))
224
225
    def remove_one_file(self, ID):
226
        """
227
        Delete a `OcrdFile </../../ocrd_models/ocrd_models.ocrd_file.html>`_.
228
        """
229
        log.info("remove_one_file(%s)" % ID)
230
        if isinstance(ID, OcrdFile):
231
            ocrd_file = ID
232
            ID = ocrd_file.ID
233
        else:
234
            ocrd_file = self.find_files(ID=ID)
235
            if ocrd_file:
236
                ocrd_file = ocrd_file[0]
237
238
        if not ocrd_file:
239
            raise FileNotFoundError("File not found: %s" % ID)
240
241
        # Delete the physical page ref
242
        for fptr in self.etree_findall('.//mets:fptr[@FILEID="%s"]' % ID):
243
            log.info("Delete fptr element %s for page '%s'", fptr, ID)
244
            page_div = fptr.getparent()
245
            page_div.remove(fptr)
246
            # delete empty pages
247
            if not page_div.getchildren():
248
                log.info("Delete empty page %s", page_div)
249
                page_div.getparent().remove(page_div)
250
251
        # Delete the file reference
252
        # pylint: disable=protected-access
253
        ocrd_file._el.getparent().remove(ocrd_file._el)
254
255
        return ocrd_file
256
257
    @property
258
    def physical_pages(self):
259
        """
260
        List all page IDs
261
        """
262
        return self.etree_xpath('mets:structMap[@TYPE="PHYSICAL"]/mets:div[@TYPE="physSequence"]/mets:div[@TYPE="page"]/@ID')
263
264
    def get_physical_pages(self, for_fileIds=None):
265
        """
266
        List all page IDs (optionally for a subset of file IDs)
267
        """
268
        if for_fileIds is None:
269
            return self.physical_pages
270
        ret = [None] * len(for_fileIds)
271
        for page in self.etree_xpath(
272
            'mets:structMap[@TYPE="PHYSICAL"]/mets:div[@TYPE="physSequence"]/mets:div[@TYPE="page"]'):
273
            for fptr in page.findall('mets:fptr', NS):
274
                if fptr.get('FILEID') in for_fileIds:
275
                    ret[for_fileIds.index(fptr.get('FILEID'))] = page.get('ID')
276
        return ret
277
278
    def set_physical_page_for_file(self, pageId, ocrd_file, order=None, orderlabel=None):
279
        """
280
        Create a new physical page
281
        """
282
        #  print(pageId, ocrd_file)
283
        # delete any page mapping for this file.ID
284
        for el_fptr in self.etree_findall(
285
                'mets:structMap[@TYPE="PHYSICAL"]/mets:div[@TYPE="physSequence"]/mets:div[@TYPE="page"]/mets:fptr[@FILEID="%s"]' %
286
                ocrd_file.ID):
287
            el_fptr.getparent().remove(el_fptr)
288
289
        # find/construct as necessary
290
        el_structmap = self.etree_find('mets:structMap[@TYPE="PHYSICAL"]')
291
        if el_structmap is None:
292
            el_structmap = ET.SubElement(self.etree_root, TAG_METS_STRUCTMAP)
293
            el_structmap.set('TYPE', 'PHYSICAL')
294
        el_seqdiv = el_structmap.find('mets:div[@TYPE="physSequence"]', NS)
295
        if el_seqdiv is None:
296
            el_seqdiv = ET.SubElement(el_structmap, TAG_METS_DIV)
297
            el_seqdiv.set('TYPE', 'physSequence')
298
        el_pagediv = el_seqdiv.find('mets:div[@ID="%s"]' % pageId, NS)
299
        if el_pagediv is None:
300
            el_pagediv = ET.SubElement(el_seqdiv, TAG_METS_DIV)
301
            el_pagediv.set('TYPE', 'page')
302
            el_pagediv.set('ID', pageId)
303
            if order:
304
                el_pagediv.set('ORDER', order)
305
            if orderlabel:
306
                el_pagediv.set('ORDERLABEL', orderlabel)
307
        el_fptr = ET.SubElement(el_pagediv, TAG_METS_FPTR)
308
        el_fptr.set('FILEID', ocrd_file.ID)
309
310
    def get_physical_page_for_file(self, ocrd_file):
311
        """
312
        Get the pageId for a ocrd_file
313
        """
314
        ret = self.etree_xpath(
315
            '/mets:mets/mets:structMap[@TYPE="PHYSICAL"]/mets:div[@TYPE="physSequence"]/mets:div[@TYPE="page"][./mets:fptr[@FILEID="%s"]]/@ID' %
316
            ocrd_file.ID)
317
        if ret:
318
            return ret[0]
319
320
    def remove_physical_page(self, ID):
321
        mets_div = self.etree_xpath(
322
            'mets:structMap[@TYPE="PHYSICAL"]/mets:div[@TYPE="physSequence"]/mets:div[@TYPE="page"][@ID="%s"]' % ID)
323
        if mets_div:
324
            mets_div[0].getparent().remove(mets_div[0])
325