Completed
Push — master ( dbc38f...56accc )
by Klaus
01:34
created

find_doc_for()   F

Complexity

Conditions 16

Size

Total Lines 28

Duplication

Lines 0
Ratio 0 %

Importance

Changes 1
Bugs 0 Features 1
Metric Value
cc 16
dl 0
loc 28
rs 2.7326
c 1
b 0
f 1

How to fix   Complexity   

Complexity

Complex classes like find_doc_for() often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
#!/usr/bin/env python
2
# coding=utf-8
3
from __future__ import division, print_function, unicode_literals
4
5
import ast
6
import inspect
7
import io
8
import re
9
import sys
10
from tokenize import generate_tokens, tokenize, TokenError, COMMENT
11
from copy import copy
12
13
from sacred import SETTINGS
14
from sacred.config.config_summary import ConfigSummary
15
from sacred.config.utils import dogmatize, normalize_or_die, recursive_fill_in
16
17
__sacred__ = True
18
19
20
class ConfigScope(object):
21
    def __init__(self, func):
22
        super(ConfigScope, self).__init__()
23
        self.arg_spec = inspect.getargspec(func)
24
        assert self.arg_spec.varargs is None, \
25
            "varargs are not allowed for ConfigScope functions"
26
        assert self.arg_spec.keywords is None, \
27
            "kwargs are not allowed for ConfigScope functions"
28
        assert self.arg_spec.defaults is None, \
29
            "default values are not allowed for ConfigScope functions"
30
31
        self._func = func
32
        self._body_code = get_function_body_code(func)
33
        self._var_docs = get_config_comments(func)
34
35
    def __call__(self, fixed=None, preset=None, fallback=None):
36
        """
37
        Evaluate this ConfigScope.
38
39
        This will evaluate the function body and fill the relevant local
40
        variables into entries into keys in this dictionary.
41
42
        :param fixed: Dictionary of entries that should stay fixed during the
43
                      evaluation. All of them will be part of the final config.
44
        :type fixed: dict
45
        :param preset: Dictionary of preset values that will be available
46
                       during the evaluation (if they are declared in the
47
                       function argument list). All of them will be part of the
48
                       final config.
49
        :type preset: dict
50
        :param fallback: Dictionary of fallback values that will be available
51
                         during the evaluation (if they are declared in the
52
                         function argument list). They will NOT be part of the
53
                         final config.
54
        :type fallback: dict
55
        :return: self
56
        :rtype: ConfigScope
57
        """
58
        cfg_locals = dogmatize(fixed or {})
59
        fallback = fallback or {}
60
        preset = preset or {}
61
        fallback_view = {}
62
63
        available_entries = set(preset.keys()) | set(fallback.keys())
64
65
        for arg in self.arg_spec.args:
66
            if arg not in available_entries:
67
                raise KeyError("'{}' not in preset for ConfigScope. "
68
                               "Available options are: {}"
69
                               .format(arg, available_entries))
70
            if arg in preset:
71
                cfg_locals[arg] = preset[arg]
72
            else:  # arg in fallback
73
                fallback_view[arg] = fallback[arg]
74
75
        cfg_locals.fallback = fallback_view
76
        eval(self._body_code, copy(self._func.__globals__), cfg_locals)
77
78
        added = cfg_locals.revelation()
79
        config_summary = ConfigSummary(added, cfg_locals.modified,
80
                                       cfg_locals.typechanges,
81
                                       cfg_locals.fallback_writes,
82
                                       docs=self._var_docs)
83
        # fill in the unused presets
84
        recursive_fill_in(cfg_locals, preset)
85
86
        for key, value in cfg_locals.items():
87
            try:
88
                config_summary[key] = normalize_or_die(value)
89
            except ValueError:
90
                pass
91
        return config_summary
92
93
94
def get_function_body(func):
95
    func_code_lines, start_idx = inspect.getsourcelines(func)
96
    func_code = ''.join(func_code_lines)
97
    arg = "(?:[a-zA-Z_][a-zA-Z0-9_]*)"
98
    arguments = r"{0}(?:\s*,\s*{0})*".format(arg)
99
    func_def = re.compile(
100
        r"^[ \t]*def[ \t]*{}[ \t]*\(\s*({})?\s*\)[ \t]*:[ \t]*\n".format(
101
            func.__name__, arguments), flags=re.MULTILINE)
102
    defs = list(re.finditer(func_def, func_code))
103
    assert defs
104
    line_offset = start_idx + func_code[:defs[0].end()].count('\n') - 1
105
    func_body = func_code[defs[0].end():]
106
    return func_body, line_offset
107
108
109
def is_empty_or_comment(line):
110
    sline = line.strip()
111
    return sline == '' or sline.startswith('#')
