AlertsMixin.modify_alert()   F
last analyzed

Complexity

Conditions 17

Size

Total Lines 107
Code Lines 59

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 17
eloc 59
nop 12
dl 0
loc 107
rs 1.8
c 0
b 0
f 0

How to fix   Long Method    Complexity    Many Parameters   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

Complexity

Complex classes like gvm.protocols.gmpv208.entities.alerts.AlertsMixin.modify_alert() often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

Many Parameters

Methods with many parameters are not only hard to understand, but their parameters also often become inconsistent when you need more, or different data.

There are several approaches to avoid long parameter lists:

1
# -*- coding: utf-8 -*-
2
# Copyright (C) 2021 Greenbone Networks GmbH
3
#
4
# SPDX-License-Identifier: GPL-3.0-or-later
5
#
6
# This program is free software: you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation, either version 3 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful,
12
# but WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14
# GNU General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
18
19
from enum import Enum
20
from typing import Any, Optional, Union
21
22
from gvm.protocols.gmpv208.entities.report_formats import (
23
    ReportFormatType,
24
)  # if I use latest, I get circular import :/
25
from gvm.errors import RequiredArgument, InvalidArgument, InvalidArgumentType
26
from gvm.utils import add_filter, to_bool
27
from gvm.xml import XmlCommand
28
29
30
class AlertEvent(Enum):
31
    """Enum for alert event types"""
32
33
    TASK_RUN_STATUS_CHANGED = 'Task run status changed'
34
    UPDATED_SECINFO_ARRIVED = 'Updated SecInfo arrived'
35
    NEW_SECINFO_ARRIVED = 'New SecInfo arrived'
36
    TICKET_RECEIVED = 'Ticket received'
37
    ASSIGNED_TICKET_CHANGED = 'Assigned ticket changed'
38
    OWNED_TICKET_CHANGED = 'Owned ticket changed'
39
40
41
def get_alert_event_from_string(
42
    alert_event: Optional[str],
43
) -> Optional[AlertEvent]:
44
    """Convert an alert event string into a AlertEvent instance"""
45
    if not alert_event:
46
        return None
47
48
    try:
49
        return AlertEvent[alert_event.replace(' ', '_').upper()]
50
    except KeyError:
51
        raise InvalidArgument(
52
            argument='alert_event',
53
            function=get_alert_event_from_string.__name__,
54
        ) from KeyError
55
56
57
class AlertCondition(Enum):
58
    """Enum for alert condition types"""
59
60
    ALWAYS = 'Always'
61
    ERROR = 'Error'
62
    SEVERITY_AT_LEAST = 'Severity at least'
63
    SEVERITY_CHANGED = 'Severity changed'
64
    FILTER_COUNT_CHANGED = 'Filter count changed'
65
    FILTER_COUNT_AT_LEAST = 'Filter count at least'
66
67
68
def get_alert_condition_from_string(
69
    alert_condition: Optional[str],
70
) -> Optional[AlertCondition]:
71
    """Convert an alert condition string into a AlertCondition instance"""
72
    if not alert_condition:
73
        return None
74
75
    try:
76
        return AlertCondition[alert_condition.replace(' ', '_').upper()]
77
    except KeyError:
78
        raise InvalidArgument(
79
            argument='alert_condition',
80
            function=get_alert_condition_from_string.__name__,
81
        ) from KeyError
82
83
84
class AlertMethod(Enum):
85
    """Enum for alert method type"""
86
87
    SCP = "SCP"
88
    SEND = "Send"
89
    SMB = "SMB"
90
    SNMP = "SNMP"
91
    SYSLOG = "Syslog"
92
    EMAIL = "Email"
93
    START_TASK = "Start Task"
94
    HTTP_GET = "HTTP Get"
95
    SOURCEFIRE_CONNECTOR = "Sourcefire Connector"
96
    VERINICE_CONNECTOR = "verinice Connector"
97
    TIPPINGPOINT_SMS = "TippingPoint SMS"
98
    ALEMBA_VFIRE = "Alemba vFire"
99
100
101
def get_alert_method_from_string(
102
    alert_method: Optional[str],
103
) -> Optional[AlertMethod]:
104
    """Convert an alert method string into a AlertCondition instance"""
