ssg.rule_dir_stats   F
last analyzed

Complexity

Total Complexity 91

Size/Duplication

Total Lines 501
Duplicated Lines 0 %

Test Coverage

Coverage 0%

Importance

Changes 0
Metric Value
eloc 246
dl 0
loc 501
ccs 0
cts 232
cp 0
rs 2
c 0
b 0
f 0
wmc 91

18 Functions

Rating   Name   Duplication   Size   Complexity  
B get_all_affected_products() 0 24 6
A get_affected_products() 0 5 1
C _walk_rule() 0 28 9
A walk_rules() 0 38 5
F walk_rules_parallel() 0 53 19
A walk_rules_diff() 0 37 1
B filter_rule_ids() 0 31 6
A prodtypes_oval() 0 23 5
A prodtypes_remediation() 0 23 5
A product_names_oval() 0 16 5
A missing_oval() 0 9 2
A walk_rules_stats() 0 36 4
A missing_remediation() 0 10 2
A product_names_remediation() 0 16 5
A two_plus_remediation() 0 11 2
B walk_rule_stats() 0 37 7
B walk_rules_diff_stats() 0 43 5
A two_plus_oval() 0 9 2

How to fix   Complexity   

Complexity

Complex classes like ssg.rule_dir_stats often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
"""
2
This module contains common code shared by utils/rule_dir_stats.py and
3
utils/rule_dir_diff.py. This code includes functions for walking the output
4
of the utils/rule_dir_json.py script, and filtering functions used in both
5
scripts.
6
"""
7
8
from __future__ import absolute_import
9
from __future__ import print_function
10
11
import os
12
from collections import defaultdict
13
14
from .build_remediations import REMEDIATION_TO_EXT_MAP as REMEDIATION_MAP
15
from .utils import subset_dict
16
17
18
def get_affected_products(rule_obj):
19
    """
20
    From a rule_obj, return the set of affected products from rule.yml
21
    """
22
    return set(rule_obj['products'])
23
24
25
def get_all_affected_products(args, rule_obj):
26
    """
27
    From a rule_obj, return the set of affected products from rule.yml, and
28
    all fixes and checks.
29
30
    If args.strict is set, this function is equivalent to
31
    get_affected_products. Otherwise, it includes ovals and fix content based
32
    on the values of args.fixes_only and args.ovals_only.
33
    """
34
35
    affected_products = get_affected_products(rule_obj)
36
37
    if args.strict:
38
        return affected_products
39
40
    if not args.fixes_only:
41
        for product in rule_obj['oval_products']:
42
            affected_products.add(product)
43
44
    if not args.ovals_only:
45
        for product in rule_obj['remediation_products']:
46
            affected_products.add(product)
47
48
    return affected_products
49
50
51
def _walk_rule(args, rule_obj, oval_func, remediation_func, verbose_output):
52
    """
53
    Walks a single rule and updates verbose_output if visited. Returns visited
54
    state as a boolean.
55
56
    Internal function for walk_rules and walk_rules_parallel.
57
    """
58
59
    rule_id = rule_obj['id']
60
61
    affected_products = get_all_affected_products(args, rule_obj)
62
    if not affected_products.intersection(args.products):
63
        return False
64
    if args.query and rule_id not in args.query:
65
        return False
66
67
    if not args.fixes_only:
68
        result = oval_func(rule_obj)
69
        if result:
70
            verbose_output[rule_id]['oval'] = result
71
72
    if not args.ovals_only:
73
        for r_type in REMEDIATION_MAP:
74
            result = remediation_func(rule_obj, r_type)
75
            if result:
76
                verbose_output[rule_id][r_type] = result
77
78
    return True
