GitHub Access Token became invalid

It seems like the GitHub access token used for retrieving details about this repository from GitHub became invalid. This might prevent certain types of inspections from being run (in particular, everything related to pull requests).
Please ask an admin of your repository to re-new the access token on this website.

AWSConfigRule   A
last analyzed

Complexity

Total Complexity 20

Size/Duplication

Total Lines 189
Duplicated Lines 0 %

Importance

Changes 10
Bugs 0 Features 0
Metric Value
c 10
b 0
f 0
dl 0
loc 189
rs 10
wmc 20

10 Methods

Rating   Name   Duplication   Size   Complexity  
A find_violation_config_change() 0 10 1
B evaluate_compliance() 0 28 3
A handler() 0 23 1
A is_config_change_call() 0 3 1
A _aws_call() 0 14 1
A find_violation_scheduled() 0 9 1
C lambda_handler() 0 56 8
A __init__() 0 17 2
A is_scheduled_call() 0 3 1
A put_evaluations() 0 4 1
1
import json
2
3
import backoff
4
import boto3
5
import botocore.exceptions
6
7
from awslambdahelper.evaluation import AWSConfigEvaluation
8
9
MAX_BACKOFF_TRIES = 100
10
11
12
class AWSConfigRule(object):
13
    """
14
    Defines the business logic for processing either scheduled or config change AWS Config rules
15
    """
16
    #: Specifies an AWS Config Rule which is triggered by a resource configuration
17
    CALL_TYPE_CONFIGURATION_CHANGE = 'ConfigurationItemChangeNotification'
18
    #: Specifies an AWS Config Rule which is triggered on a scheduled basis
19
    CALL_TYPE_SCHEDULED = 'ScheduledNotification'
20
    #: List of resources which this rule can evaluate. Only application for ConfigurationChange rules.
21
    APPLICABLE_RESOURCES = []
22
23
    @classmethod
24
    def handler(cls, event, context):
25
        """
26
        Allow a single entrypoint without extra boilerplate code.
27
28
        >>> from awslambdahelper import AWSConfigRule,InsufficientDataEvaluation
29
        >>> class MyAwesomeRule(AWSConfigRule):
30
        ...     APPLICABLE_RESOURCES = ["AWS::EC2::Instance"]
31
        ...     def find_violation_config_change(self, rule_parameters, config):
32
        ...         return [InsufficientDataEvaluation()]
33
        >>>
34
        >>> # The entrypoint for lambda would be set as "file_name.MyAwesomeRule.handler"
35
36
        :param event: See `Event Attributes
37
            <http://docs.aws.amazon.com/config/latest/developerguide/evaluate-config_develop-rules_example-events.html#w2ab1c13c33c27c15c15>`_
38
            in the AWS Config Developer guide.
39
        :type event: dict
40
        :param context: See `Context Object <http://docs.aws.amazon.com/lambda/latest/dg/python-context-object.html#python-context-object-methods>`_
41
        :type context: dict
42
        :return:
43
        """
44
        rule = cls(cls.APPLICABLE_RESOURCES)
45
        rule.lambda_handler(event, context)
46
47
    def __init__(self, applicable_resources=None):
48
        """
49
        If this rule is for handling ConfigurationChange events, then the "Applicable Resources" attribute must be set.
50
        If this is for handling Scheduled events, then no item is required.
51
52
        :param applicable_resources: A list of AWS resources which this rule evaluates. Only applicable for
53
            Configuration Change rules, and not Scheduled rules. See `Evaluating Additional Resource Types
54
            <http://docs.aws.amazon.com/config/latest/developerguide/evaluate-config_develop-rules_nodejs.html#creating-custom-rules-for-additional-resource-types>`_,
55
            and
56
            `Supported AWS Resource Types <http://docs.aws.amazon.com/config/latest/developerguide/resource-config-reference.html#supported-resources>`_.
57
        :type applicable_resources: Union[List,Tuple]
58
        """
59
        if applicable_resources is None:
60
            self.applicable_resources = self.APPLICABLE_RESOURCES
61
        else:
62
            self.applicable_resources = applicable_resources
63
        self.call_type = None
64
65
    @property
66
    def is_config_change_call(self):
67
        return self.call_type == self.CALL_TYPE_CONFIGURATION_CHANGE
68
69
    @property
70
    def is_scheduled_call(self):
71
        return self.call_type == self.CALL_TYPE_SCHEDULED
72
73
    @staticmethod
74
    def put_evaluations(*args, **kwargs):
75
        return boto3.client("config").put_evaluations(
76
            *args, **kwargs
77
        )
78
79
    def lambda_handler(self, event, context):
80
        """
81
        .. deprecated:: 1.1.4
82
            Use :py:meth:`~awslambdahelper.AWSConfigRule.handler`
83
        """
84
        invoking_event = json.loads(event["invokingEvent"])
85
        if 'ruleParameters' in event:
86
            rule_parameters = json.loads(event["ruleParameters"])
87
        else:
88
            rule_parameters = {}
