Failed Conditions
Pull Request — master (#1451)
by Abdeali
01:28
created

coalib/processes/Processing.py (1 issue)

1
import multiprocessing
2
import queue
3
import os
4
import platform
5
import subprocess
6
7
from coalib.collecting.Collectors import collect_files
8
from coalib.collecting import Dependencies
9
from coalib.misc.StringConverter import StringConverter
10
from coalib.output.printers import LOG_LEVEL
11
from coalib.processes.BearRunning import run
12
from coalib.processes.CONTROL_ELEMENT import CONTROL_ELEMENT
13
from coalib.results.Result import Result
14
from coalib.results.RESULT_SEVERITY import RESULT_SEVERITY
15
from coalib.results.SourceRange import SourceRange
16
from coalib.settings.Setting import path_list
17
from coalib.processes.LogPrinterThread import LogPrinterThread
18
from coalib.results.result_actions.ApplyPatchAction import ApplyPatchAction
19
from coalib.results.result_actions.PrintDebugMessageAction import (
20
    PrintDebugMessageAction)
21
from coalib.results.result_actions.ShowPatchAction import ShowPatchAction
22
23
24
ACTIONS = [ApplyPatchAction,
25
           PrintDebugMessageAction,
26
           ShowPatchAction]
27
28
29
def get_cpu_count():
30
    try:
31
        return multiprocessing.cpu_count()
32
    # cpu_count is not implemented for some CPU architectures/OSes
33
    except NotImplementedError:  # pragma: no cover
34
        return 2
35
36
37
def fill_queue(queue_fill, any_list):
38
    """
39
    Takes element from a list and populates a queue with those elements.
40
41
    :param queue_fill: The queue to be filled.
42
    :param any_list:   List containing the elements.
43
    """
44
    for elem in any_list:
45
        queue_fill.put(elem)
46
47
48
def get_running_processes(processes):
49
    return sum((1 if process.is_alive() else 0) for process in processes)
50
51
52
def create_process_group(command_array, **kwargs):
53
    if platform.system() == "Windows":  # pragma: no cover
54
        proc = subprocess.Popen(
55
            command_array,
56
            creationflags=subprocess.CREATE_NEW_PROCESS_GROUP,
57
            **kwargs)
58
    else:
59
        proc = subprocess.Popen(command_array,
60
                                preexec_fn=os.setsid,
61
                                **kwargs)
62
    return proc
63
64
65
def get_default_actions(section):
66
    """
67
    Parses the key `default_actions` in the given section.
68
69
    :param section:    The section where to parse from.
70
    :return:           A dict with the bearname as keys and their default
71
                       actions as values and another dict that contains bears
72
                       and invalid action names.
73
    """
74
    try:
75
        default_actions = dict(section["default_actions"])
76
    except IndexError:
77
        return {}, {}
78
79
    action_dict = {action.get_metadata().name: action for action in ACTIONS}
80
    invalid_action_set = default_actions.values() - action_dict.keys()
81
    invalid_actions = {}
82
    if len(invalid_action_set) != 0:
83
        invalid_actions = {
84
            bear: action
85
            for bear, action in default_actions.items()
86
            if action in invalid_action_set}
87
        for invalid in invalid_actions.keys():
88
            del default_actions[invalid]
89
90
    actions = {bearname: action_dict[action_name]
91
               for bearname, action_name in default_actions.items()}
92
    return actions, invalid_actions
93
94
95
def autoapply_actions(results,
96
                      file_dict,
97
                      file_diff_dict,
98
                      section,
99
                      log_printer):
100
    """
101
    Auto-applies actions like defined in the given section.
102
103
    :param results:        A list of results.
104
    :param file_dict:      A dictionary containing the name of files and its
105
                           contents.
106
    :param file_diff_dict: A dictionary that contains filenames as keys and
107
                           diff objects as values.
108
    :param section:        The section.
109
    :param log_printer:    A log printer instance to log messages on.
110
    :return:               A list of unprocessed results.
111
    """
112
113
    default_actions, invalid_actions = get_default_actions(section)
114
115
    for bearname, actionname in invalid_actions.items():
116
        log_printer.warn("Selected default action {} for bear {} does "
117
                         "not exist. Ignoring action.".format(
118
                             repr(actionname),
119
                             repr(bearname)))
120
121
    if len(default_actions) == 0:
122
        # There's nothing to auto-apply.
123
        return results
124
125
    not_processed_results = []
126
    for result in results:
127
        try:
128
            action = default_actions[result.origin]
129
        except KeyError:
130
            not_processed_results.append(result)
131
            continue
132
133
        if not action.is_applicable(result, file_dict, file_diff_dict):
134
            log_printer.warn("Selected default action {} for bear {} is not "
135
                             "applicable. Action not applied.".format(
136
                                 repr(action.get_metadata().name),
137
                                 repr(result.origin)))
138
            not_processed_results.append(result)
139
            continue
140
141
        try:
142
            action().apply_from_section(result,
143
                                        file_dict,
144
                                        file_diff_dict,
145
                                        section)
146
            log_printer.info("Applied {} on {} from {}.".format(
147
                repr(action.get_metadata().name),
148
                result.location_repr(),
149
                repr(result.origin)))
150
        except Exception as ex:
151
            not_processed_results.append(result)
152
            log_printer.log_exception(
153
                "Failed to execute action {} with error: {}.".format(
154
                    repr(action.get_metadata().name),
155
                    ex),
156
                ex)
157
            log_printer.debug("-> for result " + repr(result) + ".")
158
159
    return not_processed_results
160
161
162
def print_result(results,
163
                 file_dict,
164
                 retval,
165
                 print_results,
166
                 section,
167
                 log_printer,
168
                 file_diff_dict,
169
                 ignore_ranges):
170
    """
171
    Takes the results produced by each bear and gives them to the print_results
172
    method to present to the user.
173
174
    :param results:        A list of results.
175
    :param file_dict:      A dictionary containing the name of files and its
176
                           contents.
177
    :param retval:         It is True if no results were yielded ever before.
178
                           If it is False this function will return False no
179
                           matter what happens. Else it depends on if this
180
                           invocation yields results.
181
    :param print_results:  A function that prints all given results appropriate
182
                           to the output medium.
183
    :param file_diff_dict: A dictionary that contains filenames as keys and
184
                           diff objects as values.
185
    :param ignore_ranges:  A list of SourceRanges. Results that affect code in
186
                           any of those ranges will be ignored.
187
    :return:               Returns False if any results were yielded. Else
188
                           True.
189
    """
190
    min_severity_str = str(section.get('min_severity', 'INFO')).upper()
191
    min_severity = RESULT_SEVERITY.str_dict.get(min_severity_str, 'INFO')
192
    results = list(filter(lambda result:
193
                          type(result) is Result and
194
                          result.severity >= min_severity and
195
                          not result.to_ignore(ignore_ranges),
196
                          results))
197
198
    if bool(section.get('autoapply', 'true')):
199
        patched_results = autoapply_actions(results,
200
                                            file_dict,
201
                                            file_diff_dict,
202
                                            section,
203
                                            log_printer)
204
    else:
205
        patched_results  = results
0 ignored issues
show
Exactly one space required before assignment
patched_results = results
^
Loading history...
206
207
    print_results(log_printer,
208
                  section,
209
                  patched_results,
210
                  file_dict,
211
                  file_diff_dict)
212
    return retval or len(results) > 0, patched_results
213
214
215
def get_file_dict(filename_list, log_printer):
216
    """
217
    Reads all files into a dictionary.
218
219
    :param filename_list: List of names of paths to files to get contents of.
220
    :param log_printer:   The logger which logs errors.
221
    :return:              Reads the content of each file into a dictionary
222
                          with filenames as keys.
223
    """
224
    file_dict = {}
225
    for filename in filename_list:
226
        try:
227
            with open(filename, "r", encoding="utf-8") as _file:
228
                file_dict[filename] = tuple(_file.readlines())
229
        except UnicodeDecodeError:
230
            log_printer.warn("Failed to read file '{}'. It seems to contain "
231
                             "non-unicode characters. Leaving it "
232
                             "out.".format(filename))
233
        except Exception as exception:  # pragma: no cover
234
            log_printer.log_exception("Failed to read file '{}' because of "
235
                                      "an unknown error. Leaving it "
236
                                      "out.".format(filename),
237
                                      exception,
238
                                      log_level=LOG_LEVEL.WARNING)
239
240
    return file_dict
241
242
243
def filter_raising_callables(it, exception, *args, **kwargs):
244
    """
245
    Filters all callable items inside the given iterator that raise the
246
    given exceptions.
247
248
    :param it:        The iterator to filter.
249
    :param exception: The (tuple of) exception(s) to filter for.
250
    :param args:      Positional arguments to pass to the callable.
251
    :param kwargs:    Keyword arguments to pass to the callable.
252
    """
253
    for elem in it:
254
        try:
255
            yield elem(*args, **kwargs)
256
        except exception:
257
            pass
258
259
260
def instantiate_bears(section,
261
                      local_bear_list,
262
                      global_bear_list,
263
                      file_dict,
264
                      message_queue):
265
    """
266
    Instantiates each bear with the arguments it needs.
267
268
    :param section:          The section the bears belong to.
269
    :param local_bear_list:  List of local bear classes to instantiate.
270
    :param global_bear_list: List of global bear classes to instantiate.
271
    :param file_dict:        Dictionary containing filenames and their
272
                             contents.
273
    :param message_queue:    Queue responsible to maintain the messages
274
                             delivered by the bears.
275
    :return:                 The local and global bear instance lists.
276
    """
277
    local_bear_list = [bear
278
                       for bear in filter_raising_callables(
279
                           local_bear_list,
280
                           RuntimeError,
281
                           section,
282
                           message_queue,
283
                           timeout=0.1)]
284
285
    global_bear_list = [bear
286
                        for bear in filter_raising_callables(
287
                            global_bear_list,
288
                            RuntimeError,
289
                            file_dict,
290
                            section,
291
                            message_queue,
292
                            timeout=0.1)]
293
294
    return local_bear_list, global_bear_list
295
296
297
def instantiate_processes(section,
298
                          local_bear_list,
299
                          global_bear_list,
300
                          job_count,
301
                          log_printer):
302
    """
303
    Instantiate the number of processes that will run bears which will be
304
    responsible for running bears in a multiprocessing environment.
305
306
    :param section:          The section the bears belong to.
307
    :param local_bear_list:  List of local bears belonging to the section.
308
    :param global_bear_list: List of global bears belonging to the section.
309
    :param job_count:        Max number of processes to create.
310
    :param log_printer:      The log printer to warn to.
311
    :return:                 A tuple containing a list of processes,
312
                             and the arguments passed to each process which are
313
                             the same for each object.
314
    """
315
    filename_list = collect_files(
316
        path_list(section.get('files', "")),
317
        ignored_file_paths=path_list(section.get('ignore', "")),
318
        limit_file_paths=path_list(section.get('limit_files', "")))
319
    file_dict = get_file_dict(filename_list, log_printer)
320
321
    manager = multiprocessing.Manager()
322
    global_bear_queue = multiprocessing.Queue()
323
    filename_queue = multiprocessing.Queue()
324
    local_result_dict = manager.dict()
325
    global_result_dict = manager.dict()
326
    message_queue = multiprocessing.Queue()
327
    control_queue = multiprocessing.Queue()
328
329
    bear_runner_args = {"file_name_queue": filename_queue,
330
                        "local_bear_list": local_bear_list,
331
                        "global_bear_list": global_bear_list,
332
                        "global_bear_queue": global_bear_queue,
333
                        "file_dict": file_dict,
334
                        "local_result_dict": local_result_dict,
335
                        "global_result_dict": global_result_dict,
336
                        "message_queue": message_queue,
337
                        "control_queue": control_queue,
338
                        "timeout": 0.1}
339
340
    local_bear_list[:], global_bear_list[:] = instantiate_bears(
341
        section,
342
        local_bear_list,
343
        global_bear_list,
344
        file_dict,
345
        message_queue)
346
347
    fill_queue(filename_queue, file_dict.keys())
348
    fill_queue(global_bear_queue, range(len(global_bear_list)))
349
350
    return ([multiprocessing.Process(target=run, kwargs=bear_runner_args)
351
             for i in range(job_count)],
352
            bear_runner_args)
353
354
355
def get_ignore_scope(line, keyword):
356
    """
357
    Retrieves the bears that are to be ignored defined in the given line.
358
    :param line:    The line containing the ignore declaration.
359
    :param keyword: The keyword that was found. Everything after the rightmost
360
                    occurrence of it will be considered for the scope.
361
    :return:        A list of lower cased bearnames or an empty list (-> "all")
362
    """
363
    toignore = line[line.rfind(keyword) + len(keyword):]
364
    if toignore.startswith("all"):
365
        return []
366
    else:
367
        return list(StringConverter(toignore, list_delimiters=', '))
368
369
370
def yield_ignore_ranges(file_dict):
371
    """
372
    Yields tuples of affected bears and a SourceRange that shall be ignored for
373
    those.
374
375
    :param file_dict: The file dictionary.
376
    """
377
    for filename, file in file_dict.items():
378
        start = None
379
        bears = []
380
        for line_number, line in enumerate(file, start=1):
381
            line = line.lower()
382
            if "start ignoring " in line:
383
                start = line_number
384
                bears = get_ignore_scope(line, "start ignoring ")
385
            elif "stop ignoring" in line:
386
                if start:
387
                    yield (bears,
388
                           SourceRange.from_values(filename,
389
                                                   start,
390
                                                   end_line=line_number))
391
            elif "ignore " in line:
392
                yield (get_ignore_scope(line, "ignore "),
393
                       SourceRange.from_values(filename,
394
                                               line_number,
395
                                               end_line=line_number+1))
396
397
398
def process_queues(processes,
399
                   control_queue,
400
                   local_result_dict,
401
                   global_result_dict,
402
                   file_dict,
403
                   print_results,
404
                   section,
405
                   log_printer):
406
    """
407
    Iterate the control queue and send the results recieved to the print_result
408
    method so that they can be presented to the user.
409
410
    :param processes:          List of processes which can be used to run
411
                               Bears.
412
    :param control_queue:      Containing control elements that indicate
413
                               whether there is a result available and which
414
                               bear it belongs to.
415
    :param local_result_dict:  Dictionary containing results respective to
416
                               local bears. It is modified by the processes
417
                               i.e. results are added to it by multiple
418
                               processes.
419
    :param global_result_dict: Dictionary containing results respective to
420
                               global bears. It is modified by the processes
421
                               i.e. results are added to it by multiple
422
                               processes.
423
    :param file_dict:          Dictionary containing file contents with
424
                               filename as keys.
425
    :param print_results:      Prints all given results appropriate to the
426
                               output medium.
427
    :return:                   Return True if all bears execute succesfully and
428
                               Results were delivered to the user. Else False.
429
    """
430
    file_diff_dict = {}
431
    retval = False
432
    # Number of processes working on local/global bears. They are count down
433
    # when the last queue element of that process is processed which may be
434
    # *after* the process has ended!
435
    local_processes = len(processes)
436
    global_processes = len(processes)
437
    global_result_buffer = []
438
    ignore_ranges = list(yield_ignore_ranges(file_dict))
439
440
    # One process is the logger thread
441
    while local_processes > 1:
442
        try:
443
            control_elem, index = control_queue.get(timeout=0.1)
444
445
            if control_elem == CONTROL_ELEMENT.LOCAL_FINISHED:
446
                local_processes -= 1
447
            elif control_elem == CONTROL_ELEMENT.GLOBAL_FINISHED:
448
                global_processes -= 1
449
            elif control_elem == CONTROL_ELEMENT.LOCAL:
450
                assert local_processes != 0
451
                retval, res = print_result(local_result_dict[index],
452
                                           file_dict,
453
                                           retval,
454
                                           print_results,
455
                                           section,
456
                                           log_printer,
457
                                           file_diff_dict,
458
                                           ignore_ranges)
459
                local_result_dict[index] = res
460
            else:
461
                assert control_elem == CONTROL_ELEMENT.GLOBAL
462
                global_result_buffer.append(index)
463
        except queue.Empty:
464
            if get_running_processes(processes) < 2:  # pragma: no cover
465
                # Recover silently, those branches are only
466
                # nondeterministically covered.
467
                break
468
469
    # Flush global result buffer
470
    for elem in global_result_buffer:
471
        retval, res = print_result(global_result_dict[elem],
472
                                   file_dict,
473
                                   retval,
474
                                   print_results,
475
                                   section,
476
                                   log_printer,
477
                                   file_diff_dict,
478
                                   ignore_ranges)
479
        global_result_dict[elem] = res
480
481
    # One process is the logger thread
482
    while global_processes > 1:
483
        try:
484
            control_elem, index = control_queue.get(timeout=0.1)
485
486
            if control_elem == CONTROL_ELEMENT.GLOBAL:
487
                retval, res = print_result(global_result_dict[index],
488
                                           file_dict,
489
                                           retval,
490
                                           print_results,
491
                                           section,
492
                                           log_printer,
493
                                           file_diff_dict,
494
                                           ignore_ranges)
495
                global_result_dict[index] = res
496
            else:
497
                assert control_elem == CONTROL_ELEMENT.GLOBAL_FINISHED
498
                global_processes -= 1
499
        except queue.Empty:
500
            if get_running_processes(processes) < 2:  # pragma: no cover
501
                # Recover silently, those branches are only
502
                # nondeterministically covered.
503
                break
504
505
    return retval
506
507
508
def execute_section(section,
509
                    global_bear_list,
510
                    local_bear_list,
511
                    print_results,
512
                    log_printer):
513
    """
514
    Executes the section with the given bears.
515
516
    The execute_section method does the following things:
517
    1. Prepare a Process
518
      * Load files
519
      * Create queues
520
    2. Spawn up one or more Processes
521
    3. Output results from the Processes
522
    4. Join all processes
523
524
    :param section:          The section to execute.
525
    :param global_bear_list: List of global bears belonging to the section.
526
    :param local_bear_list:  List of local bears belonging to the section.
527
    :param print_results:    Prints all given results appropriate to the
528
                             output medium.
529
    :param log_printer:      The log_printer to warn to.
530
    :return:                 Tuple containing a bool (True if results were
531
                             yielded, False otherwise), a Manager.dict
532
                             containing all local results(filenames are key)
533
                             and a Manager.dict containing all global bear
534
                             results (bear names are key) as well as the
535
                             file dictionary.
536
    """
537
    local_bear_list = Dependencies.resolve(local_bear_list)
538
    global_bear_list = Dependencies.resolve(global_bear_list)
539
540
    try:
541
        running_processes = int(section['jobs'])
542
    except ValueError:
543
        log_printer.warn("Unable to convert setting 'jobs' into a number. "
544
                         "Falling back to CPU count.")
545
        running_processes = get_cpu_count()
546
    except IndexError:
547
        running_processes = get_cpu_count()
548
549
    processes, arg_dict = instantiate_processes(section,
550
                                                local_bear_list,
551
                                                global_bear_list,
552
                                                running_processes,
553
                                                log_printer)
554
555
    logger_thread = LogPrinterThread(arg_dict["message_queue"],
556
                                     log_printer)
557
    # Start and join the logger thread along with the processes to run bears
558
    processes.append(logger_thread)
559
560
    for runner in processes:
561
        runner.start()
562
563
    try:
564
        return (process_queues(processes,
565
                               arg_dict["control_queue"],
566
                               arg_dict["local_result_dict"],
567
                               arg_dict["global_result_dict"],
568
                               arg_dict["file_dict"],
569
                               print_results,
570
                               section,
571
                               log_printer),
572
                arg_dict["local_result_dict"],
573
                arg_dict["global_result_dict"],
574
                arg_dict["file_dict"])
575
    finally:
576
        logger_thread.running = False
577
578
        for runner in processes:
579
            runner.join()
580