79
80
81
def walk_rules(args, known_rules, oval_func, remediation_func):
82
    """
83
    Walk a dictionary of known_rules, returning the number of visited rules
84
    and the output at each visited rule, conditionally calling oval_func and
85
    remediation_func based on the values of args.fixes_only and
86
    args.ovals_only. If the result of these functions are not Falsy, set the
87
    appropriate output content.
88
89
    The input rule_obj structure is the value of known_rules[rule_id].
90
91
    The output structure is a dict as follows::
92
93
        {
94
            rule_id: {
95
                "oval": oval_func(args, rule_obj),
96
                "ansible": remediation_func(args, "ansible", rule_obj),
97
                "anaconda": remediation_func(args, "anaconda", rule_obj),
98
                "bash": remediation_func(args, "bash", rule_obj),
99
                "puppet": remediation_func(args, "puppet", rule_obj)
100
            },
101
            ...
102
        }
103
104
105
    The arguments supplied to oval_func are args and rule_obj.
106
    The arguments supplied to remediation_func are args, the remediation type,
107
    and rule_obj.
108
    """
109
110
    affected_rules = 0
111
    verbose_output = defaultdict(lambda: defaultdict(lambda: None))
112
113
    for rule_id in known_rules:
114
        rule_obj = known_rules[rule_id]
115
        if _walk_rule(args, rule_obj, oval_func, remediation_func, verbose_output):
116
            affected_rules += 1
117
118
    return affected_rules, verbose_output
119
120
121
def walk_rule_stats(rule_output):
122
    """
123
    Walk the output of a rule, generating statistics about affected
124
    ovals, remediations, and generating verbose output in a stable order.
125
126
    Returns a tuple of (affected_ovals, affected_remediations,
127
    all_affected_remediations, affected_remediations_type, all_output)
128
    """
129
130
    affected_ovals = 0
131
    affected_remediations = 0
132
    all_affected_remediations = 0
133
    affected_remediations_type = defaultdict(lambda: 0)
134
    all_output = []
135
136
    affected_remediation = False
137
    all_remedation = True
138
139
    if 'oval' in rule_output:
140
        affected_ovals += 1
141
        all_output.append(rule_output['oval'])
142
143
    for r_type in sorted(REMEDIATION_MAP):
144
        if r_type in rule_output:
145
            affected_remediation = True
146
            affected_remediations_type[r_type] += 1
147
            all_output.append(rule_output[r_type])
148
        else:
149
            all_remedation = False
150
151
    if affected_remediation:
152
        affected_remediations += 1
153
    if all_remedation:
154
        all_affected_remediations += 1
155
156
    return (affected_ovals, affected_remediations, all_affected_remediations,
157
            affected_remediations_type, all_output)
158
159
160
def walk_rules_stats(args, known_rules, oval_func, remediation_func):
161
    """
162
    Walk a dictionary of known_rules and generate simple aggregate statistics
163
    for all visited rules. The oval_func and remediation_func arguments behave
164
    according to walk_rules().
165
166
    Returned values are visited_rules, affected_ovals, affected_remediation,
167
    a dictionary containing all fix types and the quantity of affected fixes,
168
    and the ordered output of all functions.
169
170
    An effort is made to provide consistently ordered verbose_output by
171
    sorting all visited keys and the keys of
172
    ssg.build_remediations.REMEDIATION_MAP.
173
    """
174
    affected_rules, verbose_output = walk_rules(args, known_rules, oval_func, remediation_func)
175
176
    affected_ovals = 0
177
    affected_remediations = 0
178
    all_affected_remediations = 0
179
    affected_remediations_type = defaultdict(lambda: 0)
180
    all_output = []
181
182
    for rule_id in sorted(verbose_output):
183
        rule_output = verbose_output[rule_id]
184
        results = walk_rule_stats(rule_output)
185
186
        affected_ovals += results[0]
187
        affected_remediations += results[1]
188
        all_affected_remediations += results[2]
189
        for key in results[3]:
190
            affected_remediations_type[key] += results[3][key]
191
192
        all_output.extend(results[4])
