| Total Complexity | 75 |
| Total Lines | 579 |
| Duplicated Lines | 12.61 % |
| Changes | 0 | ||
Duplicate code is one of the most pungent code smells. A rule that is often used is to re-structure code once it is duplicated in three or more places.
Common duplication problems, and corresponding solutions are:
Complex classes like tests.cli.test_workspace often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.
Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.
| 1 | from os.path import join, exists |
||
| 2 | from pathlib import Path |
||
| 3 | from filecmp import dircmp |
||
| 4 | from shutil import copytree |
||
| 5 | from tempfile import TemporaryDirectory |
||
| 6 | from io import StringIO |
||
| 7 | from contextlib import contextmanager |
||
| 8 | import sys |
||
| 9 | |||
| 10 | from click.testing import CliRunner |
||
| 11 | import pytest |
||
| 12 | |||
| 13 | # pylint: disable=import-error, no-name-in-module |
||
| 14 | from tests.base import CapturingTestCase as TestCase, assets, copy_of_directory, main |
||
| 15 | |||
| 16 | from ocrd_utils import initLogging, pushd_popd, setOverrideLogLevel, disableLogging |
||
| 17 | from ocrd.cli.workspace import workspace_cli |
||
| 18 | from ocrd import Resolver |
||
| 19 | |||
| 20 | @contextmanager |
||
| 21 | def mock_stdin(inp): |
||
| 22 | old_stdin = sys.stdin |
||
| 23 | sys.stdin = StringIO(inp) |
||
| 24 | yield |
||
| 25 | sys.stdin = old_stdin |
||
| 26 | |||
| 27 | class TestCli(TestCase): |
||
| 28 | |||
| 29 | def setUp(self): |
||
| 30 | super().setUp() |
||
| 31 | disableLogging() |
||
| 32 | self.maxDiff = None |
||
| 33 | self.resolver = Resolver() |
||
| 34 | self.runner = CliRunner(mix_stderr=False) |
||
| 35 | |||
| 36 | def test_add(self): |
||
| 37 | """ |
||
| 38 | Ensure that `ocrd workspace add` does the right thing |
||
| 39 | """ |
||
| 40 | ID = 'foo123file' |
||
| 41 | page_id = 'foo123page' |
||
| 42 | file_grp = 'TEST_GROUP' |
||
| 43 | content = 'x' |
||
| 44 | mimetype = 'image/tiff' |
||
| 45 | local_filename = join(file_grp, 'foo.xml') |
||
| 46 | |||
| 47 | # mets_api = None |
||
| 48 | # mets_cli = None |
||
| 49 | |||
| 50 | with TemporaryDirectory() as tempdir: |
||
| 51 | ws_api = self.resolver.workspace_from_nothing(directory=tempdir) |
||
| 52 | ws_api.add_file( |
||
| 53 | file_grp, |
||
| 54 | file_id=ID, |
||
| 55 | content=content, |
||
| 56 | page_id=page_id, |
||
| 57 | mimetype=mimetype, |
||
| 58 | local_filename=local_filename |
||
| 59 | ) |
||
| 60 | ws_api.save_mets() |
||
| 61 | # mets_api = ws_api.mets.to_xml().decode('utf8') |
||
| 62 | |||
| 63 | with TemporaryDirectory() as tempdir: |
||
| 64 | ws_api = self.resolver.workspace_from_nothing(directory=tempdir) |
||
| 65 | content_file = join(tempdir, 'testfile') |
||
| 66 | with open(content_file, 'w') as f: |
||
| 67 | f.write(content) |
||
| 68 | result = self.runner.invoke(workspace_cli, [ |
||
| 69 | '-d', tempdir, |
||
| 70 | 'add', |
||
| 71 | '--file-grp', file_grp, |
||
| 72 | '--page-id', page_id, |
||
| 73 | '--file-id', ID, |
||
| 74 | '--mimetype', mimetype, |
||
| 75 | content_file |
||
| 76 | ]) |
||
| 77 | self.assertEqual(result.exit_code, 0) |
||
| 78 | # TODO too complex to compare :( |
||
| 79 | # with open(join(tempdir, 'mets.xml')) as f: |
||
| 80 | # mets_cli = f.read() |
||
| 81 | # print(mets_api) |
||
| 82 | # print(mets_cli) |
||
| 83 | # self.assertEqual(mets_api, mets_cli) |
||
| 84 | # print(result.output) |
||
| 85 | # with open(join(tempdir, 'mets.xml')) as f: |
||
| 86 | # print(f.read()) |
||
| 87 | self.assertEqual(result.exit_code, 0) |
||
| 88 | |||
| 89 | |||
| 90 | View Code Duplication | def test_add_remove(self): |
|
|
|
|||
| 91 | ID = 'foo123file' |
||
| 92 | page_id = 'foo123page' |
||
| 93 | file_grp = 'TEST_GROUP' |
||
| 94 | content = 'x' |
||
| 95 | mimetype = 'image/tiff' |
||
| 96 | with TemporaryDirectory() as tempdir: |
||
| 97 | content_file = join(tempdir, 'testfile') |
||
| 98 | with open(content_file, 'w') as f: |
||
| 99 | f.write(content) |
||
| 100 | |||
| 101 | result = self.runner.invoke(workspace_cli, ['init', tempdir]) |
||
| 102 | self.assertEqual(result.exit_code, 0) |
||
| 103 | |||
| 104 | result = self.runner.invoke(workspace_cli, [ |
||
| 105 | '-d', tempdir, |
||
| 106 | 'add', |
||
| 107 | '--file-grp', file_grp, |
||
| 108 | '--page-id', page_id, |
||
| 109 | '--file-id', ID, |
||
| 110 | '--mimetype', mimetype, |
||
| 111 | content_file |
||
| 112 | ]) |
||
| 113 | self.assertEqual(result.exit_code, 0) |
||
| 114 | |||
| 115 | result = self.runner.invoke(workspace_cli, [ |
||
| 116 | '-d', |
||
| 117 | tempdir, |
||
| 118 | 'remove', |
||
| 119 | '--keep-file', |
||
| 120 | ID |
||
| 121 | ]) |
||
| 122 | self.assertEqual(result.exit_code, 0) |
||
| 123 | |||
| 124 | # File should still exist |
||
| 125 | self.assertTrue(exists(content_file)) |
||
| 126 | |||
| 127 | View Code Duplication | def test_add_remove_force(self): |
|
| 128 | ID = 'foo123file' |
||
| 129 | page_id = 'foo123page' |
||
| 130 | file_grp = 'TEST_GROUP' |
||
| 131 | content = 'x' |
||
| 132 | mimetype = 'image/tiff' |
||
| 133 | with TemporaryDirectory() as tempdir: |
||
| 134 | tempdir = str(Path(tempdir).resolve()) |
||
| 135 | content_file = join(tempdir, 'testfile') |
||
| 136 | with open(content_file, 'w') as f: |
||
| 137 | f.write(content) |
||
| 138 | |||
| 139 | result = self.runner.invoke(workspace_cli, ['init', tempdir]) |
||
| 140 | self.assertEqual(result.exit_code, 0) |
||
| 141 | |||
| 142 | result = self.runner.invoke(workspace_cli, [ |
||
| 143 | '-d', tempdir, |
||
| 144 | 'add', |
||
| 145 | '--file-grp', file_grp, |
||
| 146 | '--page-id', page_id, |
||
| 147 | '--file-id', ID, |
||
| 148 | '--mimetype', mimetype, |
||
| 149 | content_file |
||
| 150 | ]) |
||
| 151 | self.assertEqual(result.exit_code, 0) |
||
| 152 | |||
| 153 | result = self.runner.invoke(workspace_cli, [ |
||
| 154 | '-d', |
||
| 155 | tempdir, |
||
| 156 | 'remove', |
||
| 157 | '--force', |
||
| 158 | ID |
||
| 159 | ]) |
||
| 160 | self.assertEqual(result.exit_code, 0) |
||
| 161 | |||
| 162 | # File should have been deleted |
||
| 163 | self.assertFalse(exists(content_file)) |
||
| 164 | |||
| 165 | def test_add_url(self): |
||
| 166 | ID = 'foo123file' |
||
| 167 | page_id = 'foo123page' |
||
| 168 | file_grp = 'TEST_GROUP' |
||
| 169 | mimetype = 'image/tiff' |
||
| 170 | url = 'http://remote/file.tif' |
||
| 171 | with TemporaryDirectory() as tempdir: |
||
| 172 | ws = self.resolver.workspace_from_nothing(directory=tempdir) |
||
| 173 | ws.save_mets() |
||
| 174 | result = self.runner.invoke(workspace_cli, [ |
||
| 175 | '-d', tempdir, |
||
| 176 | 'add', |
||
| 177 | '--file-grp', file_grp, |
||
| 178 | '--page-id', page_id, |
||
| 179 | '--file-id', ID, |
||
| 180 | '--mimetype', mimetype, |
||
| 181 | url]) |
||
| 182 | self.assertEqual(result.exit_code, 0) |
||
| 183 | ws.reload_mets() |
||
| 184 | f = ws.mets.find_all_files()[0] |
||
| 185 | self.assertEqual(f.url, url) |
||
| 186 | |||
| 187 | def test_add_nonexisting_checked(self): |
||
| 188 | ID = 'foo123file' |
||
| 189 | page_id = 'foo123page' |
||
| 190 | file_grp = 'TEST_GROUP' |
||
| 191 | mimetype = 'image/tiff' |
||
| 192 | with pushd_popd(tempdir=True) as tempdir: |
||
| 193 | ws = self.resolver.workspace_from_nothing(directory=tempdir) |
||
| 194 | ws.save_mets() |
||
| 195 | exit_code, out, err = self.invoke_cli(workspace_cli, [ |
||
| 196 | '-d', tempdir, |
||
| 197 | 'add', |
||
| 198 | '-C', |
||
| 199 | '--file-grp', file_grp, |
||
| 200 | '--page-id', page_id, |
||
| 201 | '--file-id', ID, |
||
| 202 | '--mimetype', mimetype, |
||
| 203 | 'does-not-exist.xml']) |
||
| 204 | self.assertEqual(exit_code, 1) |
||
| 205 | self.assertIn("File 'does-not-exist.xml' does not exist, halt execution!", err) |
||
| 206 | |||
| 207 | def test_add_519(self): |
||
| 208 | """ |
||
| 209 | https://github.com/OCR-D/core/issues/519 |
||
| 210 | """ |
||
| 211 | with TemporaryDirectory() as tempdir: |
||
| 212 | wsdir = Path(tempdir, "workspace") |
||
| 213 | wsdir.mkdir() |
||
| 214 | srcdir = Path(tempdir, "source") |
||
| 215 | srcdir.mkdir() |
||
| 216 | srcfile = Path(srcdir, "srcfile.jpg") |
||
| 217 | srcfile_content = 'foo' |
||
| 218 | srcfile.write_text(srcfile_content) |
||
| 219 | with pushd_popd(str(wsdir)): |
||
| 220 | exit_code, out, err = self.invoke_cli(workspace_cli, ['init']) |
||
| 221 | exit_code, out, err = self.invoke_cli(workspace_cli, [ |
||
| 222 | 'add', |
||
| 223 | '-m', 'image/jpg', |
||
| 224 | '-G', 'MAX', |
||
| 225 | '-i', 'IMG_MAX_1818975', |
||
| 226 | '-C', |
||
| 227 | str(srcfile) |
||
| 228 | ]) |
||
| 229 | # print(out, err) |
||
| 230 | self.assertEqual(exit_code, 0) |
||
| 231 | self.assertTrue(Path(wsdir, 'MAX', 'srcfile.jpg').exists()) |
||
| 232 | self.assertEqual(Path(wsdir, 'MAX', 'srcfile.jpg').read_text(), srcfile_content) |
||
| 233 | |||
| 234 | def test_add_existing_checked(self): |
||
| 235 | ID = 'foo123file' |
||
| 236 | page_id = 'foo123page' |
||
| 237 | file_grp = 'TEST_GROUP' |
||
| 238 | mimetype = 'image/tiff' |
||
| 239 | with TemporaryDirectory() as tempdir: |
||
| 240 | tempdir = str(Path(tempdir).resolve()) |
||
| 241 | content_file = join(tempdir, 'test.tif') |
||
| 242 | ws = self.resolver.workspace_from_nothing(directory=tempdir) |
||
| 243 | ws.save_mets() |
||
| 244 | with open(content_file, 'w') as f: |
||
| 245 | f.write('x') |
||
| 246 | result = self.runner.invoke(workspace_cli, [ |
||
| 247 | '-d', tempdir, |
||
| 248 | 'add', |
||
| 249 | '-C', |
||
| 250 | '--file-grp', file_grp, |
||
| 251 | '--page-id', page_id, |
||
| 252 | '--file-id', ID, |
||
| 253 | '--mimetype', mimetype, |
||
| 254 | content_file]) |
||
| 255 | self.assertEqual(result.exit_code, 0) |
||
| 256 | ws.reload_mets() |
||
| 257 | f = ws.mets.find_all_files()[0] |
||
| 258 | self.assertEqual(f.url, 'test.tif') |
||
| 259 | |||
| 260 | def test_find_all_files(self): |
||
| 261 | with TemporaryDirectory() as tempdir: |
||
| 262 | wsdir = join(tempdir, 'ws') |
||
| 263 | copytree(assets.path_to('SBB0000F29300010000/data'), wsdir) |
||
| 264 | with pushd_popd(wsdir): |
||
| 265 | result = self.runner.invoke(workspace_cli, ['find', '-G', 'OCR-D-IMG-BIN', '-k', 'fileGrp']) |
||
| 266 | self.assertEqual(result.output, 'OCR-D-IMG-BIN\nOCR-D-IMG-BIN\n') |
||
| 267 | self.assertEqual(result.exit_code, 0) |
||
| 268 | |||
| 269 | def test_find_all_files_outputfield(self): |
||
| 270 | with TemporaryDirectory() as tempdir: |
||
| 271 | wsdir = join(tempdir, 'ws') |
||
| 272 | copytree(assets.path_to('SBB0000F29300010000/data'), wsdir) |
||
| 273 | with pushd_popd(wsdir): |
||
| 274 | result = self.runner.invoke(workspace_cli, |
||
| 275 | ['find', '-G', 'OCR-D-IMG-BIN', '-k', |
||
| 276 | 'file_grp', '-k', 'file_id', '-k', 'page_id']) |
||
| 277 | self.assertEqual(result.exit_code, 0) |
||
| 278 | self.assertEqual(result.output, 'OCR-D-IMG-BIN\tFILE_0001_IMAGE_BIN\tPHYS_0001\n' |
||
| 279 | 'OCR-D-IMG-BIN\tFILE_0002_IMAGE_BIN\tPHYS_0002\n') |
||
| 280 | |||
| 281 | def test_prune_files(self): |
||
| 282 | with TemporaryDirectory() as tempdir: |
||
| 283 | copytree(assets.path_to('SBB0000F29300010000/data'), join(tempdir, 'ws')) |
||
| 284 | |||
| 285 | ws1 = self.resolver.workspace_from_url(join(tempdir, 'ws', 'mets.xml')) |
||
| 286 | self.assertEqual(len(ws1.mets.find_all_files()), 35) |
||
| 287 | |||
| 288 | result = self.runner.invoke(workspace_cli, ['-d', join(tempdir, 'ws'), 'prune-files']) |
||
| 289 | self.assertEqual(result.exit_code, 0) |
||
| 290 | |||
| 291 | ws2 = self.resolver.workspace_from_url(join(tempdir, 'ws', 'mets.xml')) |
||
| 292 | self.assertEqual(len(ws2.mets.find_all_files()), 29) |
||
| 293 | |||
| 294 | def test_clone_into_nonexisting_dir(self): |
||
| 295 | """ |
||
| 296 | https://github.com/OCR-D/core/issues/330 |
||
| 297 | """ |
||
| 298 | with TemporaryDirectory() as tempdir: |
||
| 299 | clone_to = join(tempdir, 'non-existing-dir') |
||
| 300 | result = self.runner.invoke(workspace_cli, [ |
||
| 301 | 'clone', |
||
| 302 | '--download', |
||
| 303 | assets.path_to('scribo-test/data/mets.xml'), |
||
| 304 | clone_to |
||
| 305 | ]) |
||
| 306 | self.assertEqual(result.exit_code, 0) |
||
| 307 | |||
| 308 | def test_remove_file_group(self): |
||
| 309 | """ |
||
| 310 | Test removal of filegrp |
||
| 311 | """ |
||
| 312 | with TemporaryDirectory() as tempdir: |
||
| 313 | tempdir = str(Path(tempdir).resolve()) |
||
| 314 | wsdir = join(tempdir, 'ws') |
||
| 315 | copytree(assets.path_to('SBB0000F29300010000/data'), wsdir) |
||
| 316 | file_group = 'OCR-D-GT-PAGE' |
||
| 317 | file_path = Path(tempdir, 'ws', file_group, 'FILE_0002_FULLTEXT.xml') |
||
| 318 | self.assertTrue(file_path.exists()) |
||
| 319 | |||
| 320 | workspace = self.resolver.workspace_from_url(join(wsdir, 'mets.xml')) |
||
| 321 | self.assertEqual(workspace.directory, wsdir) |
||
| 322 | |||
| 323 | with self.assertRaisesRegex(Exception, "not empty"): |
||
| 324 | workspace.remove_file_group(file_group) |
||
| 325 | |||
| 326 | self.assertTrue(file_path.exists()) |
||
| 327 | self.assertEqual(len(workspace.mets.file_groups), 17) |
||
| 328 | self.assertEqual(len(workspace.mets.find_all_files()), 35) |
||
| 329 | |||
| 330 | workspace.remove_file_group(file_group, recursive=True, force=True) |
||
| 331 | |||
| 332 | self.assertEqual(len(workspace.mets.file_groups), 16) |
||
| 333 | self.assertEqual(len(workspace.mets.find_all_files()), 33) |
||
| 334 | self.assertFalse(file_path.exists()) |
||
| 335 | |||
| 336 | # TODO ensure empty dirs are removed |
||
| 337 | # self.assertFalse(file_path.parent.exists()) |
||
| 338 | |||
| 339 | |||
| 340 | def test_clone_relative(self): |
||
| 341 | # Create a relative path to trigger make sure #319 is gone |
||
| 342 | src_path = str(Path(assets.path_to('kant_aufklaerung_1784/data/mets.xml')).relative_to(Path.cwd())) |
||
| 343 | with TemporaryDirectory() as tempdir: |
||
| 344 | result = self.runner.invoke(workspace_cli, ['clone', '-a', src_path, tempdir]) |
||
| 345 | self.assertEqual(result.exit_code, 0) |
||
| 346 | self.assertTrue(exists(join(tempdir, 'OCR-D-GT-PAGE/PAGE_0017_PAGE.xml'))) |
||
| 347 | |||
| 348 | def test_copy_vs_clone(self): |
||
| 349 | src_dir = assets.path_to('kant_aufklaerung_1784/data') |
||
| 350 | with TemporaryDirectory() as tempdir: |
||
| 351 | # cloned without download |
||
| 352 | shallowcloneddir = join(tempdir, 'cloned-shallow') |
||
| 353 | # cloned with download |
||
| 354 | fullcloneddir = join(tempdir, 'cloned-all') |
||
| 355 | # copied |
||
| 356 | copieddir = join(tempdir, 'copied') |
||
| 357 | |||
| 358 | Path(fullcloneddir).mkdir() |
||
| 359 | Path(shallowcloneddir).mkdir() |
||
| 360 | |||
| 361 | |||
| 362 | result = self.runner.invoke(workspace_cli, ['clone', join(src_dir, 'mets.xml'), shallowcloneddir]) |
||
| 363 | self.assertEqual(result.exit_code, 0) |
||
| 364 | |||
| 365 | result = self.runner.invoke(workspace_cli, ['clone', '--download', join(src_dir, 'mets.xml'), fullcloneddir]) |
||
| 366 | self.assertEqual(result.exit_code, 0) |
||
| 367 | |||
| 368 | with copy_of_directory(src_dir, copieddir): |
||
| 369 | shallow_vs_copied = dircmp(shallowcloneddir, copieddir) |
||
| 370 | assert set(shallow_vs_copied.right_only) == set(['OCR-D-GT-ALTO', 'OCR-D-GT-PAGE', 'OCR-D-IMG']) |
||
| 371 | |||
| 372 | full_vs_copied = dircmp(fullcloneddir, copieddir) |
||
| 373 | # print(full_vs_copied) |
||
| 374 | # from ocrd_utils import pushd_popd |
||
| 375 | # with pushd_popd(tempdir): |
||
| 376 | # import os |
||
| 377 | # os.system("diff %s/mets.xml %s/mets.xml" % (fullcloneddir, copieddir)) |
||
| 378 | # XXX mets.xml will not have the exact same content because |
||
| 379 | # URLs that are actually files will be marked up as such with |
||
| 380 | # @LOCTYPE/@OTHERLOCTYPE |
||
| 381 | # self.assertEqual(full_vs_copied.diff_files, []) |
||
| 382 | self.assertEqual(full_vs_copied.left_only, []) |
||
| 383 | self.assertEqual(full_vs_copied.right_only, []) |
||
| 384 | |||
| 385 | def test_find_all_files_multiple_physical_pages_for_fileids(self): |
||
| 386 | with copy_of_directory(assets.path_to('SBB0000F29300010000/data')) as tempdir: |
||
| 387 | result = self.runner.invoke(workspace_cli, ['-d', tempdir, 'find', '--page-id', 'PHYS_0005,PHYS_0005', '-k', 'local_filename']) |
||
| 388 | print(result.stdout) |
||
| 389 | self.assertEqual(result.stdout, 'OCR-D-IMG/FILE_0005_IMAGE.tif\n') |
||
| 390 | self.assertEqual(result.exit_code, 0) |
||
| 391 | result = self.runner.invoke(workspace_cli, ['-d', tempdir, 'find', '--page-id', 'PHYS_0005,PHYS_0001', '-k', 'local_filename']) |
||
| 392 | self.assertEqual(len(result.stdout.split('\n')), 19) |
||
| 393 | |||
| 394 | def test_mets_basename(self): |
||
| 395 | with TemporaryDirectory() as tempdir: |
||
| 396 | with pushd_popd(tempdir): |
||
| 397 | result = self.runner.invoke(workspace_cli, ['-m', 'foo.xml', 'init']) |
||
| 398 | self.assertEqual(result.exit_code, 0) |
||
| 399 | self.assertTrue(exists('foo.xml')) |
||
| 400 | self.assertFalse(exists('mets.xml')) |
||
| 401 | |||
| 402 | def test_mets_basename_and_mets(self): |
||
| 403 | with pushd_popd(tempdir=True) as tempdir: |
||
| 404 | with self.assertRaisesRegex(ValueError, "Use either --mets or --mets-basename, not both"): |
||
| 405 | self.invoke_cli(workspace_cli, ['-m', 'foo.xml', '-M', 'not-foo.xml', 'init']) |
||
| 406 | |||
| 407 | def test_mets_basename_and_not_mets(self): |
||
| 408 | with pushd_popd(tempdir=True) as tempdir: |
||
| 409 | _, out, err = self.invoke_cli(workspace_cli, ['-d', 'foo', '-M', 'not-foo.xml', 'init']) |
||
| 410 | self.assertEqual(out, join(tempdir, 'foo') + '\n') |
||
| 411 | self.assertIn('--mets-basename is deprecated', err) |
||
| 412 | |||
| 413 | def test_mets_get_id_set_id(self): |
||
| 414 | with pushd_popd(tempdir=True): |
||
| 415 | self.invoke_cli(workspace_cli, ['init']) |
||
| 416 | disableLogging() |
||
| 417 | mets_id = 'foo123' |
||
| 418 | self.invoke_cli(workspace_cli, ['set-id', mets_id]) |
||
| 419 | disableLogging() |
||
| 420 | _, out, _ = self.invoke_cli(workspace_cli, ['get-id']) |
||
| 421 | self.assertEqual(out, mets_id + '\n') |
||
| 422 | |||
| 423 | def test_mets_directory_incompatible(self): |
||
| 424 | with pushd_popd(tempdir=True) as tempdir: |
||
| 425 | with self.assertRaisesRegex(ValueError, "inconsistent with --directory"): |
||
| 426 | self.invoke_cli(workspace_cli, ['-d', 'foo', '-m', '/somewhere/else', 'init']) |
||
| 427 | |||
| 428 | def test_mets_directory_http(self): |
||
| 429 | with pushd_popd(tempdir=True) as tempdir: |
||
| 430 | with self.assertRaisesRegex(ValueError, r"--mets is an http\(s\) URL but no --directory was given"): |
||
| 431 | self.invoke_cli(workspace_cli, ['-m', 'https://foo.bar/bla', 'init']) |
||
| 432 | |||
| 433 | def test_bulk_add0(self): |
||
| 434 | NO_FILES=100 |
||
| 435 | with TemporaryDirectory() as srcdir: |
||
| 436 | Path(srcdir, "OCR-D-IMG").mkdir() |
||
| 437 | Path(srcdir, "OCR-D-PAGE").mkdir() |
||
| 438 | for i in range(NO_FILES): |
||
| 439 | Path(srcdir, "OCR-D-IMG", "page_%04d.tif" % i).write_text('') |
||
| 440 | for i in range(NO_FILES): |
||
| 441 | Path(srcdir, "OCR-D-PAGE", "page_%04d.xml" % i).write_text('') |
||
| 442 | with pushd_popd(tempdir=True) as wsdir: |
||
| 443 | ws = self.resolver.workspace_from_nothing(directory=wsdir) |
||
| 444 | exit_code, out, err = self.invoke_cli(workspace_cli, [ |
||
| 445 | 'bulk-add', |
||
| 446 | '--ignore', |
||
| 447 | '--regex', r'^.*/(?P<fileGrp>[^/]+)/page_(?P<pageid>.*)\.(?P<ext>[^\.]*)$', |
||
| 448 | '--local-filename', '{{ fileGrp }}/FILE_{{ pageid }}.{{ ext }}', |
||
| 449 | '--file-id', 'FILE_{{ fileGrp }}_{{ pageid }}', |
||
| 450 | '--page-id', 'PHYS_{{ pageid }}', |
||
| 451 | '--file-grp', '{{ fileGrp }}', |
||
| 452 | '%s/*/*' % srcdir |
||
| 453 | ]) |
||
| 454 | # print('exit_code', exit_code) |
||
| 455 | # print('out', out) |
||
| 456 | # print('err', err) |
||
| 457 | ws.reload_mets() |
||
| 458 | assert len(ws.mets.file_groups) == 2 |
||
| 459 | assert len(ws.mets.find_all_files()) == 2 * NO_FILES |
||
| 460 | assert len(ws.mets.find_all_files(mimetype='image/tiff')) == NO_FILES |
||
| 461 | assert len(ws.mets.find_all_files(ID='//FILE_OCR-D-IMG_000.*')) == 10 |
||
| 462 | assert len(ws.mets.find_all_files(ID='//FILE_.*_000.*')) == 20 |
||
| 463 | assert len(ws.mets.find_all_files(pageId='PHYS_0001')) == 2 |
||
| 464 | assert ws.mets.find_all_files(ID='FILE_OCR-D-PAGE_0001')[0].local_filename == 'OCR-D-PAGE/FILE_0001.xml' |
||
| 465 | |||
| 466 | def test_bulk_add_missing_param(self): |
||
| 467 | with pushd_popd(tempdir=True) as wsdir: |
||
| 468 | ws = self.resolver.workspace_from_nothing(directory=wsdir) |
||
| 469 | with pytest.raises(ValueError, match=r"OcrdFile attribute 'page_id' unset"): |
||
| 470 | _, out, err = self.invoke_cli(workspace_cli, [ |
||
| 471 | 'bulk-add', |
||
| 472 | '-r', r'(?P<pageid>.*) (?P<filegrp>.*) (?P<fileid>.*) (?P<src>.*) (?P<url>.*) (?P<mimetype>.*)', |
||
| 473 | '-G', '{{ filegrp }}', |
||
| 474 | # '-g', '{{ pageid }}', # XXX skip --page-id |
||
| 475 | '-i', '{{ fileid }}', |
||
| 476 | '-m', '{{ mimetype }}', |
||
| 477 | '-u', "{{ url }}", |
||
| 478 | 'a b c d e f', '1 2 3 4 5 6']) |
||
| 479 | print('out', out) |
||
| 480 | print('err', err) |
||
| 481 | assert 0 |
||
| 482 | |||
| 483 | def test_bulk_add_gen_id(self): |
||
| 484 | with pushd_popd(tempdir=True) as wsdir: |
||
| 485 | ws = self.resolver.workspace_from_nothing(directory=wsdir) |
||
| 486 | Path(wsdir, 'c.ext').write_text('') |
||
| 487 | _, out, err = self.invoke_cli(workspace_cli, [ |
||
| 488 | 'bulk-add', |
||
| 489 | '-r', r'(?P<pageid>.*) (?P<filegrp>.*) (?P<src>.*) (?P<local_filename>.*) (?P<mimetype>.*)', |
||
| 490 | '-G', '{{ filegrp }}', |
||
| 491 | '-g', '{{ pageid }}', |
||
| 492 | '-S', '{{ src }}', |
||
| 493 | # '-i', '{{ fileid }}', # XXX skip --file-id |
||
| 494 | '-m', '{{ mimetype }}', |
||
| 495 | '-l', "{{ local_filename }}", |
||
| 496 | '-u', "https://host/{{ filegrp }}/{{ local_filename }}", |
||
| 497 | 'a b c.ext d e']) |
||
| 498 | ws.reload_mets() |
||
| 499 | print(out) |
||
| 500 | assert next(ws.mets.find_files()).ID == 'b_c' |
||
| 501 | assert next(ws.mets.find_files()).local_filename == 'd' |
||
| 502 | assert next(ws.mets.find_files()).url == 'https://host/b/d' |
||
| 503 | |||
| 504 | def test_bulk_add_derive_local_filename(self): |
||
| 505 | with pushd_popd(tempdir=True) as wsdir: |
||
| 506 | ws = self.resolver.workspace_from_nothing(directory=wsdir) |
||
| 507 | Path(wsdir, 'srcdir').mkdir() |
||
| 508 | Path(wsdir, 'srcdir', 'src.xml').write_text('') |
||
| 509 | _, out, err = self.invoke_cli(workspace_cli, [ |
||
| 510 | 'bulk-add', |
||
| 511 | '-r', r'(?P<pageid>.*) (?P<filegrp>.*) (?P<src>.*)', |
||
| 512 | '-G', '{{ filegrp }}', |
||
| 513 | '-g', '{{ pageid }}', |
||
| 514 | '-S', '{{ src }}', |
||
| 515 | # '-l', "{{ local_filename }}", # XXX skip --local-filename |
||
| 516 | 'p0001 SEG srcdir/src.xml']) |
||
| 517 | # print('out', out) |
||
| 518 | # print('err', err) |
||
| 519 | ws.reload_mets() |
||
| 520 | assert next(ws.mets.find_files()).local_filename == 'srcdir/src.xml' |
||
| 521 | |||
| 522 | def test_bulk_add_stdin(self): |
||
| 523 | resolver = Resolver() |
||
| 524 | with pushd_popd(tempdir=True) as wsdir: |
||
| 525 | ws = resolver.workspace_from_nothing(directory=wsdir) |
||
| 526 | Path(wsdir, 'BIN').mkdir() |
||
| 527 | Path(wsdir, 'BIN/FILE_0001_BIN.IMG-wolf.png').write_text('', encoding='UTF-8') |
||
| 528 | Path(wsdir, 'BIN/FILE_0002_BIN.IMG-wolf.png').write_text('', encoding='UTF-8') |
||
| 529 | Path(wsdir, 'BIN/FILE_0001_BIN.xml').write_text('', encoding='UTF-8') |
||
| 530 | Path(wsdir, 'BIN/FILE_0002_BIN.xml').write_text('', encoding='UTF-8') |
||
| 531 | with mock_stdin( |
||
| 532 | 'PHYS_0001 BIN FILE_0001_BIN.IMG-wolf BIN/FILE_0001_BIN.IMG-wolf.png BIN/FILE_0001_BIN.IMG-wolf.png image/png\n' |
||
| 533 | 'PHYS_0002 BIN FILE_0002_BIN.IMG-wolf BIN/FILE_0002_BIN.IMG-wolf.png BIN/FILE_0002_BIN.IMG-wolf.png image/png\n' |
||
| 534 | 'PHYS_0001 BIN FILE_0001_BIN BIN/FILE_0001_BIN.xml BIN/FILE_0001_BIN.xml application/vnd.prima.page+xml\n' |
||
| 535 | 'PHYS_0002 BIN FILE_0002_BIN BIN/FILE_0002_BIN.xml BIN/FILE_0002_BIN.xml application/vnd.prima.page+xml\n'): |
||
| 536 | assert len(ws.mets.file_groups) == 0 |
||
| 537 | exit_code, out, err = self.invoke_cli(workspace_cli, [ |
||
| 538 | 'bulk-add', |
||
| 539 | '-r', r'(?P<pageid>.*) (?P<filegrp>.*) (?P<fileid>.*) (?P<src>.*) (?P<dest>.*) (?P<mimetype>.*)', |
||
| 540 | '-G', '{{ filegrp }}', |
||
| 541 | '-g', '{{ pageid }}', |
||
| 542 | '-i', '{{ fileid }}', |
||
| 543 | '-m', '{{ mimetype }}', |
||
| 544 | '-l', "{{ dest }}", |
||
| 545 | '-u', "https://host/{{ fileid }}/{{ dest }}", |
||
| 546 | '-']) |
||
| 547 | ws.reload_mets() |
||
| 548 | assert len(ws.mets.file_groups) == 1 |
||
| 549 | assert len(list(ws.mets.find_files())) == 4 |
||
| 550 | f = next(ws.mets.find_files()) |
||
| 551 | assert f.mimetype == 'image/png' |
||
| 552 | assert f.ID == 'FILE_0001_BIN.IMG-wolf' |
||
| 553 | assert f.local_filename == 'BIN/FILE_0001_BIN.IMG-wolf.png' |
||
| 554 | assert f.url == 'https://host/FILE_0001_BIN.IMG-wolf/BIN/FILE_0001_BIN.IMG-wolf.png' |
||
| 555 | |||
| 556 | def test_list_page(self): |
||
| 557 | def _call(args): |
||
| 558 | _, out, _ = self.invoke_cli(workspace_cli, ['list-page', *args]) |
||
| 559 | return out.rstrip('\n') |
||
| 560 | with pushd_popd(Path(__file__).parent.parent / 'data/list-page-workspace'): |
||
| 561 | assert _call([]) == 'PHYS_0001\nPHYS_0002\nPHYS_0003\nPHYS_0004\nPHYS_0005\nPHYS_0006\nPHYS_0008\nPHYS_0009\nPHYS_0010\nPHYS_0011\nPHYS_0012\nPHYS_0013\nPHYS_0014\nPHYS_0015\nPHYS_0016\nPHYS_0017\nPHYS_0018\nPHYS_0019\nPHYS_0020\nPHYS_0022\nPHYS_0023\nPHYS_0024\nPHYS_0025\nPHYS_0026\nPHYS_0027\nPHYS_0028\nPHYS_0029' |
||
| 562 | assert _call(['-f', 'comma-separated']) == 'PHYS_0001,PHYS_0002,PHYS_0003,PHYS_0004,PHYS_0005,PHYS_0006,PHYS_0008,PHYS_0009,PHYS_0010,PHYS_0011,PHYS_0012,PHYS_0013,PHYS_0014,PHYS_0015,PHYS_0016,PHYS_0017,PHYS_0018,PHYS_0019,PHYS_0020,PHYS_0022,PHYS_0023,PHYS_0024,PHYS_0025,PHYS_0026,PHYS_0027,PHYS_0028,PHYS_0029' |
||
| 563 | assert _call(['-f', 'json']) == '[[["PHYS_0001"], ["PHYS_0002"], ["PHYS_0003"], ["PHYS_0004"], ["PHYS_0005"], ["PHYS_0006"], ["PHYS_0008"], ["PHYS_0009"], ["PHYS_0010"], ["PHYS_0011"], ["PHYS_0012"], ["PHYS_0013"], ["PHYS_0014"], ["PHYS_0015"], ["PHYS_0016"], ["PHYS_0017"], ["PHYS_0018"], ["PHYS_0019"], ["PHYS_0020"], ["PHYS_0022"], ["PHYS_0023"], ["PHYS_0024"], ["PHYS_0025"], ["PHYS_0026"], ["PHYS_0027"], ["PHYS_0028"], ["PHYS_0029"]]]' |
||
| 564 | assert _call(['-f', 'comma-separated', '-R', '5..5']) == 'PHYS_0005' |
||
| 565 | assert _call(['-f', 'comma-separated', '-R', '6..8']) == 'PHYS_0006,PHYS_0008,PHYS_0009' |
||
| 566 | assert _call(['-f', 'comma-separated', '-r', '1..5']) == 'PHYS_0001,PHYS_0002,PHYS_0003,PHYS_0004,PHYS_0005' |
||
| 567 | assert _call(['-f', 'comma-separated', '-r', '2..3']) == 'PHYS_0002,PHYS_0003' |
||
| 568 | assert _call(['-f', 'comma-separated', '-r', 'page 2..page 3']) == 'PHYS_0002,PHYS_0003' |
||
| 569 | assert _call(['-f', 'comma-separated', '-r', 'PHYS_0006..PHYS_0009']) == 'PHYS_0006,PHYS_0008,PHYS_0009' |
||
| 570 | assert _call(['-f', 'comma-separated', '-r', 'PHYS_0001..PHYS_0010', '-D', '3']) == 'PHYS_0001,PHYS_0002,PHYS_0003\nPHYS_0004,PHYS_0005,PHYS_0006\nPHYS_0008,PHYS_0009,PHYS_0010' |
||
| 571 | assert _call(['-f', 'comma-separated', '-r', 'PHYS_0001..PHYS_0010', '-D', '3', '-C', '2']) == 'PHYS_0008,PHYS_0009,PHYS_0010' |
||
| 572 | from json import loads |
||
| 573 | assert loads(_call(['-f', 'json', '-r', 'PHYS_0001..PHYS_0010', '-D', '3', '-C', '2'])) == [[['PHYS_0008'], ['PHYS_0009'], ['PHYS_0010']]] |
||
| 574 | assert loads(_call(['-f', 'json', '-r', 'PHYS_0001..PHYS_0010', '-k', 'ID', '-k', 'ORDERLABEL', '-D', '3', '-C', '2'])) == \ |
||
| 575 | [[['PHYS_0008', 'page 7'], ['PHYS_0009', 'page 8'], ['PHYS_0010', 'page 9']]] |
||
| 576 | |||
| 577 | if __name__ == '__main__': |
||
| 578 | main(__file__) |
||
| 579 |