89
90
        self.call_type = invoking_event['messageType']
91
92
        result_token = "No token found."
93
        if "resultToken" in event:
94
            result_token = event["resultToken"]
95
96
        evaluations = []
97
98
        if self.is_config_change_call:
99
100
            configuration_item = invoking_event["configurationItem"]
101
            # If the resource has been deleted.
102
            if event['eventLeftScope']:
103
                evaluation_responses = [NotApplicableEvaluation("Resource has been deleted")]
104
            else:
105
                evaluation_responses = self.evaluate_compliance(
106
                    config=configuration_item,
107
                    rule_parameters=rule_parameters,
108
                    event=event
109
                )
110
111
            for evaluation_response in evaluation_responses:
112
                evaluation = evaluation_response.set(
113
                    ResourceType=configuration_item["resourceType"],
114
                    ResourceId=configuration_item["resourceId"],
115
                    OrderingTimestamp=configuration_item["configurationItemCaptureTime"]
116
                ).to_dict()
117
                evaluations.append(evaluation)
118
        else:
119
            evaluation_responses = self.evaluate_compliance(
120
                rule_parameters=rule_parameters,
121
                event=event
122
            )
123
124
            for evaluation_response in evaluation_responses:
125
                evaluations.append(evaluation_response.set(
126
                    OrderingTimestamp=invoking_event["notificationCreationTime"]
127
                ).to_dict())
128
129
        # There's a max number of evaluations we can apply to put_evaluations at once. It's 100.
130
        chunk_size = 100
131
        for evaluation_chunk in range(0, len(evaluations), chunk_size):
132
            self.put_evaluations(
133
                Evaluations=evaluations[evaluation_chunk:evaluation_chunk + chunk_size],
134
                ResultToken=result_token
135
            )
136
137
    def evaluate_compliance(self, rule_parameters, event, config=None):
138
        """
139
        A facade to delegate the event to either the :py:meth:`~awslambdahelper.AWSConfigRule.find_violation_config_change`, or
140
        :py:meth:`~awslambdahelper.AWSConfigRule.find_violation_scheduled`.
141
142
        :param rule_parameters: A list of key/pairs which are to be provided to the rule.
143
        :type: dict
144
        :param event:
145
        :param config:
146
        :return:
147
        """
148
        if self.is_config_change_call:
149
            if config["resourceType"] not in self.applicable_resources:
150
                return [NotApplicableEvaluation(
151
                    ResourceType=config["resourceType"],
152
                )]
153
154
            violations = self.find_violation_config_change(
155
                rule_parameters=rule_parameters,
156
                config=config
157
            )
158
        else:
159
            violations = self.find_violation_scheduled(
160
                rule_parameters=rule_parameters,
161
                accountid=event['accountId']
162
            )
163
164
        return violations
165
166
    @backoff.on_exception(backoff.expo,
167
                          botocore.exceptions.ClientError,
168
                          max_tries=MAX_BACKOFF_TRIES,
169
                          jitter=backoff.full_jitter)
170
    def _aws_call(self, callable):
171
        """
172
        Wrapper to ease testing.
173
        Decorators make mocking functions that just little bit harder. For this reason, pass
174
        a callable into this method which can handle our AWS calls.
175
176
        :param callable:
177
        :return:
178
        """
179
        return callable()
180
181
    def find_violation_config_change(self, rule_parameters, config):
182
        """
183
        Place holder function for configuration change rules. Needs to be overriden by super class.
184
185
        :raises: NotImplementedError
186
        :param rule_parameters:
187
        :param config:
188
        :return: None
189
        """
190
        raise NotImplementedError(type(self).__name__ + ":find_violation_config_change() is not implemented.")
191
192
    def find_violation_scheduled(self, rule_parameters, accountid):
193
        """
194
        Place holder function for configuration change rules. Needs to be overriden by super class.
195
196
        :param rule_parameters:
197
        :param accountid:
198
        :return: None
199
        """
200
        raise NotImplementedError(type(self).__name__ + ":find_violation_scheduled() is not implemented.")
201
202
203
class CompliantEvaluation(AWSConfigEvaluation):
204
    """
205
    A rule is compliant if all of the resources that the rule evaluates comply with it,
206
    """
207
208
    def __init__(self, Annotation="This resource is compliant with the rule.", ResourceType=None,
209
                 ResourceId=None,
210
                 OrderingTimestamp=None):
211
        """
212
        :param Annotation: An explanation to attach to the evaluation result. Shown in the AWS Config Console.
213
        :type Annotation: str
214
        :param ResourceType:  A list of AWS resources which this rule evaluates. See `Evaluating Additional Resource Types
215
            <http://docs.aws.amazon.com/config/latest/developerguide/evaluate-config_develop-rules_nodejs.html#creating-custom-rules-for-additional-resource-types>`_,
216
            and
217
            `Supported AWS Resource Types <http://docs.aws.amazon.com/config/latest/developerguide/resource-config-reference.html#supported-resources>`_.
218
        :type ResourceType: str
219
        :param ResourceId: The id (eg, id-000000) or the ARN (eg, arn:aws:iam:01234567890:eu-west-1:..) for the resource
220
        :param OrderingTimestamp: The time of the event in AWS Config that triggered the evaluation.
221
        """
