Completed
Push — master ( da306e...c5fe8d )
by Jaspar
17s queued 15s
created

_check_event()   F

Complexity

Conditions 15

Size

Total Lines 84
Code Lines 60

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 15
eloc 60
nop 3
dl 0
loc 84
rs 2.9998
c 0
b 0
f 0

How to fix   Long Method    Complexity   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

Complexity

Complex classes like gvm.protocols.gmpv208.entities.alerts._check_event() often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
# -*- coding: utf-8 -*-
2
# Copyright (C) 2021 Greenbone Networks GmbH
3
#
4
# SPDX-License-Identifier: GPL-3.0-or-later
5
#
6
# This program is free software: you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation, either version 3 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful,
12
# but WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14
# GNU General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
18
19
# pylint:  disable=redefined-builtin
20
# MAYBE we should change filter to filter_string (everywhere)
21
22
from enum import Enum
23
from typing import Any, Optional, Union
24
25
from gvm.protocols.gmpv208.entities.report_formats import (
26
    ReportFormatType,
27
)  # if I use latest, I get circular import :/
28
from gvm.errors import RequiredArgument, InvalidArgument, InvalidArgumentType
29
from gvm.utils import add_filter, to_bool
30
from gvm.xml import XmlCommand
31
32
33
class AlertEvent(Enum):
34
    """Enum for alert event types"""
35
36
    TASK_RUN_STATUS_CHANGED = 'Task run status changed'
37
    UPDATED_SECINFO_ARRIVED = 'Updated SecInfo arrived'
38
    NEW_SECINFO_ARRIVED = 'New SecInfo arrived'
39
    TICKET_RECEIVED = 'Ticket received'
40
    ASSIGNED_TICKET_CHANGED = 'Assigned ticket changed'
41
    OWNED_TICKET_CHANGED = 'Owned ticket changed'
42
43
44
def get_alert_event_from_string(
45
    alert_event: Optional[str],
46
) -> Optional[AlertEvent]:
47
    """Convert an alert event string into a AlertEvent instance"""
48
    if not alert_event:
49
        return None
50
51
    try:
52
        return AlertEvent[alert_event.replace(' ', '_').upper()]
53
    except KeyError:
54
        raise InvalidArgument(
55
            argument='alert_event',
56
            function=get_alert_event_from_string.__name__,
57
        ) from KeyError
58
59
60
class AlertCondition(Enum):
61
    """Enum for alert condition types"""
62
63
    ALWAYS = 'Always'
64
    ERROR = 'Error'
65
    SEVERITY_AT_LEAST = 'Severity at least'
66
    SEVERITY_CHANGED = 'Severity changed'
67
    FILTER_COUNT_CHANGED = 'Filter count changed'
68
    FILTER_COUNT_AT_LEAST = 'Filter count at least'
69
70
71
def get_alert_condition_from_string(
72
    alert_condition: Optional[str],
73
) -> Optional[AlertCondition]:
74
    """Convert an alert condition string into a AlertCondition instance"""
75
    if not alert_condition:
76
        return None
77
78
    try:
79
        return AlertCondition[alert_condition.replace(' ', '_').upper()]
80
    except KeyError:
81
        raise InvalidArgument(
82
            argument='alert_condition',
83
            function=get_alert_condition_from_string.__name__,
84
        ) from KeyError
85
86
87
class AlertMethod(Enum):
88
    """Enum for alert method type"""
89
90
    SCP = "SCP"
91
    SEND = "Send"
92
    SMB = "SMB"
93
    SNMP = "SNMP"
94
    SYSLOG = "Syslog"
95
    EMAIL = "Email"
96
    START_TASK = "Start Task"
97
    HTTP_GET = "HTTP Get"
98
    SOURCEFIRE_CONNECTOR = "Sourcefire Connector"
99
    VERINICE_CONNECTOR = "verinice Connector"
100
    TIPPINGPOINT_SMS = "TippingPoint SMS"
101
    ALEMBA_VFIRE = "Alemba vFire"
102
103
104
def get_alert_method_from_string(
105
    alert_method: Optional[str],
106
) -> Optional[AlertMethod]:
107
    """Convert an alert method string into a AlertCondition instance"""
