Passed
Push — master ( 3aae01...0df3c5 )
by Konstantin
03:07
created

ocrd.workspace_bagger.WorkspaceBagger.bag()   B

Complexity

Conditions 6

Size

Total Lines 65
Code Lines 30

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 30
dl 0
loc 65
rs 8.2266
c 0
b 0
f 0
cc 6
nop 9

How to fix   Long Method    Many Parameters   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

Many Parameters

Methods with many parameters are not only hard to understand, but their parameters also often become inconsistent when you need more, or different data.

There are several approaches to avoid long parameter lists:

1
from datetime import datetime
2
from os import makedirs, chdir, walk
3
from os.path import join, isdir, basename as os_path_basename, exists, relpath
4
from pathlib import Path
5
from shutil import make_archive, rmtree, copyfile, move
6
from tempfile import mkdtemp, TemporaryDirectory
7
import re
8
import tempfile
9
import sys
10
from bagit import Bag, make_manifests, _load_tag_file, _make_tag_file, _make_tagmanifest_file  # pylint: disable=no-name-in-module
11
from distutils.dir_util import copy_tree
12
from pkg_resources import get_distribution
13
14
from ocrd_utils import (
15
    pushd_popd,
16
    getLogger,
17
    MIME_TO_EXT,
18
    is_local_filename,
19
    unzip_file_to_dir,
20
21
    MIMETYPE_PAGE,
22
    VERSION,
23
)
24
from ocrd_validators.constants import BAGIT_TXT, TMP_BAGIT_PREFIX, OCRD_BAGIT_PROFILE_URL
25
from ocrd_modelfactory import page_from_file
26
from ocrd_models.ocrd_page import to_xml
27
28
from .workspace import Workspace
29
30
tempfile.tempdir = '/tmp' # TODO hard-coded
31
32
BACKUPDIR = join('/tmp', TMP_BAGIT_PREFIX + 'backup')
33
34
class WorkspaceBagger():
35
    """
36
    Serialize/De-serialize from OCRD-ZIP to workspace and back.
37
    """
38
39
    def __init__(self, resolver, strict=False):
40
        self.resolver = resolver
41
        self.strict = strict
42
43
    def _serialize_bag(self, workspace, bagdir, dest, skip_zip):
44
        if skip_zip:
45
            move(bagdir, dest)
46
        else:
47
            make_archive(dest.replace('.zip', ''), 'zip', bagdir)
48
49
            # Remove temporary bagdir
50
            rmtree(bagdir)
51
52
    def _log_or_raise(self, msg):
53
        log = getLogger('ocrd.workspace_bagger')
54
        if self.strict:
55
            raise(Exception(msg))
56
        else:
57
            log.info(msg)
58
59
    def _bag_mets_files(self, workspace, bagdir, ocrd_mets, processes):
60
        mets = workspace.mets
61
        changed_local_filenames = {}
62
63
        log = getLogger('ocrd.workspace_bagger')
64
        # TODO allow filtering by fileGrp@USE and such
65
66
        with pushd_popd(workspace.directory):
67
            # local_filenames of the files before changing
68
            for f in mets.find_files():
69
                log.info("Handling OcrdFile %s", f)
70
71
                file_grp_dir = Path(bagdir, 'data', f.fileGrp)
72
                if not file_grp_dir.is_dir():
73
                    file_grp_dir.mkdir()
74
75
                attr = 'local_filename' if f.local_filename else 'url'
76
                basename = f.basename if f.basename else f"{f.ID}{MIME_TO_EXT.get(f.mimetype, '.xml')}"
77
                _relpath = join(f.fileGrp, basename)
78
                self.resolver.download_to_directory(file_grp_dir, getattr(f, attr), basename=basename)
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable getattr does not seem to be defined.
Loading history...
79
                changed_local_filenames[str(getattr(f, attr))] = _relpath
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable str does not seem to be defined.
Loading history...
80
                f.local_filename = _relpath
81
82
            # save mets.xml
83
            with open(join(bagdir, 'data', ocrd_mets), 'wb') as f:
84
                f.write(workspace.mets.to_xml())
85
86
        # Walk through bagged workspace and fix the PAGE
87
        # Page/@imageFilename and
88
        # AlternativeImage/@filename
89
        bag_workspace = Workspace(self.resolver, directory=join(bagdir, 'data'), mets_basename=ocrd_mets)
90
        with pushd_popd(bag_workspace.directory):
91
            for page_file in bag_workspace.mets.find_files(mimetype=MIMETYPE_PAGE):
92
                pcgts = page_from_file(page_file)
93
                changed = False
