Passed
Branch master (2b673d)
by Matěj
03:01
created

ssg.rule_dir_stats   F

Complexity

Total Complexity 91

Size/Duplication

Total Lines 499
Duplicated Lines 0 %

Test Coverage

Coverage 0%

Importance

Changes 0
Metric Value
eloc 246
dl 0
loc 499
ccs 0
cts 232
cp 0
rs 2
c 0
b 0
f 0
wmc 91

18 Functions

Rating   Name   Duplication   Size   Complexity  
B get_all_affected_products() 0 24 6
A get_affected_products() 0 5 1
C _walk_rule() 0 28 9
A walk_rules() 0 36 5
F walk_rules_parallel() 0 53 19
A walk_rules_diff() 0 37 1
B filter_rule_ids() 0 31 6
A prodtypes_oval() 0 23 5
A prodtypes_remediation() 0 23 5
A product_names_oval() 0 16 5
A missing_oval() 0 9 2
A walk_rules_stats() 0 36 4
A missing_remediation() 0 10 2
A product_names_remediation() 0 16 5
A two_plus_remediation() 0 11 2
B walk_rule_stats() 0 37 7
B walk_rules_diff_stats() 0 43 5
A two_plus_oval() 0 9 2

How to fix   Complexity   

Complexity

Complex classes like ssg.rule_dir_stats often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
"""
2
This module contains common code shared by utils/rule_dir_stats.py and
3
utils/rule_dir_diff.py. This code includes functions for walking the output
4
of the utils/rule_dir_json.py script, and filtering functions used in both
5
scripts.
6
"""
7
8
from __future__ import absolute_import
9
from __future__ import print_function
10
11
import os
12
from collections import defaultdict
13
14
from .build_remediations import REMEDIATION_TO_EXT_MAP as REMEDIATION_MAP
15
from .utils import subset_dict
16
17
18
def get_affected_products(rule_obj):
19
    """
20
    From a rule_obj, return the set of affected products from rule.yml
21
    """
22
    return set(rule_obj['products'])
23
24
25
def get_all_affected_products(args, rule_obj):
26
    """
27
    From a rule_obj, return the set of affected products from rule.yml, and
28
    all fixes and checks.
29
30
    If args.strict is set, this function is equivalent to
31
    get_affected_products. Otherwise, it includes ovals and fix content based
32
    on the values of args.fixes_only and args.ovals_only.
33
    """
34
35
    affected_products = get_affected_products(rule_obj)
36
37
    if args.strict:
38
        return affected_products
39
40
    if not args.fixes_only:
41
        for product in rule_obj['oval_products']:
42
            affected_products.add(product)
43
44
    if not args.ovals_only:
45
        for product in rule_obj['remediation_products']:
46
            affected_products.add(product)
47
48
    return affected_products
49
50
51
def _walk_rule(args, rule_obj, oval_func, remediation_func, verbose_output):
52
    """
53
    Walks a single rule and updates verbose_output if visited. Returns visited
54
    state as a boolean.
55
56
    Internal function for walk_rules and walk_rules_parallel.
57
    """
58
59
    rule_id = rule_obj['id']
60
61
    affected_products = get_all_affected_products(args, rule_obj)
62
    if not affected_products.intersection(args.products):
63
        return False
64
    if args.query and rule_id not in args.query:
65
        return False
66
67
    if not args.fixes_only:
68
        result = oval_func(rule_obj)
69
        if result:
70
            verbose_output[rule_id]['oval'] = result
71
72
    if not args.ovals_only:
73
        for r_type in REMEDIATION_MAP:
74
            result = remediation_func(rule_obj, r_type)
75
            if result:
76
                verbose_output[rule_id][r_type] = result
77
78
    return True