108
    if not alert_method:
109
        return None
110
111
    try:
112
        return AlertMethod[alert_method.replace(' ', '_').upper()]
113
    except KeyError:
114
        raise InvalidArgument(
115
            argument='alert_method',
116
            function=get_alert_method_from_string.__name__,
117
        ) from KeyError
118
119
120
def _check_event(
121
    event: AlertEvent, condition: AlertCondition, method: AlertMethod
122
):
123
    if event == AlertEvent.TASK_RUN_STATUS_CHANGED:
124
        if not condition:
125
            raise RequiredArgument(
126
                "condition is required for event {}".format(event.name)
127
            )
128
129
        if not method:
130
            raise RequiredArgument(
131
                "method is required for event {}".format(event.name)
132
            )
133
134
        if condition not in (
135
            AlertCondition.ALWAYS,
136
            AlertCondition.FILTER_COUNT_CHANGED,
137
            AlertCondition.FILTER_COUNT_AT_LEAST,
138
            AlertCondition.SEVERITY_AT_LEAST,
139
            AlertCondition.SEVERITY_CHANGED,
140
        ):
141
            raise InvalidArgument(
142
                "Invalid condition {} for event {}".format(
143
                    condition.name, event.name
144
                )
145
            )
146
    elif event in (
147
        AlertEvent.NEW_SECINFO_ARRIVED,
148
        AlertEvent.UPDATED_SECINFO_ARRIVED,
149
    ):
150
        if not condition:
151
            raise RequiredArgument(
152
                "condition is required for event {}".format(event.name)
153
            )
154
155
        if not method:
156
            raise RequiredArgument(
157
                "method is required for event {}".format(event.name)
158
            )
159
160
        if condition != AlertCondition.ALWAYS:
161
            raise InvalidArgument(
162
                "Invalid condition {} for event {}".format(
163
                    condition.name, event.name
164
                )
165
            )
166
        if method not in (
167
            AlertMethod.SCP,
168
            AlertMethod.SEND,
169
            AlertMethod.SMB,
170
            AlertMethod.SNMP,
171
            AlertMethod.SYSLOG,
172
            AlertMethod.EMAIL,
173
        ):
174
            raise InvalidArgument(
175
                "Invalid method {} for event {}".format(method.name, event.name)
176
            )
177
    elif event in (
178
        AlertEvent.TICKET_RECEIVED,
179
        AlertEvent.OWNED_TICKET_CHANGED,
180
        AlertEvent.ASSIGNED_TICKET_CHANGED,
181
    ):
182
        if not condition:
183
            raise RequiredArgument(
184
                "condition is required for event {}".format(event.name)
185
            )
186
187
        if not method:
188
            raise RequiredArgument(
189
                "method is required for event {}".format(event.name)
190
            )
191
        if condition != AlertCondition.ALWAYS:
192
            raise InvalidArgument(
193
                "Invalid condition {} for event {}".format(
194
                    condition.name, event.name
195
                )
196
            )
197
        if method not in (
198
            AlertMethod.EMAIL,
199
            AlertMethod.START_TASK,
200
            AlertMethod.SYSLOG,
201
        ):
202
            raise InvalidArgument(
203
                "Invalid method {} for event {}".format(method.name, event.name)
204
            )
205
206
207
class AlertsMixin:
208
    def clone_alert(self, alert_id: str) -> Any:
209
        """Clone an existing alert
210
211
        Arguments:
212
            alert_id: UUID of an existing alert to clone from
213
214
        Returns:
215
            The response. See :py:meth:`send_command` for details.
216
        """
217
        if not alert_id:
218
            raise RequiredArgument(
219
                function=self.clone_alert.__name__, argument='alert_id'
220
            )
221
222
        cmd = XmlCommand("create_alert")
223
        cmd.add_element("copy", alert_id)
224
        return self._send_xml_command(cmd)
225
226
    def create_alert(
227
        self,
228
        name: str,
229
        condition: AlertCondition,
230
        event: AlertEvent,
231
        method: AlertMethod,
232
        *,
233
        method_data: Optional[dict] = None,
234
        event_data: Optional[dict] = None,
235
        condition_data: Optional[dict] = None,
236
        filter_id: Optional[int] = None,
237
        comment: Optional[str] = None,
238
    ) -> Any:
