Completed
Pull Request — master (#2432)
by Zatreanu
01:47
created

icollect_bears()   D

Complexity

Conditions 8

Size

Total Lines 43

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 8
c 0
b 0
f 0
dl 0
loc 43
rs 4
1
import functools
2
import os
3
import pkg_resources
4
import itertools
5
6
from pyprint.NullPrinter import NullPrinter
7
8
from coalib.bears.BEAR_KIND import BEAR_KIND
9
from coalib.collecting.Importers import iimport_objects
10
from coala-utils.decorators import yield_once
11
from coalib.output.printers.LOG_LEVEL import LOG_LEVEL
12
from coalib.parsing.Globbing import fnmatch, iglob, glob_escape
13
from coalib.output.printers.LogPrinter import LogPrinter
14
15
16
def _get_kind(bear_class):
17
    try:
18
        return bear_class.kind()
19
    except NotImplementedError:
20
        return None
21
22
23
def _import_bears(file_path, kinds):
24
    # recursive imports:
25
    for bear_list in iimport_objects(file_path,
26
                                     names='__additional_bears__',
27
                                     types=list):
28
        for bear_class in bear_list:
29
            if _get_kind(bear_class) in kinds:
30
                yield bear_class
31
    # normal import
32
    for bear_class in iimport_objects(file_path,
33
                                      attributes='kind',
34
                                      local=True):
35
        if _get_kind(bear_class) in kinds:
36
            yield bear_class
37
38
39
@yield_once
40
def icollect(file_paths, ignored_globs=None):
41
    """
42
    Evaluate globs in file paths and return all matching files.
43
44
    :param file_paths:    file path or list of such that can include globs
45
    :param ignored_globs: list of globs to ignore when matching files
46
    :return:              iterator that yields tuple of path of a matching
47
                          file, the glob where it was found
48
    """
49
    if isinstance(file_paths, str):
50
        file_paths = [file_paths]
51
52
    for file_path in file_paths:
53
        for match in iglob(file_path):
54
            if not ignored_globs or not fnmatch(match, ignored_globs):
55
                yield match, file_path
56
57
58
def collect_files(file_paths, log_printer, ignored_file_paths=None,
59
                  limit_file_paths=None):
60
    """
61
    Evaluate globs in file paths and return all matching files
62
63
    :param file_paths:         file path or list of such that can include globs
64
    :param ignored_file_paths: list of globs that match to-be-ignored files
65
    :param limit_file_paths:   list of globs that the files are limited to
66
    :return:                   list of paths of all matching files
67
    """
68
    limit_fnmatch = (functools.partial(fnmatch, globs=limit_file_paths)
69
                     if limit_file_paths else lambda fname: True)
70
71
    valid_files = list(filter(lambda fname: os.path.isfile(fname[0]),
72
                              icollect(file_paths, ignored_file_paths)))
73
74
    # Find globs that gave no files and warn the user
75
    if valid_files:
76
        collected_files, file_globs_with_files = zip(*valid_files)
77
    else:
78
        collected_files, file_globs_with_files = [], []
79
80
    _warn_if_unused_glob(log_printer, file_paths, file_globs_with_files,
81
                         "No files matching '{}' were found.")
82
    limited_files = list(filter(limit_fnmatch, collected_files))
83
    return limited_files
84
85
86
def collect_dirs(dir_paths, ignored_dir_paths=None):
87
    """
88
    Evaluate globs in directory paths and return all matching directories
89
90
    :param dir_paths:         file path or list of such that can include globs
91
    :param ignored_dir_paths: list of globs that match to-be-ignored dirs
92
    :return:                  list of paths of all matching directories
93
    """
94
    valid_dirs = list(filter(lambda fname: os.path.isdir(fname[0]),
95
                             icollect(dir_paths, ignored_dir_paths)))
96
    if valid_dirs:
97
        collected_dirs, _ = zip(*valid_dirs)
98
        return list(collected_dirs)
99
    else:
100
        return []
101
102
103
@yield_once
104
def icollect_bears(bear_dirs, bear_globs, kinds, log_printer):
105
    """
106
    Collect all bears from bear directories that have a matching kind.
107
108
    :param bear_dirs:   directory name or list of such that can contain bears
109
    :param bear_globs:  globs of bears to collect
110
    :param kinds:       list of bear kinds to be collected
111
    :param log_printer: log_printer to handle logging
112
    :return:            iterator that yields a tuple with bear class and
113
                        which bear_glob was used to find that bear class.
114
    """
115
    for bear_dir, dir_glob in filter(lambda x: os.path.isdir(x[0]),
116
                                     icollect(bear_dirs)):
117
        # Since we get a real directory here and since we
118
        # pass this later to iglob, we need to escape this.
119
        bear_dir = glob_escape(bear_dir)
120
        for bear_glob in bear_globs:
121
            for matching_file in iglob(
122
                    os.path.join(bear_dir, bear_glob + '.py')):
123
124
                try:
125
                    for bear in _import_bears(matching_file, kinds):
126
                        yield bear, bear_glob
127
                except pkg_resources.VersionConflict as exception:
128
                    log_printer.log_exception(
129
                        ("Unable to collect bears from {file} because there "
130
                         "is a conflict with the version of a dependency "
131
                         "you have installed. This may be resolved by "
132
                         "creating a separate virtual environment for coala "
133
                         "or running `pip install {pkg}`. Be aware that the "
134
                         "latter solution might break other python packages "
135
                         "that depend on the currently installed "
136
                         "version.").format(file=matching_file,
137
                                            pkg=exception.req),
138
                        exception, log_level=LOG_LEVEL.WARNING)
139
                except BaseException as exception:
140
                    log_printer.log_exception(
141
                        "Unable to collect bears from {file}. Probably the "
142
                        "file is malformed or the module code raises an "
143
                        "exception.".format(file=matching_file),
144
                        exception,
145
                        log_level=LOG_LEVEL.WARNING)
146
147
148
def collect_bears(bear_dirs, bear_globs, kinds, log_printer,
149
                  warn_if_unused_glob=True):
150
    """
151
    Collect all bears from bear directories that have a matching kind
152
    matching the given globs.
153
154
    :param bear_dirs:           Directory name or list of such that can contain
155
                                bears.
156
    :param bear_globs:          Globs of bears to collect.
157
    :param kinds:               List of bear kinds to be collected.
158
    :param log_printer:         log_printer to handle logging.
159
    :param warn_if_unused_glob: True if warning message should be shown if a
160
                                glob didn't give any bears.
161
    :return:                    Tuple of list of matching bear classes based on
162
                                kind. The lists are in the same order as kinds.
163
    """
164
    bears_found = tuple([] for i in range(len(kinds)))
165
    bear_globs_with_bears = set()
166
    for bear, glob in icollect_bears(bear_dirs, bear_globs, kinds, log_printer):
167
        index = kinds.index(_get_kind(bear))
168
        bears_found[index].append(bear)
169
        bear_globs_with_bears.add(glob)
170
171
    if warn_if_unused_glob:
172
        _warn_if_unused_glob(log_printer, bear_globs, bear_globs_with_bears,
173
                             "No bears were found matching '{}'.")
174
    return bears_found
175
176
177
def filter_section_bears_by_languages(bears, languages):
178
    """
179
    Filters the bears by languages.
180
181
    :param bears:       the dictionary of the sections as keys and list of
182
                        bears as values.
183
    :param languages:   languages that bears are being filtered on.
184
    :return:            new dictionary with filtered out bears that don't match
185
                        any language from languages.
186
    """
187
    new_bears = {}
188
    # All bears with "all" languages supported shall be shown
189
    languages = set(language.lower() for language in languages) | {'all'}
190
    for section in bears.keys():
191
        new_bears[section] = tuple(
192
            bear for bear in bears[section]
193
            if {language.lower() for language in bear.LANGUAGES} & languages)
194
    return new_bears
195
196
197
def get_all_bears_names():
198
    from coalib.settings.Section import Section
199
    printer = LogPrinter(NullPrinter())
200
    local_bears, global_bears = collect_bears(
201
        Section("").bear_dirs(),
202
        ["**"],
203
        [BEAR_KIND.LOCAL, BEAR_KIND.GLOBAL],
204
        printer,
205
        warn_if_unused_glob=False)
206
    return [bear.name for bear in itertools.chain(local_bears, global_bears)]
207
208
209
def collect_all_bears_from_sections(sections, log_printer):
210
    """
211
    Collect all kinds of bears from bear directories given in the sections.
212
213
    :param sections:    list of sections so bear_dirs are taken into account
214
    :param log_printer: log_printer to handle logging
215
    :return:            tuple of dictionaries of local and global bears
216
                        The dictionary key is section class and
217
                        dictionary value is a list of Bear classes
218
    """
219
    local_bears = {}
220
    global_bears = {}
221
    for section in sections:
222
        bear_dirs = sections[section].bear_dirs()
223
        local_bears[section], global_bears[section] = collect_bears(
224
            bear_dirs,
225
            ["**"],
226
            [BEAR_KIND.LOCAL, BEAR_KIND.GLOBAL],
227
            log_printer,
228
            warn_if_unused_glob=False)
229
    return local_bears, global_bears
230
231
232
def _warn_if_unused_glob(log_printer, globs, used_globs, message):
233
    """
234
    Warn if a glob has not been used.
235
236
    :param log_printer: The log_printer to handle logging.
237
    :param globs:       List of globs that were expected to be used.
238
    :param used_globs:  List of globs that were actually used.
239
    :param message:     Warning message to display if a glob is unused.
240
                        The glob which was unused will be added using
241
                        .format()
242
    """
243
    unused_globs = set(globs) - set(used_globs)
244
    for glob in unused_globs:
245
        log_printer.warn(message.format(glob))
246
247
248
def collect_registered_bears_dirs(entrypoint):
249
    """
250
    Searches setuptools for the entrypoint and returns the bear
251
    directories given by the module.
252
253
    :param entrypoint: The entrypoint to find packages with.
254
    :return:           List of bear directories.
255
    """
256
    collected_dirs = []
257
    for ep in pkg_resources.iter_entry_points(entrypoint):
258
        registered_package = None
259
        try:
260
            registered_package = ep.load()
261
        except pkg_resources.DistributionNotFound:
262
            continue
263
        collected_dirs.append(os.path.abspath(
264
            os.path.dirname(registered_package.__file__)))
265
    return collected_dirs
266