Completed
Pull Request — master (#2063)
by Abdeali
02:50 queued 52s
created

icollect_bears()   D

Complexity

Conditions 8

Size

Total Lines 43

Duplication

Lines 0
Ratio 0 %

Importance

Changes 1
Bugs 0 Features 0
Metric Value
cc 8
c 1
b 0
f 0
dl 0
loc 43
rs 4
1
import functools
2
import os
3
import pkg_resources
4
5
from coalib.bears.BEAR_KIND import BEAR_KIND
6
from coalib.collecting.Importers import iimport_objects
7
from coalib.misc.Decorators import yield_once
8
from coalib.output.printers.LOG_LEVEL import LOG_LEVEL
9
from coalib.parsing.Globbing import fnmatch, iglob, glob_escape
10
11
12
def _get_kind(bear_class):
13
    try:
14
        return bear_class.kind()
15
    except NotImplementedError:
16
        return None
17
18
19
def _import_bears(file_path, kinds):
20
    # recursive imports:
21
    for bear_list in iimport_objects(file_path,
22
                                     names='__additional_bears__',
23
                                     types=list):
24
        for bear_class in bear_list:
25
            if _get_kind(bear_class) in kinds:
26
                yield bear_class
27
    # normal import
28
    for bear_class in iimport_objects(file_path,
29
                                      attributes='kind',
30
                                      local=True):
31
        if _get_kind(bear_class) in kinds:
32
            yield bear_class
33
34
35
@yield_once
36
def icollect(file_paths, ignored_globs=None):
37
    """
38
    Evaluate globs in file paths and return all matching files.
39
40
    :param file_paths:    file path or list of such that can include globs
41
    :param ignored_globs: list of globs to ignore when matching files
42
    :return:              iterator that yields tuple of path of a matching
43
                          file, the glob where it was found
44
    """
45
    if isinstance(file_paths, str):
46
        file_paths = [file_paths]
47
48
    for file_path in file_paths:
49
        for match in iglob(file_path):
50
            if not ignored_globs or not fnmatch(match, ignored_globs):
51
                yield match, file_path
52
53
54
def collect_files(file_paths, log_printer, ignored_file_paths=None,
55
                  limit_file_paths=None):
56
    """
57
    Evaluate globs in file paths and return all matching files
58
59
    :param file_paths:         file path or list of such that can include globs
60
    :param ignored_file_paths: list of globs that match to-be-ignored files
61
    :param limit_file_paths:   list of globs that the files are limited to
62
    :return:                   list of paths of all matching files
63
    """
64
    limit_fnmatch = (functools.partial(fnmatch, patterns=limit_file_paths)
65
                     if limit_file_paths else lambda fname: True)
66
67
    valid_files = list(filter(lambda fname: os.path.isfile(fname[0]),
68
                              icollect(file_paths, ignored_file_paths)))
69
70
    # Find globs that gave no files and warn the user
71
    if valid_files:
72
        collected_files, file_globs_with_files = zip(*valid_files)
73
    else:
74
        collected_files, file_globs_with_files = [], []
75
76
    _warn_if_unused_glob(log_printer, file_paths, file_globs_with_files,
77
                         "No files matching '{}' were found.")
78
    limited_files = list(filter(limit_fnmatch, collected_files))
79
    return limited_files
80
81
82
def collect_dirs(dir_paths, ignored_dir_paths=None):
83
    """
84
    Evaluate globs in directory paths and return all matching directories
85
86
    :param dir_paths:         file path or list of such that can include globs
87
    :param ignored_dir_paths: list of globs that match to-be-ignored dirs
88
    :return:                  list of paths of all matching directories
89
    """
90
    valid_dirs = list(filter(lambda fname: os.path.isdir(fname[0]),
91
                             icollect(dir_paths, ignored_dir_paths)))
92
    if valid_dirs:
93
        collected_dirs, _ = zip(*valid_dirs)
94
        return list(collected_dirs)
95
    else:
96
        return []
97
98
99
@yield_once
100
def icollect_bears(bear_dirs, bear_globs, kinds, log_printer):
101
    """
102
    Collect all bears from bear directories that have a matching kind.
103
104
    :param bear_dirs:   directory name or list of such that can contain bears
105
    :param bear_globs:  globs of bears to collect
106
    :param kinds:       list of bear kinds to be collected
107
    :param log_printer: log_printer to handle logging
108
    :return:            iterator that yields a tuple with bear class and
109
                        which bear_glob was used to find that bear class.
110
    """
111
    for bear_dir, dir_glob in filter(lambda x: os.path.isdir(x[0]),
112
                                     icollect(bear_dirs)):
113
        # Since we get a real directory here and since we
114
        # pass this later to iglob, we need to escape this.
115
        bear_dir = glob_escape(bear_dir)
116
        for bear_glob in bear_globs:
117
            for matching_file in iglob(
118
                    os.path.join(bear_dir, bear_glob + '.py')):
119
120
                try:
121
                    for bear in _import_bears(matching_file, kinds):
122
                        yield bear, bear_glob
123
                except pkg_resources.VersionConflict as exception:
124
                    log_printer.log_exception(
125
                        ("Unable to collect bears from {file} because there "
126
                         "is a conflict with the version of a dependency "
127
                         "you have installed. This may be resolved by "
128
                         "creating a separate virtual environment for coala "
129
                         "or running `pip install {pkg}`. Be aware that the "
130
                         "latter solution might break other python packages "
131
                         "that depend on the currently installed "
132
                         "version.").format(file=matching_file,
133
                                            pkg=str(exception.req)),
134
                        exception, log_level=LOG_LEVEL.WARNING)
135
                except BaseException as exception:
136
                    log_printer.log_exception(
137
                        "Unable to collect bears from {file}. Probably the "
138
                        "file is malformed or the module code raises an "
139
                        "exception.".format(file=matching_file),
140
                        exception,
141
                        log_level=LOG_LEVEL.WARNING)
142
143
144
def collect_bears(bear_dirs, bear_globs, kinds, log_printer,
145
                  warn_if_unused_glob=True):
146
    """
147
    Collect all bears from bear directories that have a matching kind
148
    matching the given globs.
149
150
    :param bear_dirs:           Directory name or list of such that can contain
151
                                bears.
152
    :param bear_globs:          Globs of bears to collect.
153
    :param kinds:               List of bear kinds to be collected.
154
    :param log_printer:         LogPrinter to handle logging.
155
    :param warn_if_unused_glob: True if warning message should be shown if a
156
                                glob didn't give any bears.
157
    :return:                    Tuple of list of matching bear classes based on
158
                                kind. The lists are in the same order as kinds.
159
    """
160
    bears_found = tuple([] for i in range(len(kinds)))
161
    bear_globs_with_bears = set()
162
    for bear, glob in icollect_bears(bear_dirs, bear_globs, kinds, log_printer):
163
        index = kinds.index(_get_kind(bear))
164
        bears_found[index].append(bear)
165
        bear_globs_with_bears.add(glob)
166
167
    if warn_if_unused_glob:
168
        _warn_if_unused_glob(log_printer, bear_globs, bear_globs_with_bears,
169
                             "No bears were found matching '{}'.")
170
    return bears_found
171
172
173
def collect_all_bears_from_sections(sections, log_printer):
174
    """
175
    Collect all kinds of bears from bear directories given in the sections.
176
177
    :param bear_dirs:   directory name or list of such that can contain bears
178
    :param log_printer: log_printer to handle logging
179
    :return:            tuple of dictionaries of local and global bears
180
                        The dictionary key is section class and
181
                        dictionary value is a list of Bear classes
182
    """
183
    local_bears = {}
184
    global_bears = {}
185
    for section in sections:
186
        bear_dirs = sections[section].bear_dirs()
187
        local_bears[section], global_bears[section] = collect_bears(
188
            bear_dirs,
189
            ["**"],
190
            [BEAR_KIND.LOCAL, BEAR_KIND.GLOBAL],
191
            log_printer,
192
            warn_if_unused_glob=False)
193
    return local_bears, global_bears
194
195
196
def _warn_if_unused_glob(log_printer, globs, used_globs, message):
197
    """
198
    Warn if a glob has not been used.
199
200
    :param log_printer: The log_printer to handle logging.
201
    :param globs:       List of globs that were expected to be used.
202
    :param used_globs:  List of globs that were actually used.
203
    :param message:     Warning message to display if a glob is unused.
204
                        The glob which was unused will be added using
205
                        .format()
206
    """
207
    unused_globs = set(globs) - set(used_globs)
208
    for glob in unused_globs:
209
        log_printer.warn(message.format(glob))
210
211
212
def collect_registered_bears_dirs(entrypoint):
213
    """
214
    Searches setuptools for the entrypoint and returns the bear
215
    directories given by the module.
216
217
    :param entrypoint: The endpoint to find packages with.
218
    :return:           List of bear directories.
219
    """
220
    collected_dirs = []
221
    for ep in pkg_resources.iter_entry_points(entrypoint):
222
        registered_package = None
223
        try:
224
            registered_package = ep.load()
225
        except pkg_resources.DistributionNotFound:
226
            continue
227
        collected_dirs.append(os.path.abspath(
228
            os.path.dirname(registered_package.__file__)))
229
    return collected_dirs
230