Completed
Push — master ( dbc38f...56accc )
by Klaus
01:34
created

test_custom_bufferreaderwrapper()   F

Complexity

Conditions 14

Size

Total Lines 23

Duplication

Lines 0
Ratio 0 %

Importance

Changes 2
Bugs 0 Features 0
Metric Value
cc 14
dl 0
loc 23
rs 2.9878
c 2
b 0
f 0

How to fix   Complexity   

Complexity

Complex classes like test_custom_bufferreaderwrapper() often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
#!/usr/bin/env python
2
# coding=utf-8
3
from __future__ import (division, print_function, unicode_literals,
4
                        absolute_import)
5
6
import os
7
import datetime
8
import tempfile
9
import io
10
11
import pytest
12
13
tinydb = pytest.importorskip("tinydb")
14
hashfs = pytest.importorskip("hashfs")
15
16
from tinydb import TinyDB
17
from hashfs import HashFS
18
19
from sacred.dependencies import get_digest
20
from sacred.observers.tinydb_hashfs import (TinyDbObserver, TinyDbOption, 
21
                                            BufferedReaderWrapper)
22
from sacred import optional as opt
23
from sacred.experiment import Experiment
24
25
T1 = datetime.datetime(1999, 5, 4, 3, 2, 1, 0)
26
T2 = datetime.datetime(1999, 5, 5, 5, 5, 5, 5)
27
28
29
@pytest.fixture()
30
def tinydb_obs(tmpdir):
31
    return TinyDbObserver.create(path=tmpdir.strpath)
32
33
34
@pytest.fixture()
35
def sample_run():
36
    exp = {'name': 'test_exp', 'sources': [], 'doc': '', 'base_dir': '/tmp'}
37
    host = {'hostname': 'test_host', 'cpu_count': 1, 'python_version': '3.4'}
38
    config = {'config': 'True', 'foo': 'bar', 'answer': 42}
39
    command = 'run'
40
    meta_info = {'comment': 'test run'}
41
    return {
42
        '_id': 'FEDCBA9876543210',
43
        'ex_info': exp,
44
        'command': command,
45
        'host_info': host,
46
        'start_time': T1,
47
        'config': config,
48
        'meta_info': meta_info,
49
    }
50
51
52
def test_tinydb_observer_creates_missing_directories(tmpdir):
53
    tinydb_obs = TinyDbObserver.create(path=os.path.join(tmpdir.strpath, 'foo'))
54
    assert tinydb_obs.root == os.path.join(tmpdir.strpath, 'foo')
55
56
57
def test_tinydb_observer_started_event_creates_run(tinydb_obs, sample_run):
58
    sample_run['_id'] = None
59
    _id = tinydb_obs.started_event(**sample_run)
60
    assert _id is not None
61
    assert len(tinydb_obs.runs) == 1
62
    db_run = tinydb_obs.runs.get(eid=1)
63
    assert db_run == {
64
        '_id': _id,
65
        'experiment': sample_run['ex_info'],
66
        'format': tinydb_obs.VERSION,
67
        'command': sample_run['command'],
68
        'host': sample_run['host_info'],
69
        'start_time': sample_run['start_time'],
70
        'heartbeat': None,
71
        'info': {},
72
        'captured_out': '',
73
        'artifacts': [],
74
        'config': sample_run['config'],
75
        'meta': sample_run['meta_info'],
76
        'status': 'RUNNING',
77
        'resources': []
78
    }
79
80
81
def test_tinydb_observer_started_event_uses_given_id(tinydb_obs, sample_run):
82
    _id = tinydb_obs.started_event(**sample_run)
83
    assert _id == sample_run['_id']
84
    assert len(tinydb_obs.runs) == 1
85
    db_run = tinydb_obs.runs.get(eid=1)
86
    assert db_run['_id'] == sample_run['_id']
87
88
89
def test_tinydb_observer_started_event_saves_given_sources(tinydb_obs,
90
                                                           sample_run):
