Completed
Push — master ( 1a32f0...6baa7b )
by Klaus
01:15
created

tee_output()   A

Complexity

Conditions 3

Size

Total Lines 64

Duplication

Lines 0
Ratio 0 %

Importance

Changes 2
Bugs 0 Features 1
Metric Value
cc 3
c 2
b 0
f 1
dl 0
loc 64
rs 9.3956

2 Methods

Rating   Name   Duplication   Size   Complexity  
A get_by_dotted_path() 0 17 4
A set_by_dotted_path() 0 22 3

How to fix   Long Method   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

1
#!/usr/bin/env python
2
# coding=utf-8
3
from __future__ import division, print_function, unicode_literals
4
5
import collections
6
import logging
7
import os.path
8
import re
9
import sys
10
import traceback as tb
11
from functools import partial
12
13
import wrapt
14
15
16
__sacred__ = True  # marks files that should be filtered from stack traces
17
18
NO_LOGGER = logging.getLogger('ignore')
19
NO_LOGGER.disabled = 1
20
21
PATHCHANGE = object()
22
23
PYTHON_IDENTIFIER = re.compile("^[a-zA-Z_][_a-zA-Z0-9]*$")
24
25
26
class CircularDependencyError(Exception):
27
    """The ingredients of the current experiment form a circular dependency."""
28
29
30
class ObserverError(Exception):
31
    """Error that an observer raises but that should not make the run fail."""
32
33
34
class SacredInterrupt(Exception):
35
    """Base-Class for all custom interrupts.
36
37
    For more information see :ref:`custom_interrupts`.
38
    """
39
40
    STATUS = "INTERRUPTED"
41
42
43
class TimeoutInterrupt(SacredInterrupt):
44
    """Signal a that the experiment timed out.
45
46
    This exception can be used in client code to indicate that the run
47
    exceeded its time limit and has been interrupted because of that.
48
    The status of the interrupted run will then be set to ``TIMEOUT``.
49
50
    For more information see :ref:`custom_interrupts`.
51
    """
52
53
    STATUS = "TIMEOUT"
54
55
56
def create_basic_stream_logger():
57
    logger = logging.getLogger('')
58
    logger.setLevel(logging.INFO)
59
    logger.handlers = []
60
    ch = logging.StreamHandler()
61
    formatter = logging.Formatter('%(levelname)s - %(name)s - %(message)s')
62
    ch.setFormatter(formatter)
63
    logger.addHandler(ch)
64
    return logger
65
66
67
def recursive_update(d, u):
68
    """
69
    Given two dictionaries d and u, update dict d recursively.
70
71
    E.g.:
72
    d = {'a': {'b' : 1}}
73
    u = {'c': 2, 'a': {'d': 3}}
74
    => {'a': {'b': 1, 'd': 3}, 'c': 2}
75
    """
76
    for k, v in u.items():
77
        if isinstance(v, collections.Mapping):
78
            r = recursive_update(d.get(k, {}), v)
79
            d[k] = r
80
        else:
81
            d[k] = u[k]
82
    return d
83
84
85
def iterate_flattened_separately(dictionary, manually_sorted_keys=None):
86
    """
87
    Recursively iterate over the items of a dictionary in a special order.
88
89
    First iterate over manually sorted keys and then over all items that are
90
    non-dictionary values (sorted by keys), then over the rest
91
    (sorted by keys), providing full dotted paths for every leaf.
92
    """
93
    if manually_sorted_keys is None:
94
        manually_sorted_keys = []
95
    for key in manually_sorted_keys:
96
        if key in dictionary:
97
            yield key, dictionary[key]
98
99
    single_line_keys = [key for key in dictionary.keys() if
100
                        key not in manually_sorted_keys and
101
                        (not dictionary[key] or
102
                         not isinstance(dictionary[key], dict))]
103
    for key in sorted(single_line_keys):
104
        yield key, dictionary[key]
105
106
    multi_line_keys = [key for key in dictionary.keys() if
107
                       key not in manually_sorted_keys and
108
                       (dictionary[key] and
109
                        isinstance(dictionary[key], dict))]
110
    for key in sorted(multi_line_keys):
111
        yield key, PATHCHANGE
112
        for k, val in iterate_flattened_separately(dictionary[key],
113
                                                   manually_sorted_keys):
114
            yield join_paths(key, k), val
115
116
117
def iterate_flattened(d):
118
    """
119
    Recursively iterate over the items of a dictionary.
120
121
    Provides a full dotted paths for every leaf.
122
    """
123
    for key in sorted(d.keys()):
124
        value = d[key]
125
        if isinstance(value, dict):
126
            for k, v in iterate_flattened(d[key]):
127
                yield join_paths(key, k), v
128
        else:
129
            yield key, value
130
131
132
def set_by_dotted_path(d, path, value):
133
    """
134
    Set an entry in a nested dict using a dotted path.
135
136
    Will create dictionaries as needed.
137
138
    Examples:
139
    >>> d = {'foo': {'bar': 7}}
140
    >>> set_by_dotted_path(d, 'foo.bar', 10)
141
    >>> d
142
    {'foo': {'bar': 10}}
143
    >>> set_by_dotted_path(d, 'foo.d.baz', 3)
144
    >>> d
145
    {'foo': {'bar': 10, 'd': {'baz': 3}}}
146
    """
147
    split_path = path.split('.')
148
    current_option = d
149
    for p in split_path[:-1]:
150
        if p not in current_option:
151
            current_option[p] = dict()
152
        current_option = current_option[p]
153
    current_option[split_path[-1]] = value
154
155
156
def get_by_dotted_path(d, path, default=None):
157
    """
158
    Get an entry from nested dictionaries using a dotted path.
159
160
    Example:
161
    >>> get_by_dotted_path({'foo': {'a': 12}}, 'foo.a')
162
    12
163
    """
