Completed
Push — master ( 49ed4c...bd651e )
by Klaus
58s
created

FileStorageObserver.started_event()   F

Complexity

Conditions 9

Size

Total Lines 42

Duplication

Lines 0
Ratio 0 %

Importance

Changes 1
Bugs 1 Features 0
Metric Value
cc 9
c 1
b 1
f 0
dl 0
loc 42
rs 3
1
#!/usr/bin/env python
2
# coding=utf-8
3
from __future__ import division, print_function, unicode_literals
4
import json
5
import os
6
import os.path
7
import tempfile
8
9
from datetime import datetime
10
from shutil import copyfile
11
12
from sacred.commandline_options import CommandLineOption
13
from sacred.dependencies import get_digest
14
from sacred.observers.base import RunObserver
15
from sacred.utils import FileNotFoundError  # For compatibility with py2
16
from sacred import optional as opt
17
from sacred.serializer import flatten
18
19
20
DEFAULT_FILE_STORAGE_PRIORITY = 20
21
22
23
def json_serial(obj):
24
    """JSON serializer for objects not serializable by default json code."""
25
    if isinstance(obj, datetime):
26
        serial = obj.isoformat()
27
        return serial
28
    raise TypeError("Type not serializable")
29
30
31
class FileStorageObserver(RunObserver):
32
    VERSION = 'FileStorageObserver-0.7.0'
33
34
    @classmethod
35
    def create(cls, basedir, resource_dir=None, source_dir=None,
36
               template=None, priority=DEFAULT_FILE_STORAGE_PRIORITY):
37
        if not os.path.exists(basedir):
38
            os.makedirs(basedir)
39
        resource_dir = resource_dir or os.path.join(basedir, '_resources')
40
        source_dir = source_dir or os.path.join(basedir, '_sources')
41
        if template is not None:
42
            if not os.path.exists(template):
43
                raise FileNotFoundError("Couldn't find template file '{}'"
44
                                        .format(template))
45
        else:
46
            template = os.path.join(basedir, 'template.html')
47
            if not os.path.exists(template):
48
                template = None
49
        return cls(basedir, resource_dir, source_dir, template, priority)
50
51
    def __init__(self, basedir, resource_dir, source_dir, template,
52
                 priority=DEFAULT_FILE_STORAGE_PRIORITY):
53
        self.basedir = basedir
54
        self.resource_dir = resource_dir
55
        self.source_dir = source_dir
56
        self.template = template
57
        self.priority = priority
58
        self.dir = None
59
        self.run_entry = None
60
        self.config = None
61
        self.info = None
62
        self.cout = ""
63
64
    def queued_event(self, ex_info, command, host_info, queue_time, config,
65
                     meta_info, _id):
66
        if _id is None:
67
            self.dir = tempfile.mkdtemp(prefix='run_', dir=self.basedir)
68
        else:
69
            self.dir = os.path.join(self.basedir, str(_id))
70
            os.mkdir(self.dir)
71
72
        self.run_entry = {
73
            'experiment': dict(ex_info),
74
            'command': command,
75
            'host': dict(host_info),
76
            'meta': meta_info,
77
            'status': 'QUEUED',
78
        }
79
        self.config = config
80
        self.info = {}
81
82
        self.save_json(self.run_entry, 'run.json')
83
        self.save_json(self.config, 'config.json')
84
85
        for s, m in ex_info['sources']:
86
            self.save_file(s)
87
88
        return os.path.relpath(self.dir, self.basedir) if _id is None else _id
89
90
    def save_sources(self, ex_info):
91
        base_dir = ex_info['base_dir']
92
        source_info = []
93
        for s, m in ex_info['sources']:
94
            abspath = os.path.join(base_dir, s)
95
            store_path, md5sum = self.find_or_save(abspath, self.source_dir)
96
            # assert m == md5sum
97
            source_info.append([s, os.path.relpath(store_path, self.basedir)])
98
        return source_info
99
100
    def started_event(self, ex_info, command, host_info, start_time, config,
101
                      meta_info, _id):
102
        if _id is None:
103
            for i in range(200):
104
                dir_nrs = [int(d) for d in os.listdir(self.basedir)
105
                           if os.path.isdir(os.path.join(self.basedir, d)) and
106
                           d.isdigit()]
107
                _id = max(dir_nrs + [0]) + 1
108
                self.dir = os.path.join(self.basedir, str(_id))
109
                try:
110
                    os.mkdir(self.dir)
111
                except FileExistsError:  # Catch race conditions
112
                    if i > 100:
113
                        # After some tries,
114
                        # expect that something other went wrong
115
                        raise
116
        else:
117
            self.dir = os.path.join(self.basedir, str(_id))