112
113
114
def iscomment(line):
115
    return line.strip().startswith('#')
116
117
118
def dedent_line(line, indent):
119
    for i, (line_sym, indent_sym) in enumerate(zip(line, indent)):
120
        if line_sym != indent_sym:
121
            start = i
122
            break
123
    else:
124
        start = len(indent)
125
    return line[start:]
126
127
128
def dedent_function_body(body):
129
    lines = body.split('\n')
130
    # find indentation by first line
131
    indent = ''
132
    for line in lines:
133
        if is_empty_or_comment(line):
134
            continue
135
        else:
136
            indent = re.match('^\s*', line).group()
137
            break
138
139
    out_lines = [dedent_line(line, indent) for line in lines]
140
    return '\n'.join(out_lines)
141
142
143
def get_function_body_code(func):
144
    filename = inspect.getfile(func)
145
    func_body, line_offset = get_function_body(func)
146
    body_source = dedent_function_body(func_body)
147
    try:
148
        body_code = compile(body_source, filename, "exec", ast.PyCF_ONLY_AST)
149
        body_code = ast.increment_lineno(body_code, n=line_offset)
150
        body_code = compile(body_code, filename, "exec")
151
    except SyntaxError as e:
152
        if e.args[0] == "'return' outside function":
153
            filename, lineno, _, statement = e.args[1]
154
            raise SyntaxError('No return statements allowed in ConfigScopes\n'
155
                              '(\'{}\' in File "{}", line {})'.format(
156
                                  statement.strip(), filename, lineno))
157
        elif e.args[0] == "'yield' outside function":
158
            filename, lineno, _, statement = e.args[1]
159
            raise SyntaxError('No yield statements allowed in ConfigScopes\n'
160
                              '(\'{}\' in File "{}", line {})'.format(
161
                                  statement.strip(), filename, lineno))
162
        else:
163
            raise
164
    return body_code
165
166
167
def is_ignored(line):
168
    for pattern in SETTINGS.CONFIG.IGNORED_COMMENTS:
169
        if re.match(pattern, line) is not None:
170
            return True
171
    return False
172
173
174
def find_doc_for(ast_entry, body_lines):
175
    lineno = ast_entry.lineno - 1
176
    line_io = io.BytesIO(body_lines[lineno].encode())
177
    try:
178
        if sys.version_info[0] >= 3:
179
            tokens = tokenize(line_io.readline) or []
180
            line_comments = [t.string for t in tokens if t.type == COMMENT]
181
        else:  # sys.version[0] == 2:
182
            tokens = generate_tokens(line_io.readline)
183
            line_comments = [s for (t, s, _, _, _) in tokens if t == COMMENT]
184
        if line_comments:
185
            formatted_lcs = [l[1:].strip() for l in line_comments]
186
            filtered_lcs = [l for l in formatted_lcs if not is_ignored(l)]
187
            if filtered_lcs:
188
                return filtered_lcs[0]
189
    except TokenError:
190
        pass
191
192
    lineno -= 1
193
    while lineno >= 0:
194
        if iscomment(body_lines[lineno]):
195
            comment = body_lines[lineno].strip('# ')
196
            if not is_ignored(comment):
197
                return comment
198
        if not body_lines[lineno].strip() == '':
199
            return None
200
        lineno -= 1
201
    return None
202
203
204
def add_doc(target, variables, body_lines):
205
    if isinstance(target, ast.Name):
206
        # if it is a variable name add it to the doc
207
        name = target.id
208
        if name not in variables:
209
            doc = find_doc_for(target, body_lines)
210
            if doc is not None:
211
                variables[name] = doc
212
    elif isinstance(target, ast.Tuple):
213
        # if it is a tuple then iterate the elements
214
        # this can happen like this:
215
        # a, b = 1, 2
216
        for e in target.elts:
217
            add_doc(e, variables, body_lines)
218
219
220
def get_config_comments(func):
221
    filename = inspect.getfile(func)
222
    func_body, line_offset = get_function_body(func)
223
    body_source = dedent_function_body(func_body)
224
    body_code = compile(body_source, filename, "exec", ast.PyCF_ONLY_AST)
225
    body_lines = body_source.split('\n')
226
227
    variables = {'seed': 'the random seed for this experiment'}
228
229
    for ast_root in body_code.body:
230
        for ast_entry in [ast_root] + list(ast.iter_child_nodes(ast_root)):
231
            if isinstance(ast_entry, ast.Assign):
232
                # we found an assignment statement
233
                # go through all targets of the assignment
234
                # usually a single entry, but can be more for statements like:
235
                # a = b = 5
236
                for t in ast_entry.targets:
237
                    add_doc(t, variables, body_lines)
238
239
    return variables
240