Completed
Push — master ( 2f3aad...e06a88 )
by Klaus
01:09
created

Run.__call__()   F

Complexity

Conditions 12

Size

Total Lines 67

Duplication

Lines 4
Ratio 5.97 %

Importance

Changes 0
Metric Value
cc 12
c 0
b 0
f 0
dl 4
loc 67
rs 2.5741

How to fix   Long Method    Complexity   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

Complexity

Complex classes like Run.__call__() often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
#!/usr/bin/env python
2
# coding=utf-8
3
from __future__ import division, print_function, unicode_literals
4
5
import datetime
6
import os.path
7
import sys
8
import threading
9
import traceback as tb
10
11
from sacred.randomness import set_global_seed
12
from sacred.utils import ObserverError, SacredInterrupt, join_paths
13
from sacred.stdout_capturing import get_stdcapturer, flush
14
15
16
__sacred__ = True  # marks files that should be filtered from stack traces
17
18
19
class Run(object):
20
    """Represent and manage a single run of an experiment."""
21
22
    def __init__(self, config, config_modifications, main_function, observers,
23
                 root_logger, run_logger, experiment_info, host_info,
24
                 pre_run_hooks, post_run_hooks, captured_out_filter=None):
25
26
        self._id = None
27
        """The ID of this run as assigned by the first observer"""
28
29
        self.captured_out = None
30
        """Captured stdout and stderr"""
31
32
        self.config = config
33
        """The final configuration used for this run"""
34
35
        self.config_modifications = config_modifications
36
        """A ConfigSummary object with information about config changes"""
37
38
        self.experiment_info = experiment_info
39
        """A dictionary with information about the experiment"""
40
41
        self.host_info = host_info
42
        """A dictionary with information about the host"""
43
44
        self.info = {}
45
        """Custom info dict that will be sent to the observers"""
46
47
        self.root_logger = root_logger
48
        """The root logger that was used to create all the others"""
49
50
        self.run_logger = run_logger
51
        """The logger that is used for this run"""
52
53
        self.main_function = main_function
54
        """The main function that is executed with this run"""
55
56
        self.observers = observers
57
        """A list of all observers that observe this run"""
58
59
        self.pre_run_hooks = pre_run_hooks
60
        """List of pre-run hooks (captured functions called before this run)"""
61
62
        self.post_run_hooks = post_run_hooks
63
        """List of post-run hooks (captured functions called after this run)"""
64
65
        self.result = None
66
        """The return value of the main function"""
67
68
        self.status = None
69
        """The current status of the run, from QUEUED to COMPLETED"""
70
71
        self.start_time = None
72
        """The datetime when this run was started"""
73
74
        self.stop_time = None
75
        """The datetime when this run stopped"""
76
77
        self.debug = False
78
        """Determines whether this run is executed in debug mode"""
79
80
        self.pdb = False
81
        """If true the pdb debugger is automatically started after a failure"""
82
83
        self.meta_info = {}
84
        """A custom comment for this run"""
85
86
        self.beat_interval = 10.0  # sec
87
        """The time between two heartbeat events measured in seconds"""
88
89
        self.unobserved = False
90
        """Indicates whether this run should be unobserved"""
91
92
        self.force = False
93
        """Disable warnings about suspicious changes"""
94
95
        self.queue_only = False
96
        """If true then this run will only fire the queued_event and quit"""
97
98
        self.captured_out_filter = captured_out_filter
99
        """Filter function to be applied to captured output"""
100
101
        self.fail_trace = None
102
        """A stacktrace, in case the run failed"""
103
104
        self.capture_mode = None
105
        """Determines the way the stdout/stderr are captured"""
106
107
        self._heartbeat = None
108
        self._failed_observers = []
109
        self._output_file = None
110
111
    def open_resource(self, filename, mode='r'):
112
        """Open a file and also save it as a resource.
113
114
        Opens a file, reports it to the observers as a resource, and returns
115
        the opened file.
116
117
        In Sacred terminology a resource is a file that the experiment needed
118
        to access during a run. In case of a MongoObserver that means making
119
        sure the file is stored in the database (but avoiding duplicates) along
120
        its path and md5 sum.
121
122
        See also :py:meth:`sacred.Experiment.open_resource`.
123
124
        Parameters
125
        ----------
126
        filename : str
127
            name of the file that should be opened
128
        mode : str
129
            mode that file will be open
130
131
        Returns
132
        -------
133
        file
134
            the opened file-object
135
        """
136
        filename = os.path.abspath(filename)
137
        self._emit_resource_added(filename)  # TODO: maybe non-blocking?
138
        return open(filename, mode)
139
140
    def add_resource(self, filename):
