find_doc_for()   F
last analyzed

Complexity

Conditions 16

Size

Total Lines 28

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 16
c 0
b 0
f 0
dl 0
loc 28
rs 2.4

How to fix   Complexity   

Complexity

Complex classes like find_doc_for() often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
#!/usr/bin/env python
2
# coding=utf-8
3
from __future__ import division, print_function, unicode_literals
4
5
import ast
6
import inspect
7
import io
8
import re
9
import sys
10
from tokenize import generate_tokens, tokenize, TokenError, COMMENT
11
from copy import copy
12
13
from sacred import SETTINGS
14
from sacred.config.config_summary import ConfigSummary
15
from sacred.config.utils import dogmatize, normalize_or_die, recursive_fill_in
16
from sacred.config.signature import get_argspec
17
18
19
class ConfigScope(object):
20
    def __init__(self, func):
21
        super(ConfigScope, self).__init__()
22
        self.args, vararg_name, kw_wildcard, _, kwargs = get_argspec(func)
23
        assert vararg_name is None, \
24
            "*args not allowed for ConfigScope functions"
25
        assert kw_wildcard is None, \
26
            "**kwargs not allowed for ConfigScope functions"
27
        assert not kwargs, \
28
            "default values are not allowed for ConfigScope functions"
29
30
        self._func = func
31
        self._body_code = get_function_body_code(func)
32
        self._var_docs = get_config_comments(func)
33
34
    def __call__(self, fixed=None, preset=None, fallback=None):
35
        """
36
        Evaluate this ConfigScope.
37
38
        This will evaluate the function body and fill the relevant local
39
        variables into entries into keys in this dictionary.
40
41
        :param fixed: Dictionary of entries that should stay fixed during the
42
                      evaluation. All of them will be part of the final config.
43
        :type fixed: dict
44
        :param preset: Dictionary of preset values that will be available
45
                       during the evaluation (if they are declared in the
46
                       function argument list). All of them will be part of the
47
                       final config.
48
        :type preset: dict
49
        :param fallback: Dictionary of fallback values that will be available
50
                         during the evaluation (if they are declared in the
51
                         function argument list). They will NOT be part of the
52
                         final config.
53
        :type fallback: dict
54
        :return: self
55
        :rtype: ConfigScope
56
        """
57
        cfg_locals = dogmatize(fixed or {})
58
        fallback = fallback or {}
59
        preset = preset or {}
60
        fallback_view = {}
61
62
        available_entries = set(preset.keys()) | set(fallback.keys())
63
64
        for arg in self.args:
65
            if arg not in available_entries:
66
                raise KeyError("'{}' not in preset for ConfigScope. "
67
                               "Available options are: {}"
68
                               .format(arg, available_entries))
69
            if arg in preset:
70
                cfg_locals[arg] = preset[arg]
71
            else:  # arg in fallback
72
                fallback_view[arg] = fallback[arg]
73
74
        cfg_locals.fallback = fallback_view
75
        eval(self._body_code, copy(self._func.__globals__), cfg_locals)
76
77
        added = cfg_locals.revelation()
78
        config_summary = ConfigSummary(added, cfg_locals.modified,
79
                                       cfg_locals.typechanges,
80
                                       cfg_locals.fallback_writes,
81
                                       docs=self._var_docs)
82
        # fill in the unused presets
83
        recursive_fill_in(cfg_locals, preset)
84
85
        for key, value in cfg_locals.items():
86
            try:
87
                config_summary[key] = normalize_or_die(value)
88
            except ValueError:
89
                pass
90
        return config_summary
91
92
93
def get_function_body(func):
94
    func_code_lines, start_idx = inspect.getsourcelines(func)
95
    func_code = ''.join(func_code_lines)
96
    arg = "(?:[a-zA-Z_][a-zA-Z0-9_]*)"
97
    arguments = r"{0}(?:\s*,\s*{0})*".format(arg)
98
    func_def = re.compile(
99
        r"^[ \t]*def[ \t]*{}[ \t]*\(\s*({})?\s*\)[ \t]*:[ \t]*\n".format(
100
            func.__name__, arguments), flags=re.MULTILINE)
101
    defs = list(re.finditer(func_def, func_code))
102
    assert defs
103
    line_offset = start_idx + func_code[:defs[0].end()].count('\n') - 1
104
    func_body = func_code[defs[0].end():]
105
    return func_body, line_offset
106
107
108
def is_empty_or_comment(line):
109
    sline = line.strip()
110
    return sline == '' or sline.startswith('#')
111
112
113
def iscomment(line):
114
    return line.strip().startswith('#')