239
        """Create a new alert
240
241
        Arguments:
242
            name: Name of the new Alert
243
            condition: The condition that must be satisfied for the alert
244
                to occur; if the event is either 'Updated SecInfo arrived' or
245
                'New SecInfo arrived', condition must be 'Always'. Otherwise,
246
                condition can also be on of 'Severity at least', 'Filter count
247
                changed' or 'Filter count at least'.
248
            event: The event that must happen for the alert to occur, one
249
                of 'Task run status changed', 'Updated SecInfo arrived' or 'New
250
                SecInfo arrived'
251
            method: The method by which the user is alerted, one of 'SCP',
252
                'Send', 'SMB', 'SNMP', 'Syslog' or 'Email'; if the event is
253
                neither 'Updated SecInfo arrived' nor 'New SecInfo arrived',
254
                method can also be one of 'Start Task', 'HTTP Get', 'Sourcefire
255
                Connector' or 'verinice Connector'.
256
            condition_data: Data that defines the condition
257
            event_data: Data that defines the event
258
            method_data: Data that defines the method
259
            filter_id: Filter to apply when executing alert
260
            comment: Comment for the alert
261
262
        Returns:
263
            The response. See :py:meth:`send_command` for details.
264
        """
265
        if not name:
266
            raise RequiredArgument(
267
                function=self.create_alert.__name__, argument='name'
268
            )
269
270
        if not condition:
271
            raise RequiredArgument(
272
                function=self.create_alert.__name__, argument='condition'
273
            )
274
275
        if not event:
276
            raise RequiredArgument(
277
                function=self.create_alert.__name__, argument='event'
278
            )
279
280
        if not method:
281
            raise RequiredArgument(
282
                function=self.create_alert.__name__, argument='method'
283
            )
284
285
        if not isinstance(condition, AlertCondition):
286
            raise InvalidArgumentType(
287
                function=self.create_alert.__name__,
288
                argument='condition',
289
                arg_type=AlertCondition.__name__,
290
            )
291
292
        if not isinstance(event, AlertEvent):
293
            raise InvalidArgumentType(
294
                function=self.create_alert.__name__,
295
                argument='even',
296
                arg_type=AlertEvent.__name__,
297
            )
298
299
        if not isinstance(method, AlertMethod):
300
            raise InvalidArgumentType(
301
                function=self.create_alert.__name__,
302
                argument='method',
303
                arg_type=AlertMethod.__name__,
304
            )
305
306
        _check_event(event, condition, method)
307
308
        cmd = XmlCommand("create_alert")
309
        cmd.add_element("name", name)
310
311
        conditions = cmd.add_element("condition", condition.value)
312
313
        if condition_data is not None:
314
            for key, value in condition_data.items():
315
                _data = conditions.add_element("data", value)
316
                _data.add_element("name", key)
317
318
        events = cmd.add_element("event", event.value)
319
320
        if event_data is not None:
321
            for key, value in event_data.items():
322
                _data = events.add_element("data", value)
323
                _data.add_element("name", key)
324
325
        methods = cmd.add_element("method", method.value)
326
327
        if method_data is not None:
328
            for key, value in method_data.items():
329
                _data = methods.add_element("data", value)
330
                _data.add_element("name", key)
331
332
        if filter_id:
333
            cmd.add_element("filter", attrs={"id": filter_id})
334
335
        if comment:
336
            cmd.add_element("comment", comment)
337
338
        return self._send_xml_command(cmd)
339
340
    def delete_alert(
341
        self, alert_id: str, *, ultimate: Optional[bool] = False
342
    ) -> Any:
343
        """Deletes an existing alert
344
345
        Arguments:
346
            alert_id: UUID of the alert to be deleted.
347
            ultimate: Whether to remove entirely, or to the trashcan.
348
        """
349
        if not alert_id:
350
            raise RequiredArgument(
351
                function=self.delete_alert.__name__, argument='alert_id'
352
            )
353
354
        cmd = XmlCommand("delete_alert")
