1 | import unittest |
||
2 | import os, shutil |
||
3 | from os.path import abspath |
||
4 | |||
5 | |||
6 | class ProvenanceContextApiTests(unittest.TestCase): |
||
7 | |||
8 | def setUp(self): |
||
9 | self.dbpath = os.path.expanduser(os.path.join('~','provenance_test.json')) |
||
10 | if os.path.exists(self.dbpath): |
||
11 | os.remove(self.dbpath) |
||
12 | os.mkdir('temp') |
||
13 | from niprov import ProvenanceContext |
||
14 | self.provenance = ProvenanceContext() |
||
15 | self.provenance.config.database_type = 'file' |
||
16 | self.provenance.config.database_url = self.dbpath |
||
17 | |||
18 | def tearDown(self): |
||
19 | shutil.rmtree('temp') |
||
20 | |||
21 | def touch(self, path): |
||
22 | with open(path,'w') as tempfile: |
||
23 | tempfile.write('0') |
||
24 | |||
25 | def test_Relative_paths(self): |
||
26 | self.provenance.discover('testdata') |
||
27 | newfile = 'temp/smoothed.test' |
||
28 | self.touch(newfile) |
||
29 | self.provenance.log(newfile, 'test', 'testdata/eeg/stub.cnt') |
||
30 | img = self.provenance.get().byLocation(newfile) |
||
31 | |||
32 | View Code Duplication | def test_Log(self): |
|
0 ignored issues
–
show
Duplication
introduced
by
![]() |
|||
33 | self.provenance.discover('testdata') |
||
34 | newfile = 'temp/smoothed.test' |
||
35 | self.touch(newfile) |
||
36 | parent = os.path.abspath('testdata/eeg/stub.cnt') |
||
37 | self.provenance.log(newfile, 'test', parent) |
||
38 | testfpath = os.path.abspath(newfile) |
||
39 | img = self.provenance.get().byLocation(testfpath) |
||
40 | self.assertEqual(img.provenance['subject'], 'Jane Doe') |
||
41 | self.assertEqual(img.provenance['size'], os.path.getsize(newfile)) |
||
42 | |||
43 | View Code Duplication | def test_Record_with_user(self): |
|
0 ignored issues
–
show
|
|||
44 | self.provenance.discover('testdata') |
||
45 | newfile = 'temp/recorded.test' |
||
46 | self.touch(newfile) |
||
47 | parent = os.path.abspath('testdata/eeg/stub.cnt') |
||
48 | self.provenance.record('echo hallo', newfile, parent, user='007') |
||
49 | testfpath = os.path.abspath(newfile) |
||
50 | img = self.provenance.get().byLocation(testfpath) |
||
51 | self.assertEqual(img.provenance['user'], '007') |
||
52 | |||
53 | def test_Export_Import(self): |
||
54 | from niprov.exceptions import UnknownFileError |
||
55 | self.provenance.discover('testdata') |
||
56 | discoveredFile = os.path.abspath('testdata/eeg/stub.cnt') |
||
57 | self.assertIsNotNone(self.provenance.get().byLocation(discoveredFile)) |
||
58 | backupFilepath = self.provenance.backup() |
||
59 | os.remove(self.dbpath) # get rid of existing data. |
||
60 | self.assertIsNone(self.provenance.get().byLocation(discoveredFile)) |
||
61 | self.provenance.importp(backupFilepath) |
||
62 | self.assertIsNotNone(self.provenance.get().byLocation(discoveredFile)) |
||
63 | |||
64 | @unittest.skip("Doesn't work on Travis right now.") |
||
65 | def test_Attach_provenance_string_in_file_based_on_config(self): |
||
66 | import nibabel |
||
67 | self.provenance.config.attach = True |
||
68 | newfile = os.path.abspath('temp/fileX.nii.gz') |
||
69 | shutil.copy('testdata/nifti/fieldmap.nii.gz', newfile) |
||
70 | self.provenance.add(newfile) |
||
71 | img = nibabel.load(newfile) |
||
72 | self.assertEqual(img.get_header().extensions.count('comment'), 1) |
||
73 | self.assertEqual(img.get_header().extensions[0].get_code(), 6) |
||
74 | content = img.get_header().extensions[0].get_content() |
||
75 | self.assertIn('"path": "{0}"'.format(newfile), content) |
||
76 | |||
77 | def test_Approval(self): |
||
78 | self.provenance.discover('testdata') |
||
79 | self.provenance.markForApproval(['testdata/parrec/T1.PAR', |
||
80 | 'testdata/parrec/T2.PAR']) |
||
81 | imgs = self.provenance.markedForApproval() |
||
82 | self.provenance.approve('testdata/parrec/T1.PAR') |
||
83 | imgs = self.provenance.markedForApproval() |
||
84 | |||
85 | def test_Comparison(self): |
||
86 | # Given two PARREC images' provenance records |
||
87 | par1 = self.provenance.add(abspath('testdata/parrec/T1.PAR')) |
||
88 | par2 = self.provenance.add(abspath('testdata/parrec/T2.PAR')) |
||
89 | # Comparing them returns a Diff object with methods testing equality |
||
90 | self.assertFalse(self.provenance.compare(par1, par2).areEqual()) |
||
91 | # Compare() can also be called as a method on the objects themselves, |
||
92 | # and the Diff object has assert..() methods that raise AssertionErrors |
||
93 | msgRegExp = ".*echo-time.*2\.08.*80\.0.*" |
||
94 | with self.assertRaisesRegexp(AssertionError, msgRegExp): |
||
95 | par1.compare(par2).assertEqualProtocol() |
||
96 | |||
97 | def test_Search(self): |
||
98 | x1 = self.provenance.add('x1', transient=True, |
||
99 | provenance={'transformation':'needle and thread'}) |
||
100 | x2 = self.provenance.add('x2/needle.y', transient=True, |
||
101 | provenance={'transformation':'needle and thread'}) |
||
102 | x3 = self.provenance.add('x3', transient=True, |
||
103 | provenance={'transformation':'hammer and tongs'}) |
||
104 | results = self.provenance.search('needle') |
||
105 | self.assertEqual(len(results), 2) |
||
106 | self.assertEqual(x2.provenance['id'], results[0].provenance['id']) |
||
107 | self.assertEqual(x1.provenance['id'], results[1].provenance['id']) |
||
108 | |||
109 | def test_GetModalities(self): |
||
110 | self.provenance.discover('testdata') |
||
111 | modalities = self.provenance.get().allModalities() |
||
112 | self.assertIn('MRI', modalities) |
||
113 | self.assertIn('DWI', modalities) |
||
114 | self.assertIn('EEG', modalities) |
||
115 | |||
116 | def test_Version_history(self): |
||
117 | self.provenance.add('f', transient=True, provenance={'a':1}) |
||
118 | img = self.provenance.get().byLocation('f') |
||
119 | self.assertEqual(1, img.provenance['a']) |
||
120 | self.provenance.add('f', transient=True, provenance={'a':2}) |
||
121 | img = self.provenance.get().byLocation('f') |
||
122 | self.assertEqual(2, img.provenance['a']) |
||
123 | self.assertEqual(1, img.versions[-1]['a']) |
||
124 | self.provenance.add('f', transient=True, provenance={'a':3}) |
||
125 | img = self.provenance.get().byLocation('f') |
||
126 | self.assertEqual(3, img.provenance['a']) |
||
127 | self.assertEqual(2, img.versions[-1]['a']) |
||
128 | self.assertEqual(1, img.versions[-2]['a']) |
||
129 | |||
130 | View Code Duplication | def test_If_no_parent_provided_found_copy_considered_parent(self): |
|
0 ignored issues
–
show
|
|||
131 | self.provenance.add('testdata/eeg/stub.cnt') |
||
132 | self.touch('temp/orig.f') |
||
133 | self.provenance.log('temp/orig.f', 'op1', 'testdata/eeg/stub.cnt') |
||
134 | shutil.copy('temp/orig.f', 'temp/copy.f') |
||
135 | self.touch('temp/child.f') |
||
136 | child = self.provenance.log('temp/child.f', 'op2', 'temp/copy.f') |
||
137 | self.assertEqual(child.provenance['subject'], 'Jane Doe') |
||
138 | copy = self.provenance.get().byLocation('temp/copy.f') |
||
139 | self.assertIn('temp/orig.f', copy.provenance['parents'][0]) |
||
140 | |||
141 | def test_Differentiates_Fifs(self): |
||
142 | """ |
||
143 | The fields tested here are just a subset. |
||
144 | """ |
||
145 | PROJDESC = 'ECG-planar-999--0.080-0.080-PCA-01' |
||
146 | fiftypes = {'ave': {'fif-type':'ave', 'dimensions':[2,401,701]}, |
||
147 | 'cov': {'fif-type':'cov', 'dimensions':[366, 366]}, |
||
148 | 'epo': {'fif-type':'epo', 'highpass':0.10000000149011612}, |
||
149 | 'fwd': {'fif-type':'fwd'}, |
||
150 | 'trans': {'fif-type':'trans'}, |
||
151 | 'proj' : {'fif-type':'proj', |
||
152 | 'projection-description':PROJDESC}} |
||
153 | for ftype, fields in fiftypes.items(): |
||
154 | pth = os.path.abspath('testdata/fif/test-{}.fif'.format(ftype)) |
||
155 | img = self.provenance.add(pth) |
||
156 | for field, expectedValue in fields.items(): |
||
157 | self.assertIn(field, img.provenance) |
||
158 | self.assertEqual(img.provenance[field], expectedValue) |
||
159 | |||
160 | |||
161 | if __name__ == '__main__': |
||
162 | unittest.main() |
||
163 | |||
164 |