test_custom_bufferreaderwrapper()   F
last analyzed

Complexity

Conditions 14

Size

Total Lines 23

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 14
c 0
b 0
f 0
dl 0
loc 23
rs 3.6

How to fix   Complexity   

Complexity

Complex classes like test_custom_bufferreaderwrapper() often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
#!/usr/bin/env python
2
# coding=utf-8
3
from __future__ import (division, print_function, unicode_literals,
4
                        absolute_import)
5
6
import os
7
import datetime
8
import tempfile
9
import io
10
11
import pytest
12
13
tinydb = pytest.importorskip("tinydb")
14
hashfs = pytest.importorskip("hashfs")
15
16
from tinydb import TinyDB
17
from hashfs import HashFS
18
19
from sacred.dependencies import get_digest
20
from sacred.observers.tinydb_hashfs import (TinyDbObserver, TinyDbOption, 
21
                                            BufferedReaderWrapper)
22
from sacred import optional as opt
23
from sacred.experiment import Experiment
24
25
T1 = datetime.datetime(1999, 5, 4, 3, 2, 1, 0)
26
T2 = datetime.datetime(1999, 5, 5, 5, 5, 5, 5)
27
28
29
@pytest.fixture()
30
def tinydb_obs(tmpdir):
31
    return TinyDbObserver.create(path=tmpdir.strpath)
32
33
34
@pytest.fixture()
35
def sample_run():
36
    exp = {'name': 'test_exp', 'sources': [], 'doc': '',
37
           'base_dir': os.path.join(os.path.dirname(__file__), '..', '..')}
38
    host = {'hostname': 'test_host', 'cpu_count': 1, 'python_version': '3.4'}
39
    config = {'config': 'True', 'foo': 'bar', 'answer': 42}
40
    command = 'run'
41
    meta_info = {'comment': 'test run'}
42
    return {
43
        '_id': 'FEDCBA9876543210',
44
        'ex_info': exp,
45
        'command': command,
46
        'host_info': host,
47
        'start_time': T1,
48
        'config': config,
49
        'meta_info': meta_info,
50
    }
51
52
53
def test_tinydb_observer_creates_missing_directories(tmpdir):
54
    tinydb_obs = TinyDbObserver.create(path=os.path.join(tmpdir.strpath, 'foo'))
55
    assert tinydb_obs.root == os.path.join(tmpdir.strpath, 'foo')
56
57
58
def test_tinydb_observer_started_event_creates_run(tinydb_obs, sample_run):
59
    sample_run['_id'] = None
60
    _id = tinydb_obs.started_event(**sample_run)
61
    assert _id is not None
62
    assert len(tinydb_obs.runs) == 1
63
    db_run = tinydb_obs.runs.get(eid=1)
64
    assert db_run == {
65
        '_id': _id,
66
        'experiment': sample_run['ex_info'],
67
        'format': tinydb_obs.VERSION,
68
        'command': sample_run['command'],
69
        'host': sample_run['host_info'],
70
        'start_time': sample_run['start_time'],
71
        'heartbeat': None,
72
        'info': {},
73
        'captured_out': '',
74
        'artifacts': [],
75
        'config': sample_run['config'],
76
        'meta': sample_run['meta_info'],
77
        'status': 'RUNNING',
78
        'resources': []
79
    }
80
81
82
def test_tinydb_observer_started_event_uses_given_id(tinydb_obs, sample_run):
83
    _id = tinydb_obs.started_event(**sample_run)
84
    assert _id == sample_run['_id']
85
    assert len(tinydb_obs.runs) == 1
86
    db_run = tinydb_obs.runs.get(eid=1)
87
    assert db_run['_id'] == sample_run['_id']
88
89
90
def test_tinydb_observer_started_event_saves_given_sources(tinydb_obs,
91
                                                           sample_run):
92
    filename = 'setup.py'
93
    md5 = get_digest(filename)