141
        """Add a file as a resource.
142
143
        In Sacred terminology a resource is a file that the experiment needed
144
        to access during a run. In case of a MongoObserver that means making
145
        sure the file is stored in the database (but avoiding duplicates) along
146
        its path and md5 sum.
147
148
        See also :py:meth:`sacred.Experiment.add_resource`.
149
150
        Parameters
151
        ----------
152
        filename : str
153
            name of the file to be stored as a resource
154
        """
155
        filename = os.path.abspath(filename)
156
        self._emit_resource_added(filename)
157
158
    def add_artifact(self, filename, name=None):
159
        """Add a file as an artifact.
160
161
        In Sacred terminology an artifact is a file produced by the experiment
162
        run. In case of a MongoObserver that means storing the file in the
163
        database.
164
165
        See also :py:meth:`sacred.Experiment.add_artifact`.
166
167
        Parameters
168
        ----------
169
        filename : str
170
            name of the file to be stored as artifact
171
        name : str, optional
172
            optionally set the name of the artifact.
173
            Defaults to the relative file-path.
174
        """
175
        filename = os.path.abspath(filename)
176
        name = os.path.relpath(filename) if name is None else name
177
        self._emit_artifact_added(name, filename)
178
179
    def __call__(self, *args):
180
        r"""Start this run.
181
182
        Parameters
183
        ----------
184
        \*args
185
            parameters passed to the main function
186
187
        Returns
188
        -------
189
            the return value of the main function
190
        """
191
        if self.start_time is not None:
192
            raise RuntimeError('A run can only be started once. '
193
                               '(Last start was {})'.format(self.start_time))
194
195
        if self.unobserved:
196
            self.observers = []
197
        else:
198
            self.observers = sorted(self.observers, key=lambda x: -x.priority)
199
200
        self.warn_if_unobserved()
201
        set_global_seed(self.config['seed'])
202
203
        if self.capture_mode is None and not self.observers:
204
            capture_mode = "no"
205
        else:
206
            capture_mode = self.capture_mode
207
        capture_stdout = get_stdcapturer(capture_mode)
208
209
        if self.queue_only:
210
            self._emit_queued()
211
            return
212
        try:
213
            try:
214
                with capture_stdout() as (f, final_out):
215
                    self._output_file = f
216
                    self._emit_started()
217
                    self._start_heartbeat()
218
                    self._execute_pre_run_hooks()
219
                    self.result = self.main_function(*args)
220
                    self._execute_post_run_hooks()
221
                    if self.result is not None:
222
                        self.run_logger.info('Result: {}'.format(self.result))
223
                    elapsed_time = self._stop_time()
224
                    self.run_logger.info('Completed after %s', elapsed_time)
225
            finally:
226
                self.captured_out = final_out[0]
227
                if self.captured_out_filter is not None:
228
                    self.captured_out = self.captured_out_filter(
229
                        self.captured_out)
230
            self._stop_heartbeat()
231
            self._emit_completed(self.result)
232
        except (SacredInterrupt, KeyboardInterrupt) as e:
233
            self._stop_heartbeat()
234
            status = getattr(e, 'STATUS', 'INTERRUPTED')
235
            self._emit_interrupted(status)
236
            raise
237
        except:
238
            exc_type, exc_value, trace = sys.exc_info()
239
            self._stop_heartbeat()
240
            self._emit_failed(exc_type, exc_value, trace.tb_next)
241
            raise
242 View Code Duplication
        finally:
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
243
            self._warn_about_failed_observers()
244
245
        return self.result
246
247
    def _get_captured_output(self):
248
        if self._output_file.closed:
249
            return  # nothing we can do
250
        flush()
251
        self._output_file.flush()
252
        self._output_file.seek(0)
253
        text = self._output_file.read()
254
        if isinstance(text, bytes):
255
            text = text.decode()
256
        if self.captured_out_filter is not None:
257
            text = self.captured_out_filter(text)
258
        self.captured_out = text
259
260
    def _start_heartbeat(self):
261
        self._emit_heartbeat()
262
        if self.beat_interval > 0:
263
            self._heartbeat = threading.Timer(self.beat_interval,
264
                                              self._start_heartbeat)
265
            self._heartbeat.start()
266
267
    def _stop_heartbeat(self):
268
        if self._heartbeat is not None:
269 View Code Duplication
            self._heartbeat.cancel()
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
270
        self._heartbeat = None
271
        self._emit_heartbeat()  # one final beat to flush pending changes
272
273
    def _emit_queued(self):
274
        self.status = 'QUEUED'
275
        queue_time = datetime.datetime.utcnow()
276
        self.meta_info['queue_time'] = queue_time
277
        command = join_paths(self.main_function.prefix,
278
                             self.main_function.signature.name)
279
        self.run_logger.info("Queuing-up command '%s'", command)
280
        for observer in self.observers:
281
            if hasattr(observer, 'queued_event'):
282
                _id = observer.queued_event(
283
                    ex_info=self.experiment_info,
284
                    command=command,
285
                    queue_time=queue_time,
286
                    config=self.config,
287
                    meta_info=self.meta_info,
288
                    _id=self._id
289
                )