355
        cmd.set_attribute("alert_id", alert_id)
356
        cmd.set_attribute("ultimate", to_bool(ultimate))
357
358
        return self._send_xml_command(cmd)
359
360
    def get_alerts(
361
        self,
362
        *,
363
        filter: Optional[str] = None,
364
        filter_id: Optional[str] = None,
365
        trash: Optional[bool] = None,
366
        tasks: Optional[bool] = None,
367
    ) -> Any:
368
        """Request a list of alerts
369
370
        Arguments:
371
            filter: Filter term to use for the query
372
            filter_id: UUID of an existing filter to use for the query
373
            trash: True to request the alerts in the trashcan
374
            tasks: Whether to include the tasks using the alerts
375
        Returns:
376
            The response. See :py:meth:`send_command` for details.
377
        """
378
        cmd = XmlCommand("get_alerts")
379
380
        add_filter(cmd, filter, filter_id)
381
382
        if trash is not None:
383
            cmd.set_attribute("trash", to_bool(trash))
384
385
        if tasks is not None:
386
            cmd.set_attribute("tasks", to_bool(tasks))
387
388
        return self._send_xml_command(cmd)
389
390
    def get_alert(self, alert_id: str, *, tasks: Optional[bool] = None) -> Any:
391
        """Request a single alert
392
393
        Arguments:
394
            alert_id: UUID of an existing alert
395
396
        Returns:
397
            The response. See :py:meth:`send_command` for details.
398
        """
399
        cmd = XmlCommand("get_alerts")
400
401
        if not alert_id:
402
            raise RequiredArgument(
403
                function=self.get_alert.__name__, argument='alert_id'
404
            )
405
406
        cmd.set_attribute("alert_id", alert_id)
407
408
        if tasks is not None:
409
            cmd.set_attribute("tasks", to_bool(tasks))
410
411
        return self._send_xml_command(cmd)
412
413
    def modify_alert(
414
        self,
415
        alert_id: str,
416
        *,
417
        name: Optional[str] = None,
418
        comment: Optional[str] = None,
419
        filter_id: Optional[str] = None,
420
        event: Optional[AlertEvent] = None,
421
        event_data: Optional[dict] = None,
422
        condition: Optional[AlertCondition] = None,
423
        condition_data: Optional[dict] = None,
424
        method: Optional[AlertMethod] = None,
425
        method_data: Optional[dict] = None,
426
    ) -> Any:
427
        """Modifies an existing alert.
428
429
        Arguments:
430
            alert_id: UUID of the alert to be modified.
431
            name: Name of the Alert.
432
            condition: The condition that must be satisfied for the alert to
433
                occur. If the event is either 'Updated SecInfo
434
                arrived' or 'New SecInfo arrived', condition must be 'Always'.
435
                Otherwise, condition can also be on of 'Severity at least',
436
                'Filter count changed' or 'Filter count at least'.
437
            condition_data: Data that defines the condition
438
            event: The event that must happen for the alert to occur, one of
439
                'Task run status changed', 'Updated SecInfo arrived' or
440
                'New SecInfo arrived'
441
            event_data: Data that defines the event
442
            method: The method by which the user is alerted, one of 'SCP',
443
                'Send', 'SMB', 'SNMP', 'Syslog' or 'Email';
444
                if the event is neither 'Updated SecInfo arrived' nor
445
                'New SecInfo arrived', method can also be one of 'Start Task',
446
                'HTTP Get', 'Sourcefire Connector' or 'verinice Connector'.
447
            method_data: Data that defines the method
448
            filter_id: Filter to apply when executing alert
449
            comment: Comment for the alert
450
451
        Returns:
452
            The response. See :py:meth:`send_command` for details.
453
        """
454
455
        if not alert_id:
456
            raise RequiredArgument(
457
                function=self.modify_alert.__name__, argument='alert_id'
458
            )
459
460
        cmd = XmlCommand("modify_alert")
461
        cmd.set_attribute("alert_id", str(alert_id))
462
463
        if name:
464
            cmd.add_element("name", name)
465
466
        if comment:
467
            cmd.add_element("comment", comment)
468
469
        if filter_id:
470
            cmd.add_element("filter", attrs={"id": filter_id})
471
472
        if condition:
473
            if not isinstance(condition, AlertCondition):
474
                raise InvalidArgumentType(
475
                    function=self.modify_alert.__name__,
476
                    argument='condition',
477
                    arg_type=AlertCondition.__name__,
478
                )
479
480
            conditions = cmd.add_element("condition", condition.value)
481
482
            if condition_data is not None:
483
                for key, value in condition_data.items():
484
                    _data = conditions.add_element("data", value)
485
                    _data.add_element("name", key)
486
487
        if method:
488
            if not isinstance(method, AlertMethod):
489
                raise InvalidArgumentType(
490
                    function=self.modify_alert.__name__,
491
                    argument='method',
492
                    arg_type=AlertMethod.__name__,
493
                )
494
495
            methods = cmd.add_element("method", method.value)
496
497
            if method_data is not None:
498
                for key, value in method_data.items():
499
                    _data = methods.add_element("data", value)
500
                    _data.add_element("name", key)
501
502
        if event:
503
            if not isinstance(event, AlertEvent):
504
                raise InvalidArgumentType(
505
                    function=self.modify_alert.__name__,
506
                    argument='event',
507
                    arg_type=AlertEvent.__name__,
508
                )
509
510
            _check_event(event, condition, method)
511
512
            events = cmd.add_element("event", event.value)
513
514
            if event_data is not None:
515
                for key, value in event_data.items():
516
                    _data = events.add_element("data", value)
517
                    _data.add_element("name", key)
518
519
        return self._send_xml_command(cmd)
520
521
    def test_alert(self, alert_id: str) -> Any:
522
        """Run an alert
523
524
        Invoke a test run of an alert
525
526
        Arguments:
527
            alert_id: UUID of the alert to be tested
528
529
        Returns:
530
            The response. See :py:meth:`send_command` for details.
531
        """
532
        if not alert_id:
533
            raise InvalidArgument("test_alert requires an alert_id argument")
534
535
        cmd = XmlCommand("test_alert")
536
        cmd.set_attribute("alert_id", alert_id)
537
538
        return self._send_xml_command(cmd)
539
540
    def trigger_alert(
541
        self,
542
        alert_id: str,
543
        report_id: str,
544
        *,
545
        filter: Optional[str] = None,
546
        filter_id: Optional[str] = None,
547
        report_format_id: Optional[Union[str, ReportFormatType]] = None,
548
        delta_report_id: Optional[str] = None,
549
    ) -> Any:
550
        """Run an alert by ignoring its event and conditions
551
552
        The alert is triggered to run immediately with the provided filtered
553
        report by ignoring the even and condition settings.
554
555
        Arguments:
556
            alert_id: UUID of the alert to be run
557
            report_id: UUID of the report to be provided to the alert
558
            filter: Filter term to use to filter results in the report
559
            filter_id: UUID of filter to use to filter results in the report
560
            report_format_id: UUID of report format to use
561
                              or ReportFormatType (enum)
562
            delta_report_id: UUID of an existing report to compare report to.
563
564
        Returns:
565
            The response. See :py:meth:`send_command` for details.
566
        """
567
        if not alert_id:
568
            raise RequiredArgument(
569
                function=self.trigger_alert.__name__,
570
                argument='alert_id argument',
571
            )
572
573
        if not report_id:
574
            raise RequiredArgument(
575
                function=self.trigger_alert.__name__,
576
                argument='report_id argument',
577
            )
578
579
        cmd = XmlCommand("get_reports")
580
        cmd.set_attribute("report_id", report_id)
581
        cmd.set_attribute("alert_id", alert_id)
582
583
        if filter:
584
            cmd.set_attribute("filter", filter)
585
586
        if filter_id:
587
            cmd.set_attribute("filt_id", filter_id)
588
589
        if report_format_id:
590
            if isinstance(report_format_id, ReportFormatType):
591
                report_format_id = report_format_id.value
592
593
            cmd.set_attribute("format_id", report_format_id)
594
595
        if delta_report_id:
596
            cmd.set_attribute("delta_report_id", delta_report_id)
597
598
        return self._send_xml_command(cmd)
599