193
194
    return (affected_rules, affected_ovals, affected_remediations,
195
            all_affected_remediations, affected_remediations_type, all_output)
196
197
198
def walk_rules_parallel(args, left_rules, right_rules, oval_func, remediation_func):
199
    """
200
    Walks two sets of known_rules (left_rules and right_rules) with identical
201
    keys and returns left_only, right_only, and common_only output from
202
    _walk_rule. If the outputted data for a rule when called on left_rules and
203
    right_rules is the same, it is added to common_only. Only rules which
204
    output different data will have their data added to left_only and
205
    right_only respectively.
206
207
    Can assert.
208
    """
209
210
    left_affected_rules = 0
211
    right_affected_rules = 0
212
    common_affected_rules = 0
213
214
    left_verbose_output = defaultdict(lambda: defaultdict(lambda: None))
215
    right_verbose_output = defaultdict(lambda: defaultdict(lambda: None))
216
    common_verbose_output = defaultdict(lambda: defaultdict(lambda: None))
217
218
    assert set(left_rules) == set(right_rules)
219
220
    for rule_id in left_rules:
221
        left_rule_obj = left_rules[rule_id]
222
        right_rule_obj = right_rules[rule_id]
223
224
        if left_rule_obj == right_rule_obj:
225
            if _walk_rule(args, left_rule_obj, oval_func, remediation_func, common_verbose_output):
226
                common_affected_rules += 1
227
        else:
228
            left_temp = defaultdict(lambda: defaultdict(lambda: None))
229
            right_temp = defaultdict(lambda: defaultdict(lambda: None))
230
231
            left_ret = _walk_rule(args, left_rule_obj, oval_func, remediation_func, left_temp)
232
            right_ret = _walk_rule(args, right_rule_obj, oval_func, remediation_func, right_temp)
233
234
            if left_ret == right_ret and left_temp == right_temp:
235
                common_verbose_output.update(left_temp)
236
                if left_ret:
237
                    common_affected_rules += 1
238
            else:
239
                left_verbose_output.update(left_temp)
240
                right_verbose_output.update(right_temp)
241
                if left_ret:
242
                    left_affected_rules += 1
243
                if right_ret:
244
                    right_affected_rules += 1
245
246
    left_only = (left_affected_rules, left_verbose_output)
247
    right_only = (right_affected_rules, right_verbose_output)
248
    common_only = (common_affected_rules, common_verbose_output)
249
250
    return left_only, right_only, common_only
251
252
253
def walk_rules_diff(args, left_rules, right_rules, oval_func, remediation_func):
254
    """
255
    Walk a two dictionary of known_rules (left_rules and right_rules) and generate
256
    five sets of output: left_only rules output, right_only rules output,
257
    shared left output, shared right output, and shared common output, as a
258
    five-tuple, where each tuple element is equivalent to walk_rules on the
259
    appropriate set of rules.
260
261
    Does not understand renaming of rule_ids as this would depend on disk
262
    content to reflect these differences. Unless significantly more data is
263
    added to the rule_obj structure (contents of rule.yml, ovals,
264
    remediations, etc.), all information besides 'title' is not uniquely
265
    identifying or could be easily updated.
266
    """
267
268
    left_rule_ids = set(left_rules)
269
    right_rule_ids = set(right_rules)
270
271
    left_only_rule_ids = left_rule_ids.difference(right_rule_ids)
272
    right_only_rule_ids = right_rule_ids.difference(left_rule_ids)
273
    common_rule_ids = left_rule_ids.intersection(right_rule_ids)
274
275
    left_restricted = subset_dict(left_rules, left_only_rule_ids)
276
    left_common = subset_dict(left_rules, common_rule_ids)
277
    right_restricted = subset_dict(right_rules, right_only_rule_ids)
278
    right_common = subset_dict(right_rules, common_rule_ids)
279
280
    left_only_data = walk_rules(args, left_restricted, oval_func, remediation_func)