91
    filename = 'setup.py'
92
    md5 = get_digest(filename)
93
94
    sample_run['ex_info']['sources'] = [[filename, md5]]
95
    _id = tinydb_obs.started_event(**sample_run)
96
97
    assert _id is not None
98
    assert len(tinydb_obs.runs) == 1
99
    db_run = tinydb_obs.runs.get(eid=1)
100
101
    # Check all but the experiment section
102
    db_run_copy = db_run.copy()
103
    del db_run_copy['experiment']
104
    assert db_run_copy == {
105
        '_id': _id,
106
        'format': tinydb_obs.VERSION,
107
        'command': sample_run['command'],
108
        'host': sample_run['host_info'],
109
        'start_time': sample_run['start_time'],
110
        'heartbeat': None,
111
        'info': {},
112
        'captured_out': '',
113
        'artifacts': [],
114
        'config': sample_run['config'],
115
        'meta': sample_run['meta_info'],
116
        'status': 'RUNNING',
117
        'resources': []
118
    }
119
120
    assert len(db_run['experiment']['sources']) == 1
121
    assert len(db_run['experiment']['sources'][0]) == 3
122
    assert db_run['experiment']['sources'][0][:2] == [filename, md5]
123
    assert isinstance(db_run['experiment']['sources'][0][2], io.BufferedReader)
124
125
    # Check that duplicate source files are still listed in ex_info
126
    tinydb_obs.db_run_id = None
127
    tinydb_obs.started_event(**sample_run)
128
    assert len(tinydb_obs.runs) == 2
129
    db_run2 = tinydb_obs.runs.get(eid=2)
130
131
    assert (db_run['experiment']['sources'][0][:2] ==
132
            db_run2['experiment']['sources'][0][:2])
133
134
135
def test_tinydb_observer_started_event_generates_different_run_ids(tinydb_obs,
136
                                                                   sample_run):
137
    sample_run['_id'] = None
138
    _id = tinydb_obs.started_event(**sample_run)
139
    assert _id is not None
140
141
    # Check that duplicate source files are still listed in ex_info
142
    tinydb_obs.db_run_id = None
143
    sample_run['_id'] = None
144
    _id2 = tinydb_obs.started_event(**sample_run)
145
146
    assert len(tinydb_obs.runs) == 2
147
    # Check new random id is given for each run
148
    assert _id != _id2
149
150
151
def test_tinydb_observer_queued_event_is_not_implimented(tinydb_obs,
152
                                                         sample_run):
153
154
    sample_queued_run = sample_run.copy()
155
    del sample_queued_run['host_info']
156
    del sample_queued_run['start_time']
157
    sample_queued_run['queue_time'] = T1
158
159
    with pytest.raises(NotImplementedError):
160
        tinydb_obs.queued_event(**sample_queued_run)
161
162
163
def test_tinydb_observer_equality(tmpdir, tinydb_obs):
164
165
    db = TinyDB(os.path.join(tmpdir.strpath, 'metadata.json'))
166
    fs = HashFS(os.path.join(tmpdir.strpath, 'hashfs'), depth=3,
167
                width=2, algorithm='md5')
168
    m = TinyDbObserver(db, fs)
169
170
    assert tinydb_obs == m
171
    assert not tinydb_obs != m
172
173
    assert not tinydb_obs == 'foo'
174
    assert tinydb_obs != 'foo'
175
176
177 View Code Duplication
def test_tinydb_observer_heartbeat_event_updates_run(tinydb_obs, sample_run):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
178
    tinydb_obs.started_event(**sample_run)
179
180
    info = {'my_info': [1, 2, 3], 'nr': 7}
181
    outp = 'some output'
182
    tinydb_obs.heartbeat_event(info=info, captured_out=outp, beat_time=T2)
183
184
    assert len(tinydb_obs.runs) == 1
185
    db_run = tinydb_obs.runs.get(eid=1)