164
    if not path:
165
        return d
166
    split_path = path.split('.')
167
    current_option = d
168
    for p in split_path:
169
        if p not in current_option:
170
            return default
171
        current_option = current_option[p]
172
    return current_option
173
174
175
def iter_path_splits(path):
176
    """
177
    Iterate over possible splits of a dotted path.
178
179
    The first part can be empty the second should not be.
180
181
    Example:
182
    >>> list(iter_path_splits('foo.bar.baz'))
183
    [('',        'foo.bar.baz'),
184
     ('foo',     'bar.baz'),
185
     ('foo.bar', 'baz')]
186
    """
187
    split_path = path.split('.')
188
    for i in range(len(split_path)):
189
        p1 = join_paths(*split_path[:i])
190
        p2 = join_paths(*split_path[i:])
191
        yield p1, p2
192
193
194
def iter_prefixes(path):
195
    """
196
    Iterate through all (non-empty) prefixes of a dotted path.
197
198
    Example:
199
    >>> list(iter_prefixes('foo.bar.baz'))
200
    ['foo', 'foo.bar', 'foo.bar.baz']
201
    """
202
    split_path = path.split('.')
203
    for i in range(1, len(split_path) + 1):
204
        yield join_paths(*split_path[:i])
205
206
207
def join_paths(*parts):
208
    """Join different parts together to a valid dotted path."""
209
    return '.'.join(str(p).strip('.') for p in parts if p)
210
211
212
def is_prefix(pre_path, path):
213
    """Return True if pre_path is a path-prefix of path."""
214
    pre_path = pre_path.strip('.')
215
    path = path.strip('.')
216
    return not pre_path or path.startswith(pre_path + '.')
217
218
219
def convert_to_nested_dict(dotted_dict):
220
    """Convert a dict with dotted path keys to corresponding nested dict."""
221
    nested_dict = {}
222
    for k, v in iterate_flattened(dotted_dict):
223
        set_by_dotted_path(nested_dict, k, v)
224
    return nested_dict
225
226
227
def print_filtered_stacktrace():
228
    exc_type, exc_value, exc_traceback = sys.exc_info()
229
    # determine if last exception is from sacred
230
    current_tb = exc_traceback
231
    while current_tb.tb_next is not None:
232
        current_tb = current_tb.tb_next
233
    if '__sacred__' in current_tb.tb_frame.f_globals:
234
        print("Exception originated from within Sacred.\n"
235
              "Traceback (most recent calls):", file=sys.stderr)
236
        tb.print_tb(exc_traceback)
237
        tb.print_exception(exc_type, exc_value, None)
238
    else:
239
        print("Traceback (most recent calls WITHOUT Sacred internals):",
240
              file=sys.stderr)
241
        current_tb = exc_traceback
242
        while current_tb is not None:
243
            if '__sacred__' not in current_tb.tb_frame.f_globals:
244
                tb.print_tb(current_tb, 1)
245
            current_tb = current_tb.tb_next
246
        print("\n".join(tb.format_exception_only(exc_type, exc_value)).strip(),
247
              file=sys.stderr)
248
249
250
def is_subdir(path, directory):
251
    path = os.path.abspath(os.path.realpath(path)) + os.sep
252
    directory = os.path.abspath(os.path.realpath(directory)) + os.sep
253
254
    return path.startswith(directory)
255
256
257
# noinspection PyUnusedLocal
258
@wrapt.decorator
259
def optional_kwargs_decorator(wrapped, instance=None, args=None, kwargs=None):
260
    # here wrapped is itself a decorator
261
    if args:  # means it was used as a normal decorator (so just call it)
262
        return wrapped(*args, **kwargs)
263
    else:  # used with kwargs, so we need to return a decorator
264
        return partial(wrapped, **kwargs)
265
266
267
def get_inheritors(cls):
268
    """Get a set of all classes that inherit from the given class."""
269
    subclasses = set()
270
    work = [cls]
271
    while work:
272
        parent = work.pop()
273
        for child in parent.__subclasses__():
274
            if child not in subclasses:
275
                subclasses.add(child)
276
                work.append(child)
277
    return subclasses
278
279
280
# Credit to Zarathustra and epost from stackoverflow
281
# Taken from http://stackoverflow.com/a/1176023/1388435
282
def convert_camel_case_to_snake_case(name):
283
    """Convert CamelCase to snake_case."""
284
    s1 = re.sub('(.)([A-Z][a-z]+)', r'\1_\2', name)
285
    return re.sub('([a-z0-9])([A-Z])', r'\1_\2', s1).lower()
286
287
288
def apply_backspaces_and_linefeeds(text):
289
    """
290
    Interpret backspaces and linefeeds in text like a terminal would.
291
292
    Interpret text like a terminal by removing backspace and linefeed
293
    characters and applying them line by line.
294
    """
295
    lines = []
296
    for line in text.split('\n'):
297
        chars, cursor = [], 0
298
        for ch in line:
299
            if ch == '\b':
300
                cursor = max(0, cursor - 1)
301
            elif ch == '\r':
302
                cursor = 0
303
            else:
304
                # normal character
305
                if cursor == len(chars):
306
                    chars.append(ch)
307
                else:
308
                    chars[cursor] = ch
309
                cursor += 1
310
        lines.append(''.join(chars))
311
    return '\n'.join(lines)
312
313
314
# Code adapted from here:
315
# https://blog.codinghorror.com/sorting-for-humans-natural-sort-order/
316
def natural_sort(l):
317
    def alphanum_key(key):
318
        return [int(c) if c.isdigit() else c.lower()
319
                for c in re.split('([0-9]+)', key)]
320
321
    return sorted(l, key=alphanum_key)
322