105
    if not alert_method:
106
        return None
107
108
    try:
109
        return AlertMethod[alert_method.replace(' ', '_').upper()]
110
    except KeyError:
111
        raise InvalidArgument(
112
            argument='alert_method',
113
            function=get_alert_method_from_string.__name__,
114
        ) from KeyError
115
116
117
def _check_event(
118
    event: AlertEvent, condition: AlertCondition, method: AlertMethod
119
):
120
    if event == AlertEvent.TASK_RUN_STATUS_CHANGED:
121
        if not condition:
122
            raise RequiredArgument(
123
                "condition is required for event {}".format(event.name)
124
            )
125
126
        if not method:
127
            raise RequiredArgument(
128
                "method is required for event {}".format(event.name)
129
            )
130
131
        if condition not in (
132
            AlertCondition.ALWAYS,
133
            AlertCondition.FILTER_COUNT_CHANGED,
134
            AlertCondition.FILTER_COUNT_AT_LEAST,
135
            AlertCondition.SEVERITY_AT_LEAST,
136
            AlertCondition.SEVERITY_CHANGED,
137
        ):
138
            raise InvalidArgument(
139
                "Invalid condition {} for event {}".format(
140
                    condition.name, event.name
141
                )
142
            )
143
    elif event in (
144
        AlertEvent.NEW_SECINFO_ARRIVED,
145
        AlertEvent.UPDATED_SECINFO_ARRIVED,
146
    ):
147
        if not condition:
148
            raise RequiredArgument(
149
                "condition is required for event {}".format(event.name)
150
            )
151
152
        if not method:
153
            raise RequiredArgument(
154
                "method is required for event {}".format(event.name)
155
            )
156
157
        if condition != AlertCondition.ALWAYS:
158
            raise InvalidArgument(
159
                "Invalid condition {} for event {}".format(
160
                    condition.name, event.name
161
                )
162
            )
163
        if method not in (
164
            AlertMethod.SCP,
165
            AlertMethod.SEND,
166
            AlertMethod.SMB,
167
            AlertMethod.SNMP,
168
            AlertMethod.SYSLOG,
169
            AlertMethod.EMAIL,
170
        ):
171
            raise InvalidArgument(
172
                "Invalid method {} for event {}".format(method.name, event.name)
173
            )
174
    elif event in (
175
        AlertEvent.TICKET_RECEIVED,
176
        AlertEvent.OWNED_TICKET_CHANGED,
177
        AlertEvent.ASSIGNED_TICKET_CHANGED,
178
    ):
179
        if not condition:
180
            raise RequiredArgument(
181
                "condition is required for event {}".format(event.name)
182
            )
183
184
        if not method:
185
            raise RequiredArgument(
186
                "method is required for event {}".format(event.name)
187
            )
188
        if condition != AlertCondition.ALWAYS:
189
            raise InvalidArgument(
190
                "Invalid condition {} for event {}".format(
191
                    condition.name, event.name
192
                )
193
            )
194
        if method not in (
195
            AlertMethod.EMAIL,
196
            AlertMethod.START_TASK,
197
            AlertMethod.SYSLOG,
198
        ):
199
            raise InvalidArgument(
200
                "Invalid method {} for event {}".format(method.name, event.name)
201
            )
202
203
204
class AlertsMixin:
205
    def clone_alert(self, alert_id: str) -> Any:
206
        """Clone an existing alert
207
208
        Arguments:
209
            alert_id: UUID of an existing alert to clone from
210
211
        Returns:
212
            The response. See :py:meth:`send_command` for details.
213
        """
214
        if not alert_id:
215
            raise RequiredArgument(
216
                function=self.clone_alert.__name__, argument='alert_id'
217
            )
218
219
        cmd = XmlCommand("create_alert")
220
        cmd.add_element("copy", alert_id)
221
        return self._send_xml_command(cmd)
222
223
    def create_alert(
224
        self,
225
        name: str,
226
        condition: AlertCondition,
227
        event: AlertEvent,
228
        method: AlertMethod,
229
        *,
230
        method_data: Optional[dict] = None,
231
        event_data: Optional[dict] = None,
232
        condition_data: Optional[dict] = None,
233
        filter_id: Optional[int] = None,
234
        comment: Optional[str] = None,
235
    ) -> Any:
236
        """Create a new alert
237
238
        Arguments:
239
            name: Name of the new Alert
240
            condition: The condition that must be satisfied for the alert
241
                to occur; if the event is either 'Updated SecInfo arrived' or
242
                'New SecInfo arrived', condition must be 'Always'. Otherwise,
243
                condition can also be on of 'Severity at least', 'Filter count
244
                changed' or 'Filter count at least'.
245
            event: The event that must happen for the alert to occur, one
246
                of 'Task run status changed', 'Updated SecInfo arrived' or 'New
247
                SecInfo arrived'
248
            method: The method by which the user is alerted, one of 'SCP',
249
                'Send', 'SMB', 'SNMP', 'Syslog' or 'Email'; if the event is
250
                neither 'Updated SecInfo arrived' nor 'New SecInfo arrived',
251
                method can also be one of 'Start Task', 'HTTP Get', 'Sourcefire
252
                Connector' or 'verinice Connector'.
253
            condition_data: Data that defines the condition
254
            event_data: Data that defines the event
255
            method_data: Data that defines the method
256
            filter_id: Filter to apply when executing alert
257
            comment: Comment for the alert
258
259
        Returns:
260
            The response. See :py:meth:`send_command` for details.
261
        """
262
        if not name:
263
            raise RequiredArgument(
264
                function=self.create_alert.__name__, argument='name'
265
            )
266
267
        if not condition:
268
            raise RequiredArgument(
269
                function=self.create_alert.__name__, argument='condition'
270
            )
271
272
        if not event:
273
            raise RequiredArgument(
274
                function=self.create_alert.__name__, argument='event'
275
            )
276
277
        if not method:
278
            raise RequiredArgument(
279
                function=self.create_alert.__name__, argument='method'
280
            )
281
282
        if not isinstance(condition, AlertCondition):
283
            raise InvalidArgumentType(
284
                function=self.create_alert.__name__,
285
                argument='condition',
286
                arg_type=AlertCondition.__name__,
287
            )
288
289
        if not isinstance(event, AlertEvent):
290
            raise InvalidArgumentType(
291
                function=self.create_alert.__name__,
292
                argument='even',
293
                arg_type=AlertEvent.__name__,
294
            )
295
296
        if not isinstance(method, AlertMethod):
297
            raise InvalidArgumentType(
298
                function=self.create_alert.__name__,
299
                argument='method',
300
                arg_type=AlertMethod.__name__,
301
            )
302
303
        _check_event(event, condition, method)
304
305
        cmd = XmlCommand("create_alert")
306
        cmd.add_element("name", name)
307
308
        conditions = cmd.add_element("condition", condition.value)
309
310
        if condition_data is not None:
311
            for key, value in condition_data.items():
312
                _data = conditions.add_element("data", value)
313
                _data.add_element("name", key)
314
315
        events = cmd.add_element("event", event.value)
316
317
        if event_data is not None:
318
            for key, value in event_data.items():
319
                _data = events.add_element("data", value)
320
                _data.add_element("name", key)
321
322
        methods = cmd.add_element("method", method.value)
323
324
        if method_data is not None:
325
            for key, value in method_data.items():
326
                _data = methods.add_element("data", value)
327
                _data.add_element("name", key)
328
329
        if filter_id:
330
            cmd.add_element("filter", attrs={"id": filter_id})
331
332
        if comment:
333
            cmd.add_element("comment", comment)
334
335
        return self._send_xml_command(cmd)
336
337
    def delete_alert(
338
        self, alert_id: str, *, ultimate: Optional[bool] = False
339
    ) -> Any:
340
        """Deletes an existing alert
341
342
        Arguments:
343
            alert_id: UUID of the alert to be deleted.
344
            ultimate: Whether to remove entirely, or to the trashcan.
345
        """
346
        if not alert_id:
347
            raise RequiredArgument(
348
                function=self.delete_alert.__name__, argument='alert_id'
349
            )