186
    assert db_run['heartbeat'] == T2
187
    assert db_run['info'] == info
188
    assert db_run['captured_out'] == outp
189
190
191 View Code Duplication
def test_tinydb_observer_completed_event_updates_run(tinydb_obs, sample_run):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
192
    tinydb_obs.started_event(**sample_run)
193
194
    tinydb_obs.completed_event(stop_time=T2, result=42)
195
196
    assert len(tinydb_obs.runs) == 1
197
    db_run = tinydb_obs.runs.get(eid=1)
198
    assert db_run['stop_time'] == T2
199
    assert db_run['result'] == 42
200
    assert db_run['status'] == 'COMPLETED'
201
202
203
def test_tinydb_observer_interrupted_event_updates_run(tinydb_obs,
204
                                                       sample_run):
205
    tinydb_obs.started_event(**sample_run)
206
207
    tinydb_obs.interrupted_event(interrupt_time=T2, status='INTERRUPTED')
208
209
    assert len(tinydb_obs.runs) == 1
210
    db_run = tinydb_obs.runs.get(eid=1)
211
    assert db_run['stop_time'] == T2
212
    assert db_run['status'] == 'INTERRUPTED'
213
214
215 View Code Duplication
def test_tinydb_observer_failed_event_updates_run(tinydb_obs, sample_run):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
216
    tinydb_obs.started_event(**sample_run)
217
218
    fail_trace = "lots of errors and\nso\non..."
219
    tinydb_obs.failed_event(fail_time=T2,
220
                            fail_trace=fail_trace)
221
222
    assert len(tinydb_obs.runs) == 1
223
    db_run = tinydb_obs.runs.get(eid=1)
224
    assert db_run['stop_time'] == T2
225
    assert db_run['status'] == 'FAILED'
226
    assert db_run['fail_trace'] == fail_trace
227
228
229 View Code Duplication
def test_tinydb_observer_artifact_event(tinydb_obs, sample_run):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
230
    tinydb_obs.started_event(**sample_run)
231
232
    filename = "setup.py"
233
    name = 'mysetup'
234
235
    tinydb_obs.artifact_event(name, filename)
236
237
    assert tinydb_obs.fs.exists(filename)
238
239
    db_run = tinydb_obs.runs.get(eid=1)
240
    assert db_run['artifacts'][0][0] == name
241
242
    with open(filename, 'rb') as f:
243
        file_content = f.read()
244
    assert db_run['artifacts'][0][3].read() == file_content
245
246
247 View Code Duplication
def test_tinydb_observer_resource_event(tinydb_obs, sample_run):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
248
    tinydb_obs.started_event(**sample_run)
249
250
    filename = "setup.py"
251
    md5 = get_digest(filename)
252
253
    tinydb_obs.resource_event(filename)
254
255
    assert tinydb_obs.fs.exists(filename)
256
257
    db_run = tinydb_obs.runs.get(eid=1)
258
    assert db_run['resources'][0][:2] == [filename, md5]
259
260
    with open(filename, 'rb') as f:
261
        file_content = f.read()
262
    assert db_run['resources'][0][2].read() == file_content
263
264
265
def test_tinydb_observer_resource_event_when_resource_present(tinydb_obs,
266
                                                              sample_run):
267
    tinydb_obs.started_event(**sample_run)
268
269
    filename = "setup.py"
270
    md5 = get_digest(filename)
271
272
    # Add file by other means
273
    tinydb_obs.fs.put(filename)
274
275
    tinydb_obs.resource_event(filename)
276
277
    db_run = tinydb_obs.runs.get(eid=1)
278
    assert db_run['resources'][0][:2] == [filename, md5]
279
280
281
def test_custom_bufferreaderwrapper(tmpdir):
282
    import copy
283
284
    with open(os.path.join(tmpdir.strpath, 'test.txt'), 'w') as f:
285
        f.write('some example text')
