Completed
Push — master ( 4e58e9...669a3f )
by Klaus
01:09
created

Run.__call__()   D

Complexity

Conditions 10

Size

Total Lines 61

Duplication

Lines 0
Ratio 0 %

Importance

Changes 1
Bugs 0 Features 1
Metric Value
cc 10
c 1
b 0
f 1
dl 0
loc 61
rs 4.7368

How to fix   Long Method    Complexity   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

Complexity

Complex classes like Run.__call__() often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
#!/usr/bin/env python
2
# coding=utf-8
3
from __future__ import division, print_function, unicode_literals
4
5
import datetime
6
import os.path
7
import sys
8
import threading
9
import traceback as tb
10
11
from tempfile import NamedTemporaryFile
12
13
from sacred.randomness import set_global_seed
14
from sacred.utils import (tee_output, ObserverError, SacredInterrupt,
15
                          join_paths, flush)
16
17
18
__sacred__ = True  # marks files that should be filtered from stack traces
19
20
21
class Run(object):
22
    """Represent and manage a single run of an experiment."""
23
24
    def __init__(self, config, config_modifications, main_function, observers,
25
                 root_logger, run_logger, experiment_info, host_info,
26
                 pre_run_hooks, post_run_hooks, captured_out_filter=None):
27
28
        self._id = None
29
        """The ID of this run as assigned by the first observer"""
30
31
        self.captured_out = None
32
        """Captured stdout and stderr"""
33
34
        self.config = config
35
        """The final configuration used for this run"""
36
37
        self.config_modifications = config_modifications
38
        """A ConfigSummary object with information about config changes"""
39
40
        self.experiment_info = experiment_info
41
        """A dictionary with information about the experiment"""
42
43
        self.host_info = host_info
44
        """A dictionary with information about the host"""
45
46
        self.info = {}
47
        """Custom info dict that will be sent to the observers"""
48
49
        self.root_logger = root_logger
50
        """The root logger that was used to create all the others"""
51
52
        self.run_logger = run_logger
53
        """The logger that is used for this run"""
54
55
        self.main_function = main_function
56
        """The main function that is executed with this run"""
57
58
        self.observers = observers
59
        """A list of all observers that observe this run"""
60
61
        self.pre_run_hooks = pre_run_hooks
62
        """List of pre-run hooks (captured functions called before this run)"""
63
64
        self.post_run_hooks = post_run_hooks
65
        """List of post-run hooks (captured functions called after this run)"""
66
67
        self.result = None
68
        """The return value of the main function"""
69
70
        self.status = None
71
        """The current status of the run, from QUEUED to COMPLETED"""
72
73
        self.start_time = None
74
        """The datetime when this run was started"""
75
76
        self.stop_time = None
77
        """The datetime when this run stopped"""
78
79
        self.debug = False
80
        """Determines whether this run is executed in debug mode"""
81
82
        self.pdb = False
83
        """If true the pdb debugger is automatically started after a failure"""
84
85
        self.meta_info = {}
86
        """A custom comment for this run"""
87
88
        self.beat_interval = 10.0  # sec
89
        """The time between two heartbeat events measured in seconds"""
90
91
        self.unobserved = False
92
        """Indicates whether this run should be unobserved"""
93
94
        self.force = False
95
        """Disable warnings about suspicious changes"""
96
97
        self.queue_only = False
98
        """If true then this run will only fire the queued_event and quit"""
99
100
        self.captured_out_filter = captured_out_filter
101
        """Filter function to be applied to captured output"""
102
103
        self.fail_trace = None
104
        """A stacktrace, in case the run failed"""
105
106
        self._heartbeat = None
107
        self._failed_observers = []
108
        self._output_file = None
109
110
    def open_resource(self, filename, mode='r'):
111
        """Open a file and also save it as a resource.
112
113
        Opens a file, reports it to the observers as a resource, and returns
114
        the opened file.
115
116
        In Sacred terminology a resource is a file that the experiment needed
117
        to access during a run. In case of a MongoObserver that means making
118
        sure the file is stored in the database (but avoiding duplicates) along
119
        its path and md5 sum.
120
121
        See also :py:meth:`sacred.Experiment.open_resource`.
122
123
        Parameters
124
        ----------
125
        filename : str
126
            name of the file that should be opened
127
        mode : str
128
            mode that file will be open
129
130
        Returns
131
        -------
132
        file
133
            the opened file-object
134
        """
135
        filename = os.path.abspath(filename)
136
        self._emit_resource_added(filename)  # TODO: maybe non-blocking?
137
        return open(filename, mode)