118
            os.mkdir(self.dir)
119
120
        ex_info['sources'] = self.save_sources(ex_info)
121
122
        self.run_entry = {
123
            'experiment': dict(ex_info),
124
            'command': command,
125
            'host': dict(host_info),
126
            'start_time': start_time.isoformat(),
127
            'meta': meta_info,
128
            'status': 'RUNNING',
129
            'resources': [],
130
            'artifacts': [],
131
            'heartbeat': None
132
        }
133
        self.config = config
134
        self.info = {}
135
        self.cout = ""
136
137
        self.save_json(self.run_entry, 'run.json')
138
        self.save_json(self.config, 'config.json')
139
        self.save_cout()
140
141
        return os.path.relpath(self.dir, self.basedir) if _id is None else _id
142
143
    def find_or_save(self, filename, store_dir):
144
        if not os.path.exists(store_dir):
145
            os.makedirs(store_dir)
146
        source_name, ext = os.path.splitext(os.path.basename(filename))
147
        md5sum = get_digest(filename)
148
        store_name = source_name + '_' + md5sum + ext
149
        store_path = os.path.join(store_dir, store_name)
150
        if not os.path.exists(store_path):
151
            copyfile(filename, store_path)
152
        return store_path, md5sum
153
154
    def save_json(self, obj, filename):
155
        with open(os.path.join(self.dir, filename), 'w') as f:
156
            json.dump(flatten(obj), f, sort_keys=True, indent=2)
157
158
    def save_file(self, filename, target_name=None):
159
        target_name = target_name or os.path.basename(filename)
160
        copyfile(filename, os.path.join(self.dir, target_name))
161
162
    def save_cout(self):
163
        with open(os.path.join(self.dir, 'cout.txt'), 'wb') as f:
164
            f.write(self.cout.encode('utf-8'))
165
166
    def render_template(self):
167
        if opt.has_mako and self.template:
168
            from mako.template import Template
169
            template = Template(filename=self.template)
170
            report = template.render(run=self.run_entry,
171
                                     config=self.config,
172
                                     info=self.info,
173
                                     cout=self.cout,
174
                                     savedir=self.dir)
175
            _, ext = os.path.splitext(self.template)
176
            with open(os.path.join(self.dir, 'report' + ext), 'w') as f:
177
                f.write(report)
178
179
    def heartbeat_event(self, info, captured_out, beat_time, result):
180
        self.info = info
181
        self.run_entry['heartbeat'] = beat_time.isoformat()
182
        self.run_entry['result'] = result
183
        self.cout = captured_out
184
        self.save_cout()
185
        self.save_json(self.run_entry, 'run.json')
186
        if self.info:
187
            self.save_json(self.info, 'info.json')
188
189
    def completed_event(self, stop_time, result):
190
        self.run_entry['stop_time'] = stop_time.isoformat()
191
        self.run_entry['result'] = result
192
        self.run_entry['status'] = 'COMPLETED'
193
194
        self.save_json(self.run_entry, 'run.json')
195
        self.render_template()
196
197
    def interrupted_event(self, interrupt_time, status):
198
        self.run_entry['stop_time'] = interrupt_time.isoformat()
199
        self.run_entry['status'] = status
200
        self.save_json(self.run_entry, 'run.json')
201
        self.render_template()
202
203
    def failed_event(self, fail_time, fail_trace):
204
        self.run_entry['stop_time'] = fail_time.isoformat()
205
        self.run_entry['status'] = 'FAILED'
206
        self.run_entry['fail_trace'] = fail_trace
207
        self.save_json(self.run_entry, 'run.json')
208
        self.render_template()
209
210
    def resource_event(self, filename):
211
        store_path, md5sum = self.find_or_save(filename, self.resource_dir)
212
        self.run_entry['resources'].append([filename, store_path])
213
        self.save_json(self.run_entry, 'run.json')
214
215
    def artifact_event(self, name, filename):
216
        self.save_file(filename, name)
217
        self.run_entry['artifacts'].append(name)
218
        self.save_json(self.run_entry, 'run.json')
219
220
    def __eq__(self, other):
221
        if isinstance(other, FileStorageObserver):
222
            return self.basedir == other.basedir
223
        return False
224
225
    def __ne__(self, other):
226
        return not self.__eq__(other)
227
228
229
class FileStorageOption(CommandLineOption):
230
    """Add a file-storage observer to the experiment."""
231
232
    short_flag = 'F'
233
    arg = 'BASEDIR'
234
    arg_description = "Base-directory to write the runs to"
235
236
    @classmethod
237
    def apply(cls, args, run):
238
        run.observers.append(FileStorageObserver.create(args))
239