115
116
117
def dedent_line(line, indent):
118
    for i, (line_sym, indent_sym) in enumerate(zip(line, indent)):
119
        if line_sym != indent_sym:
120
            start = i
121
            break
122
    else:
123
        start = len(indent)
124
    return line[start:]
125
126
127
def dedent_function_body(body):
128
    lines = body.split('\n')
129
    # find indentation by first line
130
    indent = ''
131
    for line in lines:
132
        if is_empty_or_comment(line):
133
            continue
134
        else:
135
            indent = re.match('^\s*', line).group()
136
            break
137
138
    out_lines = [dedent_line(line, indent) for line in lines]
139
    return '\n'.join(out_lines)
140
141
142
def get_function_body_code(func):
143
    filename = inspect.getfile(func)
144
    func_body, line_offset = get_function_body(func)
145
    body_source = dedent_function_body(func_body)
146
    try:
147
        body_code = compile(body_source, filename, "exec", ast.PyCF_ONLY_AST)
148
        body_code = ast.increment_lineno(body_code, n=line_offset)
149
        body_code = compile(body_code, filename, "exec")
150
    except SyntaxError as e:
151
        if e.args[0] == "'return' outside function":
152
            filename, lineno, _, statement = e.args[1]
153
            raise SyntaxError('No return statements allowed in ConfigScopes\n'
154
                              '(\'{}\' in File "{}", line {})'.format(
155
                                  statement.strip(), filename, lineno))
156
        elif e.args[0] == "'yield' outside function":
157
            filename, lineno, _, statement = e.args[1]
158
            raise SyntaxError('No yield statements allowed in ConfigScopes\n'
159
                              '(\'{}\' in File "{}", line {})'.format(
160
                                  statement.strip(), filename, lineno))
161
        else:
162
            raise
163
    return body_code
164
165
166
def is_ignored(line):
167
    for pattern in SETTINGS.CONFIG.IGNORED_COMMENTS:
168
        if re.match(pattern, line) is not None:
169
            return True
170
    return False
171
172
173
def find_doc_for(ast_entry, body_lines):
174
    lineno = ast_entry.lineno - 1
175
    line_io = io.BytesIO(body_lines[lineno].encode())
176
    try:
177
        if sys.version_info[0] >= 3:
178
            tokens = tokenize(line_io.readline) or []
179
            line_comments = [t.string for t in tokens if t.type == COMMENT]
180
        else:  # sys.version[0] == 2:
181
            tokens = generate_tokens(line_io.readline)
182
            line_comments = [s for (t, s, _, _, _) in tokens if t == COMMENT]
183
        if line_comments:
184
            formatted_lcs = [l[1:].strip() for l in line_comments]
185
            filtered_lcs = [l for l in formatted_lcs if not is_ignored(l)]
186
            if filtered_lcs:
187
                return filtered_lcs[0]
188
    except TokenError:
189
        pass
190
191
    lineno -= 1
192
    while lineno >= 0:
193
        if iscomment(body_lines[lineno]):
194
            comment = body_lines[lineno].strip('# ')
195
            if not is_ignored(comment):
196
                return comment
197
        if not body_lines[lineno].strip() == '':
198
            return None
199
        lineno -= 1
200
    return None
201
202
203
def add_doc(target, variables, body_lines):
204
    if isinstance(target, ast.Name):
205
        # if it is a variable name add it to the doc
206
        name = target.id
207
        if name not in variables:
208
            doc = find_doc_for(target, body_lines)
209
            if doc is not None:
210
                variables[name] = doc
211
    elif isinstance(target, ast.Tuple):
212
        # if it is a tuple then iterate the elements
213
        # this can happen like this:
214
        # a, b = 1, 2
215
        for e in target.elts:
216
            add_doc(e, variables, body_lines)
217
218
219
def get_config_comments(func):
220
    filename = inspect.getfile(func)
221
    func_body, line_offset = get_function_body(func)
222
    body_source = dedent_function_body(func_body)
223
    body_code = compile(body_source, filename, "exec", ast.PyCF_ONLY_AST)
224
    body_lines = body_source.split('\n')
225
226
    variables = {'seed': 'the random seed for this experiment'}
227
228
    for ast_root in body_code.body:
229
        for ast_entry in [ast_root] + list(ast.iter_child_nodes(ast_root)):
230
            if isinstance(ast_entry, ast.Assign):
231
                # we found an assignment statement
232
                # go through all targets of the assignment
233
                # usually a single entry, but can be more for statements like:
234
                # a = b = 5
235
                for t in ast_entry.targets:
236
                    add_doc(t, variables, body_lines)
237
238
    return variables
239