350
351
        cmd = XmlCommand("delete_alert")
352
        cmd.set_attribute("alert_id", alert_id)
353
        cmd.set_attribute("ultimate", to_bool(ultimate))
354
355
        return self._send_xml_command(cmd)
356
357 View Code Duplication
    def get_alerts(
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
358
        self,
359
        *,
360
        filter_string: Optional[str] = None,
361
        filter_id: Optional[str] = None,
362
        trash: Optional[bool] = None,
363
        tasks: Optional[bool] = None,
364
    ) -> Any:
365
        """Request a list of alerts
366
367
        Arguments:
368
            filter: Filter term to use for the query
369
            filter_id: UUID of an existing filter to use for the query
370
            trash: True to request the alerts in the trashcan
371
            tasks: Whether to include the tasks using the alerts
372
        Returns:
373
            The response. See :py:meth:`send_command` for details.
374
        """
375
        cmd = XmlCommand("get_alerts")
376
377
        add_filter(cmd, filter_string, filter_id)
378
379
        if trash is not None:
380
            cmd.set_attribute("trash", to_bool(trash))
381
382
        if tasks is not None:
383
            cmd.set_attribute("tasks", to_bool(tasks))
384
385
        return self._send_xml_command(cmd)
386
387
    def get_alert(self, alert_id: str, *, tasks: Optional[bool] = None) -> Any:
388
        """Request a single alert
389
390
        Arguments:
391
            alert_id: UUID of an existing alert
392
393
        Returns:
394
            The response. See :py:meth:`send_command` for details.
395
        """
396
        cmd = XmlCommand("get_alerts")
397
398
        if not alert_id:
399
            raise RequiredArgument(
400
                function=self.get_alert.__name__, argument='alert_id'
401
            )
402
403
        cmd.set_attribute("alert_id", alert_id)
404
405
        if tasks is not None:
406
            cmd.set_attribute("tasks", to_bool(tasks))
407
408
        return self._send_xml_command(cmd)
409
410
    def modify_alert(
411
        self,
412
        alert_id: str,
413
        *,
414
        name: Optional[str] = None,
415
        comment: Optional[str] = None,
416
        filter_id: Optional[str] = None,
417
        event: Optional[AlertEvent] = None,
418
        event_data: Optional[dict] = None,
419
        condition: Optional[AlertCondition] = None,
420
        condition_data: Optional[dict] = None,
421
        method: Optional[AlertMethod] = None,
422
        method_data: Optional[dict] = None,
423
    ) -> Any:
424
        """Modifies an existing alert.
425
426
        Arguments:
427
            alert_id: UUID of the alert to be modified.
428
            name: Name of the Alert.
429
            condition: The condition that must be satisfied for the alert to
430
                occur. If the event is either 'Updated SecInfo
431
                arrived' or 'New SecInfo arrived', condition must be 'Always'.
432
                Otherwise, condition can also be on of 'Severity at least',
433
                'Filter count changed' or 'Filter count at least'.
434
            condition_data: Data that defines the condition
435
            event: The event that must happen for the alert to occur, one of
436
                'Task run status changed', 'Updated SecInfo arrived' or
437
                'New SecInfo arrived'
438
            event_data: Data that defines the event
439
            method: The method by which the user is alerted, one of 'SCP',
440
                'Send', 'SMB', 'SNMP', 'Syslog' or 'Email';
441
                if the event is neither 'Updated SecInfo arrived' nor
442
                'New SecInfo arrived', method can also be one of 'Start Task',
443
                'HTTP Get', 'Sourcefire Connector' or 'verinice Connector'.
444
            method_data: Data that defines the method
445
            filter_id: Filter to apply when executing alert
446
            comment: Comment for the alert
447
448
        Returns:
449
            The response. See :py:meth:`send_command` for details.
450
        """
451
452
        if not alert_id:
453
            raise RequiredArgument(
454
                function=self.modify_alert.__name__, argument='alert_id'
455
            )
456
457
        cmd = XmlCommand("modify_alert")
458
        cmd.set_attribute("alert_id", str(alert_id))
459
460
        if name:
461
            cmd.add_element("name", name)
462
463
        if comment:
464
            cmd.add_element("comment", comment)
465
466
        if filter_id:
467
            cmd.add_element("filter", attrs={"id": filter_id})
468
469
        if condition:
470
            if not isinstance(condition, AlertCondition):
471
                raise InvalidArgumentType(
472
                    function=self.modify_alert.__name__,
473
                    argument='condition',
474
                    arg_type=AlertCondition.__name__,
475
                )
476
477
            conditions = cmd.add_element("condition", condition.value)
478
479
            if condition_data is not None:
480
                for key, value in condition_data.items():
481
                    _data = conditions.add_element("data", value)
482
                    _data.add_element("name", key)
483
484
        if method:
485
            if not isinstance(method, AlertMethod):
486
                raise InvalidArgumentType(
487
                    function=self.modify_alert.__name__,
488
                    argument='method',
489
                    arg_type=AlertMethod.__name__,
490
                )
491
492
            methods = cmd.add_element("method", method.value)
493
494
            if method_data is not None:
495
                for key, value in method_data.items():
496
                    _data = methods.add_element("data", value)
497
                    _data.add_element("name", key)
498
499
        if event:
500
            if not isinstance(event, AlertEvent):
501
                raise InvalidArgumentType(
502
                    function=self.modify_alert.__name__,
503
                    argument='event',
504
                    arg_type=AlertEvent.__name__,
505
                )
506
507
            _check_event(event, condition, method)
508
509
            events = cmd.add_element("event", event.value)
510
511
            if event_data is not None:
512
                for key, value in event_data.items():
513
                    _data = events.add_element("data", value)
514
                    _data.add_element("name", key)
515
516
        return self._send_xml_command(cmd)
517
518
    def test_alert(self, alert_id: str) -> Any:
519
        """Run an alert
520
521
        Invoke a test run of an alert
522
523
        Arguments:
524
            alert_id: UUID of the alert to be tested
525
526
        Returns:
527
            The response. See :py:meth:`send_command` for details.
528
        """
529
        if not alert_id:
530
            raise InvalidArgument("test_alert requires an alert_id argument")
531
532
        cmd = XmlCommand("test_alert")
533
        cmd.set_attribute("alert_id", alert_id)
534
535
        return self._send_xml_command(cmd)
536
537
    def trigger_alert(
538
        self,
539
        alert_id: str,
540
        report_id: str,
541
        *,
542
        filter_string: Optional[str] = None,
543
        filter_id: Optional[str] = None,
544
        report_format_id: Optional[Union[str, ReportFormatType]] = None,
545
        delta_report_id: Optional[str] = None,
546
    ) -> Any:
547
        """Run an alert by ignoring its event and conditions
548
549
        The alert is triggered to run immediately with the provided filtered
550
        report by ignoring the even and condition settings.
551
552
        Arguments:
553
            alert_id: UUID of the alert to be run
554
            report_id: UUID of the report to be provided to the alert
555
            filter: Filter term to use to filter results in the report
556
            filter_id: UUID of filter to use to filter results in the report
557
            report_format_id: UUID of report format to use
558
                              or ReportFormatType (enum)
559
            delta_report_id: UUID of an existing report to compare report to.
560
561
        Returns:
562
            The response. See :py:meth:`send_command` for details.
563
        """
564
        if not alert_id:
565
            raise RequiredArgument(
566
                function=self.trigger_alert.__name__,
567
                argument='alert_id argument',
568
            )
569
570
        if not report_id:
571
            raise RequiredArgument(
572
                function=self.trigger_alert.__name__,
573
                argument='report_id argument',
574
            )
575
576
        cmd = XmlCommand("get_reports")
577
        cmd.set_attribute("report_id", report_id)
578
        cmd.set_attribute("alert_id", alert_id)
579
580
        add_filter(cmd, filter_string, filter_id)
581
582
        if report_format_id:
583
            if isinstance(report_format_id, ReportFormatType):
584
                report_format_id = report_format_id.value
585
586
            cmd.set_attribute("format_id", report_format_id)
587
588
        if delta_report_id:
589
            cmd.set_attribute("delta_report_id", delta_report_id)
590
591
        return self._send_xml_command(cmd)
592