94
95
    sample_run['ex_info']['sources'] = [[filename, md5]]
96
    _id = tinydb_obs.started_event(**sample_run)
97
98
    assert _id is not None
99
    assert len(tinydb_obs.runs) == 1
100
    db_run = tinydb_obs.runs.get(eid=1)
101
102
    # Check all but the experiment section
103
    db_run_copy = db_run.copy()
104
    del db_run_copy['experiment']
105
    assert db_run_copy == {
106
        '_id': _id,
107
        'format': tinydb_obs.VERSION,
108
        'command': sample_run['command'],
109
        'host': sample_run['host_info'],
110
        'start_time': sample_run['start_time'],
111
        'heartbeat': None,
112
        'info': {},
113
        'captured_out': '',
114
        'artifacts': [],
115
        'config': sample_run['config'],
116
        'meta': sample_run['meta_info'],
117
        'status': 'RUNNING',
118
        'resources': []
119
    }
120
121
    assert len(db_run['experiment']['sources']) == 1
122
    assert len(db_run['experiment']['sources'][0]) == 3
123
    assert db_run['experiment']['sources'][0][:2] == [filename, md5]
124
    assert isinstance(db_run['experiment']['sources'][0][2], io.BufferedReader)
125
126
    # Check that duplicate source files are still listed in ex_info
127
    tinydb_obs.db_run_id = None
128
    tinydb_obs.started_event(**sample_run)
129
    assert len(tinydb_obs.runs) == 2
130
    db_run2 = tinydb_obs.runs.get(eid=2)
131
132
    assert (db_run['experiment']['sources'][0][:2] ==
133
            db_run2['experiment']['sources'][0][:2])
134
135
136
def test_tinydb_observer_started_event_generates_different_run_ids(tinydb_obs,
137
                                                                   sample_run):
138
    sample_run['_id'] = None
139
    _id = tinydb_obs.started_event(**sample_run)
140
    assert _id is not None
141
142
    # Check that duplicate source files are still listed in ex_info
143
    tinydb_obs.db_run_id = None
144
    sample_run['_id'] = None
145
    _id2 = tinydb_obs.started_event(**sample_run)
146
147
    assert len(tinydb_obs.runs) == 2
148
    # Check new random id is given for each run
149
    assert _id != _id2
150
151
152
def test_tinydb_observer_queued_event_is_not_implemented(tinydb_obs,
153
                                                         sample_run):
154
155
    sample_queued_run = sample_run.copy()
156
    del sample_queued_run['start_time']
157
    sample_queued_run['queue_time'] = T1
158
159
    with pytest.raises(NotImplementedError):
160
        tinydb_obs.queued_event(**sample_queued_run)
161
162
163
def test_tinydb_observer_equality(tmpdir, tinydb_obs):
164
165
    db = TinyDB(os.path.join(tmpdir.strpath, 'metadata.json'))
166
    fs = HashFS(os.path.join(tmpdir.strpath, 'hashfs'), depth=3,
167
                width=2, algorithm='md5')
168
    m = TinyDbObserver(db, fs)
169
170
    assert tinydb_obs == m
171
    assert not tinydb_obs != m
172
173
    assert not tinydb_obs == 'foo'
174
    assert tinydb_obs != 'foo'
175
176
177 View Code Duplication
def test_tinydb_observer_heartbeat_event_updates_run(tinydb_obs, sample_run):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
178
    tinydb_obs.started_event(**sample_run)
179
180
    info = {'my_info': [1, 2, 3], 'nr': 7}
181
    outp = 'some output'
182
    tinydb_obs.heartbeat_event(info=info, captured_out=outp, beat_time=T2,
183
                               result=42)
184
185
    assert len(tinydb_obs.runs) == 1
186
    db_run = tinydb_obs.runs.get(eid=1)
187
    assert db_run['heartbeat'] == T2
188
    assert db_run['result'] == 42