222
        super(CompliantEvaluation, self).__init__(
223
            AWSConfigEvaluation.TYPE_COMPLIANT,
224
            Annotation,
225
            ResourceType=ResourceType,
226
            ResourceId=ResourceId,
227
            OrderingTimestamp=OrderingTimestamp,
228
        )
229
230
231
class NonCompliantEvaluation(AWSConfigEvaluation):
232
    """
233
    A rule is noncompliant if any of these resources do not comply.
234
    """
235
236
    def __init__(self, Annotation, ResourceType=None, ResourceId=None,
237
                 OrderingTimestamp=None):
238
        """
239
        :param Annotation: An explanation to attach to the evaluation result. Shown in the AWS Config Console.
240
        :type Annotation: str
241
        :param ResourceType:  A list of AWS resources which this rule evaluates. See `Evaluating Additional Resource Types
242
            <http://docs.aws.amazon.com/config/latest/developerguide/evaluate-config_develop-rules_nodejs.html#creating-custom-rules-for-additional-resource-types>`_,
243
            and
244
            `Supported AWS Resource Types <http://docs.aws.amazon.com/config/latest/developerguide/resource-config-reference.html#supported-resources>`_.
245
        :type ResourceType: str
246
        :param ResourceId: The id (eg, id-000000) or the ARN (eg, arn:aws:iam:01234567890:eu-west-1:..) for the resource
247
        :param OrderingTimestamp: The time of the event in AWS Config that triggered the evaluation.
248
        """
249
        super(NonCompliantEvaluation, self).__init__(
250
            AWSConfigEvaluation.TYPE_NON_COMPLIANT,
251
            Annotation,
252
            ResourceType=ResourceType,
253
            ResourceId=ResourceId,
254
            OrderingTimestamp=OrderingTimestamp
255
        )
256
257
258
class NotApplicableEvaluation(AWSConfigEvaluation):
259
    """
260
    This resource is not applicable for this rule.
261
    """
262
263
    def __init__(self, ResourceType, ResourceId=None,
264
                 OrderingTimestamp=None):
265
        """
266
        :param ResourceType:  A list of AWS resources which this rule evaluates. See `Evaluating Additional Resource Types
267
            <http://docs.aws.amazon.com/config/latest/developerguide/evaluate-config_develop-rules_nodejs.html#creating-custom-rules-for-additional-resource-types>`_,
268
            and
269
            `Supported AWS Resource Types <http://docs.aws.amazon.com/config/latest/developerguide/resource-config-reference.html#supported-resources>`_.
270
        :type ResourceType: str
271
        :param ResourceId: The id (eg, id-000000) or the ARN (eg, arn:aws:iam:01234567890:eu-west-1:..) for the resource
272
        :param OrderingTimestamp: The time of the event in AWS Config that triggered the evaluation.
273
        """
274
        super(NotApplicableEvaluation, self).__init__(
275
            AWSConfigEvaluation.TYPE_NOT_APPLICABLE,
276
            "The rule doesn't apply to resources of type " + ResourceType + ".",
277
            ResourceType=ResourceType,
278
            ResourceId=ResourceId,
279
            OrderingTimestamp=OrderingTimestamp
280
        )
281
282
283
class InsufficientDataEvaluation(AWSConfigEvaluation):
284
    """
285
    AWS Config returns the INSUFFICIENT_DATA value when no evaluation results are available for the AWS resource or
286
    Config rule.
287
    """
288
289
    def __init__(self, Annotation, ResourceType=None, ResourceId=None,
290
                 OrderingTimestamp=None):
291
        """
292
        :param Annotation: An explanation to attach to the evaluation result. Shown in the AWS Config Console.
293
        :type Annotation: str
294
        :param ResourceType:  A list of AWS resources which this rule evaluates. See `Evaluating Additional Resource Types
295
            <http://docs.aws.amazon.com/config/latest/developerguide/evaluate-config_develop-rules_nodejs.html#creating-custom-rules-for-additional-resource-types>`_,
296
            and
297
            `Supported AWS Resource Types <http://docs.aws.amazon.com/config/latest/developerguide/resource-config-reference.html#supported-resources>`_.
298
        :type ResourceType: str
299
        :param ResourceId: The id (eg, id-000000) or the ARN (eg, arn:aws:iam:01234567890:eu-west-1:..) for the resource
300
        :type ResourceId: str
301
        :param OrderingTimestamp: The time of the event in AWS Config that triggered the evaluation.
302
        """
303
        super(InsufficientDataEvaluation, self).__init__(
304
            AWSConfigEvaluation.TYPE_INSUFFICIENT_DATA,
305
            Annotation,
306
            ResourceType=ResourceType,
307
            ResourceId=ResourceId,
308
            OrderingTimestamp=OrderingTimestamp
309
        )
310