Total Complexity | 75 |
Total Lines | 579 |
Duplicated Lines | 12.61 % |
Changes | 0 |
Duplicate code is one of the most pungent code smells. A rule that is often used is to re-structure code once it is duplicated in three or more places.
Common duplication problems, and corresponding solutions are:
Complex classes like tests.cli.test_workspace often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.
Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.
1 | from os.path import join, exists |
||
2 | from pathlib import Path |
||
3 | from filecmp import dircmp |
||
4 | from shutil import copytree |
||
5 | from tempfile import TemporaryDirectory |
||
6 | from io import StringIO |
||
7 | from contextlib import contextmanager |
||
8 | import sys |
||
9 | |||
10 | from click.testing import CliRunner |
||
11 | import pytest |
||
12 | |||
13 | # pylint: disable=import-error, no-name-in-module |
||
14 | from tests.base import CapturingTestCase as TestCase, assets, copy_of_directory, main |
||
15 | |||
16 | from ocrd_utils import initLogging, pushd_popd, setOverrideLogLevel, disableLogging |
||
17 | from ocrd.cli.workspace import workspace_cli |
||
18 | from ocrd import Resolver |
||
19 | |||
20 | @contextmanager |
||
21 | def mock_stdin(inp): |
||
22 | old_stdin = sys.stdin |
||
23 | sys.stdin = StringIO(inp) |
||
24 | yield |
||
25 | sys.stdin = old_stdin |
||
26 | |||
27 | class TestCli(TestCase): |
||
28 | |||
29 | def setUp(self): |
||
30 | super().setUp() |
||
31 | disableLogging() |
||
32 | self.maxDiff = None |
||
33 | self.resolver = Resolver() |
||
34 | self.runner = CliRunner(mix_stderr=False) |
||
35 | |||
36 | def test_add(self): |
||
37 | """ |
||
38 | Ensure that `ocrd workspace add` does the right thing |
||
39 | """ |
||
40 | ID = 'foo123file' |
||
41 | page_id = 'foo123page' |
||
42 | file_grp = 'TEST_GROUP' |
||
43 | content = 'x' |
||
44 | mimetype = 'image/tiff' |
||
45 | local_filename = join(file_grp, 'foo.xml') |
||
46 | |||
47 | # mets_api = None |
||
48 | # mets_cli = None |
||
49 | |||
50 | with TemporaryDirectory() as tempdir: |
||
51 | ws_api = self.resolver.workspace_from_nothing(directory=tempdir) |
||
52 | ws_api.add_file( |
||
53 | file_grp, |
||
54 | file_id=ID, |
||
55 | content=content, |
||
56 | page_id=page_id, |
||
57 | mimetype=mimetype, |
||
58 | local_filename=local_filename |
||
59 | ) |
||
60 | ws_api.save_mets() |
||
61 | # mets_api = ws_api.mets.to_xml().decode('utf8') |
||
62 | |||
63 | with TemporaryDirectory() as tempdir: |
||
64 | ws_api = self.resolver.workspace_from_nothing(directory=tempdir) |
||
65 | content_file = join(tempdir, 'testfile') |
||
66 | with open(content_file, 'w') as f: |
||
67 | f.write(content) |
||
68 | result = self.runner.invoke(workspace_cli, [ |
||
69 | '-d', tempdir, |
||
70 | 'add', |
||
71 | '--file-grp', file_grp, |
||
72 | '--page-id', page_id, |
||
73 | '--file-id', ID, |
||
74 | '--mimetype', mimetype, |
||
75 | content_file |
||
76 | ]) |
||
77 | self.assertEqual(result.exit_code, 0) |
||
78 | # TODO too complex to compare :( |
||
79 | # with open(join(tempdir, 'mets.xml')) as f: |
||
80 | # mets_cli = f.read() |
||
81 | # print(mets_api) |
||
82 | # print(mets_cli) |
||
83 | # self.assertEqual(mets_api, mets_cli) |
||
84 | # print(result.output) |
||
85 | # with open(join(tempdir, 'mets.xml')) as f: |
||
86 | # print(f.read()) |
||
87 | self.assertEqual(result.exit_code, 0) |
||
88 | |||
89 | |||
90 | View Code Duplication | def test_add_remove(self): |
|
|
|||
91 | ID = 'foo123file' |
||
92 | page_id = 'foo123page' |
||
93 | file_grp = 'TEST_GROUP' |
||
94 | content = 'x' |
||
95 | mimetype = 'image/tiff' |
||
96 | with TemporaryDirectory() as tempdir: |
||
97 | content_file = join(tempdir, 'testfile') |
||
98 | with open(content_file, 'w') as f: |
||
99 | f.write(content) |
||
100 | |||
101 | result = self.runner.invoke(workspace_cli, ['init', tempdir]) |
||
102 | self.assertEqual(result.exit_code, 0) |
||
103 | |||
104 | result = self.runner.invoke(workspace_cli, [ |
||
105 | '-d', tempdir, |
||
106 | 'add', |
||
107 | '--file-grp', file_grp, |
||
108 | '--page-id', page_id, |
||
109 | '--file-id', ID, |
||
110 | '--mimetype', mimetype, |
||
111 | content_file |
||
112 | ]) |
||
113 | self.assertEqual(result.exit_code, 0) |
||
114 | |||
115 | result = self.runner.invoke(workspace_cli, [ |
||
116 | '-d', |
||
117 | tempdir, |
||
118 | 'remove', |
||
119 | '--keep-file', |
||
120 | ID |
||
121 | ]) |
||
122 | self.assertEqual(result.exit_code, 0) |
||
123 | |||
124 | # File should still exist |
||
125 | self.assertTrue(exists(content_file)) |
||
126 | |||
127 | View Code Duplication | def test_add_remove_force(self): |
|
128 | ID = 'foo123file' |
||
129 | page_id = 'foo123page' |
||
130 | file_grp = 'TEST_GROUP' |
||
131 | content = 'x' |
||
132 | mimetype = 'image/tiff' |
||
133 | with TemporaryDirectory() as tempdir: |
||
134 | tempdir = str(Path(tempdir).resolve()) |
||
135 | content_file = join(tempdir, 'testfile') |
||
136 | with open(content_file, 'w') as f: |
||
137 | f.write(content) |
||
138 | |||
139 | result = self.runner.invoke(workspace_cli, ['init', tempdir]) |
||
140 | self.assertEqual(result.exit_code, 0) |
||
141 | |||
142 | result = self.runner.invoke(workspace_cli, [ |
||
143 | '-d', tempdir, |
||
144 | 'add', |
||
145 | '--file-grp', file_grp, |
||
146 | '--page-id', page_id, |
||
147 | '--file-id', ID, |
||
148 | '--mimetype', mimetype, |
||
149 | content_file |
||
150 | ]) |
||
151 | self.assertEqual(result.exit_code, 0) |
||
152 | |||
153 | result = self.runner.invoke(workspace_cli, [ |
||
154 | '-d', |
||
155 | tempdir, |
||
156 | 'remove', |
||
157 | '--force', |
||
158 | ID |
||
159 | ]) |
||
160 | self.assertEqual(result.exit_code, 0) |
||
161 | |||
162 | # File should have been deleted |
||
163 | self.assertFalse(exists(content_file)) |
||
164 | |||
165 | def test_add_url(self): |
||
166 | ID = 'foo123file' |
||
167 | page_id = 'foo123page' |
||
168 | file_grp = 'TEST_GROUP' |
||
169 | mimetype = 'image/tiff' |
||
170 | url = 'http://remote/file.tif' |
||
171 | with TemporaryDirectory() as tempdir: |
||
172 | ws = self.resolver.workspace_from_nothing(directory=tempdir) |
||
173 | ws.save_mets() |
||
174 | result = self.runner.invoke(workspace_cli, [ |
||
175 | '-d', tempdir, |
||
176 | 'add', |
||
177 | '--file-grp', file_grp, |
||
178 | '--page-id', page_id, |
||
179 | '--file-id', ID, |
||
180 | '--mimetype', mimetype, |
||
181 | url]) |
||
182 | self.assertEqual(result.exit_code, 0) |
||
183 | ws.reload_mets() |
||
184 | f = ws.mets.find_all_files()[0] |
||
185 | self.assertEqual(f.url, url) |
||
186 | |||
187 | def test_add_nonexisting_checked(self): |
||
188 | ID = 'foo123file' |
||
189 | page_id = 'foo123page' |
||
190 | file_grp = 'TEST_GROUP' |
||
191 | mimetype = 'image/tiff' |
||
192 | with pushd_popd(tempdir=True) as tempdir: |
||
193 | ws = self.resolver.workspace_from_nothing(directory=tempdir) |
||
194 | ws.save_mets() |
||
195 | exit_code, out, err = self.invoke_cli(workspace_cli, [ |
||
196 | '-d', tempdir, |
||
197 | 'add', |
||
198 | '-C', |
||
199 | '--file-grp', file_grp, |
||
200 | '--page-id', page_id, |
||
201 | '--file-id', ID, |
||
202 | '--mimetype', mimetype, |
||
203 | 'does-not-exist.xml']) |
||
204 | self.assertEqual(exit_code, 1) |
||
205 | self.assertIn("File 'does-not-exist.xml' does not exist, halt execution!", err) |
||
206 | |||
207 | def test_add_519(self): |
||
208 | """ |
||
209 | https://github.com/OCR-D/core/issues/519 |
||
210 | """ |
||
211 | with TemporaryDirectory() as tempdir: |
||
212 | wsdir = Path(tempdir, "workspace") |
||
213 | wsdir.mkdir() |
||
214 | srcdir = Path(tempdir, "source") |
||
215 | srcdir.mkdir() |
||
216 | srcfile = Path(srcdir, "srcfile.jpg") |
||
217 | srcfile_content = 'foo' |
||
218 | srcfile.write_text(srcfile_content) |
||
219 | with pushd_popd(str(wsdir)): |
||
220 | exit_code, out, err = self.invoke_cli(workspace_cli, ['init']) |
||
221 | exit_code, out, err = self.invoke_cli(workspace_cli, [ |
||
222 | 'add', |
||
223 | '-m', 'image/jpg', |
||
224 | '-G', 'MAX', |
||
225 | '-i', 'IMG_MAX_1818975', |
||
226 | '-C', |
||
227 | str(srcfile) |
||
228 | ]) |
||
229 | # print(out, err) |
||
230 | self.assertEqual(exit_code, 0) |
||
231 | self.assertTrue(Path(wsdir, 'MAX', 'srcfile.jpg').exists()) |
||
232 | self.assertEqual(Path(wsdir, 'MAX', 'srcfile.jpg').read_text(), srcfile_content) |
||
233 | |||
234 | def test_add_existing_checked(self): |
||
235 | ID = 'foo123file' |
||
236 | page_id = 'foo123page' |
||
237 | file_grp = 'TEST_GROUP' |
||
238 | mimetype = 'image/tiff' |
||
239 | with TemporaryDirectory() as tempdir: |
||
240 | tempdir = str(Path(tempdir).resolve()) |
||
241 | content_file = join(tempdir, 'test.tif') |
||
242 | ws = self.resolver.workspace_from_nothing(directory=tempdir) |
||
243 | ws.save_mets() |
||
244 | with open(content_file, 'w') as f: |
||
245 | f.write('x') |
||
246 | result = self.runner.invoke(workspace_cli, [ |
||
247 | '-d', tempdir, |
||
248 | 'add', |
||
249 | '-C', |
||
250 | '--file-grp', file_grp, |
||
251 | '--page-id', page_id, |
||
252 | '--file-id', ID, |
||
253 | '--mimetype', mimetype, |
||
254 | content_file]) |
||
255 | self.assertEqual(result.exit_code, 0) |
||
256 | ws.reload_mets() |
||
257 | f = ws.mets.find_all_files()[0] |
||
258 | self.assertEqual(f.url, 'test.tif') |
||
259 | |||
260 | def test_find_all_files(self): |
||
261 | with TemporaryDirectory() as tempdir: |
||
262 | wsdir = join(tempdir, 'ws') |
||
263 | copytree(assets.path_to('SBB0000F29300010000/data'), wsdir) |
||
264 | with pushd_popd(wsdir): |
||
265 | result = self.runner.invoke(workspace_cli, ['find', '-G', 'OCR-D-IMG-BIN', '-k', 'fileGrp']) |
||
266 | self.assertEqual(result.output, 'OCR-D-IMG-BIN\nOCR-D-IMG-BIN\n') |
||
267 | self.assertEqual(result.exit_code, 0) |
||
268 | |||
269 | def test_find_all_files_outputfield(self): |
||
270 | with TemporaryDirectory() as tempdir: |
||
271 | wsdir = join(tempdir, 'ws') |
||
272 | copytree(assets.path_to('SBB0000F29300010000/data'), wsdir) |
||
273 | with pushd_popd(wsdir): |
||
274 | result = self.runner.invoke(workspace_cli, |
||
275 | ['find', '-G', 'OCR-D-IMG-BIN', '-k', |
||
276 | 'file_grp', '-k', 'file_id', '-k', 'page_id']) |
||
277 | self.assertEqual(result.exit_code, 0) |
||
278 | self.assertEqual(result.output, 'OCR-D-IMG-BIN\tFILE_0001_IMAGE_BIN\tPHYS_0001\n' |
||
279 | 'OCR-D-IMG-BIN\tFILE_0002_IMAGE_BIN\tPHYS_0002\n') |
||
280 | |||
281 | def test_prune_files(self): |
||
282 | with TemporaryDirectory() as tempdir: |
||
283 | copytree(assets.path_to('SBB0000F29300010000/data'), join(tempdir, 'ws')) |
||
284 | |||
285 | ws1 = self.resolver.workspace_from_url(join(tempdir, 'ws', 'mets.xml')) |
||
286 | self.assertEqual(len(ws1.mets.find_all_files()), 35) |
||
287 | |||
288 | result = self.runner.invoke(workspace_cli, ['-d', join(tempdir, 'ws'), 'prune-files']) |
||
289 | self.assertEqual(result.exit_code, 0) |
||
290 | |||
291 | ws2 = self.resolver.workspace_from_url(join(tempdir, 'ws', 'mets.xml')) |
||
292 | self.assertEqual(len(ws2.mets.find_all_files()), 29) |
||
293 | |||
294 | def test_clone_into_nonexisting_dir(self): |
||
295 | """ |
||
296 | https://github.com/OCR-D/core/issues/330 |
||
297 | """ |
||
298 | with TemporaryDirectory() as tempdir: |
||
299 | clone_to = join(tempdir, 'non-existing-dir') |
||
300 | result = self.runner.invoke(workspace_cli, [ |
||
301 | 'clone', |
||
302 | '--download', |
||
303 | assets.path_to('scribo-test/data/mets.xml'), |
||
304 | clone_to |
||
305 | ]) |
||
306 | self.assertEqual(result.exit_code, 0) |
||
307 | |||
308 | def test_remove_file_group(self): |
||
309 | """ |
||
310 | Test removal of filegrp |
||
311 | """ |
||
312 | with TemporaryDirectory() as tempdir: |
||
313 | tempdir = str(Path(tempdir).resolve()) |
||
314 | wsdir = join(tempdir, 'ws') |
||
315 | copytree(assets.path_to('SBB0000F29300010000/data'), wsdir) |
||
316 | file_group = 'OCR-D-GT-PAGE' |
||
317 | file_path = Path(tempdir, 'ws', file_group, 'FILE_0002_FULLTEXT.xml') |
||
318 | self.assertTrue(file_path.exists()) |
||
319 | |||
320 | workspace = self.resolver.workspace_from_url(join(wsdir, 'mets.xml')) |
||
321 | self.assertEqual(workspace.directory, wsdir) |
||
322 | |||
323 | with self.assertRaisesRegex(Exception, "not empty"): |
||
324 | workspace.remove_file_group(file_group) |
||
325 | |||
326 | self.assertTrue(file_path.exists()) |
||
327 | self.assertEqual(len(workspace.mets.file_groups), 17) |
||
328 | self.assertEqual(len(workspace.mets.find_all_files()), 35) |
||
329 | |||
330 | workspace.remove_file_group(file_group, recursive=True, force=True) |
||
331 | |||
332 | self.assertEqual(len(workspace.mets.file_groups), 16) |
||
333 | self.assertEqual(len(workspace.mets.find_all_files()), 33) |
||
334 | self.assertFalse(file_path.exists()) |
||
335 | |||
336 | # TODO ensure empty dirs are removed |
||
337 | # self.assertFalse(file_path.parent.exists()) |
||
338 | |||
339 | |||
340 | def test_clone_relative(self): |
||
341 | # Create a relative path to trigger make sure #319 is gone |
||
342 | src_path = str(Path(assets.path_to('kant_aufklaerung_1784/data/mets.xml')).relative_to(Path.cwd())) |
||
343 | with TemporaryDirectory() as tempdir: |
||
344 | result = self.runner.invoke(workspace_cli, ['clone', '-a', src_path, tempdir]) |
||
345 | self.assertEqual(result.exit_code, 0) |
||
346 | self.assertTrue(exists(join(tempdir, 'OCR-D-GT-PAGE/PAGE_0017_PAGE.xml'))) |
||
347 | |||
348 | def test_copy_vs_clone(self): |
||
349 | src_dir = assets.path_to('kant_aufklaerung_1784/data') |
||
350 | with TemporaryDirectory() as tempdir: |
||
351 | # cloned without download |
||
352 | shallowcloneddir = join(tempdir, 'cloned-shallow') |
||
353 | # cloned with download |
||
354 | fullcloneddir = join(tempdir, 'cloned-all') |
||
355 | # copied |
||
356 | copieddir = join(tempdir, 'copied') |
||
357 | |||
358 | Path(fullcloneddir).mkdir() |
||
359 | Path(shallowcloneddir).mkdir() |
||
360 | |||
361 | |||
362 | result = self.runner.invoke(workspace_cli, ['clone', join(src_dir, 'mets.xml'), shallowcloneddir]) |
||
363 | self.assertEqual(result.exit_code, 0) |
||
364 | |||
365 | result = self.runner.invoke(workspace_cli, ['clone', '--download', join(src_dir, 'mets.xml'), fullcloneddir]) |
||
366 | self.assertEqual(result.exit_code, 0) |
||
367 | |||
368 | with copy_of_directory(src_dir, copieddir): |
||
369 | shallow_vs_copied = dircmp(shallowcloneddir, copieddir) |
||
370 | assert set(shallow_vs_copied.right_only) == set(['OCR-D-GT-ALTO', 'OCR-D-GT-PAGE', 'OCR-D-IMG']) |
||
371 | |||
372 | full_vs_copied = dircmp(fullcloneddir, copieddir) |
||
373 | # print(full_vs_copied) |
||
374 | # from ocrd_utils import pushd_popd |
||
375 | # with pushd_popd(tempdir): |
||
376 | # import os |
||
377 | # os.system("diff %s/mets.xml %s/mets.xml" % (fullcloneddir, copieddir)) |
||
378 | # XXX mets.xml will not have the exact same content because |
||
379 | # URLs that are actually files will be marked up as such with |
||
380 | # @LOCTYPE/@OTHERLOCTYPE |
||
381 | # self.assertEqual(full_vs_copied.diff_files, []) |
||
382 | self.assertEqual(full_vs_copied.left_only, []) |
||
383 | self.assertEqual(full_vs_copied.right_only, []) |
||
384 | |||
385 | def test_find_all_files_multiple_physical_pages_for_fileids(self): |
||
386 | with copy_of_directory(assets.path_to('SBB0000F29300010000/data')) as tempdir: |
||
387 | result = self.runner.invoke(workspace_cli, ['-d', tempdir, 'find', '--page-id', 'PHYS_0005,PHYS_0005', '-k', 'local_filename']) |
||
388 | print(result.stdout) |
||
389 | self.assertEqual(result.stdout, 'OCR-D-IMG/FILE_0005_IMAGE.tif\n') |
||
390 | self.assertEqual(result.exit_code, 0) |
||
391 | result = self.runner.invoke(workspace_cli, ['-d', tempdir, 'find', '--page-id', 'PHYS_0005,PHYS_0001', '-k', 'local_filename']) |
||
392 | self.assertEqual(len(result.stdout.split('\n')), 19) |
||
393 | |||
394 | def test_mets_basename(self): |
||
395 | with TemporaryDirectory() as tempdir: |
||
396 | with pushd_popd(tempdir): |
||
397 | result = self.runner.invoke(workspace_cli, ['-m', 'foo.xml', 'init']) |
||
398 | self.assertEqual(result.exit_code, 0) |
||
399 | self.assertTrue(exists('foo.xml')) |
||
400 | self.assertFalse(exists('mets.xml')) |
||
401 | |||
402 | def test_mets_basename_and_mets(self): |
||
403 | with pushd_popd(tempdir=True) as tempdir: |
||
404 | with self.assertRaisesRegex(ValueError, "Use either --mets or --mets-basename, not both"): |
||
405 | self.invoke_cli(workspace_cli, ['-m', 'foo.xml', '-M', 'not-foo.xml', 'init']) |
||
406 | |||
407 | def test_mets_basename_and_not_mets(self): |
||
408 | with pushd_popd(tempdir=True) as tempdir: |
||
409 | _, out, err = self.invoke_cli(workspace_cli, ['-d', 'foo', '-M', 'not-foo.xml', 'init']) |
||
410 | self.assertEqual(out, join(tempdir, 'foo') + '\n') |
||
411 | self.assertIn('--mets-basename is deprecated', err) |
||
412 | |||
413 | def test_mets_get_id_set_id(self): |
||
414 | with pushd_popd(tempdir=True): |
||
415 | self.invoke_cli(workspace_cli, ['init']) |
||
416 | disableLogging() |
||
417 | mets_id = 'foo123' |
||
418 | self.invoke_cli(workspace_cli, ['set-id', mets_id]) |
||
419 | disableLogging() |
||
420 | _, out, _ = self.invoke_cli(workspace_cli, ['get-id']) |
||
421 | self.assertEqual(out, mets_id + '\n') |
||
422 | |||
423 | def test_mets_directory_incompatible(self): |
||
424 | with pushd_popd(tempdir=True) as tempdir: |
||
425 | with self.assertRaisesRegex(ValueError, "inconsistent with --directory"): |
||
426 | self.invoke_cli(workspace_cli, ['-d', 'foo', '-m', '/somewhere/else', 'init']) |
||
427 | |||
428 | def test_mets_directory_http(self): |
||
429 | with pushd_popd(tempdir=True) as tempdir: |
||
430 | with self.assertRaisesRegex(ValueError, r"--mets is an http\(s\) URL but no --directory was given"): |
||
431 | self.invoke_cli(workspace_cli, ['-m', 'https://foo.bar/bla', 'init']) |
||
432 | |||
433 | def test_bulk_add0(self): |
||
434 | NO_FILES=100 |
||
435 | with TemporaryDirectory() as srcdir: |
||
436 | Path(srcdir, "OCR-D-IMG").mkdir() |
||
437 | Path(srcdir, "OCR-D-PAGE").mkdir() |
||
438 | for i in range(NO_FILES): |
||
439 | Path(srcdir, "OCR-D-IMG", "page_%04d.tif" % i).write_text('') |
||
440 | for i in range(NO_FILES): |
||
441 | Path(srcdir, "OCR-D-PAGE", "page_%04d.xml" % i).write_text('') |
||
442 | with pushd_popd(tempdir=True) as wsdir: |
||
443 | ws = self.resolver.workspace_from_nothing(directory=wsdir) |
||
444 | exit_code, out, err = self.invoke_cli(workspace_cli, [ |
||
445 | 'bulk-add', |
||
446 | '--ignore', |
||
447 | '--regex', r'^.*/(?P<fileGrp>[^/]+)/page_(?P<pageid>.*)\.(?P<ext>[^\.]*)$', |
||
448 | '--local-filename', '{{ fileGrp }}/FILE_{{ pageid }}.{{ ext }}', |
||
449 | '--file-id', 'FILE_{{ fileGrp }}_{{ pageid }}', |
||
450 | '--page-id', 'PHYS_{{ pageid }}', |
||
451 | '--file-grp', '{{ fileGrp }}', |
||
452 | '%s/*/*' % srcdir |
||
453 | ]) |
||
454 | # print('exit_code', exit_code) |
||
455 | # print('out', out) |
||
456 | # print('err', err) |
||
457 | ws.reload_mets() |
||
458 | assert len(ws.mets.file_groups) == 2 |
||
459 | assert len(ws.mets.find_all_files()) == 2 * NO_FILES |
||
460 | assert len(ws.mets.find_all_files(mimetype='image/tiff')) == NO_FILES |
||
461 | assert len(ws.mets.find_all_files(ID='//FILE_OCR-D-IMG_000.*')) == 10 |
||
462 | assert len(ws.mets.find_all_files(ID='//FILE_.*_000.*')) == 20 |
||
463 | assert len(ws.mets.find_all_files(pageId='PHYS_0001')) == 2 |
||
464 | assert ws.mets.find_all_files(ID='FILE_OCR-D-PAGE_0001')[0].local_filename == 'OCR-D-PAGE/FILE_0001.xml' |
||
465 | |||
466 | def test_bulk_add_missing_param(self): |
||
467 | with pushd_popd(tempdir=True) as wsdir: |
||
468 | ws = self.resolver.workspace_from_nothing(directory=wsdir) |
||
469 | with pytest.raises(ValueError, match=r"OcrdFile attribute 'page_id' unset"): |
||
470 | _, out, err = self.invoke_cli(workspace_cli, [ |
||
471 | 'bulk-add', |
||
472 | '-r', r'(?P<pageid>.*) (?P<filegrp>.*) (?P<fileid>.*) (?P<src>.*) (?P<url>.*) (?P<mimetype>.*)', |
||
473 | '-G', '{{ filegrp }}', |
||
474 | # '-g', '{{ pageid }}', # XXX skip --page-id |
||
475 | '-i', '{{ fileid }}', |
||
476 | '-m', '{{ mimetype }}', |
||
477 | '-u', "{{ url }}", |
||
478 | 'a b c d e f', '1 2 3 4 5 6']) |
||
479 | print('out', out) |
||
480 | print('err', err) |
||
481 | assert 0 |
||
482 | |||
483 | def test_bulk_add_gen_id(self): |
||
484 | with pushd_popd(tempdir=True) as wsdir: |
||
485 | ws = self.resolver.workspace_from_nothing(directory=wsdir) |
||
486 | Path(wsdir, 'c.ext').write_text('') |
||
487 | _, out, err = self.invoke_cli(workspace_cli, [ |
||
488 | 'bulk-add', |
||
489 | '-r', r'(?P<pageid>.*) (?P<filegrp>.*) (?P<src>.*) (?P<local_filename>.*) (?P<mimetype>.*)', |
||
490 | '-G', '{{ filegrp }}', |
||
491 | '-g', '{{ pageid }}', |
||
492 | '-S', '{{ src }}', |
||
493 | # '-i', '{{ fileid }}', # XXX skip --file-id |
||
494 | '-m', '{{ mimetype }}', |
||
495 | '-l', "{{ local_filename }}", |
||
496 | '-u', "https://host/{{ filegrp }}/{{ local_filename }}", |
||
497 | 'a b c.ext d e']) |
||
498 | ws.reload_mets() |
||
499 | print(out) |
||
500 | assert next(ws.mets.find_files()).ID == 'b_c' |
||
501 | assert next(ws.mets.find_files()).local_filename == 'd' |
||
502 | assert next(ws.mets.find_files()).url == 'https://host/b/d' |
||
503 | |||
504 | def test_bulk_add_derive_local_filename(self): |
||
505 | with pushd_popd(tempdir=True) as wsdir: |
||
506 | ws = self.resolver.workspace_from_nothing(directory=wsdir) |
||
507 | Path(wsdir, 'srcdir').mkdir() |
||
508 | Path(wsdir, 'srcdir', 'src.xml').write_text('') |
||
509 | _, out, err = self.invoke_cli(workspace_cli, [ |
||
510 | 'bulk-add', |
||
511 | '-r', r'(?P<pageid>.*) (?P<filegrp>.*) (?P<src>.*)', |
||
512 | '-G', '{{ filegrp }}', |
||
513 | '-g', '{{ pageid }}', |
||
514 | '-S', '{{ src }}', |
||
515 | # '-l', "{{ local_filename }}", # XXX skip --local-filename |
||
516 | 'p0001 SEG srcdir/src.xml']) |
||
517 | # print('out', out) |
||
518 | # print('err', err) |
||
519 | ws.reload_mets() |
||
520 | assert next(ws.mets.find_files()).local_filename == 'srcdir/src.xml' |
||
521 | |||
522 | def test_bulk_add_stdin(self): |
||
523 | resolver = Resolver() |
||
524 | with pushd_popd(tempdir=True) as wsdir: |
||
525 | ws = resolver.workspace_from_nothing(directory=wsdir) |
||
526 | Path(wsdir, 'BIN').mkdir() |
||
527 | Path(wsdir, 'BIN/FILE_0001_BIN.IMG-wolf.png').write_text('', encoding='UTF-8') |
||
528 | Path(wsdir, 'BIN/FILE_0002_BIN.IMG-wolf.png').write_text('', encoding='UTF-8') |
||
529 | Path(wsdir, 'BIN/FILE_0001_BIN.xml').write_text('', encoding='UTF-8') |
||
530 | Path(wsdir, 'BIN/FILE_0002_BIN.xml').write_text('', encoding='UTF-8') |
||
531 | with mock_stdin( |
||
532 | 'PHYS_0001 BIN FILE_0001_BIN.IMG-wolf BIN/FILE_0001_BIN.IMG-wolf.png BIN/FILE_0001_BIN.IMG-wolf.png image/png\n' |
||
533 | 'PHYS_0002 BIN FILE_0002_BIN.IMG-wolf BIN/FILE_0002_BIN.IMG-wolf.png BIN/FILE_0002_BIN.IMG-wolf.png image/png\n' |
||
534 | 'PHYS_0001 BIN FILE_0001_BIN BIN/FILE_0001_BIN.xml BIN/FILE_0001_BIN.xml application/vnd.prima.page+xml\n' |
||
535 | 'PHYS_0002 BIN FILE_0002_BIN BIN/FILE_0002_BIN.xml BIN/FILE_0002_BIN.xml application/vnd.prima.page+xml\n'): |
||
536 | assert len(ws.mets.file_groups) == 0 |
||
537 | exit_code, out, err = self.invoke_cli(workspace_cli, [ |
||
538 | 'bulk-add', |
||
539 | '-r', r'(?P<pageid>.*) (?P<filegrp>.*) (?P<fileid>.*) (?P<src>.*) (?P<dest>.*) (?P<mimetype>.*)', |
||
540 | '-G', '{{ filegrp }}', |
||
541 | '-g', '{{ pageid }}', |
||
542 | '-i', '{{ fileid }}', |
||
543 | '-m', '{{ mimetype }}', |
||
544 | '-l', "{{ dest }}", |
||
545 | '-u', "https://host/{{ fileid }}/{{ dest }}", |
||
546 | '-']) |
||
547 | ws.reload_mets() |
||
548 | assert len(ws.mets.file_groups) == 1 |
||
549 | assert len(list(ws.mets.find_files())) == 4 |
||
550 | f = next(ws.mets.find_files()) |
||
551 | assert f.mimetype == 'image/png' |
||
552 | assert f.ID == 'FILE_0001_BIN.IMG-wolf' |
||
553 | assert f.local_filename == 'BIN/FILE_0001_BIN.IMG-wolf.png' |
||
554 | assert f.url == 'https://host/FILE_0001_BIN.IMG-wolf/BIN/FILE_0001_BIN.IMG-wolf.png' |
||
555 | |||
556 | def test_list_page(self): |
||
557 | def _call(args): |
||
558 | _, out, _ = self.invoke_cli(workspace_cli, ['list-page', *args]) |
||
559 | return out.rstrip('\n') |
||
560 | with pushd_popd(Path(__file__).parent.parent / 'data/list-page-workspace'): |
||
561 | assert _call([]) == 'PHYS_0001\nPHYS_0002\nPHYS_0003\nPHYS_0004\nPHYS_0005\nPHYS_0006\nPHYS_0008\nPHYS_0009\nPHYS_0010\nPHYS_0011\nPHYS_0012\nPHYS_0013\nPHYS_0014\nPHYS_0015\nPHYS_0016\nPHYS_0017\nPHYS_0018\nPHYS_0019\nPHYS_0020\nPHYS_0022\nPHYS_0023\nPHYS_0024\nPHYS_0025\nPHYS_0026\nPHYS_0027\nPHYS_0028\nPHYS_0029' |
||
562 | assert _call(['-f', 'comma-separated']) == 'PHYS_0001,PHYS_0002,PHYS_0003,PHYS_0004,PHYS_0005,PHYS_0006,PHYS_0008,PHYS_0009,PHYS_0010,PHYS_0011,PHYS_0012,PHYS_0013,PHYS_0014,PHYS_0015,PHYS_0016,PHYS_0017,PHYS_0018,PHYS_0019,PHYS_0020,PHYS_0022,PHYS_0023,PHYS_0024,PHYS_0025,PHYS_0026,PHYS_0027,PHYS_0028,PHYS_0029' |
||
563 | assert _call(['-f', 'json']) == '[[["PHYS_0001"], ["PHYS_0002"], ["PHYS_0003"], ["PHYS_0004"], ["PHYS_0005"], ["PHYS_0006"], ["PHYS_0008"], ["PHYS_0009"], ["PHYS_0010"], ["PHYS_0011"], ["PHYS_0012"], ["PHYS_0013"], ["PHYS_0014"], ["PHYS_0015"], ["PHYS_0016"], ["PHYS_0017"], ["PHYS_0018"], ["PHYS_0019"], ["PHYS_0020"], ["PHYS_0022"], ["PHYS_0023"], ["PHYS_0024"], ["PHYS_0025"], ["PHYS_0026"], ["PHYS_0027"], ["PHYS_0028"], ["PHYS_0029"]]]' |
||
564 | assert _call(['-f', 'comma-separated', '-R', '5..5']) == 'PHYS_0005' |
||
565 | assert _call(['-f', 'comma-separated', '-R', '6..8']) == 'PHYS_0006,PHYS_0008,PHYS_0009' |
||
566 | assert _call(['-f', 'comma-separated', '-r', '1..5']) == 'PHYS_0001,PHYS_0002,PHYS_0003,PHYS_0004,PHYS_0005' |
||
567 | assert _call(['-f', 'comma-separated', '-r', '2..3']) == 'PHYS_0002,PHYS_0003' |
||
568 | assert _call(['-f', 'comma-separated', '-r', 'page 2..page 3']) == 'PHYS_0002,PHYS_0003' |
||
569 | assert _call(['-f', 'comma-separated', '-r', 'PHYS_0006..PHYS_0009']) == 'PHYS_0006,PHYS_0008,PHYS_0009' |
||
570 | assert _call(['-f', 'comma-separated', '-r', 'PHYS_0001..PHYS_0010', '-D', '3']) == 'PHYS_0001,PHYS_0002,PHYS_0003\nPHYS_0004,PHYS_0005,PHYS_0006\nPHYS_0008,PHYS_0009,PHYS_0010' |
||
571 | assert _call(['-f', 'comma-separated', '-r', 'PHYS_0001..PHYS_0010', '-D', '3', '-C', '2']) == 'PHYS_0008,PHYS_0009,PHYS_0010' |
||
572 | from json import loads |
||
573 | assert loads(_call(['-f', 'json', '-r', 'PHYS_0001..PHYS_0010', '-D', '3', '-C', '2'])) == [[['PHYS_0008'], ['PHYS_0009'], ['PHYS_0010']]] |
||
574 | assert loads(_call(['-f', 'json', '-r', 'PHYS_0001..PHYS_0010', '-k', 'ID', '-k', 'ORDERLABEL', '-D', '3', '-C', '2'])) == \ |
||
575 | [[['PHYS_0008', 'page 7'], ['PHYS_0009', 'page 8'], ['PHYS_0010', 'page 9']]] |
||
576 | |||
577 | if __name__ == '__main__': |
||
578 | main(__file__) |
||
579 |