Passed
Pull Request — master (#1184)
by Konstantin
03:15
created

tests.cli.test_workspace   F

Complexity

Total Complexity 75

Size/Duplication

Total Lines 579
Duplicated Lines 12.61 %

Importance

Changes 0
Metric Value
wmc 75
eloc 453
dl 73
loc 579
rs 2.4
c 0
b 0
f 0

28 Methods

Rating   Name   Duplication   Size   Complexity  
A TestCli.test_mets_get_id_set_id() 0 9 2
A TestCli.test_add_existing_checked() 0 25 3
A TestCli.test_add() 0 52 4
A TestCli.test_add_519() 0 26 3
A TestCli.test_mets_directory_incompatible() 0 4 3
A TestCli.test_mets_basename() 0 7 3
A TestCli.test_mets_basename_and_not_mets() 0 5 2
A TestCli.test_add_nonexisting_checked() 0 19 2
A TestCli.test_clone_relative() 0 7 2
A TestCli.test_mets_basename_and_mets() 0 4 3
A TestCli.test_list_page() 0 20 2
A TestCli.test_find_all_files_multiple_physical_pages_for_fileids() 0 8 2
A TestCli.test_clone_into_nonexisting_dir() 0 13 2
A TestCli.test_add_remove_force() 37 37 3
A TestCli.test_find_all_files() 0 8 3
A TestCli.test_bulk_add_missing_param() 0 16 3
A TestCli.test_remove_file_group() 0 27 3
A TestCli.test_bulk_add_stdin() 0 33 3
A TestCli.test_bulk_add_derive_local_filename() 0 17 2
B TestCli.test_bulk_add0() 0 32 5
A TestCli.setUp() 0 6 1
A TestCli.test_copy_vs_clone() 0 36 3
A TestCli.test_add_remove() 36 36 3
A TestCli.test_bulk_add_gen_id() 0 20 2
A TestCli.test_prune_files() 0 12 2
A TestCli.test_mets_directory_http() 0 4 3
A TestCli.test_find_all_files_outputfield() 0 10 3
A TestCli.test_add_url() 0 21 2

1 Function

Rating   Name   Duplication   Size   Complexity  
A mock_stdin() 0 6 1

How to fix   Duplicated Code    Complexity   

Duplicated Code

Duplicate code is one of the most pungent code smells. A rule that is often used is to re-structure code once it is duplicated in three or more places.

Common duplication problems, and corresponding solutions are:

Complexity

 Tip:   Before tackling complexity, make sure that you eliminate any duplication first. This often can reduce the size of classes significantly.

Complex classes like tests.cli.test_workspace often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
from os.path import join, exists
2
from pathlib import Path
3
from filecmp import dircmp
4
from shutil import copytree
5
from tempfile import TemporaryDirectory
6
from io import StringIO
7
from contextlib import contextmanager
8
import sys
9
10
from click.testing import CliRunner
11
import pytest
12
13
# pylint: disable=import-error, no-name-in-module
14
from tests.base import CapturingTestCase as TestCase, assets, copy_of_directory, main
15
16
from ocrd_utils import initLogging, pushd_popd, setOverrideLogLevel, disableLogging
17
from ocrd.cli.workspace import workspace_cli
18
from ocrd import Resolver
19
20
@contextmanager
21
def mock_stdin(inp):
22
    old_stdin = sys.stdin
23
    sys.stdin = StringIO(inp)
24
    yield
25
    sys.stdin = old_stdin
26
27
class TestCli(TestCase):
28
29
    def setUp(self):
30
        super().setUp()
31
        disableLogging()
32
        self.maxDiff = None
33
        self.resolver = Resolver()
34
        self.runner = CliRunner(mix_stderr=False)
35
36
    def test_add(self):
37
        """
38
        Ensure that `ocrd workspace add` does the right thing
39
        """
40
        ID = 'foo123file'
41
        page_id = 'foo123page'
42
        file_grp = 'TEST_GROUP'
43
        content = 'x'
44
        mimetype = 'image/tiff'
45
        local_filename = join(file_grp, 'foo.xml')
46
47
        #  mets_api = None
48
        #  mets_cli = None
49
50
        with TemporaryDirectory() as tempdir:
51
            ws_api = self.resolver.workspace_from_nothing(directory=tempdir)
52
            ws_api.add_file(
53
                file_grp,
54
                file_id=ID,
55
                content=content,
56
                page_id=page_id,
57
                mimetype=mimetype,
58
                local_filename=local_filename
59
            )
60
            ws_api.save_mets()
61
            #  mets_api = ws_api.mets.to_xml().decode('utf8')
62
63
        with TemporaryDirectory() as tempdir:
64
            ws_api = self.resolver.workspace_from_nothing(directory=tempdir)
65
            content_file = join(tempdir, 'testfile')
66
            with open(content_file, 'w') as f:
67
                f.write(content)
68
                result = self.runner.invoke(workspace_cli, [
69
                    '-d', tempdir,
70
                    'add',
71
                    '--file-grp', file_grp,
72
                    '--page-id', page_id,
73
                    '--file-id', ID,
74
                    '--mimetype', mimetype,
75
                    content_file
76
                ])
77
                self.assertEqual(result.exit_code, 0)
78
                # TODO too complex to compare :(
79
                #  with open(join(tempdir, 'mets.xml')) as f:
80
                #      mets_cli = f.read()
81
                #  print(mets_api)
82
                #  print(mets_cli)
83
                #  self.assertEqual(mets_api, mets_cli)
84
                #  print(result.output)
85
                #  with open(join(tempdir, 'mets.xml')) as f:
86
                #      print(f.read())
87
                self.assertEqual(result.exit_code, 0)
88
89
90 View Code Duplication
    def test_add_remove(self):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
91
        ID = 'foo123file'
92
        page_id = 'foo123page'
93
        file_grp = 'TEST_GROUP'
94
        content = 'x'
95
        mimetype = 'image/tiff'
96
        with TemporaryDirectory() as tempdir:
97
            content_file = join(tempdir, 'testfile')
98
            with open(content_file, 'w') as f:
99
                f.write(content)
100
101
            result = self.runner.invoke(workspace_cli, ['init', tempdir])
102
            self.assertEqual(result.exit_code, 0)
103
104
            result = self.runner.invoke(workspace_cli, [
105
                '-d', tempdir,
106
                'add',
107
                '--file-grp', file_grp,
108
                '--page-id', page_id,
109
                '--file-id', ID,
110
                '--mimetype', mimetype,
111
                content_file
112
            ])
113
            self.assertEqual(result.exit_code, 0)
114
115
            result = self.runner.invoke(workspace_cli, [
116
                '-d',
117
                tempdir,
118
                'remove',
119
                '--keep-file',
120
                ID
121
            ])
122
            self.assertEqual(result.exit_code, 0)
123
124
            # File should still exist
125
            self.assertTrue(exists(content_file))
126
127 View Code Duplication
    def test_add_remove_force(self):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
128
        ID = 'foo123file'
129
        page_id = 'foo123page'
130
        file_grp = 'TEST_GROUP'
131
        content = 'x'
132
        mimetype = 'image/tiff'
133
        with TemporaryDirectory() as tempdir:
134
            tempdir = str(Path(tempdir).resolve())
135
            content_file = join(tempdir, 'testfile')
136
            with open(content_file, 'w') as f:
137
                f.write(content)
138
139
            result = self.runner.invoke(workspace_cli, ['init', tempdir])
140
            self.assertEqual(result.exit_code, 0)
141
142
            result = self.runner.invoke(workspace_cli, [
143
                '-d', tempdir,
144
                'add',
145
                '--file-grp', file_grp,
146
                '--page-id', page_id,
147
                '--file-id', ID,
148
                '--mimetype', mimetype,
149
                content_file
150
            ])
151
            self.assertEqual(result.exit_code, 0)
152
153
            result = self.runner.invoke(workspace_cli, [
154
                '-d',
155
                tempdir,
156
                'remove',
157
                '--force',
158
                ID
159
            ])
160
            self.assertEqual(result.exit_code, 0)
161
162
            # File should have been deleted
163
            self.assertFalse(exists(content_file))
164
165
    def test_add_url(self):
166
        ID = 'foo123file'
167
        page_id = 'foo123page'
168
        file_grp = 'TEST_GROUP'
169
        mimetype = 'image/tiff'
170
        url = 'http://remote/file.tif'
171
        with TemporaryDirectory() as tempdir:
172
            ws = self.resolver.workspace_from_nothing(directory=tempdir)
173
            ws.save_mets()
174
            result = self.runner.invoke(workspace_cli, [
175
                '-d', tempdir,
176
                'add',
177
                '--file-grp', file_grp,
178
                '--page-id', page_id,
179
                '--file-id', ID,
180
                '--mimetype', mimetype,
181
                url])
182
            self.assertEqual(result.exit_code, 0)
183
            ws.reload_mets()
184
            f = ws.mets.find_all_files()[0]
185
            self.assertEqual(f.url, url)
186
187
    def test_add_nonexisting_checked(self):
188
        ID = 'foo123file'
189
        page_id = 'foo123page'
190
        file_grp = 'TEST_GROUP'
191
        mimetype = 'image/tiff'
192
        with pushd_popd(tempdir=True) as tempdir:
193
            ws = self.resolver.workspace_from_nothing(directory=tempdir)
194
            ws.save_mets()
195
            exit_code, out, err = self.invoke_cli(workspace_cli, [
196
                '-d', tempdir,
197
                'add',
198
                '-C',
199
                '--file-grp', file_grp,
200
                '--page-id', page_id,
201
                '--file-id', ID,
202
                '--mimetype', mimetype,
203
                'does-not-exist.xml'])
204
            self.assertEqual(exit_code, 1)
205
            self.assertIn("File 'does-not-exist.xml' does not exist, halt execution!", err)
206
207
    def test_add_519(self):
208
        """
209
        https://github.com/OCR-D/core/issues/519
210
        """
211
        with TemporaryDirectory() as tempdir:
212
            wsdir = Path(tempdir, "workspace")
213
            wsdir.mkdir()
214
            srcdir = Path(tempdir, "source")
215
            srcdir.mkdir()
216
            srcfile = Path(srcdir, "srcfile.jpg")
217
            srcfile_content = 'foo'
218
            srcfile.write_text(srcfile_content)
219
            with pushd_popd(str(wsdir)):
220
                exit_code, out, err = self.invoke_cli(workspace_cli, ['init'])
221
                exit_code, out, err = self.invoke_cli(workspace_cli, [
222
                    'add',
223
                    '-m', 'image/jpg',
224
                    '-G', 'MAX',
225
                    '-i', 'IMG_MAX_1818975',
226
                    '-C',
227
                    str(srcfile)
228
                    ])
229
                # print(out, err)
230
                self.assertEqual(exit_code, 0)
231
                self.assertTrue(Path(wsdir, 'MAX', 'srcfile.jpg').exists())
232
                self.assertEqual(Path(wsdir, 'MAX', 'srcfile.jpg').read_text(), srcfile_content)
233
234
    def test_add_existing_checked(self):
235
        ID = 'foo123file'
236
        page_id = 'foo123page'
237
        file_grp = 'TEST_GROUP'
238
        mimetype = 'image/tiff'
239
        with TemporaryDirectory() as tempdir:
240
            tempdir = str(Path(tempdir).resolve())
241
            content_file = join(tempdir, 'test.tif')
242
            ws = self.resolver.workspace_from_nothing(directory=tempdir)
243
            ws.save_mets()
244
            with open(content_file, 'w') as f:
245
                f.write('x')
246
            result = self.runner.invoke(workspace_cli, [
247
                '-d', tempdir,
248
                'add',
249
                '-C',
250
                '--file-grp', file_grp,
251
                '--page-id', page_id,
252
                '--file-id', ID,
253
                '--mimetype', mimetype,
254
                content_file])
255
            self.assertEqual(result.exit_code, 0)
256
            ws.reload_mets()
257
            f = ws.mets.find_all_files()[0]
258
            self.assertEqual(f.url, 'test.tif')
259
260
    def test_find_all_files(self):
261
        with TemporaryDirectory() as tempdir:
262
            wsdir = join(tempdir, 'ws')
263
            copytree(assets.path_to('SBB0000F29300010000/data'), wsdir)
264
            with pushd_popd(wsdir):
265
                result = self.runner.invoke(workspace_cli, ['find', '-G', 'OCR-D-IMG-BIN', '-k', 'fileGrp'])
266
                self.assertEqual(result.output, 'OCR-D-IMG-BIN\nOCR-D-IMG-BIN\n')
267
                self.assertEqual(result.exit_code, 0)
268
269
    def test_find_all_files_outputfield(self):
270
        with TemporaryDirectory() as tempdir:
271
            wsdir = join(tempdir, 'ws')
272
            copytree(assets.path_to('SBB0000F29300010000/data'), wsdir)
273
            with pushd_popd(wsdir):
274
                result = self.runner.invoke(workspace_cli,
275
                                            ['find', '-G', 'OCR-D-IMG-BIN', '-k',
276
                                             'file_grp', '-k', 'file_id', '-k', 'page_id'])
277
                self.assertEqual(result.exit_code, 0)
278
                self.assertEqual(result.output, 'OCR-D-IMG-BIN\tFILE_0001_IMAGE_BIN\tPHYS_0001\n'
279
                                                'OCR-D-IMG-BIN\tFILE_0002_IMAGE_BIN\tPHYS_0002\n')
280
281
    def test_prune_files(self):
282
        with TemporaryDirectory() as tempdir:
283
            copytree(assets.path_to('SBB0000F29300010000/data'), join(tempdir, 'ws'))
284
285
            ws1 = self.resolver.workspace_from_url(join(tempdir, 'ws', 'mets.xml'))
286
            self.assertEqual(len(ws1.mets.find_all_files()), 35)
287
288
            result = self.runner.invoke(workspace_cli, ['-d', join(tempdir, 'ws'), 'prune-files'])
289
            self.assertEqual(result.exit_code, 0)
290
291
            ws2 = self.resolver.workspace_from_url(join(tempdir, 'ws', 'mets.xml'))
292
            self.assertEqual(len(ws2.mets.find_all_files()), 29)
293
294
    def test_clone_into_nonexisting_dir(self):
295
        """
296
        https://github.com/OCR-D/core/issues/330
297
        """
298
        with TemporaryDirectory() as tempdir:
299
            clone_to = join(tempdir, 'non-existing-dir')
300
            result = self.runner.invoke(workspace_cli, [
301
                'clone',
302
                '--download',
303
                assets.path_to('scribo-test/data/mets.xml'),
304
                clone_to
305
            ])
306
            self.assertEqual(result.exit_code, 0)
307
308
    def test_remove_file_group(self):
309
        """
310
        Test removal of filegrp
311
        """
312
        with TemporaryDirectory() as tempdir:
313
            tempdir = str(Path(tempdir).resolve())
314
            wsdir = join(tempdir, 'ws')
315
            copytree(assets.path_to('SBB0000F29300010000/data'), wsdir)
316
            file_group = 'OCR-D-GT-PAGE'
317
            file_path = Path(tempdir, 'ws', file_group, 'FILE_0002_FULLTEXT.xml')
318
            self.assertTrue(file_path.exists())
319
320
            workspace = self.resolver.workspace_from_url(join(wsdir, 'mets.xml'))
321
            self.assertEqual(workspace.directory, wsdir)
322
323
            with self.assertRaisesRegex(Exception, "not empty"):
324
                workspace.remove_file_group(file_group)
325
326
            self.assertTrue(file_path.exists())
327
            self.assertEqual(len(workspace.mets.file_groups), 17)
328
            self.assertEqual(len(workspace.mets.find_all_files()), 35)
329
330
            workspace.remove_file_group(file_group, recursive=True, force=True)
331
332
            self.assertEqual(len(workspace.mets.file_groups), 16)
333
            self.assertEqual(len(workspace.mets.find_all_files()), 33)
334
            self.assertFalse(file_path.exists())
335
336
            # TODO ensure empty dirs are removed
337
            # self.assertFalse(file_path.parent.exists())
338
339
340
    def test_clone_relative(self):
341
        # Create a relative path to trigger make sure #319 is gone
342
        src_path = str(Path(assets.path_to('kant_aufklaerung_1784/data/mets.xml')).relative_to(Path.cwd()))
343
        with TemporaryDirectory() as tempdir:
344
            result = self.runner.invoke(workspace_cli, ['clone', '-a', src_path, tempdir])
345
            self.assertEqual(result.exit_code, 0)
346
            self.assertTrue(exists(join(tempdir, 'OCR-D-GT-PAGE/PAGE_0017_PAGE.xml')))
347
348
    def test_copy_vs_clone(self):
349
        src_dir = assets.path_to('kant_aufklaerung_1784/data')
350
        with TemporaryDirectory() as tempdir:
351
            # cloned without download
352
            shallowcloneddir = join(tempdir, 'cloned-shallow')
353
            # cloned with download
354
            fullcloneddir = join(tempdir, 'cloned-all')
355
            # copied
356
            copieddir = join(tempdir, 'copied')
357
358
            Path(fullcloneddir).mkdir()
359
            Path(shallowcloneddir).mkdir()
360
361
362
            result = self.runner.invoke(workspace_cli, ['clone', join(src_dir, 'mets.xml'), shallowcloneddir])
363
            self.assertEqual(result.exit_code, 0)
364
365
            result = self.runner.invoke(workspace_cli, ['clone', '--download', join(src_dir, 'mets.xml'), fullcloneddir])
366
            self.assertEqual(result.exit_code, 0)
367
368
            with copy_of_directory(src_dir, copieddir):
369
                shallow_vs_copied = dircmp(shallowcloneddir, copieddir)
370
                assert set(shallow_vs_copied.right_only) == set(['OCR-D-GT-ALTO', 'OCR-D-GT-PAGE', 'OCR-D-IMG'])
371
372
                full_vs_copied = dircmp(fullcloneddir, copieddir)
373
                #  print(full_vs_copied)
374
                #  from ocrd_utils import pushd_popd
375
                #  with pushd_popd(tempdir):
376
                    #  import os
377
                    #  os.system("diff %s/mets.xml %s/mets.xml" % (fullcloneddir, copieddir))
378
                # XXX mets.xml will not have the exact same content because
379
                # URLs that are actually files will be marked up as such with
380
                # @LOCTYPE/@OTHERLOCTYPE
381
                #  self.assertEqual(full_vs_copied.diff_files, [])
382
                self.assertEqual(full_vs_copied.left_only, [])
383
                self.assertEqual(full_vs_copied.right_only, [])
384
385
    def test_find_all_files_multiple_physical_pages_for_fileids(self):
386
        with copy_of_directory(assets.path_to('SBB0000F29300010000/data')) as tempdir:
387
            result = self.runner.invoke(workspace_cli, ['-d', tempdir, 'find', '--page-id', 'PHYS_0005,PHYS_0005', '-k', 'local_filename'])
388
            print(result.stdout)
389
            self.assertEqual(result.stdout, 'OCR-D-IMG/FILE_0005_IMAGE.tif\n')
390
            self.assertEqual(result.exit_code, 0)
391
            result = self.runner.invoke(workspace_cli, ['-d', tempdir, 'find', '--page-id', 'PHYS_0005,PHYS_0001', '-k', 'local_filename'])
392
            self.assertEqual(len(result.stdout.split('\n')), 19)
393
394
    def test_mets_basename(self):
395
        with TemporaryDirectory() as tempdir:
396
            with pushd_popd(tempdir):
397
                result = self.runner.invoke(workspace_cli, ['-m', 'foo.xml', 'init'])
398
                self.assertEqual(result.exit_code, 0)
399
                self.assertTrue(exists('foo.xml'))
400
                self.assertFalse(exists('mets.xml'))
401
402
    def test_mets_basename_and_mets(self):
403
        with pushd_popd(tempdir=True) as tempdir:
404
            with self.assertRaisesRegex(ValueError, "Use either --mets or --mets-basename, not both"):
405
                self.invoke_cli(workspace_cli, ['-m', 'foo.xml', '-M', 'not-foo.xml', 'init'])
406
407
    def test_mets_basename_and_not_mets(self):
408
        with pushd_popd(tempdir=True) as tempdir:
409
            _, out, err = self.invoke_cli(workspace_cli, ['-d', 'foo', '-M', 'not-foo.xml', 'init'])
410
            self.assertEqual(out, join(tempdir, 'foo') + '\n')
411
            self.assertIn('--mets-basename is deprecated', err)
412
413
    def test_mets_get_id_set_id(self):
414
        with pushd_popd(tempdir=True):
415
            self.invoke_cli(workspace_cli, ['init'])
416
            disableLogging()
417
            mets_id = 'foo123'
418
            self.invoke_cli(workspace_cli, ['set-id', mets_id])
419
            disableLogging()
420
            _, out, _ = self.invoke_cli(workspace_cli, ['get-id'])
421
            self.assertEqual(out, mets_id + '\n')
422
423
    def test_mets_directory_incompatible(self):
424
          with pushd_popd(tempdir=True) as tempdir:
425
            with self.assertRaisesRegex(ValueError, "inconsistent with --directory"):
426
                self.invoke_cli(workspace_cli, ['-d', 'foo', '-m', '/somewhere/else', 'init'])
427
428
    def test_mets_directory_http(self):
429
          with pushd_popd(tempdir=True) as tempdir:
430
            with self.assertRaisesRegex(ValueError, r"--mets is an http\(s\) URL but no --directory was given"):
431
                self.invoke_cli(workspace_cli, ['-m', 'https://foo.bar/bla', 'init'])
432
433
    def test_bulk_add0(self):
434
        NO_FILES=100
435
        with TemporaryDirectory() as srcdir:
436
            Path(srcdir, "OCR-D-IMG").mkdir()
437
            Path(srcdir, "OCR-D-PAGE").mkdir()
438
            for i in range(NO_FILES):
439
                Path(srcdir, "OCR-D-IMG", "page_%04d.tif" % i).write_text('')
440
            for i in range(NO_FILES):
441
                Path(srcdir, "OCR-D-PAGE", "page_%04d.xml" % i).write_text('')
442
            with pushd_popd(tempdir=True) as wsdir:
443
                ws = self.resolver.workspace_from_nothing(directory=wsdir)
444
                exit_code, out, err = self.invoke_cli(workspace_cli, [
445
                    'bulk-add',
446
                    '--ignore',
447
                    '--regex', r'^.*/(?P<fileGrp>[^/]+)/page_(?P<pageid>.*)\.(?P<ext>[^\.]*)$',
448
                    '--local-filename', '{{ fileGrp }}/FILE_{{ pageid }}.{{ ext }}',
449
                    '--file-id', 'FILE_{{ fileGrp }}_{{ pageid }}',
450
                    '--page-id', 'PHYS_{{ pageid }}',
451
                    '--file-grp', '{{ fileGrp }}',
452
                    '%s/*/*' % srcdir
453
                ])
454
                # print('exit_code', exit_code)
455
                # print('out', out)
456
                # print('err', err)
457
                ws.reload_mets()
458
                assert len(ws.mets.file_groups) == 2
459
                assert len(ws.mets.find_all_files()) == 2 * NO_FILES
460
                assert len(ws.mets.find_all_files(mimetype='image/tiff')) == NO_FILES
461
                assert len(ws.mets.find_all_files(ID='//FILE_OCR-D-IMG_000.*')) == 10
462
                assert len(ws.mets.find_all_files(ID='//FILE_.*_000.*')) == 20
463
                assert len(ws.mets.find_all_files(pageId='PHYS_0001')) == 2
464
                assert ws.mets.find_all_files(ID='FILE_OCR-D-PAGE_0001')[0].local_filename == 'OCR-D-PAGE/FILE_0001.xml'
465
466
    def test_bulk_add_missing_param(self):
467
        with pushd_popd(tempdir=True) as wsdir:
468
            ws = self.resolver.workspace_from_nothing(directory=wsdir)
469
            with pytest.raises(ValueError, match=r"OcrdFile attribute 'page_id' unset"):
470
                _, out, err = self.invoke_cli(workspace_cli, [
471
                    'bulk-add',
472
                    '-r', r'(?P<pageid>.*) (?P<filegrp>.*) (?P<fileid>.*) (?P<src>.*) (?P<url>.*) (?P<mimetype>.*)',
473
                    '-G', '{{ filegrp }}',
474
                    # '-g', '{{ pageid }}', # XXX skip --page-id
475
                    '-i', '{{ fileid }}',
476
                    '-m', '{{ mimetype }}',
477
                    '-u', "{{ url }}",
478
                    'a b c d e f', '1 2 3 4 5 6'])
479
                print('out', out)
480
                print('err', err)
481
                assert 0
482
483
    def test_bulk_add_gen_id(self):
484
        with pushd_popd(tempdir=True) as wsdir:
485
            ws = self.resolver.workspace_from_nothing(directory=wsdir)
486
            Path(wsdir, 'c.ext').write_text('')
487
            _, out, err = self.invoke_cli(workspace_cli, [
488
                'bulk-add',
489
                '-r', r'(?P<pageid>.*) (?P<filegrp>.*) (?P<src>.*) (?P<local_filename>.*) (?P<mimetype>.*)',
490
                '-G', '{{ filegrp }}',
491
                '-g', '{{ pageid }}',
492
                '-S', '{{ src }}',
493
                # '-i', '{{ fileid }}',  # XXX skip --file-id
494
                '-m', '{{ mimetype }}',
495
                '-l', "{{ local_filename }}",
496
                '-u', "https://host/{{ filegrp }}/{{ local_filename }}",
497
                'a b c.ext d e'])
498
            ws.reload_mets()
499
            print(out)
500
            assert next(ws.mets.find_files()).ID == 'b_c'
501
            assert next(ws.mets.find_files()).local_filename == 'd'
502
            assert next(ws.mets.find_files()).url == 'https://host/b/d'
503
504
    def test_bulk_add_derive_local_filename(self):
505
        with pushd_popd(tempdir=True) as wsdir:
506
            ws = self.resolver.workspace_from_nothing(directory=wsdir)
507
            Path(wsdir, 'srcdir').mkdir()
508
            Path(wsdir, 'srcdir', 'src.xml').write_text('')
509
            _, out, err = self.invoke_cli(workspace_cli, [
510
                'bulk-add',
511
                '-r', r'(?P<pageid>.*) (?P<filegrp>.*) (?P<src>.*)',
512
                '-G', '{{ filegrp }}',
513
                '-g', '{{ pageid }}',
514
                '-S', '{{ src }}',
515
                # '-l', "{{ local_filename }}", # XXX skip --local-filename
516
                'p0001 SEG srcdir/src.xml'])
517
            # print('out', out)
518
            # print('err', err)
519
            ws.reload_mets()
520
            assert next(ws.mets.find_files()).local_filename == 'srcdir/src.xml'
521
522
    def test_bulk_add_stdin(self):
523
        resolver = Resolver()
524
        with pushd_popd(tempdir=True) as wsdir:
525
            ws = resolver.workspace_from_nothing(directory=wsdir)
526
            Path(wsdir, 'BIN').mkdir()
527
            Path(wsdir, 'BIN/FILE_0001_BIN.IMG-wolf.png').write_text('', encoding='UTF-8')
528
            Path(wsdir, 'BIN/FILE_0002_BIN.IMG-wolf.png').write_text('', encoding='UTF-8')
529
            Path(wsdir, 'BIN/FILE_0001_BIN.xml').write_text('', encoding='UTF-8')
530
            Path(wsdir, 'BIN/FILE_0002_BIN.xml').write_text('', encoding='UTF-8')
531
            with mock_stdin(
532
                    'PHYS_0001 BIN FILE_0001_BIN.IMG-wolf BIN/FILE_0001_BIN.IMG-wolf.png BIN/FILE_0001_BIN.IMG-wolf.png image/png\n'
533
                    'PHYS_0002 BIN FILE_0002_BIN.IMG-wolf BIN/FILE_0002_BIN.IMG-wolf.png BIN/FILE_0002_BIN.IMG-wolf.png image/png\n'
534
                    'PHYS_0001 BIN FILE_0001_BIN BIN/FILE_0001_BIN.xml BIN/FILE_0001_BIN.xml application/vnd.prima.page+xml\n'
535
                    'PHYS_0002 BIN FILE_0002_BIN BIN/FILE_0002_BIN.xml BIN/FILE_0002_BIN.xml application/vnd.prima.page+xml\n'):
536
                assert len(ws.mets.file_groups) == 0
537
                exit_code, out, err = self.invoke_cli(workspace_cli, [
538
                    'bulk-add',
539
                    '-r', r'(?P<pageid>.*) (?P<filegrp>.*) (?P<fileid>.*) (?P<src>.*) (?P<dest>.*) (?P<mimetype>.*)',
540
                    '-G', '{{ filegrp }}',
541
                    '-g', '{{ pageid }}',
542
                    '-i', '{{ fileid }}',
543
                    '-m', '{{ mimetype }}',
544
                    '-l', "{{ dest }}",
545
                    '-u', "https://host/{{ fileid }}/{{ dest }}",
546
                    '-'])
547
                ws.reload_mets()
548
                assert len(ws.mets.file_groups) == 1
549
                assert len(list(ws.mets.find_files())) == 4
550
                f = next(ws.mets.find_files())
551
                assert f.mimetype == 'image/png'
552
                assert f.ID == 'FILE_0001_BIN.IMG-wolf'
553
                assert f.local_filename == 'BIN/FILE_0001_BIN.IMG-wolf.png'
554
                assert f.url == 'https://host/FILE_0001_BIN.IMG-wolf/BIN/FILE_0001_BIN.IMG-wolf.png'
555
556
    def test_list_page(self):
557
        def _call(args):
558
            _, out, _ = self.invoke_cli(workspace_cli, ['list-page', *args])
559
            return out.rstrip('\n')
560
        with pushd_popd(Path(__file__).parent.parent / 'data/list-page-workspace'):
561
            assert _call([]) == 'PHYS_0001\nPHYS_0002\nPHYS_0003\nPHYS_0004\nPHYS_0005\nPHYS_0006\nPHYS_0008\nPHYS_0009\nPHYS_0010\nPHYS_0011\nPHYS_0012\nPHYS_0013\nPHYS_0014\nPHYS_0015\nPHYS_0016\nPHYS_0017\nPHYS_0018\nPHYS_0019\nPHYS_0020\nPHYS_0022\nPHYS_0023\nPHYS_0024\nPHYS_0025\nPHYS_0026\nPHYS_0027\nPHYS_0028\nPHYS_0029'
562
            assert _call(['-f', 'comma-separated']) == 'PHYS_0001,PHYS_0002,PHYS_0003,PHYS_0004,PHYS_0005,PHYS_0006,PHYS_0008,PHYS_0009,PHYS_0010,PHYS_0011,PHYS_0012,PHYS_0013,PHYS_0014,PHYS_0015,PHYS_0016,PHYS_0017,PHYS_0018,PHYS_0019,PHYS_0020,PHYS_0022,PHYS_0023,PHYS_0024,PHYS_0025,PHYS_0026,PHYS_0027,PHYS_0028,PHYS_0029'
563
            assert _call(['-f', 'json']) == '[[["PHYS_0001"], ["PHYS_0002"], ["PHYS_0003"], ["PHYS_0004"], ["PHYS_0005"], ["PHYS_0006"], ["PHYS_0008"], ["PHYS_0009"], ["PHYS_0010"], ["PHYS_0011"], ["PHYS_0012"], ["PHYS_0013"], ["PHYS_0014"], ["PHYS_0015"], ["PHYS_0016"], ["PHYS_0017"], ["PHYS_0018"], ["PHYS_0019"], ["PHYS_0020"], ["PHYS_0022"], ["PHYS_0023"], ["PHYS_0024"], ["PHYS_0025"], ["PHYS_0026"], ["PHYS_0027"], ["PHYS_0028"], ["PHYS_0029"]]]'
564
            assert _call(['-f', 'comma-separated', '-R', '5..5']) == 'PHYS_0005'
565
            assert _call(['-f', 'comma-separated', '-R', '6..8']) == 'PHYS_0006,PHYS_0008,PHYS_0009'
566
            assert _call(['-f', 'comma-separated', '-r', '1..5']) == 'PHYS_0001,PHYS_0002,PHYS_0003,PHYS_0004,PHYS_0005'
567
            assert _call(['-f', 'comma-separated', '-r', '2..3']) == 'PHYS_0002,PHYS_0003'
568
            assert _call(['-f', 'comma-separated', '-r', 'page 2..page 3']) == 'PHYS_0002,PHYS_0003'
569
            assert _call(['-f', 'comma-separated', '-r', 'PHYS_0006..PHYS_0009']) == 'PHYS_0006,PHYS_0008,PHYS_0009'
570
            assert _call(['-f', 'comma-separated', '-r', 'PHYS_0001..PHYS_0010', '-D', '3']) == 'PHYS_0001,PHYS_0002,PHYS_0003\nPHYS_0004,PHYS_0005,PHYS_0006\nPHYS_0008,PHYS_0009,PHYS_0010'
571
            assert _call(['-f', 'comma-separated', '-r', 'PHYS_0001..PHYS_0010', '-D', '3', '-C', '2']) == 'PHYS_0008,PHYS_0009,PHYS_0010'
572
            from json import loads
573
            assert loads(_call(['-f', 'json', '-r', 'PHYS_0001..PHYS_0010', '-D', '3', '-C', '2'])) == [[['PHYS_0008'], ['PHYS_0009'], ['PHYS_0010']]]
574
            assert loads(_call(['-f', 'json', '-r', 'PHYS_0001..PHYS_0010', '-k', 'ID', '-k', 'ORDERLABEL', '-D', '3', '-C', '2'])) == \
575
                [[['PHYS_0008', 'page 7'], ['PHYS_0009', 'page 8'], ['PHYS_0010', 'page 9']]]
576
577
if __name__ == '__main__':
578
    main(__file__)
579