79
80
81
def walk_rules(args, known_rules, oval_func, remediation_func):
82
    """
83
    Walk a dictionary of known_rules, returning the number of visited rules
84
    and the output at each visited rule, conditionally calling oval_func and
85
    remediation_func based on the values of args.fixes_only and
86
    args.ovals_only. If the result of these functions are not Falsy, set the
87
    appropriate output content.
88
89
    The input rule_obj structure is the value of known_rules[rule_id].
90
91
    The output structure is a dict as follows:
92
    {
93
        rule_id: {
94
            "oval": oval_func(args, rule_obj),
95
            "ansible": remediation_func(args, "ansible", rule_obj),
96
            "anaconda": remediation_func(args, "anaconda", rule_obj),
97
            "bash": remediation_func(args, "bash", rule_obj),
98
            "puppet": remediation_func(args, "puppet", rule_obj)
99
        },
100
        ...
101
    }
102
103
    The arguments supplied to oval_func are args and rule_obj.
104
    The arguments supplied to remediation_func are args, the remediation type,
105
    and rule_obj.
106
    """
107
108
    affected_rules = 0
109
    verbose_output = defaultdict(lambda: defaultdict(lambda: None))
110
111
    for rule_id in known_rules:
112
        rule_obj = known_rules[rule_id]
113
        if _walk_rule(args, rule_obj, oval_func, remediation_func, verbose_output):
114
            affected_rules += 1
115
116
    return affected_rules, verbose_output
117
118
119
def walk_rule_stats(rule_output):
120
    """
121
    Walk the output of a rule, generating statistics about affected
122
    ovals, remediations, and generating verbose output in a stable order.
123
124
    Returns a tuple of (affected_ovals, affected_remediations,
125
    all_affected_remediations, affected_remediations_type, all_output)
126
    """
127
128
    affected_ovals = 0
129
    affected_remediations = 0
130
    all_affected_remediations = 0
131
    affected_remediations_type = defaultdict(lambda: 0)
132
    all_output = []
133
134
    affected_remediation = False
135
    all_remedation = True
136
137
    if 'oval' in rule_output:
138
        affected_ovals += 1
139
        all_output.append(rule_output['oval'])
140
141
    for r_type in sorted(REMEDIATION_MAP):
142
        if r_type in rule_output:
143
            affected_remediation = True
144
            affected_remediations_type[r_type] += 1
145
            all_output.append(rule_output[r_type])
146
        else:
147
            all_remedation = False
148
149
    if affected_remediation:
150
        affected_remediations += 1
151
    if all_remedation:
152
        all_affected_remediations += 1
153
154
    return (affected_ovals, affected_remediations, all_affected_remediations,
155
            affected_remediations_type, all_output)
156
157
158
def walk_rules_stats(args, known_rules, oval_func, remediation_func):
159
    """
160
    Walk a dictionary of known_rules and generate simple aggregate statistics
161
    for all visited rules. The oval_func and remediation_func arguments behave
162
    according to walk_rules().
163
164
    Returned values are visited_rules, affected_ovals, affected_remediation,
165
    a dictionary containing all fix types and the quantity of affected fixes,
166
    and the ordered output of all functions.
167
168
    An effort is made to provide consistently ordered verbose_output by
169
    sorting all visited keys and the keys of
170
    ssg.build_remediations.REMEDIATION_MAP.
171
    """
172
    affected_rules, verbose_output = walk_rules(args, known_rules, oval_func, remediation_func)
173
174
    affected_ovals = 0
175
    affected_remediations = 0
176
    all_affected_remediations = 0
177
    affected_remediations_type = defaultdict(lambda: 0)
178
    all_output = []
179
180
    for rule_id in sorted(verbose_output):
181
        rule_output = verbose_output[rule_id]
182
        results = walk_rule_stats(rule_output)
183
184
        affected_ovals += results[0]
185
        affected_remediations += results[1]
186
        all_affected_remediations += results[2]
187
        for key in results[3]:
188
            affected_remediations_type[key] += results[3][key]
189
190
        all_output.extend(results[4])
191
192
    return (affected_rules, affected_ovals, affected_remediations,
193
            all_affected_remediations, affected_remediations_type, all_output)