94
                for old, new in changed_local_filenames.items():
95
                    if pcgts.get_Page().imageFilename == old:
96
                        pcgts.get_Page().imageFilename = new
97
                        changed = True
98
                    # TODO replace AlternativeImage, recursively...
99
                if changed:
100
                    with open(page_file.local_filename, 'w') as out:
101
                        out.write(to_xml(pcgts))
102
                    #  log.info("Replace %s -> %s in %s" % (old, new, page_file))
103
104
            with pushd_popd(bagdir):
105
                total_bytes, total_files = make_manifests('data', processes, algorithms=['sha512'])
106
            log.info("New vs. old: %s" % changed_local_filenames)
107
        return total_bytes, total_files
108
109
    def _set_bag_info(self, bag, total_bytes, total_files, ocrd_identifier, ocrd_base_version_checksum, ocrd_mets='mets.xml'):
110
        bag.info['BagIt-Profile-Identifier'] = OCRD_BAGIT_PROFILE_URL
111
        bag.info['Bag-Software-Agent'] = 'ocrd/core %s (bagit.py %s, bagit_profile %s) [cmdline: "%s"]' % (
112
            VERSION, # TODO
113
            get_distribution('bagit').version,
114
            get_distribution('bagit_profile').version,
115
            ' '.join(sys.argv))
116
117
        bag.info['Ocrd-Identifier'] = ocrd_identifier
118
        if ocrd_base_version_checksum:
119
            bag.info['Ocrd-Base-Version-Checksum'] = ocrd_base_version_checksum
120
        bag.info['Bagging-Date'] = str(datetime.now())
121
        bag.info['Payload-Oxum'] = '%s.%s' % (total_bytes, total_files)
122
        if ocrd_mets != 'mets.xml':
123
            bag.info['Ocrd-Mets'] = ocrd_mets
124
125
    def bag(self,
126
            workspace,
127
            ocrd_identifier,
128
            dest=None,
129
            ocrd_mets='mets.xml',
130
            ocrd_base_version_checksum=None,
131
            processes=1,
132
            skip_zip=False,
133
            tag_files=None
134
           ):
135
        """
136
        Bag a workspace
137
138
        See https://ocr-d.github.com/ocrd_zip#packing-a-workspace-as-ocrd-zip
139
140
        Arguments:
141
            workspace (ocrd.Workspace): workspace to bag
142
            ord_identifier (string): Ocrd-Identifier in bag-info.txt
143
            dest (string): Path of the generated OCRD-ZIP.
144
            ord_mets (string): Ocrd-Mets in bag-info.txt
145
            ord_base_version_checksum (string): Ocrd-Base-Version-Checksum in bag-info.txt
146
            processes (integer): Number of parallel processes checksumming
147
            skip_zip (boolean): Whether to leave directory unzipped
148
            tag_files (list<string>): Path names of additional tag files to be bagged at the root of the bag
149
        """
150
        if tag_files is None:
151
            tag_files = []
152
153
        # create bagdir
154
        bagdir = mkdtemp(prefix=TMP_BAGIT_PREFIX)
155
156
        if dest is None:
157
            if not skip_zip:
158
                dest = '%s.ocrd.zip' % workspace.directory
159
            else:
160
                dest = '%s.ocrd' % workspace.directory
161
162
        log = getLogger('ocrd.workspace_bagger')
163
        log.info("Bagging %s to %s (temp dir %s)", workspace.directory, dest, bagdir)
164
165
        # create data dir
166
        makedirs(join(bagdir, 'data'))
167
168
        # create bagit.txt
169
        with open(join(bagdir, 'bagit.txt'), 'wb') as f:
170
            f.write(BAGIT_TXT.encode('utf-8'))
171
172
        # create manifests
173
        total_bytes, total_files = self._bag_mets_files(workspace, bagdir, ocrd_mets, processes)
174
175
        # create bag-info.txt
176
        bag = Bag(bagdir)
177
        self._set_bag_info(bag, total_bytes, total_files, ocrd_identifier, ocrd_base_version_checksum, ocrd_mets=ocrd_mets)
178
179
        for tag_file in tag_files:
180
            copyfile(tag_file, join(bagdir, os_path_basename(tag_file)))
181
182
        # save bag
183
        bag.save()
184
185
        # ZIP it
186
        self._serialize_bag(workspace, bagdir, dest, skip_zip)
187
188
        log.info('Created bag at %s', dest)
189
        return dest
190
191
    def spill(self, src, dest):
