Completed
Pull Request — master (#409)
by Konstantin
02:45 queued 16s
created

ocrd_models.ocrd_mets.OcrdMets.unique_identifier()   A

Complexity

Conditions 4

Size

Total Lines 11
Code Lines 12

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 12
dl 0
loc 11
rs 9.8
c 0
b 0
f 0
cc 4
nop 2
1
"""
2
API to METS
3
"""
4
from datetime import datetime
5
6
from ocrd_utils import is_local_filename, getLogger, VERSION
7
8
from .constants import (
9
    NAMESPACES as NS,
10
    TAG_METS_AGENT,
11
    TAG_METS_DIV,
12
    TAG_METS_FILE,
13
    TAG_METS_FILEGRP,
14
    TAG_METS_FILESEC,
15
    TAG_METS_FPTR,
16
    TAG_METS_METSHDR,
17
    TAG_METS_STRUCTMAP,
18
    IDENTIFIER_PRIORITY,
19
    TAG_MODS_IDENTIFIER,
20
    METS_XML_EMPTY,
21
)
22
23
from .ocrd_xml_base import OcrdXmlDocument, ET
24
from .ocrd_file import OcrdFile
25
from .ocrd_agent import OcrdAgent
26
27
log = getLogger('ocrd_models.ocrd_mets')
28
29
class OcrdMets(OcrdXmlDocument):
30
    """
31
    API to a single METS file
32
    """
33
34
    @staticmethod
35
    def empty_mets(now=None):
36
        """
37
        Create an empty METS file from bundled template.
38
        """
39
        if not now:
40
            now = datetime.now().isoformat()
41
        tpl = METS_XML_EMPTY.decode('utf-8')
42
        tpl = tpl.replace('{{ VERSION }}', VERSION)
43
        tpl = tpl.replace('{{ NOW }}', '%s' % now)
44
        return OcrdMets(content=tpl.encode('utf-8'))
45
46
    def __init__(self, file_by_id=None, **kwargs):
47
        """
48
49
        Arguments:
50
            file_by_id (dict): Cache mapping file ID to OcrdFile
51
        """
52
        super(OcrdMets, self).__init__(**kwargs)
53
        if file_by_id is None:
54
            file_by_id = {}
55
        self._file_by_id = file_by_id
56
57
    def __str__(self):
58
        """
59
        String representation
60
        """
61
        return 'OcrdMets[fileGrps=%s,files=%s]' % (self.file_groups, self.find_files())
62
63
    @property
64
    def unique_identifier(self):
65
        """
66
        Get the unique identifier by looking through ``mods:identifier``
67
68
        See `specs <https://ocr-d.github.io/mets#unique-id-for-the-document-processed>`_ for details.
69
        """
70
        for t in IDENTIFIER_PRIORITY:
71
            found = self._tree.getroot().find('.//mods:identifier[@type="%s"]' % t, NS)
72
            if found is not None:
73
                return found.text
74
75
    @unique_identifier.setter
76
    def unique_identifier(self, purl):
77
        """
78
        Set the unique identifier by looking through ``mods:identifier``
79
80
        See `specs <https://ocr-d.github.io/mets#unique-id-for-the-document-processed>`_ for details.
81
        """
82
        id_el = None
83
        for t in IDENTIFIER_PRIORITY:
84
            id_el = self._tree.getroot().find('.//mods:identifier[@type="%s"]' % t, NS)
85
            if id_el is not None:
86
                break
87
        if id_el is None:
88
            mods = self._tree.getroot().find('.//mods:mods', NS)
89
            id_el = ET.SubElement(mods, TAG_MODS_IDENTIFIER)
90
            id_el.set('type', 'purl')
91
        id_el.text = purl
92
93
    @property
94
    def agents(self):
95
        """
96
        List all `OcrdAgent </../../ocrd_models/ocrd_models.ocrd_agent.html>`_
97
        """
98
        return [OcrdAgent(el_agent) for el_agent in self._tree.getroot().findall('mets:metsHdr/mets:agent', NS)]
99
100
    def add_agent(self, *args, **kwargs):
101
        """
102
        Add an `OcrdAgent </../../ocrd_models/ocrd_models.ocrd_agent.html>`_ to the list of agents in the metsHdr.
103
        """
104
        el_metsHdr = self._tree.getroot().find('.//mets:metsHdr', NS)
105
        if el_metsHdr is None:
106
            el_metsHdr = ET.Element(TAG_METS_METSHDR)
107
            self._tree.getroot().insert(0, el_metsHdr)
108
        #  assert(el_metsHdr is not None)
109
        el_agent = ET.SubElement(el_metsHdr, TAG_METS_AGENT)
110
        #  print(ET.tostring(el_metsHdr))
111
        return OcrdAgent(el_agent, *args, **kwargs)
112
113
    @property
114
    def file_groups(self):
115
        """
116
        List the ``USE`` attributes of all ``mets:fileGrp``.
117
        """
118
        return [el.get('USE') for el in self._tree.getroot().findall('.//mets:fileGrp', NS)]
119
120
    def find_files(self, ID=None, fileGrp=None, pageId=None, mimetype=None, url=None, local_only=False):
121
        """
122
        Search ``mets:file`` in this METS document.
123
        Args:
124
            ID (string) : ID of the file
125
            fileGrp (string) : USE of the fileGrp to list files of
126
            pageId (string) : ID of physical page manifested by matching files
127
            url (string) : @xlink:href of mets:Flocat of mets:file
128
            mimetype (string) : MIMETYPE of matching files
129
            local (boolean) : Whether to restrict results to local files
130
131
        Return:
132
            List of files.
133
        """
134
        ret = []
135
        fileGrp_clause = '' if fileGrp is None else '[@USE="%s"]' % fileGrp
136
        file_clause = ''
137
        if ID is not None:
138
            file_clause += '[@ID="%s"]' % ID
139
        if mimetype is not None:
140
            file_clause += '[@MIMETYPE="%s"]' % mimetype
141
        if url is not None:
142
            file_clause += '[mets:FLocat[@xlink:href = "%s"]]' % url
143
        # TODO lxml says invalid predicate. I disagree
144
        #  if local_only:
145
        #      file_clause += "[mets:FLocat[starts-with(@xlink:href, 'file://')]]"
146
147
        # Search
148
        file_ids = self._tree.getroot().xpath("//mets:fileGrp%s/mets:file%s/@ID" % (fileGrp_clause, file_clause), namespaces=NS)
149
        if pageId is not None:
150
            by_pageid = self._tree.getroot().xpath('//mets:div[@TYPE="page"][@ID="%s"]/mets:fptr/@FILEID' % pageId, namespaces=NS)
151
            file_ids = [i for i in by_pageid if i in file_ids]
152
153
        # instantiate / get from cache
154
        for file_id in file_ids:
155
            el = self._tree.getroot().find('.//mets:file[@ID="%s"]' % file_id, NS)
156
            if file_id not in self._file_by_id:
157
                self._file_by_id[file_id] = OcrdFile(el, mets=self)
158
159
            # If only local resources should be returned and file is not a file path: skip the file
160
            url = self._file_by_id[file_id].url
161
            if local_only and not is_local_filename(url):
162
                continue
163
            ret.append(self._file_by_id[file_id])
164
        return ret
165
166
    def add_file_group(self, fileGrp):
167
        """
168
        Add a new ``mets:fileGrp``.
169
170
        Arguments:
171
            fileGrp (string): ``USE`` attribute of the new filegroup.
172
        """
173
        if ',' in fileGrp:
174
            raise Exception('fileGrp must not contain commas')
175
        el_fileSec = self._tree.getroot().find('mets:fileSec', NS)
176
        if el_fileSec is None:
177
            el_fileSec = ET.SubElement(self._tree.getroot(), TAG_METS_FILESEC)
178
        el_fileGrp = el_fileSec.find('mets:fileGrp[@USE="%s"]' % fileGrp, NS)
179
        if el_fileGrp is None:
180
            el_fileGrp = ET.SubElement(el_fileSec, TAG_METS_FILEGRP)
181
            el_fileGrp.set('USE', fileGrp)
182
        return el_fileGrp
183
184
    def remove_file_group(self, USE, recursive=False):
185
        """
186
        Remove a fileGrp.
187
188
        Arguments:
189
            USE (string): USE attribute of the fileGrp to delete
190
            recursive (boolean): Whether to recursively delete all files in the group
191
        """
192
        el_fileSec = self._tree.getroot().find('mets:fileSec', NS)
193
        if el_fileSec is None:
194
            raise Exception("No fileSec!")
195
        el_fileGrp = el_fileSec.find('mets:fileGrp[@USE="%s"]' % USE, NS)
196
        if el_fileGrp is None:   # pylint: disable=len-as-condition
197
            raise Exception("No such fileGrp: %s" % USE)
198
        files = el_fileGrp.findall('mets:file', NS)
199
        if len(files) > 0:  # pylint: disable=len-as-condition
200
            if not recursive:
201
                raise Exception("fileGrp %s is not empty and recursive wasn't set" % USE)
202
            for f in files:
203
                self.remove_file(f.get('ID'))
204
        el_fileGrp.getparent().remove(el_fileGrp)
205
206
    def add_file(self, fileGrp, mimetype=None, url=None, ID=None, pageId=None, force=False, local_filename=None, **kwargs):
207
        """
208
        Add a `OcrdFile </../../ocrd_models/ocrd_models.ocrd_file.html>`_.
209
210
        Arguments:
211
            fileGrp (string): Add file to ``mets:fileGrp`` with this ``USE`` attribute
212
            mimetype (string):
213
            url (string):
214
            ID (string):
215
            pageId (string):
216
            force (boolean): Whether to add the file even if a ``mets:file`` with the same ``ID`` already exists.
217
            local_filename (string):
218
            mimetype (string):
219
        """
220
        if not ID:
221
            raise Exception("Must set ID of the mets:file")
222
        el_fileGrp = self._tree.getroot().find(".//mets:fileGrp[@USE='%s']" % (fileGrp), NS)
223
        if el_fileGrp is None:
224
            el_fileGrp = self.add_file_group(fileGrp)
225
        if ID is not None and self.find_files(ID=ID) != []:
226
            if not force:
227
                raise Exception("File with ID='%s' already exists" % ID)
228
            mets_file = self.find_files(ID=ID)[0]
229
        else:
230
            mets_file = OcrdFile(ET.SubElement(el_fileGrp, TAG_METS_FILE), mets=self)
231
        mets_file.url = url
232
        mets_file.mimetype = mimetype
233
        mets_file.ID = ID
234
        mets_file.pageId = pageId
235
        mets_file.local_filename = local_filename
236
237
        self._file_by_id[ID] = mets_file
238
239
        return mets_file
240
241
    def remove_file(self, ID):
242
        """
243
        Delete a `OcrdFile </../../ocrd_models/ocrd_models.ocrd_file.html>`_.
244
        """
245
        log.info("remove_file(%s)" % ID)
246
        ocrd_file = self.find_files(ID)
247
        if not ocrd_file:
248
            raise FileNotFoundError("File not found: %s" % ID)
249
        ocrd_file = ocrd_file[0]
250
251
        # Delete the physical page ref
252
        for fptr in self._tree.getroot().findall('.//mets:fptr[@FILEID="%s"]' % ID, namespaces=NS):
253
            log.info("Delete fptr element %s for page '%s'", fptr, ID)
254
            page_div = fptr.getparent()
255
            page_div.remove(fptr)
256
            # delete empty pages
257
            if not page_div.getchildren():
258
                log.info("Delete empty page %s", page_div)
259
                page_div.getparent().remove(page_div)
260
261
        # Delete the file reference
262
        # pylint: disable=protected-access
263
        ocrd_file._el.getparent().remove(ocrd_file._el)
264
265
        # Uncache
266
        if ID in self._file_by_id:
267
            del self._file_by_id[ID]
268
        return ocrd_file
269
270
    @property
271
    def physical_pages(self):
272
        """
273
        List all page IDs
274
        """
275
        return self._tree.getroot().xpath(
276
            'mets:structMap[@TYPE="PHYSICAL"]/mets:div[@TYPE="physSequence"]/mets:div[@TYPE="page"]/@ID',
277
            namespaces=NS)
278
279
    def set_physical_page_for_file(self, pageId, ocrd_file, order=None, orderlabel=None):
280
        """
281
        Create a new physical page
282
        """
283
        #  print(pageId, ocrd_file)
284
        # delete any page mapping for this file.ID
285
        for el_fptr in self._tree.getroot().findall(
286
                'mets:structMap[@TYPE="PHYSICAL"]/mets:div[@TYPE="physSequence"]/mets:div[@TYPE="page"]/mets:fptr[@FILEID="%s"]' %
287
                ocrd_file.ID, namespaces=NS):
288
            el_fptr.getparent().remove(el_fptr)
289
290
        # find/construct as necessary
291
        el_structmap = self._tree.getroot().find('mets:structMap[@TYPE="PHYSICAL"]', NS)
292
        if el_structmap is None:
293
            el_structmap = ET.SubElement(self._tree.getroot(), TAG_METS_STRUCTMAP)
294
            el_structmap.set('TYPE', 'PHYSICAL')
295
        el_seqdiv = el_structmap.find('mets:div[@TYPE="physSequence"]', NS)
296
        if el_seqdiv is None:
297
            el_seqdiv = ET.SubElement(el_structmap, TAG_METS_DIV)
298
            el_seqdiv.set('TYPE', 'physSequence')
299
        el_pagediv = el_seqdiv.find('mets:div[@ID="%s"]' % pageId, NS)
300
        if el_pagediv is None:
301
            el_pagediv = ET.SubElement(el_seqdiv, TAG_METS_DIV)
302
            el_pagediv.set('TYPE', 'page')
303
            el_pagediv.set('ID', pageId)
304
            if order:
305
                el_pagediv.set('ORDER', order)
306
            if orderlabel:
307
                el_pagediv.set('ORDERLABEL', orderlabel)
308
        el_fptr = ET.SubElement(el_pagediv, TAG_METS_FPTR)
309
        el_fptr.set('FILEID', ocrd_file.ID)
310
311
    def get_physical_page_for_file(self, ocrd_file):
312
        """
313
        Get the pageId for a ocrd_file
314
        """
315
        ret = self._tree.getroot().xpath(
316
            '/mets:mets/mets:structMap[@TYPE="PHYSICAL"]/mets:div[@TYPE="physSequence"]/mets:div[@TYPE="page"][./mets:fptr[@FILEID="%s"]]/@ID' %
317
            ocrd_file.ID, namespaces=NS)
318
        if ret:
319
            return ret[0]
320
321
    def remove_physical_page(self, ID):
322
        mets_div = self._tree.getroot().xpath(
323
            'mets:structMap[@TYPE="PHYSICAL"]/mets:div[@TYPE="physSequence"]/mets:div[@TYPE="page"][@ID="%s"]' % ID,
324
            namespaces=NS)
325
        if mets_div:
326
            mets_div[0].getparent().remove(mets_div[0])
327