Passed
Pull Request — master (#582)
by Konstantin
01:59
created

ocrd_models.ocrd_mets.OcrdMets.remove_file()   A

Complexity

Conditions 5

Size

Total Lines 16
Code Lines 11

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 11
dl 0
loc 16
rs 9.3333
c 0
b 0
f 0
cc 5
nop 3
1
"""
2
API to METS
3
"""
4
from datetime import datetime
5
from re import fullmatch
6
from lxml import etree as ET
7
8
from ocrd_utils import is_local_filename, getLogger, VERSION, REGEX_PREFIX
9
10
from .constants import (
11
    NAMESPACES as NS,
12
    TAG_METS_AGENT,
13
    TAG_METS_DIV,
14
    TAG_METS_FILE,
15
    TAG_METS_FILEGRP,
16
    TAG_METS_FILESEC,
17
    TAG_METS_FPTR,
18
    TAG_METS_METSHDR,
19
    TAG_METS_STRUCTMAP,
20
    IDENTIFIER_PRIORITY,
21
    TAG_MODS_IDENTIFIER,
22
    METS_XML_EMPTY,
23
    REGEX_FILE_ID
24
)
25
26
from .ocrd_xml_base import OcrdXmlDocument, ET
27
from .ocrd_mets_filter import OcrdMetsFilter
28
from .ocrd_file import OcrdFile
29
from .ocrd_agent import OcrdAgent
30
31
log = getLogger('ocrd_models.ocrd_mets')
32
REGEX_PREFIX_LEN = len(REGEX_PREFIX)
33
34
class OcrdMets(OcrdXmlDocument):
35
    """
36
    API to a single METS file
37
    """
38
39
    @staticmethod
40
    def empty_mets(now=None):
41
        """
42
        Create an empty METS file from bundled template.
43
        """
44
        if not now:
45
            now = datetime.now().isoformat()
46
        tpl = METS_XML_EMPTY.decode('utf-8')
47
        tpl = tpl.replace('{{ VERSION }}', VERSION)
48
        tpl = tpl.replace('{{ NOW }}', '%s' % now)
49
        return OcrdMets(content=tpl.encode('utf-8'))
50
51
    def __init__(self, **kwargs):
52
        """
53
54
        """
55
        super().__init__(**kwargs)
56
57
    def __str__(self):
58
        """
59
        String representation
60
        """
61
        return 'OcrdMets[fileGrps=%s,files=%s]' % (self.file_groups, self.find_files())
62
63
    @property
64
    def unique_identifier(self):
65
        """
66
        Get the unique identifier by looking through ``mods:identifier``
67
68
        See `specs <https://ocr-d.github.io/mets#unique-id-for-the-document-processed>`_ for details.
69
        """
70
        for t in IDENTIFIER_PRIORITY:
71
            found = self.etree_find('.//mods:identifier[@type="%s"]' % t)
72
            if found is not None:
73
                return found.text
74
75
    @unique_identifier.setter
76
    def unique_identifier(self, purl):
77
        """
78
        Set the unique identifier by looking through ``mods:identifier``
79
80
        See `specs <https://ocr-d.github.io/mets#unique-id-for-the-document-processed>`_ for details.
81
        """
82
        id_el = None
83
        for t in IDENTIFIER_PRIORITY:
84
            id_el = self.etree_find('.//mods:identifier[@type="%s"]' % t)
85
            if id_el is not None:
86
                break
87
        if id_el is None:
88
            mods = self.etree_find('.//mods:mods')
89
            id_el = ET.SubElement(mods, TAG_MODS_IDENTIFIER)
90
            id_el.set('type', 'purl')
91
        id_el.text = purl
92
93
    @property
94
    def agents(self):
95
        """
96
        List all `OcrdAgent </../../ocrd_models/ocrd_models.ocrd_agent.html>`_
97
        """
98
        return [OcrdAgent(el_agent) for el_agent in self.etree_findall('mets:metsHdr/mets:agent')]
99
100
    def add_agent(self, *args, **kwargs):
101
        """
102
        Add an `OcrdAgent </../../ocrd_models/ocrd_models.ocrd_agent.html>`_ to the list of agents in the metsHdr.
103
        """
104
        el_metsHdr = self.etree_find('.//mets:metsHdr')
105
        if el_metsHdr is None:
106
            el_metsHdr = ET.Element(TAG_METS_METSHDR)
107
            self.etree_root.insert(0, el_metsHdr)
108
        #  assert(el_metsHdr is not None)
109
        el_agent = ET.SubElement(el_metsHdr, TAG_METS_AGENT)
110
        #  print(ET.tostring(el_metsHdr))
111
        return OcrdAgent(el_agent, *args, **kwargs)
112
113
    @property
114
    def file_groups(self):
115
        """
116
        List the ``USE`` attributes of all ``mets:fileGrp``.
117
        """
118
        return [el.get('USE') for el in self.etree_findall('.//mets:fileGrp')]
119
120
    def find_files(self, **kwargs):
121
        return OcrdMetsFilter(**kwargs).find_files(self)
122
123
    def add_file_group(self, fileGrp):
124
        """
125
        Add a new ``mets:fileGrp``.
126
127
        Arguments:
128
            fileGrp (string): ``USE`` attribute of the new filegroup.
129
        """
130
        if ',' in fileGrp:
131
            raise Exception('fileGrp must not contain commas')
132
        el_fileSec = self.etree_find('mets:fileSec')
133
        if el_fileSec is None:
134
            el_fileSec = ET.SubElement(self.etree_root, TAG_METS_FILESEC)
135
        el_fileGrp = el_fileSec.find('mets:fileGrp[@USE="%s"]' % fileGrp, NS)
136
        if el_fileGrp is None:
137
            el_fileGrp = ET.SubElement(el_fileSec, TAG_METS_FILEGRP)
138
            el_fileGrp.set('USE', fileGrp)
139
        return el_fileGrp
140
141
    def remove_file_group(self, USE, recursive=False):
142
        """
143
        Remove a fileGrp (fixed ``USE``) or fileGrps (regex ``USE``)
144
145
        Arguments:
146
            USE (string): USE attribute of the fileGrp to delete. Can be a regex if prefixed with //
147
            recursive (boolean): Whether to recursively delete all files in the group
148
        """
149
        el_fileSec = self.etree_find('mets:fileSec')
150
        if el_fileSec is None:
151
            raise Exception("No fileSec!")
152
        if isinstance(USE, str):
153
            if USE.startswith(REGEX_PREFIX):
154
                for cand in el_fileSec.findall('mets:fileGrp', NS):
155
                    if fullmatch(USE[REGEX_PREFIX_LEN:], cand.get('USE')):
156
                        self.remove_file_group(cand, recursive=recursive)
157
                return
158
            else:
159
                el_fileGrp = el_fileSec.find('mets:fileGrp[@USE="%s"]' % USE, NS)
160
        else:
161
            el_fileGrp = USE
162
        if el_fileGrp is None:   # pylint: disable=len-as-condition
163
            raise Exception("No such fileGrp: %s" % USE)
164
        files = el_fileGrp.findall('mets:file', NS)
165
        if files:
166
            if not recursive:
167
                raise Exception("fileGrp %s is not empty and recursive wasn't set" % USE)
168
            for f in files:
169
                self.remove_one_file(f.get('ID'))
170
        el_fileGrp.getparent().remove(el_fileGrp)
171
172
    def add_file(self, fileGrp, mimetype=None, url=None, ID=None, pageId=None, force=False, local_filename=None, ignore=False, **kwargs):
173
        """
174
        Add a `OcrdFile </../../ocrd_models/ocrd_models.ocrd_file.html>`_.
175
176
        Arguments:
177
            fileGrp (string): Add file to ``mets:fileGrp`` with this ``USE`` attribute
178
            mimetype (string):
179
            url (string):
180
            ID (string):
181
            pageId (string):
182
            force (boolean): Whether to add the file even if a ``mets:file`` with the same ``ID`` already exists.
183
            ignore (boolean): Don't look for existing files. Shift responsibility for preventing errors from duplicate ID to the user.
184
            local_filename (string):
185
            mimetype (string):
186
        """
187
        if not ID:
188
            raise Exception("Must set ID of the mets:file")
189
        if not REGEX_FILE_ID.fullmatch(ID):
190
            raise Exception("Invalid syntax for mets:file/@ID %s" % ID)
191
        el_fileGrp = self.etree_find(".//mets:fileGrp[@USE='%s']" % fileGrp)
192
        if el_fileGrp is None:
193
            el_fileGrp = self.add_file_group(fileGrp)
194
        if ID and not ignore and self.find_files(ID=ID) != []:
195
            if not force:
196
                raise Exception("File with ID='%s' already exists" % ID)
197
            mets_file = self.find_files(ID=ID)[0]
198
        else:
199
            mets_file = OcrdFile(ET.SubElement(el_fileGrp, TAG_METS_FILE), mets=self)
200
        mets_file.url = url
201
        mets_file.mimetype = mimetype
202
        mets_file.ID = ID
203
        mets_file.pageId = pageId
204
        mets_file.local_filename = local_filename
205
206
        return mets_file
207
208
    def remove_file(self, *args, **kwargs):
209
        """
210
        Delete all files matching the query. Same arguments as ``OcrdMets.find_files``
211
        """
212
        # XXX must be retained for backwards-compatibility
213
        if args:
214
            kwargs['ID'] = args[0]
215
        files = self.find_files(**kwargs)
216
        if files:
217
            for f in files:
218
                self.remove_one_file(f)
219
            if len(files) > 1:
220
                return files
221
            else:
222
                return files[0] # for backwards-compatibility
223
        raise FileNotFoundError("File not found: %s %s" % (args, kwargs))
224
225
    def remove_one_file(self, ID):
226
        """
227
        Delete a `OcrdFile </../../ocrd_models/ocrd_models.ocrd_file.html>`_.
228
        """
229
        log.info("remove_one_file(%s)" % ID)
230
        if isinstance(ID, OcrdFile):
231
            ocrd_file = ID
232
            ID = ocrd_file.ID
233
        else:
234
            ocrd_file = self.find_files(ID=ID)
235
            if ocrd_file:
236
                ocrd_file = ocrd_file[0]
237
238
        if not ocrd_file:
239
            raise FileNotFoundError("File not found: %s" % ID)
240
241
        # Delete the physical page ref
242
        for fptr in self.etree_findall('.//mets:fptr[@FILEID="%s"]' % ID):
243
            log.info("Delete fptr element %s for page '%s'", fptr, ID)
244
            page_div = fptr.getparent()
245
            page_div.remove(fptr)
246
            # delete empty pages
247
            if not page_div.getchildren():
248
                log.info("Delete empty page %s", page_div)
249
                page_div.getparent().remove(page_div)
250
251
        # Delete the file reference
252
        # pylint: disable=protected-access
253
        ocrd_file._el.getparent().remove(ocrd_file._el)
254
255
        return ocrd_file
256
257
    @property
258
    def physical_pages(self):
259
        """
260
        List all page IDs
261
        """
262
        return self.etree_xpath('mets:structMap[@TYPE="PHYSICAL"]/mets:div[@TYPE="physSequence"]/mets:div[@TYPE="page"]/@ID')
263
264
    def get_physical_pages(self, for_fileIds=None):
265
        """
266
        List all page IDs (optionally for a subset of file IDs)
267
        """
268
        if for_fileIds is None:
269
            return self.physical_pages
270
        ret = [None] * len(for_fileIds)
271
        for page in self.etree_xpath(
272
            'mets:structMap[@TYPE="PHYSICAL"]/mets:div[@TYPE="physSequence"]/mets:div[@TYPE="page"]'):
273
            for fptr in page.findall('mets:fptr', NS):
274
                if fptr.get('FILEID') in for_fileIds:
275
                    ret[for_fileIds.index(fptr.get('FILEID'))] = page.get('ID')
276
        return ret
277
278
    def set_physical_page_for_file(self, pageId, ocrd_file, order=None, orderlabel=None):
279
        """
280
        Create a new physical page
281
        """
282
        #  print(pageId, ocrd_file)
283
        # delete any page mapping for this file.ID
284
        for el_fptr in self.etree_findall(
285
                'mets:structMap[@TYPE="PHYSICAL"]/mets:div[@TYPE="physSequence"]/mets:div[@TYPE="page"]/mets:fptr[@FILEID="%s"]' %
286
                ocrd_file.ID):
287
            el_fptr.getparent().remove(el_fptr)
288
289
        # find/construct as necessary
290
        el_structmap = self.etree_find('mets:structMap[@TYPE="PHYSICAL"]')
291
        if el_structmap is None:
292
            el_structmap = ET.SubElement(self.etree_root, TAG_METS_STRUCTMAP)
293
            el_structmap.set('TYPE', 'PHYSICAL')
294
        el_seqdiv = el_structmap.find('mets:div[@TYPE="physSequence"]', NS)
295
        if el_seqdiv is None:
296
            el_seqdiv = ET.SubElement(el_structmap, TAG_METS_DIV)
297
            el_seqdiv.set('TYPE', 'physSequence')
298
        el_pagediv = el_seqdiv.find('mets:div[@ID="%s"]' % pageId, NS)
299
        if el_pagediv is None:
300
            el_pagediv = ET.SubElement(el_seqdiv, TAG_METS_DIV)
301
            el_pagediv.set('TYPE', 'page')
302
            el_pagediv.set('ID', pageId)
303
            if order:
304
                el_pagediv.set('ORDER', order)
305
            if orderlabel:
306
                el_pagediv.set('ORDERLABEL', orderlabel)
307
        el_fptr = ET.SubElement(el_pagediv, TAG_METS_FPTR)
308
        el_fptr.set('FILEID', ocrd_file.ID)
309
310
    def get_physical_page_for_file(self, ocrd_file):
311
        """
312
        Get the pageId for a ocrd_file
313
        """
314
        ret = self.etree_xpath(
315
            '/mets:mets/mets:structMap[@TYPE="PHYSICAL"]/mets:div[@TYPE="physSequence"]/mets:div[@TYPE="page"][./mets:fptr[@FILEID="%s"]]/@ID' %
316
            ocrd_file.ID)
317
        if ret:
318
            return ret[0]
319
320
    def remove_physical_page(self, ID):
321
        mets_div = self.etree_xpath(
322
            'mets:structMap[@TYPE="PHYSICAL"]/mets:div[@TYPE="physSequence"]/mets:div[@TYPE="page"][@ID="%s"]' % ID)
323
        if mets_div:
324
            mets_div[0].getparent().remove(mets_div[0])
325