138
139
    def add_artifact(self, filename, name=None):
140
        """Add a file as an artifact.
141
142
        In Sacred terminology an artifact is a file produced by the experiment
143
        run. In case of a MongoObserver that means storing the file in the
144
        database.
145
146
        See also :py:meth:`sacred.Experiment.add_artifact`.
147
148
        Parameters
149
        ----------
150
        filename : str
151
            name of the file to be stored as artifact
152
        name : str, optional
153
            optionally set the name of the artifact.
154
            Defaults to the relative file-path.
155
        """
156
        filename = os.path.abspath(filename)
157
        name = os.path.relpath(filename) if name is None else name
158
        self._emit_artifact_added(name, filename)
159
160
    def __call__(self, *args):
161
        r"""Start this run.
162
163
        Parameters
164
        ----------
165
        \*args
166
            parameters passed to the main function
167
168
        Returns
169
        -------
170
            the return value of the main function
171
        """
172
        if self.start_time is not None:
173
            raise RuntimeError('A run can only be started once. '
174
                               '(Last start was {})'.format(self.start_time))
175
176
        if self.unobserved:
177
            self.observers = []
178
        else:
179
            self.observers = sorted(self.observers, key=lambda x: -x.priority)
180
181
        self.warn_if_unobserved()
182
        set_global_seed(self.config['seed'])
183
184
        if self.queue_only:
185
            self._emit_queued()
186
            return
187
        try:
188
            try:
189
                with NamedTemporaryFile() as f, tee_output(f) as final_out:
190
                    self._output_file = f
191
                    self._emit_started()
192
                    self._start_heartbeat()
193
                    self._execute_pre_run_hooks()
194
                    self.result = self.main_function(*args)
195
                    self._execute_post_run_hooks()
196
                    if self.result is not None:
197
                        self.run_logger.info('Result: {}'.format(self.result))
198
                    elapsed_time = self._stop_time()
199
                    self.run_logger.info('Completed after %s', elapsed_time)
200
            finally:
201
                self.captured_out = final_out[0]
202
                if self.captured_out_filter is not None:
203
                    self.captured_out = self.captured_out_filter(
204
                        self.captured_out)
205
            self._stop_heartbeat()
206
            self._emit_completed(self.result)
207
        except (SacredInterrupt, KeyboardInterrupt) as e:
208
            self._stop_heartbeat()
209
            status = getattr(e, 'STATUS', 'INTERRUPTED')
210
            self._emit_interrupted(status)
211
            raise
212
        except:
213
            exc_type, exc_value, trace = sys.exc_info()
214
            self._stop_heartbeat()
215
            self._emit_failed(exc_type, exc_value, trace.tb_next)
216
            raise
217
        finally:
218
            self._warn_about_failed_observers()
219
220
        return self.result
221
222
    def _get_captured_output(self):
223
        if self._output_file.closed:
224
            return  # nothing we can do
225
        flush()
226
        self._output_file.flush()
227
        self._output_file.seek(0)
228
        text = self._output_file.read().decode()
229
        if self.captured_out_filter is not None:
230
            text = self.captured_out_filter(text)
231
        self.captured_out = text
232
233
    def _start_heartbeat(self):
234
        self._emit_heartbeat()
235
        if self.beat_interval > 0:
236
            self._heartbeat = threading.Timer(self.beat_interval,
237
                                              self._start_heartbeat)
238
            self._heartbeat.start()
239
240
    def _stop_heartbeat(self):
241
        if self._heartbeat is not None:
242 View Code Duplication
            self._heartbeat.cancel()
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
243
        self._heartbeat = None
244
        self._emit_heartbeat()  # one final beat to flush pending changes
245
246
    def _emit_queued(self):
247
        self.status = 'QUEUED'
248
        queue_time = datetime.datetime.utcnow()
249
        self.meta_info['queue_time'] = queue_time
250
        command = join_paths(self.main_function.prefix,
251
                             self.main_function.signature.name)
252
        self.run_logger.info("Queuing-up command '%s'", command)
253
        for observer in self.observers:
254
            if hasattr(observer, 'queued_event'):
255
                _id = observer.queued_event(
256
                    ex_info=self.experiment_info,
257
                    command=command,
258
                    queue_time=queue_time,
259
                    config=self.config,
260
                    meta_info=self.meta_info,
261
                    _id=self._id
262
                )
263
                if self._id is None:
264
                    self._id = _id
265
                # do not catch any exceptions on startup:
266
                # the experiment SHOULD fail if any of the observers fails
267
268
        if self._id is None:
269 View Code Duplication
            self.run_logger.info('Queued')
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
270
        else:
271
            self.run_logger.info('Queued-up run with ID "{}"'.format(self._id))
272
273
    def _emit_started(self):
274
        self.status = 'RUNNING'
275
        self.start_time = datetime.datetime.utcnow()
276
        command = join_paths(self.main_function.prefix,
277
                             self.main_function.signature.name)
278
        self.run_logger.info("Running command '%s'", command)
279
        for observer in self.observers:
280
            if hasattr(observer, 'started_event'):
281
                _id = observer.started_event(
282
                    ex_info=self.experiment_info,
283
                    command=command,
284
                    host_info=self.host_info,
285
                    start_time=self.start_time,
286
                    config=self.config,
287
                    meta_info=self.meta_info,
288
                    _id=self._id
289
                )
290
                if self._id is None:
291
                    self._id = _id
292
                # do not catch any exceptions on startup:
293
                # the experiment SHOULD fail if any of the observers fails
294
        if self._id is None:
295
            self.run_logger.info('Started')
296
        else:
297
            self.run_logger.info('Started run with ID "{}"'.format(self._id))
298
299
    def _emit_heartbeat(self):
300
        beat_time = datetime.datetime.utcnow()
301
        self._get_captured_output()
302
        for observer in self.observers:
303
            self._safe_call(observer, 'heartbeat_event',
304
                            info=self.info,
305
                            captured_out=self.captured_out,
306
                            beat_time=beat_time)
307
308
    def _stop_time(self):
309
        self.stop_time = datetime.datetime.utcnow()
310
        elapsed_time = datetime.timedelta(
311
            seconds=round((self.stop_time - self.start_time).total_seconds()))
312
        return elapsed_time
313
314
    def _emit_completed(self, result):
315
        self.status = 'COMPLETED'
316
        for observer in self.observers:
317
            self._final_call(observer, 'completed_event',
318
                             stop_time=self.stop_time,
319
                             result=result)
320
321
    def _emit_interrupted(self, status):
322
        self.status = status
323
        elapsed_time = self._stop_time()
324
        self.run_logger.warning("Aborted after %s!", elapsed_time)
325
        for observer in self.observers:
326
            self._final_call(observer, 'interrupted_event',
327
                             interrupt_time=self.stop_time,
328
                             status=status)
329
330
    def _emit_failed(self, exc_type, exc_value, trace):
331
        self.status = 'FAILED'
332
        elapsed_time = self._stop_time()
333
        self.run_logger.error("Failed after %s!", elapsed_time)
334
        self.fail_trace = tb.format_exception(exc_type, exc_value, trace)
335
        for observer in self.observers:
336
            self._final_call(observer, 'failed_event',
337
                             fail_time=self.stop_time,
338
                             fail_trace=self.fail_trace)
339
340
    def _emit_resource_added(self, filename):
341
        for observer in self.observers:
342
            self._safe_call(observer, 'resource_event', filename=filename)
343
344
    def _emit_artifact_added(self, name, filename):
345
        for observer in self.observers:
346
            self._safe_call(observer, 'artifact_event',
347
                            name=name,
348
                            filename=filename)
349
350
    def _safe_call(self, obs, method, **kwargs):
351
        if obs not in self._failed_observers and hasattr(obs, method):
352
            try:
353
                getattr(obs, method)(**kwargs)
354
            except ObserverError as e:
355
                self._failed_observers.append(obs)
356
                self.run_logger.warning("An error ocurred in the '{}' "
357
                                        "observer: {}".format(obs, e))
358
            except:
359
                self._failed_observers.append(obs)
360
                raise
361
362
    def _final_call(self, observer, method, **kwargs):
363
        if hasattr(observer, method):
364
            try:
365
                getattr(observer, method)(**kwargs)
366
            except Exception:
367
                # Feels dirty to catch all exceptions, but it is just for
368
                # finishing up, so we don't want one observer to kill the
369
                # others
370
                self.run_logger.error(tb.format_exc())
371
372
    def _warn_about_failed_observers(self):
373
        for observer in self._failed_observers:
374
            self.run_logger.warning("The observer '{}' failed at some point "
375
                                    "during the run.".format(observer))
376
377
    def _execute_pre_run_hooks(self):
378
        for pr in self.pre_run_hooks:
379
            pr()
380
381
    def _execute_post_run_hooks(self):
382
        for pr in self.post_run_hooks:
383
            pr()
384
385
    def warn_if_unobserved(self):
386
        if not self.observers and not self.debug and not self.unobserved:
387
            self.run_logger.warning("No observers have been added to this run")
388