281
    right_only_data = walk_rules(args, right_restricted, oval_func, remediation_func)
282
    l_c_d, r_c_d, c_d = walk_rules_parallel(args, left_common, right_common,
283
                                            oval_func, remediation_func)
284
285
    left_changed_data = l_c_d
286
    right_changed_data = r_c_d
287
    common_data = c_d
288
289
    return (left_only_data, right_only_data, left_changed_data, right_changed_data, common_data)
290
291
292
def walk_rules_diff_stats(results):
293
    """
294
    Takes the results of walk_rules_diff (results) and generates five sets of
295
    output statistics: left_only rules output, right_only rules output,
296
    shared left output, shared right output, and shared common output, as a
297
    five-tuple, where each tuple element is equivalent to walk_rules_stats on
298
    the appropriate set of rules.
299
300
    Can assert.
301
    """
302
303
    assert len(results) == 5
304
305
    output_data = []
306
307
    for data in results:
308
        affected_rules, verbose_output = data
309
310
        affected_ovals = 0
311
        affected_remediations = 0
312
        all_affected_remediations = 0
313
        affected_remediations_type = defaultdict(lambda: 0)
314
        all_output = []
315
316
        for rule_id in sorted(verbose_output):
317
            rule_output = verbose_output[rule_id]
318
            _results = walk_rule_stats(rule_output)
319
320
            affected_ovals += _results[0]
321
            affected_remediations += _results[1]
322
            all_affected_remediations += _results[2]
323
            for key in _results[3]:
324
                affected_remediations_type[key] += _results[3][key]
325
326
            all_output.extend(_results[4])
327
328
        output_data.append((affected_rules, affected_ovals,
329
                            affected_remediations, all_affected_remediations,
330
                            affected_remediations_type, all_output))
331
332
    assert len(output_data) == 5
333
334
    return tuple(output_data)
335
336
337
def filter_rule_ids(all_keys, queries):
338
    """
339
    From a set of queries (a comma separated list of queries, where a query is either a
340
    rule id or a substring thereof), return the set of matching keys from all_keys. When
341
    queries is the literal string "all", return all of the keys.
342
    """
343
344
    if not queries:
345
        return set()
346
347
    if queries == 'all':
348
        return set(all_keys)
349
350
    # We assume that all_keys is much longer than queries; this allows us to do
351
    # len(all_keys) iterations of size len(query_parts) instead of len(query_parts)
352
    # queries of size len(all_keys) -- which hopefully should be a faster data access
353
    # pattern due to caches but in reality shouldn't matter. Note that we have to iterate
354
    # over the keys in all_keys either way, because we wish to check whether query is a
355
    # substring of a key, not whether query is a key.
356
    #
357
    # This does have the side-effect of not having the results be ordered according to
358
    # their order in query_parts, so we instead, we intentionally discard order by using
359
    # a set. This also guarantees that our results are unique.
360
    results = set()
361
    query_parts = queries.split(',')
362
    for key in all_keys:
363
        for query in query_parts:
364
            if query in key:
365
                results.add(key)
366
367
    return results
368
369
370
def missing_oval(rule_obj):
371
    """
372
    For a rule object, check if it is missing an oval.
373
    """
374
375
    rule_id = rule_obj['id']
376
    check = len(rule_obj['ovals']) > 0
377
    if not check:
378
        return "\trule_id:%s is missing all OVALs" % rule_id
379
380
381
def missing_remediation(rule_obj, r_type):
382
    """
383
    For a rule object, check if it is missing a remediation of type r_type.
384
    """
385
386
    rule_id = rule_obj['id']
387
    check = (r_type in rule_obj['remediations'] and
388
             len(rule_obj['remediations'][r_type]) > 0)
389
    if not check:
390
        return "\trule_id:%s is missing %s remediations" % (rule_id, r_type)