189
    assert db_run['info'] == info
190
    assert db_run['captured_out'] == outp
191
192
193 View Code Duplication
def test_tinydb_observer_completed_event_updates_run(tinydb_obs, sample_run):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
194
    tinydb_obs.started_event(**sample_run)
195
196
    tinydb_obs.completed_event(stop_time=T2, result=42)
197
198
    assert len(tinydb_obs.runs) == 1
199
    db_run = tinydb_obs.runs.get(eid=1)
200
    assert db_run['stop_time'] == T2
201
    assert db_run['result'] == 42
202
    assert db_run['status'] == 'COMPLETED'
203
204
205
def test_tinydb_observer_interrupted_event_updates_run(tinydb_obs,
206
                                                       sample_run):
207
    tinydb_obs.started_event(**sample_run)
208
209
    tinydb_obs.interrupted_event(interrupt_time=T2, status='INTERRUPTED')
210
211
    assert len(tinydb_obs.runs) == 1
212
    db_run = tinydb_obs.runs.get(eid=1)
213
    assert db_run['stop_time'] == T2
214
    assert db_run['status'] == 'INTERRUPTED'
215
216
217 View Code Duplication
def test_tinydb_observer_failed_event_updates_run(tinydb_obs, sample_run):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
218
    tinydb_obs.started_event(**sample_run)
219
220
    fail_trace = "lots of errors and\nso\non..."
221
    tinydb_obs.failed_event(fail_time=T2,
222
                            fail_trace=fail_trace)
223
224
    assert len(tinydb_obs.runs) == 1
225
    db_run = tinydb_obs.runs.get(eid=1)
226
    assert db_run['stop_time'] == T2
227
    assert db_run['status'] == 'FAILED'
228
    assert db_run['fail_trace'] == fail_trace
229
230
231 View Code Duplication
def test_tinydb_observer_artifact_event(tinydb_obs, sample_run):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
232
    tinydb_obs.started_event(**sample_run)
233
234
    filename = "setup.py"
235
    name = 'mysetup'
236
237
    tinydb_obs.artifact_event(name, filename)
238
239
    assert tinydb_obs.fs.exists(filename)
240
241
    db_run = tinydb_obs.runs.get(eid=1)
242
    assert db_run['artifacts'][0][0] == name
243
244
    with open(filename, 'rb') as f:
245
        file_content = f.read()
246
    assert db_run['artifacts'][0][3].read() == file_content
247
248
249 View Code Duplication
def test_tinydb_observer_resource_event(tinydb_obs, sample_run):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
250
    tinydb_obs.started_event(**sample_run)
251
252
    filename = "setup.py"
253
    md5 = get_digest(filename)
254
255
    tinydb_obs.resource_event(filename)
256
257
    assert tinydb_obs.fs.exists(filename)
258
259
    db_run = tinydb_obs.runs.get(eid=1)
260
    assert db_run['resources'][0][:2] == [filename, md5]
261
262
    with open(filename, 'rb') as f:
263
        file_content = f.read()
264
    assert db_run['resources'][0][2].read() == file_content
265
266
267
def test_tinydb_observer_resource_event_when_resource_present(tinydb_obs,
268
                                                              sample_run):
269
    tinydb_obs.started_event(**sample_run)
270
271
    filename = "setup.py"
272
    md5 = get_digest(filename)
273
274
    # Add file by other means
275
    tinydb_obs.fs.put(filename)
276
277
    tinydb_obs.resource_event(filename)
278
279
    db_run = tinydb_obs.runs.get(eid=1)
280
    assert db_run['resources'][0][:2] == [filename, md5]
281
282
283
def test_custom_bufferreaderwrapper(tmpdir):
284
    import copy
285
286
    with open(os.path.join(tmpdir.strpath, 'test.txt'), 'w') as f:
287
        f.write('some example text')
288
    with open(os.path.join(tmpdir.strpath, 'test.txt'), 'rb') as f:
289
        custom_fh = BufferedReaderWrapper(f)
290
        assert f.name == custom_fh.name
291
        assert f.mode == custom_fh.mode
292
        custom_fh_copy = copy.copy(custom_fh)
293
        assert custom_fh.name == custom_fh_copy.name
294
        assert custom_fh.mode == custom_fh_copy.mode
295
296
    assert f.closed
297
    assert not custom_fh.closed
298
    assert not custom_fh_copy.closed
299
300
    custom_fh_deepcopy = copy.deepcopy(custom_fh_copy)
301
    assert custom_fh_copy.name == custom_fh_deepcopy.name
302
    assert custom_fh_copy.mode == custom_fh_deepcopy.mode
303
    custom_fh_copy.close()
304
    assert custom_fh_copy.closed
305
    assert not custom_fh_deepcopy.closed
306
307
308
@pytest.mark.skipif(not opt.has_numpy, reason='needs numpy')
309
def test_serialisation_of_numpy_ndarray(tmpdir):
310
    from sacred.observers.tinydb_hashfs import NdArraySerializer
311
    from tinydb_serialization import SerializationMiddleware
312
    import numpy as np
313
314
    # Setup Serialisation object for non list/dict objects
315
    serialization_store = SerializationMiddleware()
316
    serialization_store.register_serializer(NdArraySerializer(), 'TinyArray')
317
318
    db = TinyDB(os.path.join(tmpdir.strpath, 'metadata.json'),
319
                storage=serialization_store)
320
321
    eye_mat = np.eye(3)
322
    ones_array = np.ones(5)
323
324
    document = {
325
        'foo': 'bar',
326
        'some_array': eye_mat,
327
        'nested': {
328
            'ones': ones_array
329
        }
330
    }
331
332
    db.insert(document)
333
    returned_doc = db.all()[0]
334
335
    assert returned_doc['foo'] == 'bar'
336
    assert (returned_doc['some_array'] == eye_mat).all()
337
    assert (returned_doc['nested']['ones'] == ones_array).all()
338
339
340
@pytest.mark.skipif(not opt.has_pandas, reason='needs pandas')
341
def test_serialisation_of_pandas_dataframe(tmpdir):
342
    from sacred.observers.tinydb_hashfs import (DataFrameSerializer,
343
                                                SeriesSerializer)
344
    from tinydb_serialization import SerializationMiddleware
345
346
    import numpy as np
347
    import pandas as pd
348
349
    # Setup Serialisation object for non list/dict objects
350
    serialization_store = SerializationMiddleware()
351
    serialization_store.register_serializer(DataFrameSerializer(),
352
                                            'TinyDataFrame')
353
    serialization_store.register_serializer(SeriesSerializer(),
354
                                            'TinySeries')
355
356
    db = TinyDB(os.path.join(tmpdir.strpath, 'metadata.json'),
357
                storage=serialization_store)
358
359
    df = pd.DataFrame(np.eye(3), columns=list('ABC'))
360
    series = pd.Series(np.ones(5))
361
362
    document = {
363
        'foo': 'bar',
364
        'some_dataframe': df,
365
        'nested': {
366
            'ones': series
367
        }
368
    }
369
370
    db.insert(document)
371
    returned_doc = db.all()[0]
372
373
    assert returned_doc['foo'] == 'bar'
374
    assert (returned_doc['some_dataframe'] == df).all().all()
375
    assert (returned_doc['nested']['ones'] == series).all()
376
377
378
def test_parse_tinydb_arg():
379
    assert TinyDbOption.parse_tinydb_arg('foo') == 'foo'
380
381
382
def test_parse_tinydboption_apply(tmpdir):
383
384
    exp = Experiment()
385
    args = os.path.join(tmpdir.strpath)
386
387
    TinyDbOption.apply(args, exp)
388
    assert type(exp.observers[0]) == TinyDbObserver
389