Failed Conditions
Pull Request — master (#1451)
by Abdeali
01:50
created

coalib/processes/Processing.py (1 issue)

1
import multiprocessing
2
import queue
3
import os
4
import platform
5
import subprocess
6
7
from coalib.collecting.Collectors import collect_files
8
from coalib.collecting import Dependencies
9
from coalib.misc.StringConverter import StringConverter
10
from coalib.output.printers import LOG_LEVEL
11
from coalib.processes.BearRunning import run
12
from coalib.processes.CONTROL_ELEMENT import CONTROL_ELEMENT
13
from coalib.results.Result import Result
14
from coalib.results.RESULT_SEVERITY import RESULT_SEVERITY
15
from coalib.results.SourceRange import SourceRange
16
from coalib.settings.Setting import path_list
17
from coalib.processes.LogPrinterThread import LogPrinterThread
18
from coalib.results.result_actions.ApplyPatchAction import ApplyPatchAction
19
from coalib.results.result_actions.PrintDebugMessageAction import (
20
    PrintDebugMessageAction)
21
from coalib.results.result_actions.ShowPatchAction import ShowPatchAction
22
23
24
ACTIONS = [ApplyPatchAction,
25
           PrintDebugMessageAction,
26
           ShowPatchAction]
27
28
29
def get_cpu_count():
30
    try:
31
        return multiprocessing.cpu_count()
32
    # cpu_count is not implemented for some CPU architectures/OSes
33
    except NotImplementedError:  # pragma: no cover
34
        return 2
35
36
37
def fill_queue(queue_fill, any_list):
38
    """
39
    Takes element from a list and populates a queue with those elements.
40
41
    :param queue_fill: The queue to be filled.
42
    :param any_list:   List containing the elements.
43
    """
44
    for elem in any_list:
45
        queue_fill.put(elem)
46
47
48
def get_running_processes(processes):
49
    return sum((1 if process.is_alive() else 0) for process in processes)
50
51
52
def create_process_group(command_array, **kwargs):
53
    if platform.system() == "Windows":  # pragma: no cover
54
        proc = subprocess.Popen(
55
            command_array,
56
            creationflags=subprocess.CREATE_NEW_PROCESS_GROUP,
57
            **kwargs)
58
    else:
59
        proc = subprocess.Popen(command_array,
60
                                preexec_fn=os.setsid,
61
                                **kwargs)
62
    return proc
63
64
65
def get_default_actions(section):
66
    """
67
    Parses the key `default_actions` in the given section.
68
69
    :param section:    The section where to parse from.
70
    :return:           A dict with the bearname as keys and their default
71
                       actions as values and another dict that contains bears
72
                       and invalid action names.
73
    """
74
    try:
75
        default_actions = dict(section["default_actions"])
76
    except IndexError:
77
        return {}, {}
78
79
    action_dict = {action.get_metadata().name: action for action in ACTIONS}
80
    invalid_action_set = default_actions.values() - action_dict.keys()
81
    invalid_actions = {}
82
    if len(invalid_action_set) != 0:
83
        invalid_actions = {
84
            bear: action
85
            for bear, action in default_actions.items()
86
            if action in invalid_action_set}
87
        for invalid in invalid_actions.keys():
88
            del default_actions[invalid]
89
90
    actions = {bearname: action_dict[action_name]
91
               for bearname, action_name in default_actions.items()}
92
    return actions, invalid_actions
93
94
95
def autoapply_actions(results,
96
                      file_dict,
97
                      file_diff_dict,
98
                      section,
99
                      log_printer):
100
    """
101
    Auto-applies actions like defined in the given section.
102
103
    :param results:        A list of results.
104
    :param file_dict:      A dictionary containing the name of files and its
105
                           contents.
106
    :param file_diff_dict: A dictionary that contains filenames as keys and
107
                           diff objects as values.
108
    :param section:        The section.
109
    :param log_printer:    A log printer instance to log messages on.
110
    :return:               A list of unprocessed results.
111
    """
112
113
    default_actions, invalid_actions = get_default_actions(section)
114
115
    for bearname, actionname in invalid_actions.items():
116
        log_printer.warn("Selected default action {} for bear {} does "
117
                         "not exist. Ignoring action.".format(
118
                             repr(actionname),
119
                             repr(bearname)))
120
121
    if len(default_actions) == 0:
122
        # There's nothing to auto-apply.
123
        return results
124
125
    not_processed_results = []
126
    for result in results:
127
        try:
128
            action = default_actions[result.origin]
129
        except KeyError:
130
            not_processed_results.append(result)
131
            continue
132
133
        if not action.is_applicable(result, file_dict, file_diff_dict):
134
            log_printer.warn("Selected default action {} for bear {} is not "
135
                             "applicable. Action not applied.".format(
136
                                 repr(action.get_metadata().name),
137
                                 repr(result.origin)))
138
            not_processed_results.append(result)
139
            continue
140
141
        try:
142
            action().apply_from_section(result,
143
                                        file_dict,
144
                                        file_diff_dict,
145
                                        section)
146
            log_printer.info("Applied {} on {} from {}.".format(
147
                repr(action.get_metadata().name),
148
                result.location_repr(),
149
                repr(result.origin)))
150
        except Exception as ex:
151
            not_processed_results.append(result)
152
            log_printer.log_exception(
153
                "Failed to execute action {} with error: {}.".format(
154
                    repr(action.get_metadata().name),
155
                    ex),
156
                ex)
157
            log_printer.debug("-> for result " + repr(result) + ".")
158
159
    return not_processed_results
160
161
162
def print_result(results,
163
                 file_dict,
164
                 retval,
165
                 print_results,
166
                 section,
167
                 log_printer,
168
                 file_diff_dict,
169
                 ignore_ranges):
170
    """
171
    Takes the results produced by each bear and gives them to the print_results
172
    method to present to the user.
173
174
    :param results:        A list of results.
175
    :param file_dict:      A dictionary containing the name of files and its
176
                           contents.
177
    :param retval:         It is True if no results were yielded ever before.
178
                           If it is False this function will return False no
179
                           matter what happens. Else it depends on if this
180
                           invocation yields results.
181
    :param print_results:  A function that prints all given results appropriate
182
                           to the output medium.
183
    :param file_diff_dict: A dictionary that contains filenames as keys and
184
                           diff objects as values.
185
    :param ignore_ranges:  A list of SourceRanges. Results that affect code in
186
                           any of those ranges will be ignored.
187
    :return:               Returns False if any results were yielded. Else
188
                           True.
189
    """
190
    min_severity_str = str(section.get('min_severity', 'INFO')).upper()
191
    min_severity = RESULT_SEVERITY.str_dict.get(min_severity_str, 'INFO')
192
    results = list(filter(lambda result:
193
                          type(result) is Result and
194
                          result.severity >= min_severity and
195
                          not result.to_ignore(ignore_ranges),
196
                          results))
197
198
199
    if bool(section.get('autoapply', 'true')):
200
        patched_results = autoapply_actions(results,
201
                                            file_dict,
202
                                            file_diff_dict,
203
                                            section,
204
                                            log_printer)
205
    else:
206
        patched_results  = results
0 ignored issues
show
Exactly one space required before assignment
patched_results = results
^
Loading history...
207
208
    print_results(log_printer,
209
                  section,
210
                  patched_results,
211
                  file_dict,
212
                  file_diff_dict)
213
    return retval or len(results) > 0, patched_results
214
215
216
def get_file_dict(filename_list, log_printer):
217
    """
218
    Reads all files into a dictionary.
219
220
    :param filename_list: List of names of paths to files to get contents of.
221
    :param log_printer:   The logger which logs errors.
222
    :return:              Reads the content of each file into a dictionary
223
                          with filenames as keys.
224
    """
225
    file_dict = {}
226
    for filename in filename_list:
227
        try:
228
            with open(filename, "r", encoding="utf-8") as _file:
229
                file_dict[filename] = tuple(_file.readlines())
230
        except UnicodeDecodeError:
231
            log_printer.warn("Failed to read file '{}'. It seems to contain "
232
                             "non-unicode characters. Leaving it "
233
                             "out.".format(filename))
234
        except Exception as exception:  # pragma: no cover
235
            log_printer.log_exception("Failed to read file '{}' because of "
236
                                      "an unknown error. Leaving it "
237
                                      "out.".format(filename),
238
                                      exception,
239
                                      log_level=LOG_LEVEL.WARNING)
240
241
    return file_dict
242
243
244
def filter_raising_callables(it, exception, *args, **kwargs):
245
    """
246
    Filters all callable items inside the given iterator that raise the
247
    given exceptions.
248
249
    :param it:        The iterator to filter.
250
    :param exception: The (tuple of) exception(s) to filter for.
251
    :param args:      Positional arguments to pass to the callable.
252
    :param kwargs:    Keyword arguments to pass to the callable.
253
    """
254
    for elem in it:
255
        try:
256
            yield elem(*args, **kwargs)
257
        except exception:
258
            pass
259
260
261
def instantiate_bears(section,
262
                      local_bear_list,
263
                      global_bear_list,
264
                      file_dict,
265
                      message_queue):
266
    """
267
    Instantiates each bear with the arguments it needs.
268
269
    :param section:          The section the bears belong to.
270
    :param local_bear_list:  List of local bear classes to instantiate.
271
    :param global_bear_list: List of global bear classes to instantiate.
272
    :param file_dict:        Dictionary containing filenames and their
273
                             contents.
274
    :param message_queue:    Queue responsible to maintain the messages
275
                             delivered by the bears.
276
    :return:                 The local and global bear instance lists.
277
    """
278
    local_bear_list = [bear
279
                       for bear in filter_raising_callables(
280
                           local_bear_list,
281
                           RuntimeError,
282
                           section,
283
                           message_queue,
284
                           timeout=0.1)]
285
286
    global_bear_list = [bear
287
                        for bear in filter_raising_callables(
288
                            global_bear_list,
289
                            RuntimeError,
290
                            file_dict,
291
                            section,
292
                            message_queue,
293
                            timeout=0.1)]
294
295
    return local_bear_list, global_bear_list
296
297
298
def instantiate_processes(section,
299
                          local_bear_list,
300
                          global_bear_list,
301
                          job_count,
302
                          log_printer):
303
    """
304
    Instantiate the number of processes that will run bears which will be
305
    responsible for running bears in a multiprocessing environment.
306
307
    :param section:          The section the bears belong to.
308
    :param local_bear_list:  List of local bears belonging to the section.
309
    :param global_bear_list: List of global bears belonging to the section.
310
    :param job_count:        Max number of processes to create.
311
    :param log_printer:      The log printer to warn to.
312
    :return:                 A tuple containing a list of processes,
313
                             and the arguments passed to each process which are
314
                             the same for each object.
315
    """
316
    filename_list = collect_files(
317
        path_list(section.get('files', "")),
318
        ignored_file_paths=path_list(section.get('ignore', "")),
319
        limit_file_paths=path_list(section.get('limit_files', "")))
320
    file_dict = get_file_dict(filename_list, log_printer)
321
322
    manager = multiprocessing.Manager()
323
    global_bear_queue = multiprocessing.Queue()
324
    filename_queue = multiprocessing.Queue()
325
    local_result_dict = manager.dict()
326
    global_result_dict = manager.dict()
327
    message_queue = multiprocessing.Queue()
328
    control_queue = multiprocessing.Queue()
329
330
    bear_runner_args = {"file_name_queue": filename_queue,
331
                        "local_bear_list": local_bear_list,
332
                        "global_bear_list": global_bear_list,
333
                        "global_bear_queue": global_bear_queue,
334
                        "file_dict": file_dict,
335
                        "local_result_dict": local_result_dict,
336
                        "global_result_dict": global_result_dict,
337
                        "message_queue": message_queue,
338
                        "control_queue": control_queue,
339
                        "timeout": 0.1}
340
341
    local_bear_list[:], global_bear_list[:] = instantiate_bears(
342
        section,
343
        local_bear_list,
344
        global_bear_list,
345
        file_dict,
346
        message_queue)
347
348
    fill_queue(filename_queue, file_dict.keys())
349
    fill_queue(global_bear_queue, range(len(global_bear_list)))
350
351
    return ([multiprocessing.Process(target=run, kwargs=bear_runner_args)
352
             for i in range(job_count)],
353
            bear_runner_args)
354
355
356
def get_ignore_scope(line, keyword):
357
    """
358
    Retrieves the bears that are to be ignored defined in the given line.
359
    :param line:    The line containing the ignore declaration.
360
    :param keyword: The keyword that was found. Everything after the rightmost
361
                    occurrence of it will be considered for the scope.
362
    :return:        A list of lower cased bearnames or an empty list (-> "all")
363
    """
364
    toignore = line[line.rfind(keyword) + len(keyword):]
365
    if toignore.startswith("all"):
366
        return []
367
    else:
368
        return list(StringConverter(toignore, list_delimiters=', '))
369
370
371
def yield_ignore_ranges(file_dict):
372
    """
373
    Yields tuples of affected bears and a SourceRange that shall be ignored for
374
    those.
375
376
    :param file_dict: The file dictionary.
377
    """
378
    for filename, file in file_dict.items():
379
        start = None
380
        bears = []
381
        for line_number, line in enumerate(file, start=1):
382
            line = line.lower()
383
            if "start ignoring " in line:
384
                start = line_number
385
                bears = get_ignore_scope(line, "start ignoring ")
386
            elif "stop ignoring" in line:
387
                if start:
388
                    yield (bears,
389
                           SourceRange.from_values(filename,
390
                                                   start,
391
                                                   end_line=line_number))
392
            elif "ignore " in line:
393
                yield (get_ignore_scope(line, "ignore "),
394
                       SourceRange.from_values(filename,
395
                                               line_number,
396
                                               end_line=line_number+1))
397
398
399
def process_queues(processes,
400
                   control_queue,
401
                   local_result_dict,
402
                   global_result_dict,
403
                   file_dict,
404
                   print_results,
405
                   section,
406
                   log_printer):
407
    """
408
    Iterate the control queue and send the results recieved to the print_result
409
    method so that they can be presented to the user.
410
411
    :param processes:          List of processes which can be used to run
412
                               Bears.
413
    :param control_queue:      Containing control elements that indicate
414
                               whether there is a result available and which
415
                               bear it belongs to.
416
    :param local_result_dict:  Dictionary containing results respective to
417
                               local bears. It is modified by the processes
418
                               i.e. results are added to it by multiple
419
                               processes.
420
    :param global_result_dict: Dictionary containing results respective to
421
                               global bears. It is modified by the processes
422
                               i.e. results are added to it by multiple
423
                               processes.
424
    :param file_dict:          Dictionary containing file contents with
425
                               filename as keys.
426
    :param print_results:      Prints all given results appropriate to the
427
                               output medium.
428
    :return:                   Return True if all bears execute succesfully and
429
                               Results were delivered to the user. Else False.
430
    """
431
    file_diff_dict = {}
432
    retval = False
433
    # Number of processes working on local/global bears. They are count down
434
    # when the last queue element of that process is processed which may be
435
    # *after* the process has ended!
436
    local_processes = len(processes)
437
    global_processes = len(processes)
438
    global_result_buffer = []
439
    ignore_ranges = list(yield_ignore_ranges(file_dict))
440
441
    # One process is the logger thread
442
    while local_processes > 1:
443
        try:
444
            control_elem, index = control_queue.get(timeout=0.1)
445
446
            if control_elem == CONTROL_ELEMENT.LOCAL_FINISHED:
447
                local_processes -= 1
448
            elif control_elem == CONTROL_ELEMENT.GLOBAL_FINISHED:
449
                global_processes -= 1
450
            elif control_elem == CONTROL_ELEMENT.LOCAL:
451
                assert local_processes != 0
452
                retval, res = print_result(local_result_dict[index],
453
                                           file_dict,
454
                                           retval,
455
                                           print_results,
456
                                           section,
457
                                           log_printer,
458
                                           file_diff_dict,
459
                                           ignore_ranges)
460
                local_result_dict[index] = res
461
            else:
462
                assert control_elem == CONTROL_ELEMENT.GLOBAL
463
                global_result_buffer.append(index)
464
        except queue.Empty:
465
            if get_running_processes(processes) < 2:  # pragma: no cover
466
                # Recover silently, those branches are only
467
                # nondeterministically covered.
468
                break
469
470
    # Flush global result buffer
471
    for elem in global_result_buffer:
472
        retval, res = print_result(global_result_dict[elem],
473
                                   file_dict,
474
                                   retval,
475
                                   print_results,
476
                                   section,
477
                                   log_printer,
478
                                   file_diff_dict,
479
                                   ignore_ranges)
480
        global_result_dict[elem] = res
481
482
    # One process is the logger thread
483
    while global_processes > 1:
484
        try:
485
            control_elem, index = control_queue.get(timeout=0.1)
486
487
            if control_elem == CONTROL_ELEMENT.GLOBAL:
488
                retval, res = print_result(global_result_dict[index],
489
                                           file_dict,
490
                                           retval,
491
                                           print_results,
492
                                           section,
493
                                           log_printer,
494
                                           file_diff_dict,
495
                                           ignore_ranges)
496
                global_result_dict[index] = res
497
            else:
498
                assert control_elem == CONTROL_ELEMENT.GLOBAL_FINISHED
499
                global_processes -= 1
500
        except queue.Empty:
501
            if get_running_processes(processes) < 2:  # pragma: no cover
502
                # Recover silently, those branches are only
503
                # nondeterministically covered.
504
                break
505
506
    return retval
507
508
509
def execute_section(section,
510
                    global_bear_list,
511
                    local_bear_list,
512
                    print_results,
513
                    log_printer):
514
    """
515
    Executes the section with the given bears.
516
517
    The execute_section method does the following things:
518
    1. Prepare a Process
519
      * Load files
520
      * Create queues
521
    2. Spawn up one or more Processes
522
    3. Output results from the Processes
523
    4. Join all processes
524
525
    :param section:          The section to execute.
526
    :param global_bear_list: List of global bears belonging to the section.
527
    :param local_bear_list:  List of local bears belonging to the section.
528
    :param print_results:    Prints all given results appropriate to the
529
                             output medium.
530
    :param log_printer:      The log_printer to warn to.
531
    :return:                 Tuple containing a bool (True if results were
532
                             yielded, False otherwise), a Manager.dict
533
                             containing all local results(filenames are key)
534
                             and a Manager.dict containing all global bear
535
                             results (bear names are key) as well as the
536
                             file dictionary.
537
    """
538
    local_bear_list = Dependencies.resolve(local_bear_list)
539
    global_bear_list = Dependencies.resolve(global_bear_list)
540
541
    try:
542
        running_processes = int(section['jobs'])
543
    except ValueError:
544
        log_printer.warn("Unable to convert setting 'jobs' into a number. "
545
                         "Falling back to CPU count.")
546
        running_processes = get_cpu_count()
547
    except IndexError:
548
        running_processes = get_cpu_count()
549
550
    processes, arg_dict = instantiate_processes(section,
551
                                                local_bear_list,
552
                                                global_bear_list,
553
                                                running_processes,
554
                                                log_printer)
555
556
    logger_thread = LogPrinterThread(arg_dict["message_queue"],
557
                                     log_printer)
558
    # Start and join the logger thread along with the processes to run bears
559
    processes.append(logger_thread)
560
561
    for runner in processes:
562
        runner.start()
563
564
    try:
565
        return (process_queues(processes,
566
                               arg_dict["control_queue"],
567
                               arg_dict["local_result_dict"],
568
                               arg_dict["global_result_dict"],
569
                               arg_dict["file_dict"],
570
                               print_results,
571
                               section,
572
                               log_printer),
573
                arg_dict["local_result_dict"],
574
                arg_dict["global_result_dict"],
575
                arg_dict["file_dict"])
576
    finally:
577
        logger_thread.running = False
578
579
        for runner in processes:
580
            runner.join()
581