194
195
196
def walk_rules_parallel(args, left_rules, right_rules, oval_func, remediation_func):
0 ignored issues
show
Comprehensibility introduced by
This function exceeds the maximum number of variables (21/15).
Loading history...
197
    """
198
    Walks two sets of known_rules (left_rules and right_rules) with identical
199
    keys and returns left_only, right_only, and common_only output from
200
    _walk_rule. If the outputted data for a rule when called on left_rules and
201
    right_rules is the same, it is added to common_only. Only rules which
202
    output different data will have their data added to left_only and
203
    right_only respectively.
204
205
    Can assert.
206
    """
207
208
    left_affected_rules = 0
209
    right_affected_rules = 0
210
    common_affected_rules = 0
211
212
    left_verbose_output = defaultdict(lambda: defaultdict(lambda: None))
213
    right_verbose_output = defaultdict(lambda: defaultdict(lambda: None))
214
    common_verbose_output = defaultdict(lambda: defaultdict(lambda: None))
215
216
    assert set(left_rules) == set(right_rules)
217
218
    for rule_id in left_rules:
219
        left_rule_obj = left_rules[rule_id]
220
        right_rule_obj = right_rules[rule_id]
221
222
        if left_rule_obj == right_rule_obj:
223
            if _walk_rule(args, left_rule_obj, oval_func, remediation_func, common_verbose_output):
224
                common_affected_rules += 1
225
        else:
226
            left_temp = defaultdict(lambda: defaultdict(lambda: None))
227
            right_temp = defaultdict(lambda: defaultdict(lambda: None))
228
229
            left_ret = _walk_rule(args, left_rule_obj, oval_func, remediation_func, left_temp)
230
            right_ret = _walk_rule(args, right_rule_obj, oval_func, remediation_func, right_temp)
231
232
            if left_ret == right_ret and left_temp == right_temp:
233
                common_verbose_output.update(left_temp)
234
                if left_ret:
235
                    common_affected_rules += 1
236
            else:
237
                left_verbose_output.update(left_temp)
238
                right_verbose_output.update(right_temp)
239
                if left_ret:
240
                    left_affected_rules += 1
241
                if right_ret:
242
                    right_affected_rules += 1
243
244
    left_only = (left_affected_rules, left_verbose_output)
245
    right_only = (right_affected_rules, right_verbose_output)
246
    common_only = (common_affected_rules, common_verbose_output)
247
248
    return left_only, right_only, common_only
249
250
251
def walk_rules_diff(args, left_rules, right_rules, oval_func, remediation_func):
0 ignored issues
show
Comprehensibility introduced by
This function exceeds the maximum number of variables (22/15).
Loading history...
252
    """
253
    Walk a two dictionary of known_rules (left_rules and right_rules) and generate
254
    five sets of output: left_only rules output, right_only rules output,
255
    shared left output, shared right output, and shared common output, as a
256
    five-tuple, where each tuple element is equivalent to walk_rules on the
257
    appropriate set of rules.
258
259
    Does not understand renaming of rule_ids as this would depend on disk
260
    content to reflect these differences. Unless significantly more data is
261
    added to the rule_obj structure (contents of rule.yml, ovals,
262
    remediations, etc.), all information besides 'title' is not uniquely
263
    identifying or could be easily updated.
264
    """
265
266
    left_rule_ids = set(left_rules)
267
    right_rule_ids = set(right_rules)
268
269
    left_only_rule_ids = left_rule_ids.difference(right_rule_ids)
270
    right_only_rule_ids = right_rule_ids.difference(left_rule_ids)
271
    common_rule_ids = left_rule_ids.intersection(right_rule_ids)
272
273
    left_restricted = subset_dict(left_rules, left_only_rule_ids)
274
    left_common = subset_dict(left_rules, common_rule_ids)
275
    right_restricted = subset_dict(right_rules, right_only_rule_ids)
276
    right_common = subset_dict(right_rules, common_rule_ids)
277
278
    left_only_data = walk_rules(args, left_restricted, oval_func, remediation_func)