290
                if self._id is None:
291
                    self._id = _id
292
                # do not catch any exceptions on startup:
293
                # the experiment SHOULD fail if any of the observers fails
294
295
        if self._id is None:
296
            self.run_logger.info('Queued')
297
        else:
298
            self.run_logger.info('Queued-up run with ID "{}"'.format(self._id))
299
300
    def _emit_started(self):
301
        self.status = 'RUNNING'
302
        self.start_time = datetime.datetime.utcnow()
303
        command = join_paths(self.main_function.prefix,
304
                             self.main_function.signature.name)
305
        self.run_logger.info("Running command '%s'", command)
306
        for observer in self.observers:
307
            if hasattr(observer, 'started_event'):
308
                _id = observer.started_event(
309
                    ex_info=self.experiment_info,
310
                    command=command,
311
                    host_info=self.host_info,
312
                    start_time=self.start_time,
313
                    config=self.config,
314
                    meta_info=self.meta_info,
315
                    _id=self._id
316
                )
317
                if self._id is None:
318
                    self._id = _id
319
                # do not catch any exceptions on startup:
320
                # the experiment SHOULD fail if any of the observers fails
321
        if self._id is None:
322
            self.run_logger.info('Started')
323
        else:
324
            self.run_logger.info('Started run with ID "{}"'.format(self._id))
325
326
    def _emit_heartbeat(self):
327
        beat_time = datetime.datetime.utcnow()
328
        self._get_captured_output()
329
        for observer in self.observers:
330
            self._safe_call(observer, 'heartbeat_event',
331
                            info=self.info,
332
                            captured_out=self.captured_out,
333
                            beat_time=beat_time)
334
335
    def _stop_time(self):
336
        self.stop_time = datetime.datetime.utcnow()
337
        elapsed_time = datetime.timedelta(
338
            seconds=round((self.stop_time - self.start_time).total_seconds()))
339
        return elapsed_time
340
341
    def _emit_completed(self, result):
342
        self.status = 'COMPLETED'
343
        for observer in self.observers:
344
            self._final_call(observer, 'completed_event',
345
                             stop_time=self.stop_time,
346
                             result=result)
347
348
    def _emit_interrupted(self, status):
349
        self.status = status
350
        elapsed_time = self._stop_time()
351
        self.run_logger.warning("Aborted after %s!", elapsed_time)
352
        for observer in self.observers:
353
            self._final_call(observer, 'interrupted_event',
354
                             interrupt_time=self.stop_time,
355
                             status=status)
356
357
    def _emit_failed(self, exc_type, exc_value, trace):
358
        self.status = 'FAILED'
359
        elapsed_time = self._stop_time()
360
        self.run_logger.error("Failed after %s!", elapsed_time)
361
        self.fail_trace = tb.format_exception(exc_type, exc_value, trace)
362
        for observer in self.observers:
363
            self._final_call(observer, 'failed_event',
364
                             fail_time=self.stop_time,
365
                             fail_trace=self.fail_trace)
366
367
    def _emit_resource_added(self, filename):
368
        for observer in self.observers:
369
            self._safe_call(observer, 'resource_event', filename=filename)
370
371
    def _emit_artifact_added(self, name, filename):
372
        for observer in self.observers:
373
            self._safe_call(observer, 'artifact_event',
374
                            name=name,
375
                            filename=filename)
376
377
    def _safe_call(self, obs, method, **kwargs):
378
        if obs not in self._failed_observers and hasattr(obs, method):
379
            try:
380
                getattr(obs, method)(**kwargs)
381
            except ObserverError as e:
382
                self._failed_observers.append(obs)
383
                self.run_logger.warning("An error ocurred in the '{}' "
384
                                        "observer: {}".format(obs, e))
385
            except:
386
                self._failed_observers.append(obs)
387
                raise
388
389
    def _final_call(self, observer, method, **kwargs):
390
        if hasattr(observer, method):
391
            try:
392
                getattr(observer, method)(**kwargs)
393
            except Exception:
394
                # Feels dirty to catch all exceptions, but it is just for
395
                # finishing up, so we don't want one observer to kill the
396
                # others
397
                self.run_logger.error(tb.format_exc())
398
399
    def _warn_about_failed_observers(self):
400
        for observer in self._failed_observers:
401
            self.run_logger.warning("The observer '{}' failed at some point "
402
                                    "during the run.".format(observer))
403
404
    def _execute_pre_run_hooks(self):
405
        for pr in self.pre_run_hooks:
406
            pr()
407
408
    def _execute_post_run_hooks(self):
409
        for pr in self.post_run_hooks:
410
            pr()
411
412
    def warn_if_unobserved(self):
413
        if not self.observers and not self.debug and not self.unobserved:
414
            self.run_logger.warning("No observers have been added to this run")
415