286
    with open(os.path.join(tmpdir.strpath, 'test.txt'), 'rb') as f:
287
        custom_fh = BufferedReaderWrapper(f)
288
        assert f.name == custom_fh.name
289
        assert f.mode == custom_fh.mode
290
        custom_fh_copy = copy.copy(custom_fh)
291
        assert custom_fh.name == custom_fh_copy.name
292
        assert custom_fh.mode == custom_fh_copy.mode
293
294
    assert f.closed
295
    assert not custom_fh.closed
296
    assert not custom_fh_copy.closed
297
298
    custom_fh_deepcopy = copy.deepcopy(custom_fh_copy)
299
    assert custom_fh_copy.name == custom_fh_deepcopy.name
300
    assert custom_fh_copy.mode == custom_fh_deepcopy.mode
301
    custom_fh_copy.close()
302
    assert custom_fh_copy.closed
303
    assert not custom_fh_deepcopy.closed
304
305
306
@pytest.mark.skipif(not opt.has_numpy, reason='needs numpy')
307
def test_serialisation_of_numpy_ndarray(tmpdir):
308
    from sacred.observers.tinydb_hashfs import NdArraySerializer
309
    from tinydb_serialization import SerializationMiddleware
310
    import numpy as np
311
312
    # Setup Serialisation object for non list/dict objects
313
    serialization_store = SerializationMiddleware()
314
    serialization_store.register_serializer(NdArraySerializer(), 'TinyArray')
315
316
    db = TinyDB(os.path.join(tmpdir.strpath, 'metadata.json'),
317
                storage=serialization_store)
318
319
    eye_mat = np.eye(3)
320
    ones_array = np.ones(5)
321
322
    document = {
323
        'foo': 'bar',
324
        'some_array': eye_mat,
325
        'nested': {
326
            'ones': ones_array
327
        }
328
    }
329
330
    db.insert(document)
331
    returned_doc = db.all()[0]
332
333
    assert returned_doc['foo'] == 'bar'
334
    assert (returned_doc['some_array'] == eye_mat).all()
335
    assert (returned_doc['nested']['ones'] == ones_array).all()
336
337
338
@pytest.mark.skipif(not opt.has_pandas, reason='needs pandas')
339
def test_serialisation_of_pandas_dataframe(tmpdir):
340
    from sacred.observers.tinydb_hashfs import (DataFrameSerializer,
341
                                                SeriesSerializer)
342
    from tinydb_serialization import SerializationMiddleware
343
344
    import numpy as np
345
    import pandas as pd
346
347
    # Setup Serialisation object for non list/dict objects
348
    serialization_store = SerializationMiddleware()
349
    serialization_store.register_serializer(DataFrameSerializer(),
350
                                            'TinyDataFrame')
351
    serialization_store.register_serializer(SeriesSerializer(),
352
                                            'TinySeries')
353
354
    db = TinyDB(os.path.join(tmpdir.strpath, 'metadata.json'),
355
                storage=serialization_store)
356
357
    df = pd.DataFrame(np.eye(3), columns=list('ABC'))
358
    series = pd.Series(np.ones(5))
359
360
    document = {
361
        'foo': 'bar',
362
        'some_dataframe': df,
363
        'nested': {
364
            'ones': series
365
        }
366
    }
367
368
    db.insert(document)
369
    returned_doc = db.all()[0]
370
371
    assert returned_doc['foo'] == 'bar'
372
    assert (returned_doc['some_dataframe'] == df).all().all()
373
    assert (returned_doc['nested']['ones'] == series).all()
374
375
376
def test_parse_tinydb_arg():
377
    assert TinyDbOption.parse_tinydb_arg('foo') == 'foo'
378
379
380
def test_parse_tinydboption_apply(tmpdir):
381
382
    exp = Experiment()
383
    args = os.path.join(tmpdir.strpath)
384
385
    TinyDbOption.apply(args, exp)
386
    assert type(exp.observers[0]) == TinyDbObserver
387