279
    right_only_data = walk_rules(args, right_restricted, oval_func, remediation_func)
280
    l_c_d, r_c_d, c_d = walk_rules_parallel(args, left_common, right_common,
281
                                            oval_func, remediation_func)
282
283
    left_changed_data = l_c_d
284
    right_changed_data = r_c_d
285
    common_data = c_d
286
287
    return (left_only_data, right_only_data, left_changed_data, right_changed_data, common_data)
288
289
290
def walk_rules_diff_stats(results):
291
    """
292
    Takes the results of walk_rules_diff (results) and generates five sets of
293
    output statistics: left_only rules output, right_only rules output,
294
    shared left output, shared right output, and shared common output, as a
295
    five-tuple, where each tuple element is equivalent to walk_rules_stats on
296
    the appropriate set of rules.
297
298
    Can assert.
299
    """
300
301
    assert len(results) == 5
302
303
    output_data = []
304
305
    for data in results:
306
        affected_rules, verbose_output = data
307
308
        affected_ovals = 0
309
        affected_remediations = 0
310
        all_affected_remediations = 0
311
        affected_remediations_type = defaultdict(lambda: 0)
312
        all_output = []
313
314
        for rule_id in sorted(verbose_output):
315
            rule_output = verbose_output[rule_id]
316
            _results = walk_rule_stats(rule_output)
317
318
            affected_ovals += _results[0]
319
            affected_remediations += _results[1]
320
            all_affected_remediations += _results[2]
321
            for key in _results[3]:
322
                affected_remediations_type[key] += _results[3][key]
323
324
            all_output.extend(_results[4])
325
326
        output_data.append((affected_rules, affected_ovals,
327
                            affected_remediations, all_affected_remediations,
328
                            affected_remediations_type, all_output))
329
330
    assert len(output_data) == 5
331
332
    return tuple(output_data)
333
334
335
def filter_rule_ids(all_keys, queries):
336
    """
337
    From a set of queries (a comma separated list of queries, where a query is either a
338
    rule id or a substring thereof), return the set of matching keys from all_keys. When
339
    queries is the literal string "all", return all of the keys.
340
    """
341
342
    if not queries:
343
        return set()
344
345
    if queries == 'all':
346
        return set(all_keys)
347
348
    # We assume that all_keys is much longer than queries; this allows us to do
349
    # len(all_keys) iterations of size len(query_parts) instead of len(query_parts)
350
    # queries of size len(all_keys) -- which hopefully should be a faster data access
351
    # pattern due to caches but in reality shouldn't matter. Note that we have to iterate
352
    # over the keys in all_keys either way, because we wish to check whether query is a
353
    # substring of a key, not whether query is a key.
354
    #
355
    # This does have the side-effect of not having the results be ordered according to
356
    # their order in query_parts, so we instead, we intentionally discard order by using
357
    # a set. This also guarantees that our results are unique.
358
    results = set()
359
    query_parts = queries.split(',')
360
    for key in all_keys:
361
        for query in query_parts:
362
            if query in key:
363
                results.add(key)
364
365
    return results
366
367
368
def missing_oval(rule_obj):
369
    """
370
    For a rule object, check if it is missing an oval.
371
    """
372
373
    rule_id = rule_obj['id']
374
    check = len(rule_obj['ovals']) > 0
375
    if not check:
376
        return "\trule_id:%s is missing all OVALs" % rule_id
377
378
379
def missing_remediation(rule_obj, r_type):
380
    """
381
    For a rule object, check if it is missing a remediation of type r_type.
382
    """
383
384
    rule_id = rule_obj['id']
385
    check = (r_type in rule_obj['remediations'] and
386
             len(rule_obj['remediations'][r_type]) > 0)
387
    if not check:
388
        return "\trule_id:%s is missing %s remediations" % (rule_id, r_type)
389
390
391
def two_plus_oval(rule_obj):
392
    """
393
    For a rule object, check if it has two or more OVALs.
394
    """
