Failed Conditions
Pull Request — master (#1990)
by Mischa
01:34
created

coalib.processes.process_queues()   F

Complexity

Conditions 15

Size

Total Lines 108

Duplication

Lines 0
Ratio 0 %
Metric Value
cc 15
dl 0
loc 108
rs 2

How to fix   Long Method    Complexity   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

Complexity

Complex classes like coalib.processes.process_queues() often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
import multiprocessing
2
import os
3
import platform
4
import queue
5
import subprocess
6
from itertools import chain
7
8
from coalib.collecting import Dependencies
9
from coalib.collecting.Collectors import collect_files
10
from coalib.misc.StringConverter import StringConverter
11
from coalib.output.printers.LOG_LEVEL import LOG_LEVEL
12
from coalib.processes.BearRunning import run
13
from coalib.processes.CONTROL_ELEMENT import CONTROL_ELEMENT
14
from coalib.processes.LogPrinterThread import LogPrinterThread
15
from coalib.results.Result import Result
16
from coalib.results.result_actions.ApplyPatchAction import ApplyPatchAction
17
from coalib.results.result_actions.PrintDebugMessageAction import (
18
    PrintDebugMessageAction)
19
from coalib.results.result_actions.ShowPatchAction import ShowPatchAction
20
from coalib.results.RESULT_SEVERITY import RESULT_SEVERITY
21
from coalib.results.SourceRange import SourceRange
22
from coalib.settings.Setting import glob_list
23
from coalib.parsing.Globbing import fnmatch
24
25
ACTIONS = [ApplyPatchAction,
26
           PrintDebugMessageAction,
27
           ShowPatchAction]
28
29
30
def get_cpu_count():
31
    try:
32
        return multiprocessing.cpu_count()
33
    # cpu_count is not implemented for some CPU architectures/OSes
34
    except NotImplementedError:  # pragma: no cover
35
        return 2
36
37
38
def fill_queue(queue_fill, any_list):
39
    """
40
    Takes element from a list and populates a queue with those elements.
41
42
    :param queue_fill: The queue to be filled.
43
    :param any_list:   List containing the elements.
44
    """
45
    for elem in any_list:
46
        queue_fill.put(elem)
47
48
49
def get_running_processes(processes):
50
    return sum((1 if process.is_alive() else 0) for process in processes)
51
52
53
def create_process_group(command_array, **kwargs):
54
    if platform.system() == "Windows":  # pragma: no cover
55
        proc = subprocess.Popen(
56
            command_array,
57
            creationflags=subprocess.CREATE_NEW_PROCESS_GROUP,
58
            **kwargs)
59
    else:
60
        proc = subprocess.Popen(command_array,
61
                                preexec_fn=os.setsid,
62
                                **kwargs)
63
    return proc
64
65
66
def get_default_actions(section):
67
    """
68
    Parses the key ``default_actions`` in the given section.
69
70
    :param section:    The section where to parse from.
71
    :return:           A dict with the bearname as keys and their default
72
                       actions as values and another dict that contains bears
73
                       and invalid action names.
74
    """
75
    try:
76
        default_actions = dict(section["default_actions"])
77
    except IndexError:
78
        return {}, {}
79
80
    action_dict = {action.get_metadata().name: action for action in ACTIONS}
81
    invalid_action_set = default_actions.values() - action_dict.keys()
82
    invalid_actions = {}
83
    if len(invalid_action_set) != 0:
84
        invalid_actions = {
85
            bear: action
86
            for bear, action in default_actions.items()
87
            if action in invalid_action_set}
88
        for invalid in invalid_actions.keys():
89
            del default_actions[invalid]
90
91
    actions = {bearname: action_dict[action_name]
92
               for bearname, action_name in default_actions.items()}
93
    return actions, invalid_actions
94
95
96
def autoapply_actions(results,
97
                      file_dict,
98
                      file_diff_dict,
99
                      section,
100
                      log_printer):
101
    """
102
    Auto-applies actions like defined in the given section.
103
104
    :param results:        A list of results.
105
    :param file_dict:      A dictionary containing the name of files and its
106
                           contents.
107
    :param file_diff_dict: A dictionary that contains filenames as keys and
108
                           diff objects as values.
109
    :param section:        The section.
110
    :param log_printer:    A log printer instance to log messages on.
111
    :return:               A list of unprocessed results.
112
    """
113
114
    default_actions, invalid_actions = get_default_actions(section)
115
116
    for bearname, actionname in invalid_actions.items():
117
        log_printer.warn("Selected default action {} for bear {} does "
118
                         "not exist. Ignoring action.".format(
119
                             repr(actionname),
120
                             repr(bearname)))
121
122
    if len(default_actions) == 0:
123
        # There's nothing to auto-apply.
124
        return results
125
126
    not_processed_results = []
127
    for result in results:
128
        try:
129
            action = default_actions[result.origin]
130
        except KeyError:
131
            not_processed_results.append(result)
132
            continue
133
134
        if not action.is_applicable(result, file_dict, file_diff_dict):
135
            log_printer.warn("Selected default action {} for bear {} is not "
136
                             "applicable. Action not applied.".format(
137
                                 repr(action.get_metadata().name),
138
                                 repr(result.origin)))
139
            not_processed_results.append(result)
140
            continue
141
142
        try:
143
            action().apply_from_section(result,
144
                                        file_dict,
145
                                        file_diff_dict,
146
                                        section)
147
            log_printer.info("Applied {} on {} from {}.".format(
148
                repr(action.get_metadata().name),
149
                result.location_repr(),
150
                repr(result.origin)))
151
        except Exception as ex:
152
            not_processed_results.append(result)
153
            log_printer.log_exception(
154
                "Failed to execute action {} with error: {}.".format(
155
                    repr(action.get_metadata().name),
156
                    ex),
157
                ex)
158
            log_printer.debug("-> for result " + repr(result) + ".")
159
160
    return not_processed_results
161
162
163
def check_result_ignore(result, ignore_ranges):
164
    """
165
    Determines if the result has to be ignored.
166
167
    :param result:        The result that needs to be checked.
168
    :param ignore_ranges: A list of tuples, each containing a list of lower
169
                          cased affected bearnames and a SourceRange to
170
                          ignore. If any of the bearname lists is empty, it
171
                          is considered an ignore range for all bears.
172
                          This may be a list of globbed bear wildcards.
173
    :return:              True if the result has to be ignored.
174
    """
175
    for bears, range in ignore_ranges:
176
        orig = result.origin.lower()
177
        if (result.overlaps(range) and
178
                (len(bears) == 0 or orig in bears or fnmatch(orig, bears))):
179
            return True
180
181
    return False
182
183
184
def print_result(results,
185
                 file_dict,
186
                 retval,
187
                 print_results,
188
                 section,
189
                 log_printer,
190
                 file_diff_dict,
191
                 ignore_ranges):
192
    """
193
    Takes the results produced by each bear and gives them to the print_results
194
    method to present to the user.
195
196
    :param results:        A list of results.
197
    :param file_dict:      A dictionary containing the name of files and its
198
                           contents.
199
    :param retval:         It is True if no results were yielded ever before.
200
                           If it is False this function will return False no
201
                           matter what happens. Else it depends on if this
202
                           invocation yields results.
203
    :param print_results:  A function that prints all given results appropriate
204
                           to the output medium.
205
    :param file_diff_dict: A dictionary that contains filenames as keys and
206
                           diff objects as values.
207
    :param ignore_ranges:  A list of SourceRanges. Results that affect code in
208
                           any of those ranges will be ignored.
209
    :return:               Returns False if any results were yielded. Else
210
                           True.
211
    """
212
    min_severity_str = str(section.get('min_severity', 'INFO')).upper()
213
    min_severity = RESULT_SEVERITY.str_dict.get(min_severity_str, 'INFO')
214
    results = list(filter(lambda result:
215
                          type(result) is Result and
216
                          result.severity >= min_severity and
217
                          not check_result_ignore(result, ignore_ranges),
218
                          results))
219
220
    if bool(section.get('autoapply', 'true')):
221
        patched_results = autoapply_actions(results,
222
                                            file_dict,
223
                                            file_diff_dict,
224
                                            section,
225
                                            log_printer)
226
    else:
227
        patched_results = results
228
229
    print_results(log_printer,
230
                  section,
231
                  patched_results,
232
                  file_dict,
233
                  file_diff_dict)
234
    return retval or len(results) > 0, patched_results
235
236
237
def get_file_dict(filename_list, log_printer):
238
    """
239
    Reads all files into a dictionary.
240
241
    :param filename_list: List of names of paths to files to get contents of.
242
    :param log_printer:   The logger which logs errors.
243
    :return:              Reads the content of each file into a dictionary
244
                          with filenames as keys.
245
    """
246
    file_dict = {}
247
    for filename in filename_list:
248
        try:
249
            with open(filename, "r", encoding="utf-8") as _file:
250
                file_dict[filename] = tuple(_file.readlines())
251
        except UnicodeDecodeError:
252
            log_printer.warn("Failed to read file '{}'. It seems to contain "
253
                             "non-unicode characters. Leaving it "
254
                             "out.".format(filename))
255
        except OSError as exception:  # pragma: no cover
256
            log_printer.log_exception("Failed to read file '{}' because of "
257
                                      "an unknown error. Leaving it "
258
                                      "out.".format(filename),
259
                                      exception,
260
                                      log_level=LOG_LEVEL.WARNING)
261
262
    return file_dict
263
264
265
def filter_raising_callables(it, exception, *args, **kwargs):
266
    """
267
    Filters all callable items inside the given iterator that raise the
268
    given exceptions.
269
270
    :param it:        The iterator to filter.
271
    :param exception: The (tuple of) exception(s) to filter for.
272
    :param args:      Positional arguments to pass to the callable.
273
    :param kwargs:    Keyword arguments to pass to the callable.
274
    """
275
    for elem in it:
276
        try:
277
            yield elem(*args, **kwargs)
278
        except exception:
279
            pass
280
281
282
def instantiate_bears(section,
283
                      local_bear_list,
284
                      global_bear_list,
285
                      file_dict,
286
                      message_queue):
287
    """
288
    Instantiates each bear with the arguments it needs.
289
290
    :param section:          The section the bears belong to.
291
    :param local_bear_list:  List of local bear classes to instantiate.
292
    :param global_bear_list: List of global bear classes to instantiate.
293
    :param file_dict:        Dictionary containing filenames and their
294
                             contents.
295
    :param message_queue:    Queue responsible to maintain the messages
296
                             delivered by the bears.
297
    :return:                 The local and global bear instance lists.
298
    """
299
    local_bear_list = [bear
300
                       for bear in filter_raising_callables(
301
                           local_bear_list,
302
                           RuntimeError,
303
                           section,
304
                           message_queue,
305
                           timeout=0.1)]
306
307
    global_bear_list = [bear
308
                        for bear in filter_raising_callables(
309
                            global_bear_list,
310
                            RuntimeError,
311
                            file_dict,
312
                            section,
313
                            message_queue,
314
                            timeout=0.1)]
315
316
    return local_bear_list, global_bear_list
317
318
319
def instantiate_processes(section,
320
                          local_bear_list,
321
                          global_bear_list,
322
                          job_count,
323
                          log_printer):
324
    """
325
    Instantiate the number of processes that will run bears which will be
326
    responsible for running bears in a multiprocessing environment.
327
328
    :param section:          The section the bears belong to.
329
    :param local_bear_list:  List of local bears belonging to the section.
330
    :param global_bear_list: List of global bears belonging to the section.
331
    :param job_count:        Max number of processes to create.
332
    :param log_printer:      The log printer to warn to.
333
    :return:                 A tuple containing a list of processes,
334
                             and the arguments passed to each process which are
335
                             the same for each object.
336
    """
337
    filename_list = collect_files(
338
        glob_list(section.get('files', "")),
339
        log_printer,
340
        ignored_file_paths=glob_list(section.get('ignore', "")),
341
        limit_file_paths=glob_list(section.get('limit_files', "")))
342
    file_dict = get_file_dict(filename_list, log_printer)
343
344
    manager = multiprocessing.Manager()
345
    global_bear_queue = multiprocessing.Queue()
346
    filename_queue = multiprocessing.Queue()
347
    local_result_dict = manager.dict()
348
    global_result_dict = manager.dict()
349
    message_queue = multiprocessing.Queue()
350
    control_queue = multiprocessing.Queue()
351
352
    bear_runner_args = {"file_name_queue": filename_queue,
353
                        "local_bear_list": local_bear_list,
354
                        "global_bear_list": global_bear_list,
355
                        "global_bear_queue": global_bear_queue,
356
                        "file_dict": file_dict,
357
                        "local_result_dict": local_result_dict,
358
                        "global_result_dict": global_result_dict,
359
                        "message_queue": message_queue,
360
                        "control_queue": control_queue,
361
                        "timeout": 0.1}
362
363
    local_bear_list[:], global_bear_list[:] = instantiate_bears(
364
        section,
365
        local_bear_list,
366
        global_bear_list,
367
        file_dict,
368
        message_queue)
369
370
    fill_queue(filename_queue, file_dict.keys())
371
    fill_queue(global_bear_queue, range(len(global_bear_list)))
372
373
    return ([multiprocessing.Process(target=run, kwargs=bear_runner_args)
374
             for i in range(job_count)],
375
            bear_runner_args)
376
377
378
def get_ignore_scope(line, keyword):
379
    """
380
    Retrieves the bears that are to be ignored defined in the given line.
381
382
    :param line:    The line containing the ignore declaration.
383
    :param keyword: The keyword that was found. Everything after the rightmost
384
                    occurrence of it will be considered for the scope.
385
    :return:        A list of lower cased bearnames or an empty list (-> "all")
386
    """
387
    toignore = line[line.rfind(keyword) + len(keyword):]
388
    if toignore.startswith("all"):
389
        return []
390
    else:
391
        return list(StringConverter(toignore, list_delimiters=', '))
392
393
394
def yield_ignore_ranges(file_dict):
395
    """
396
    Yields tuples of affected bears and a SourceRange that shall be ignored for
397
    those.
398
399
    :param file_dict: The file dictionary.
400
    """
401
    for filename, file in file_dict.items():
402
        start = None
403
        bears = []
404
        for line_number, line in enumerate(file, start=1):
405
            line = line.lower()
406
            if "start ignoring " in line:
407
                start = line_number
408
                bears = get_ignore_scope(line, "start ignoring ")
409
            elif "stop ignoring" in line:
410
                if start:
411
                    yield (bears,
412
                           SourceRange.from_values(filename,
413
                                                   start,
414
                                                   1,
415
                                                   line_number,
416
                                                   len(file[line_number-1])))
417
            elif "ignore " in line:
418
                yield (get_ignore_scope(line, "ignore "),
419
                       SourceRange.from_values(filename,
420
                                               line_number,
421
                                               1,
422
                                               line_number+1,
423
                                               len(file[line_number])))
424
425
426
def process_queues(processes,
427
                   control_queue,
428
                   local_result_dict,
429
                   global_result_dict,
430
                   file_dict,
431
                   print_results,
432
                   section,
433
                   log_printer):
434
    """
435
    Iterate the control queue and send the results recieved to the print_result
436
    method so that they can be presented to the user.
437
438
    :param processes:          List of processes which can be used to run
439
                               Bears.
440
    :param control_queue:      Containing control elements that indicate
441
                               whether there is a result available and which
442
                               bear it belongs to.
443
    :param local_result_dict:  Dictionary containing results respective to
444
                               local bears. It is modified by the processes
445
                               i.e. results are added to it by multiple
446
                               processes.
447
    :param global_result_dict: Dictionary containing results respective to
448
                               global bears. It is modified by the processes
449
                               i.e. results are added to it by multiple
450
                               processes.
451
    :param file_dict:          Dictionary containing file contents with
452
                               filename as keys.
453
    :param print_results:      Prints all given results appropriate to the
454
                               output medium.
455
    :return:                   Return True if all bears execute succesfully and
456
                               Results were delivered to the user. Else False.
457
    """
458
    file_diff_dict = {}
459
    retval = False
460
    # Number of processes working on local/global bears. They are count down
461
    # when the last queue element of that process is processed which may be
462
    # *after* the process has ended!
463
    local_processes = len(processes)
464
    global_processes = len(processes)
465
    global_result_buffer = []
466
    ignore_ranges = list(yield_ignore_ranges(file_dict))
467
468
    # One process is the logger thread
469
    while local_processes > 1:
470
        try:
471
            control_elem, index = control_queue.get(timeout=0.1)
472
473
            if control_elem == CONTROL_ELEMENT.LOCAL_FINISHED:
474
                local_processes -= 1
475
            elif control_elem == CONTROL_ELEMENT.GLOBAL_FINISHED:
476
                global_processes -= 1
477
            elif control_elem == CONTROL_ELEMENT.LOCAL:
478
                assert local_processes != 0
479
                retval, res = print_result(local_result_dict[index],
480
                                           file_dict,
481
                                           retval,
482
                                           print_results,
483
                                           section,
484
                                           log_printer,
485
                                           file_diff_dict,
486
                                           ignore_ranges)
487
                local_result_dict[index] = res
488
            else:
489
                assert control_elem == CONTROL_ELEMENT.GLOBAL
490
                global_result_buffer.append(index)
491
        except queue.Empty:
492
            if get_running_processes(processes) < 2:  # pragma: no cover
493
                # Recover silently, those branches are only
494
                # nondeterministically covered.
495
                break
496
497
    # Flush global result buffer
498
    for elem in global_result_buffer:
499
        retval, res = print_result(global_result_dict[elem],
500
                                   file_dict,
501
                                   retval,
502
                                   print_results,
503
                                   section,
504
                                   log_printer,
505
                                   file_diff_dict,
506
                                   ignore_ranges)
507
        global_result_dict[elem] = res
508
509
    # One process is the logger thread
510
    while global_processes > 1:
511
        try:
512
            control_elem, index = control_queue.get(timeout=0.1)
513
514
            if control_elem == CONTROL_ELEMENT.GLOBAL:
515
                retval, res = print_result(global_result_dict[index],
516
                                           file_dict,
517
                                           retval,
518
                                           print_results,
519
                                           section,
520
                                           log_printer,
521
                                           file_diff_dict,
522
                                           ignore_ranges)
523
                global_result_dict[index] = res
524
            else:
525
                assert control_elem == CONTROL_ELEMENT.GLOBAL_FINISHED
526
                global_processes -= 1
527
        except queue.Empty:
528
            if get_running_processes(processes) < 2:  # pragma: no cover
529
                # Recover silently, those branches are only
530
                # nondeterministically covered.
531
                break
532
533
    return retval
534
535
536
def simplify_section_result(section_result):
537
    """
538
    Takes in a section's result from ``execute_section`` and simplifies it
539
    for easy usage in other functions.
540
541
    :param section_result: The result of a section which was executed.
542
    :return:               Tuple containing:
543
                            - bool - True if results were yielded
544
                            - bool - True if unfixed results were yielded
545
                            - list - Results from all bears (local and global)
546
    """
547
    section_yielded_result = section_result[0]
548
    results_for_section = []
549
    for value in chain(section_result[1].values(),
550
                       section_result[2].values()):
551
        if value is None:
552
            continue
553
554
        for result in value:
555
            results_for_section.append(result)
556
    section_yielded_unfixed_results = len(results_for_section) > 0
557
558
    return (section_yielded_result,
559
            section_yielded_unfixed_results,
560
            results_for_section)
561
562
563
def execute_section(section,
564
                    global_bear_list,
565
                    local_bear_list,
566
                    print_results,
567
                    log_printer):
568
    """
569
    Executes the section with the given bears.
570
571
    The execute_section method does the following things:
572
573
    1. Prepare a Process
574
       -  Load files
575
       -  Create queues
576
    2. Spawn up one or more Processes
577
    3. Output results from the Processes
578
    4. Join all processes
579
580
    :param section:          The section to execute.
581
    :param global_bear_list: List of global bears belonging to the section.
582
    :param local_bear_list:  List of local bears belonging to the section.
583
    :param print_results:    Prints all given results appropriate to the
584
                             output medium.
585
    :param log_printer:      The log_printer to warn to.
586
    :return:                 Tuple containing a bool (True if results were
587
                             yielded, False otherwise), a Manager.dict
588
                             containing all local results(filenames are key)
589
                             and a Manager.dict containing all global bear
590
                             results (bear names are key) as well as the
591
                             file dictionary.
592
    """
593
    local_bear_list = Dependencies.resolve(local_bear_list)
594
    global_bear_list = Dependencies.resolve(global_bear_list)
595
596
    try:
597
        running_processes = int(section['jobs'])
598
    except ValueError:
599
        log_printer.warn("Unable to convert setting 'jobs' into a number. "
600
                         "Falling back to CPU count.")
601
        running_processes = get_cpu_count()
602
    except IndexError:
603
        running_processes = get_cpu_count()
604
605
    processes, arg_dict = instantiate_processes(section,
606
                                                local_bear_list,
607
                                                global_bear_list,
608
                                                running_processes,
609
                                                log_printer)
610
611
    logger_thread = LogPrinterThread(arg_dict["message_queue"],
612
                                     log_printer)
613
    # Start and join the logger thread along with the processes to run bears
614
    processes.append(logger_thread)
615
616
    for runner in processes:
617
        runner.start()
618
619
    try:
620
        return (process_queues(processes,
621
                               arg_dict["control_queue"],
622
                               arg_dict["local_result_dict"],
623
                               arg_dict["global_result_dict"],
624
                               arg_dict["file_dict"],
625
                               print_results,
626
                               section,
627
                               log_printer),
628
                arg_dict["local_result_dict"],
629
                arg_dict["global_result_dict"],
630
                arg_dict["file_dict"])
631
    finally:
632
        logger_thread.running = False
633
634
        for runner in processes:
635
            runner.join()
636