Completed
Pull Request — master (#1127)
by Mischa
02:23
created

coalib.processes.instantiate_bears()   C

Complexity

Conditions 7

Size

Total Lines 42

Duplication

Lines 0
Ratio 0 %
Metric Value
cc 7
dl 0
loc 42
rs 5.5
1
import multiprocessing
2
import queue
3
import os
4
import platform
5
import subprocess
6
7
from coalib.collecting.Collectors import collect_files
8
from coalib.collecting import Dependencies
9
from coalib.misc.StringConverter import StringConverter
10
from coalib.output.printers import LOG_LEVEL
11
from coalib.processes.BearRunning import run
12
from coalib.processes.CONTROL_ELEMENT import CONTROL_ELEMENT
13
from coalib.results.Result import Result
14
from coalib.results.RESULT_SEVERITY import RESULT_SEVERITY
15
from coalib.results.SourceRange import SourceRange
16
from coalib.settings.Setting import path_list
17
from coalib.processes.LogPrinterThread import LogPrinterThread
18
19
20
def get_cpu_count():
21
    try:
22
        return multiprocessing.cpu_count()
23
    # cpu_count is not implemented for some CPU architectures/OSes
24
    except NotImplementedError:  # pragma: no cover
25
        return 2
26
27
28
def fill_queue(queue_fill, any_list):
29
    """
30
    Takes element from a list and populates a queue with those elements.
31
32
    :param queue_fill: The queue to be filled.
33
    :param any_list:   List containing the elements.
34
    """
35
    for elem in any_list:
36
        queue_fill.put(elem)
37
38
39
def get_running_processes(processes):
40
    return sum((1 if process.is_alive() else 0) for process in processes)
41
42
43
def create_process_group(command_array, **kwargs):
44
    if platform.system() == "Windows":  # pragma: no cover
45
        proc = subprocess.Popen(
46
            command_array,
47
            creationflags=subprocess.CREATE_NEW_PROCESS_GROUP,
48
            **kwargs)
49
    else:
50
        proc = subprocess.Popen(command_array,
51
                                preexec_fn=os.setsid,
52
                                **kwargs)
53
    return proc
54
55
56
def print_result(results,
57
                 file_dict,
58
                 retval,
59
                 print_results,
60
                 section,
61
                 log_printer,
62
                 file_diff_dict,
63
                 ignore_ranges):
64
    """
65
    Takes the results produced by each bear and gives them to the print_results
66
    method to present to the user.
67
68
    :param results:        A list of results.
69
    :param file_dict:      A dictionary containing the name of files and its
70
                           contents.
71
    :param retval:         It is True if no results were yielded ever before.
72
                           If it is False this function will return False no
73
                           matter what happens. Else it depends on if this
74
                           invocation yields results.
75
    :param print_results:  A function that prints all given results appropriate
76
                           to the output medium.
77
    :param file_diff_dict: A dictionary that contains filenames as keys and
78
                           diff objects as values.
79
    :param ignore_ranges:  A list of SourceRanges. Results that affect code in
80
                           any of those ranges will be ignored.
81
    :return:               Returns False if any results were yielded. Else
82
                           True.
83
    """
84
    min_severity_str = str(section.get('min_severity', 'INFO')).upper()
85
    min_severity = RESULT_SEVERITY.str_dict.get(min_severity_str, 'INFO')
86
    results = list(filter(lambda result:
87
                              type(result) is Result and
88
                              result.severity >= min_severity and
89
                              not result.to_ignore(ignore_ranges),
90
                          results))
91
    print_results(log_printer, section, results, file_dict, file_diff_dict)
92
    return retval or len(results) > 0
93
94
95
def get_file_dict(filename_list, log_printer):
96
    """
97
    Reads all files into a dictionary.
98
99
    :param filename_list: List of names of paths to files to get contents of.
100
    :param log_printer:   The logger which logs errors.
101
    :return:              Reads the content of each file into a dictionary
102
                          with filenames as keys.
103
    """
104
    file_dict = {}
105
    for filename in filename_list:
106
        try:
107
            with open(filename, "r", encoding="utf-8") as _file:
108
                file_dict[filename] = _file.readlines()
109
        except UnicodeDecodeError:
110
            log_printer.warn("Failed to read file '{}'. It seems to contain "
111
                             "non-unicode characters. Leaving it "
112
                             "out.".format(filename))
113
        except Exception as exception:  # pragma: no cover
114
            log_printer.log_exception("Failed to read file '{}' because of "
115
                                      "an unknown error. Leaving it "
116
                                      "out.".format(filename),
117
                                      exception,
118
                                      log_level=LOG_LEVEL.WARNING)
119
120
    return file_dict
121
122
123
def instantiate_bears(section,
124
                      local_bear_list,
125
                      global_bear_list,
126
                      file_dict,
127
                      message_queue):
128
    """
129
    Instantiates each bear with the arguments it needs.
130
131
    :param section:          The section the bears belong to.
132
    :param local_bear_list:  List of local bears to instantiate.
133
    :param global_bear_list: List of global bears to instantiate.
134
    :param file_dict:        Dictionary containing filenames and their
135
                             contents.
136
    :param message_queue:    Queue responsible to maintain the messages
137
                             delivered by the bears.
138
    """
139
    invalid_bear_indices = []
140
    for i in range(len(local_bear_list)):
141
        try:
142
            local_bear_list[i] = local_bear_list[i](section,
143
                                                    message_queue,
144
                                                    timeout=0.1)
145
        except RuntimeError:
146
            # If requirements of a bear are not fulfilled this is not a reason
147
            # to stop instantiating bears.
148
            invalid_bear_indices.append(i)
149
150
    for i in reversed(invalid_bear_indices):
151
        del local_bear_list[i]
152
153
    invalid_bear_indices.clear()
154
    for i in range(len(global_bear_list)):
155
        try:
156
            global_bear_list[i] = global_bear_list[i](file_dict,
157
                                                      section,
158
                                                      message_queue,
159
                                                      timeout=0.1)
160
        except RuntimeError:
161
            invalid_bear_indices.append(i)
162
163
    for i in reversed(invalid_bear_indices):
164
        del global_bear_list[i]
165
166
167
def instantiate_processes(section,
168
                          local_bear_list,
169
                          global_bear_list,
170
                          job_count,
171
                          log_printer):
172
    """
173
    Instantiate the number of processes that will run bears which will be
174
    responsible for running bears in a multiprocessing environment.
175
176
    :param section:          The section the bears belong to.
177
    :param local_bear_list:  List of local bears belonging to the section.
178
    :param global_bear_list: List of global bears belonging to the section.
179
    :param job_count:        Max number of processes to create.
180
    :param log_printer:      The log printer to warn to.
181
    :return:                 A tuple containing a list of processes,
182
                             and the arguments passed to each process which are
183
                             the same for each object.
184
    """
185
    filename_list = collect_files(path_list(section.get('files', "")),
186
                                  path_list(section.get('ignore', "")))
187
    file_dict = get_file_dict(filename_list, log_printer)
188
189
    manager = multiprocessing.Manager()
190
    global_bear_queue = multiprocessing.Queue()
191
    filename_queue = multiprocessing.Queue()
192
    local_result_dict = manager.dict()
193
    global_result_dict = manager.dict()
194
    message_queue = multiprocessing.Queue()
195
    control_queue = multiprocessing.Queue()
196
197
    bear_runner_args = {"file_name_queue": filename_queue,
198
                        "local_bear_list": local_bear_list,
199
                        "global_bear_list": global_bear_list,
200
                        "global_bear_queue": global_bear_queue,
201
                        "file_dict": file_dict,
202
                        "local_result_dict": local_result_dict,
203
                        "global_result_dict": global_result_dict,
204
                        "message_queue": message_queue,
205
                        "control_queue": control_queue,
206
                        "timeout": 0.1}
207
208
    instantiate_bears(section,
209
                      local_bear_list,
210
                      global_bear_list,
211
                      file_dict,
212
                      message_queue)
213
    fill_queue(filename_queue, file_dict.keys())
214
    fill_queue(global_bear_queue, range(len(global_bear_list)))
215
216
    return ([multiprocessing.Process(target=run, kwargs=bear_runner_args)
217
             for i in range(job_count)],
218
            bear_runner_args)
219
220
221
def get_ignore_scope(line, keyword):
222
    """
223
    Retrieves the bears that are to be ignored defined in the given line.
224
    :param line:    The line containing the ignore declaration.
225
    :param keyword: The keyword that was found. Everything after the rightmost
226
                    occurrence of it will be considered for the scope.
227
    :return:        A list of lower cased bearnames or an empty list (-> "all")
228
    """
229
    toignore = line[line.rfind(keyword) + len(keyword):]
230
    if toignore.startswith("all"):
231
        return []
232
    else:
233
        return list(StringConverter(toignore, list_delimiters=', '))
234
235
236
def yield_ignore_ranges(file_dict):
237
    """
238
    Yields tuples of affected bears and a SourceRange that shall be ignored for
239
    those.
240
241
    :param file_dict: The file dictionary.
242
    """
243
    for filename, file in file_dict.items():
244
        start = None
245
        bears = []
246
        for line_number, line in enumerate(file, start=1):
247
            line = line.lower()
248
            if "start ignoring " in line:
249
                start = line_number
250
                bears = get_ignore_scope(line, "start ignoring ")
251
            elif "stop ignoring" in line:
252
                if start:
253
                    yield (bears,
254
                           SourceRange.from_values(filename,
255
                                                   start,
256
                                                   end_line=line_number))
257
            elif "ignore " in line:
258
                yield (get_ignore_scope(line, "ignore "),
259
                       SourceRange.from_values(filename,
260
                                               line_number,
261
                                               end_line=line_number+1))
262
263
264
def process_queues(processes,
265
                   control_queue,
266
                   local_result_dict,
267
                   global_result_dict,
268
                   file_dict,
269
                   print_results,
270
                   section,
271
                   log_printer):
272
    """
273
    Iterate the control queue and send the results recieved to the print_result
274
    method so that they can be presented to the user.
275
276
    :param processes:          List of processes which can be used to run
277
                               Bears.
278
    :param control_queue:      Containing control elements that indicate
279
                               whether there is a result available and which
280
                               bear it belongs to.
281
    :param local_result_dict:  Dictionary containing results respective to
282
                               local bears. It is modified by the processes
283
                               i.e. results are added to it by multiple
284
                               processes.
285
    :param global_result_dict: Dictionary containing results respective to
286
                               global bears. It is modified by the processes
287
                               i.e. results are added to it by multiple
288
                               processes.
289
    :param file_dict:          Dictionary containing file contents with
290
                               filename as keys.
291
    :param print_results:      Prints all given results appropriate to the
292
                               output medium.
293
    :return:                   Return True if all bears execute succesfully and
294
                               Results were delivered to the user. Else False.
295
    """
296
    file_diff_dict = {}
297
    running_processes = get_running_processes(processes)
298
    retval = False
299
    # Number of processes working on local bears
300
    local_processes = len(processes)
301
    global_result_buffer = []
302
    ignore_ranges = list(yield_ignore_ranges(file_dict))
303
304
    # One process is the logger thread
305
    while local_processes > 1 and running_processes > 1:
306
        try:
307
            control_elem, index = control_queue.get(timeout=0.1)
308
309
            if control_elem == CONTROL_ELEMENT.LOCAL_FINISHED:
310
                local_processes -= 1
311
            elif control_elem == CONTROL_ELEMENT.LOCAL:
312
                assert local_processes != 0
313
                retval = print_result(local_result_dict[index],
314
                                      file_dict,
315
                                      retval,
316
                                      print_results,
317
                                      section,
318
                                      log_printer,
319
                                      file_diff_dict,
320
                                      ignore_ranges)
321
            elif control_elem == CONTROL_ELEMENT.GLOBAL:
322
                global_result_buffer.append(index)
323
        except queue.Empty:
324
            running_processes = get_running_processes(processes)
325
326
    # Flush global result buffer
327
    for elem in global_result_buffer:
328
        retval = print_result(global_result_dict[elem],
329
                              file_dict,
330
                              retval,
331
                              print_results,
332
                              section,
333
                              log_printer,
334
                              file_diff_dict,
335
                              ignore_ranges)
336
337
    running_processes = get_running_processes(processes)
338
    # One process is the logger thread
339
    while running_processes > 1:
340
        try:
341
            control_elem, index = control_queue.get(timeout=0.1)
342
343
            if control_elem == CONTROL_ELEMENT.GLOBAL:
344
                retval = print_result(global_result_dict[index],
345
                                      file_dict,
346
                                      retval,
347
                                      print_results,
348
                                      section,
349
                                      log_printer,
350
                                      file_diff_dict,
351
                                      ignore_ranges)
352
            else:
353
                assert control_elem == CONTROL_ELEMENT.GLOBAL_FINISHED
354
                running_processes = get_running_processes(processes)
355
356
        except queue.Empty:
357
            running_processes = get_running_processes(processes)
358
359
    return retval
360
361
362
def execute_section(section,
363
                    global_bear_list,
364
                    local_bear_list,
365
                    print_results,
366
                    log_printer):
367
    """
368
    Executes the section with the given bears.
369
370
    The execute_section method does the following things:
371
    1. Prepare a Process
372
      * Load files
373
      * Create queues
374
    2. Spawn up one or more Processes
375
    3. Output results from the Processes
376
    4. Join all processes
377
378
    :param section:          The section to execute.
379
    :param global_bear_list: List of global bears belonging to the section.
380
    :param local_bear_list:  List of local bears belonging to the section.
381
    :param print_results:    Prints all given results appropriate to the
382
                             output medium.
383
    :param log_printer:      The log_printer to warn to.
384
    :return:                 Tuple containing a bool (True if results were
385
                             yielded, False otherwise), a Manager.dict
386
                             containing all local results(filenames are key)
387
                             and a Manager.dict containing all global bear
388
                             results (bear names are key) as well as the
389
                             file dictionary.
390
    """
391
    local_bear_list = Dependencies.resolve(local_bear_list)
392
    global_bear_list = Dependencies.resolve(global_bear_list)
393
394
    try:
395
        running_processes = int(section['jobs'])
396
    except ValueError:
397
        log_printer.warn("Unable to convert setting 'jobs' into a number. "
398
                         "Falling back to CPU count.")
399
        running_processes = get_cpu_count()
400
    except IndexError:
401
        running_processes = get_cpu_count()
402
403
    processes, arg_dict = instantiate_processes(section,
404
                                                local_bear_list,
405
                                                global_bear_list,
406
                                                running_processes,
407
                                                log_printer)
408
409
    logger_thread = LogPrinterThread(arg_dict["message_queue"],
410
                                     log_printer)
411
    # Start and join the logger thread along with the processes to run bears
412
    processes.append(logger_thread)
413
414
    for runner in processes:
415
        runner.start()
416
417
    try:
418
        return (process_queues(processes,
419
                               arg_dict["control_queue"],
420
                               arg_dict["local_result_dict"],
421
                               arg_dict["global_result_dict"],
422
                               arg_dict["file_dict"],
423
                               print_results,
424
                               section,
425
                               log_printer),
426
                arg_dict["local_result_dict"],
427
                arg_dict["global_result_dict"],
428
                arg_dict["file_dict"])
429
    finally:
430
        logger_thread.running = False
431
432
        for runner in processes:
433
            runner.join()
434