395
396
    rule_id = rule_obj['id']
397
    check = len(rule_obj['ovals']) >= 2
398
    if check:
399
        return "\trule_id:%s has two or more OVALs: %s" % (rule_id, ','.join(rule_obj['ovals']))
400
401
402
def two_plus_remediation(rule_obj, r_type):
403
    """
404
    For a rule object, check if it has two or more remediations of type r_type.
405
    """
406
407
    rule_id = rule_obj['id']
408
    check = (r_type in rule_obj['remediations'] and
409
             len(rule_obj['remediations'][r_type]) >= 2)
410
    if check:
411
        return "\trule_id:%s has two or more %s remediations: %s" % \
412
               (rule_id, r_type, ','.join(rule_obj['remediations'][r_type]))
413
414
415
def prodtypes_oval(rule_obj):
416
    """
417
    For a rule object, check if the prodtypes match between the YAML and the
418
    OVALs.
419
    """
420
421
    rule_id = rule_obj['id']
422
423
    rule_products = set(rule_obj.get('products', []))
424
    if not rule_products:
425
        return
426
427
    oval_products = set()
428
    for oval in rule_obj.get('ovals', []):
429
        oval_products.update(rule_obj['ovals'][oval].get('products', []))
430
    if not oval_products:
431
        return
432
433
    sym_diff = sorted(rule_products.symmetric_difference(oval_products))
434
    check = len(sym_diff) > 0
435
    if check:
436
        return "\trule_id:%s has a different prodtypes between YAML and OVALs: %s" % \
437
               (rule_id, ','.join(sym_diff))
438
439
440
def prodtypes_remediation(rule_obj, r_type):
441
    """
442
    For a rule object, check if the prodtypes match between the YAML and the
443
    remediations of type r_type.
444
    """
445
446
    rule_id = rule_obj['id']
447
448
    rule_products = set(rule_obj.get('products', []))
449
    if not rule_products:
450
        return
451
452
    remediation_products = set()
453
    for remediation in rule_obj.get('remediations', dict()).get(r_type, dict()):
454
        remediation_products.update(rule_obj['remediations'][r_type][remediation]['products'])
455
    if not remediation_products:
456
        return
457
458
    sym_diff = sorted(rule_products.symmetric_difference(remediation_products))
459
    check = len(sym_diff) > 0 and rule_products and remediation_products
460
    if check:
461
        return "\trule_id:%s has a different prodtypes between YAML and %s remediations: %s" % \
462
               (rule_id, r_type, ','.join(sym_diff))
463
464
465
def product_names_oval(rule_obj):
466
    """
467
    For a rule_obj, check the scope of the platforms versus the product name
468
    of the OVAL objects.
469
    """
470
471
    rule_id = rule_obj['id']
472
    for oval_name in rule_obj['ovals']:
473
        if oval_name == "shared.xml":
474
            continue
475
476
        oval_product, _ = os.path.splitext(oval_name)
477
        for product in rule_obj['ovals'][oval_name]['products']:
478
            if product != oval_product:
479
                return "\trule_id:%s has a different product and OVALs names: %s is not %s" % \
480
                       (rule_id, product, oval_product)
481
482
483
def product_names_remediation(rule_obj, r_type):
484
    """
485
    For a rule_obj, check the scope of the platforms versus the product name
486
    of the remediations of type r_type.
487
    """
488
489
    rule_id = rule_obj['id']
490
    for r_name in rule_obj['remediations'][r_type]:
491
        r_product, _ = os.path.splitext(r_name)
492
        if r_product == "shared":
493
            continue
494
495
        for product in rule_obj['remediations'][r_type][r_name]['products']:
496
            if product != r_product:
497
                return "\trule_id:%s has a different product and %s remediation names: %s is not %s" % \
0 ignored issues
show
Coding Style introduced by
This line is too long as per the coding-style (104/100).

This check looks for lines that are too long. You can specify the maximum line length.

Loading history...
498
                       (rule_id, r_type, product, r_product)
499