391
392
393
def two_plus_oval(rule_obj):
394
    """
395
    For a rule object, check if it has two or more OVALs.
396
    """
397
398
    rule_id = rule_obj['id']
399
    check = len(rule_obj['ovals']) >= 2
400
    if check:
401
        return "\trule_id:%s has two or more OVALs: %s" % (rule_id, ','.join(rule_obj['ovals']))
402
403
404
def two_plus_remediation(rule_obj, r_type):
405
    """
406
    For a rule object, check if it has two or more remediations of type r_type.
407
    """
408
409
    rule_id = rule_obj['id']
410
    check = (r_type in rule_obj['remediations'] and
411
             len(rule_obj['remediations'][r_type]) >= 2)
412
    if check:
413
        return "\trule_id:%s has two or more %s remediations: %s" % \
414
               (rule_id, r_type, ','.join(rule_obj['remediations'][r_type]))
415
416
417
def prodtypes_oval(rule_obj):
418
    """
419
    For a rule object, check if the prodtypes match between the YAML and the
420
    OVALs.
421
    """
422
423
    rule_id = rule_obj['id']
424
425
    rule_products = set(rule_obj.get('products', []))
426
    if not rule_products:
427
        return
428
429
    oval_products = set()
430
    for oval in rule_obj.get('ovals', []):
431
        oval_products.update(rule_obj['ovals'][oval].get('products', []))
432
    if not oval_products:
433
        return
434
435
    sym_diff = sorted(rule_products.symmetric_difference(oval_products))
436
    check = len(sym_diff) > 0
437
    if check:
438
        return "\trule_id:%s has a different prodtypes between YAML and OVALs: %s" % \
439
               (rule_id, ','.join(sym_diff))
440
441
442
def prodtypes_remediation(rule_obj, r_type):
443
    """
444
    For a rule object, check if the prodtypes match between the YAML and the
445
    remediations of type r_type.
446
    """
447
448
    rule_id = rule_obj['id']
449
450
    rule_products = set(rule_obj.get('products', []))
451
    if not rule_products:
452
        return
453
454
    remediation_products = set()
455
    for remediation in rule_obj.get('remediations', dict()).get(r_type, dict()):
456
        remediation_products.update(rule_obj['remediations'][r_type][remediation]['products'])
457
    if not remediation_products:
458
        return
459
460
    sym_diff = sorted(rule_products.symmetric_difference(remediation_products))
461
    check = len(sym_diff) > 0 and rule_products and remediation_products
462
    if check:
463
        return "\trule_id:%s has a different prodtypes between YAML and %s remediations: %s" % \
464
               (rule_id, r_type, ','.join(sym_diff))
465
466
467
def product_names_oval(rule_obj):
468
    """
469
    For a rule_obj, check the scope of the platforms versus the product name
470
    of the OVAL objects.
471
    """
472
473
    rule_id = rule_obj['id']
474
    for oval_name in rule_obj['ovals']:
475
        if oval_name == "shared.xml":
476
            continue
477
478
        oval_product, _ = os.path.splitext(oval_name)
479
        for product in rule_obj['ovals'][oval_name]['products']:
480
            if product != oval_product:
481
                return "\trule_id:%s has a different product and OVALs names: %s is not %s" % \
482
                       (rule_id, product, oval_product)
483
484
485
def product_names_remediation(rule_obj, r_type):
486
    """
487
    For a rule_obj, check the scope of the platforms versus the product name
488
    of the remediations of type r_type.
489
    """
490
491
    rule_id = rule_obj['id']
492
    for r_name in rule_obj['remediations'][r_type]:
493
        r_product, _ = os.path.splitext(r_name)
494
        if r_product == "shared":
495
            continue
496
497
        for product in rule_obj['remediations'][r_type][r_name]['products']:
498
            if product != r_product:
499
                return "\trule_id:%s has a different product and %s remediation names: %s is not %s" % \
500
                       (rule_id, r_type, product, r_product)
501