192
        """
193
        Spill a workspace, i.e. unpack it and turn it into a workspace.
194
195
        See https://ocr-d.github.com/ocrd_zip#unpacking-ocrd-zip-to-a-workspace
196
197
        Arguments:
198
            src (string): Path to OCRD-ZIP
199
            dest (string): Path to directory to unpack data folder to
200
        """
201
        log = getLogger('ocrd.workspace_bagger')
202
203
        if exists(dest) and not isdir(dest):
204
            raise Exception("Not a directory: %s" % dest)
205
206
        # If dest is an existing directory, try to derive its name from src
207
        if isdir(dest):
208
            workspace_name = re.sub(r'(\.ocrd)?\.zip$', '', os_path_basename(src))
209
            new_dest = join(dest, workspace_name)
210
            if exists(new_dest):
211
                raise Exception("Directory exists: %s" % new_dest)
212
            dest = new_dest
213
214
        log.info("Spilling %s to %s", src, dest)
215
216
        bagdir = mkdtemp(prefix=TMP_BAGIT_PREFIX)
217
        unzip_file_to_dir(src, bagdir)
218
        bag_info = _load_tag_file(join(bagdir, "bag-info.txt"))
219
220
        datadir = join(bagdir, 'data')
221
        for root, _, files in walk(datadir):
222
            for f in files:
223
                srcfile = join(root, f)
224
                destdir = join(dest, relpath(root, datadir))
225
                destfile = join(destdir, f)
226
                if not exists(destdir):
227
                    makedirs(destdir)
228
                log.debug("Copy %s -> %s", srcfile, destfile)
229
                copyfile(srcfile, destfile)
230
231
        # TODO copy allowed tag files if present
232
233
        # TODO validate bagit
234
235
        # Drop tempdir
236
        rmtree(bagdir)
237
238
        # Create workspace
239
        mets_basename = bag_info.get("Ocrd-Mets", "mets.xml")
240
        workspace = Workspace(self.resolver, directory=dest, mets_basename=mets_basename)
241
242
        # TODO validate workspace
243
244
        return workspace
245
246
    def validate(self, bag):
247
        """
248
        Validate conformance with BagIt and OCR-D bagit profile.
249
250
        See:
251
            - https://ocr-d.github.io/ocrd_zip
252
            - https://ocr-d.github.io/bagit-profile.json
253
            - https://ocr-d.github.io/bagit-profile.yml
254
        """
255
        pass
256
257
    def recreate_checksums(self, src, dest=None, overwrite=False):
258
        """
259
        (Re)creates the files containing the checksums of a bag
260
261
        This function uses bag.py to create new files: manifest-sha512.txt and
262
        tagminifest-sha512.txt for the bag. Also 'Payload-Oxum' in bag-info.txt will be set to the
263
        appropriate value.
264
265
        Arguments:
266
            src (string):    Path to Bag. May be an zipped or unziped bagit
267
            dest (string):   Path to where the result should be stored. Not needed if overwrite is
268
                             set
269
            overwrite(bool): Replace bag with newly created bag
270
        """
271
        if overwrite and dest:
272
            raise Exception("Setting 'dest' and 'overwrite' is a contradiction")
273
        if not overwrite and not dest:
274
            raise Exception("For checksum recreation 'dest' must be provided")
275
        src_path = Path(src)
276
        if not src_path.exists():
277
            raise Exception("Path to bag not existing")
278
        is_zipped = src_path.is_file()
279
280
        with TemporaryDirectory() as tempdir:
281
            if is_zipped:
282
                unzip_file_to_dir(src, tempdir)
283
                path_to_bag = Path(tempdir)
284
                if not path_to_bag.joinpath("data").exists():
285
                    raise FileNotFoundError("data directory of bag not found")
286
            else:
287
                path_to_bag = src_path if overwrite else Path(dest)
288
                if not src_path.joinpath("data").exists():
289
                    raise FileNotFoundError(f"data directory of bag not found at {src}")
290
                if not overwrite:
291
                    path_to_bag.mkdir(parents=True, exist_ok=True)
292
                    copy_tree(src, dest)
293
294
            with pushd_popd(path_to_bag):
295
                n_bytes, n_files = make_manifests("data", 1, ["sha512"])
296
297
                bag_infos = _load_tag_file("bag-info.txt")
298
                bag_infos["Payload-Oxum"] = f"{n_bytes}.{n_files}"
299
                _make_tag_file("bag-info.txt", bag_infos)
300
                _make_tagmanifest_file("sha512", ".")
301
302
            if is_zipped:
303
                name = src_path.name
304
                if name.endswith(".zip"):
305
                    name = name[:-4]
306
                zip_path = make_archive(name, "zip", path_to_bag)
307
                move(zip_path, src if overwrite else dest)
308