send-delta-emails.gmp.execute_send_delta_emails()   C
last analyzed

Complexity

Conditions 6

Size

Total Lines 101
Code Lines 79

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 6
eloc 79
nop 2
dl 0
loc 101
rs 6.7539
c 0
b 0
f 0

How to fix   Long Method   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

1
# -*- coding: utf-8 -*-
2
# Copyright (C) 2017-2021 Greenbone Networks GmbH
3
#
4
# SPDX-License-Identifier: GPL-3.0-or-later
5
#
6
# This program is free software: you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation, either version 3 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful,
12
# but WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14
# GNU General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
18
19
# GMP script for gvm-pyshell to send emails with delta reports.
20
21
import base64
22
import datetime
23
import sched
24
import smtplib
25
import sys
26
import time
27
28
from email.mime.base import MIMEBase
29
from email.mime.multipart import MIMEMultipart
30
from email.utils import formatdate
31
32
33
def check_args(args):
34
    len_args = len(args.script) - 1
35
    if len_args != 0:
36
        message = """
37
        This script, once started, will continuously send delta
38
        reports via email for selected tasks.
39
40
        Example for starting up the routine:
41
            $ gvm-script --gmp-username name --gmp-password pass ssh --hostname <gsm> scripts/send-delta-emails.gmp.py
42
43
        The routine follows this procedure:
44
45
        Every <interval> minutes do:
46
        Get all tasks where the tag <task_tag> is attached.
47
        For each of these tasks get the finished reports:
48
            If less than 2 reports, continue with next task
49
            If latest report has tag "delta_alert_sent", continue with next task
50
            Create a CSV report from the delta of latest vs. previous report
51
            where filtered for only the new results.
52
            Send the CSV as an attachment to the configured email address.
53
        """
54
        print(message)
55
        sys.exit()
56
57
58
def execute_send_delta_emails(sc, **kwargs):
59
    gmp = kwargs.get('gmp')
60
    task_tag = kwargs.get('task_tag')
61
    interval = kwargs.get('interval')
62
    email_subject = kwargs.get('email_subject')
63
    to_addresses = kwargs.get('to_addresses')
64
    from_address = kwargs.get('from_address')
65
    mta_address = kwargs.get('mta_address')
66
    mta_user = kwargs.get('mta_user')
67
    mta_port = kwargs.get('mta_port')
68
    mta_password = kwargs.get('mta_password')
69
    report_tag_name = kwargs.get('report_tag_name')
70
71
    print('Retrieving task list ...')
72
73
    task_filter = 'tag=%s' % task_tag
74
    tasks = gmp.get_tasks(filter=task_filter).xpath('task')
75
    print('Found %d task(s) with tag "%s".' % (len(tasks), task_tag))
76
77
    for task in tasks:
78
        task_id = task.xpath('@id')[0]
79
        print(
80
            'Processing task "%s" (%s)...'
81
            % (task.xpath('name/text()')[0], task_id)
82
        )
83
84
        reports = gmp.get_reports(
85
            filter='task_id={0} and status=Done '
86
            'sort-reverse=date'.format(task_id)
87
        ).xpath('report')
88
        print('  Found %d report(s).' % len(reports))
89
        if len(reports) < 2:
90
            print('  Delta-reporting requires at least 2 finished reports.')
91
            continue
92
93
        if reports[0].xpath(
94
            'report/user_tags/tag/' 'name[text()="delta_alert_sent"]'
95
        ):
96
            print('  Delta report for latest finished report already sent')
97
            continue
98
99
        print(
100
            '  Latest finished report not send yet. Preparing delta '
101
            'report...'
102
        )
103
104
        delta_report = gmp.get_report(
105
            report_id=reports[0].xpath('@id')[0],
106
            delta_report_id=reports[1].xpath('@id')[0],
107
            filter='delta_states=n',
108
            format_id='c1645568-627a-11e3-a660-406186ea4fc5',
109
        )
110
111
        csv_in_b64 = delta_report.xpath('report/text()')[0]
112
        csv = base64.b64decode(csv_in_b64)
113
114
        print("  Composing Email...")
115
        alert_email = MIMEMultipart()
116
        alert_email['Subject'] = email_subject
117
        alert_email['To'] = ', '.join(to_addresses)
118
        alert_email['From'] = from_address
119
        alert_email['Date'] = formatdate(localtime=True)
120
121
        report_attachment = MIMEBase('application', "octet-stream")
122
        report_attachment.add_header(
123
            'Content-Disposition', 'attachment', filename='delta.csv'
124
        )
125
        report_attachment.set_payload(csv)
126
        alert_email.attach(report_attachment)
127
128
        print("  Sending Email...")
129
        try:
130
            with smtplib.SMTP(mta_address, mta_port) as smtp:
131
                smtp.ehlo()
132
                smtp.starttls()
133
                smtp.ehlo()
134
                smtp.login(mta_user, mta_password)  # if required
135
                smtp.sendmail(
136
                    from_address, to_addresses, alert_email.as_string()
137
                )
138
                smtp.close()
139
                print("  Email has been sent!")
140
141
                gmp.create_tag(
142
                    name=report_tag_name,
143
                    resource_id=reports[0].xpath('@id')[0],
144
                    resource_type='report',
145
                    value=datetime.datetime.now(),
146
                )
147
        except Exception:  # pylint: disable=broad-except
148
            print("  Unable to send the email. Error: ", sys.exc_info()[0])
149
            # raise # in case an error should stop the script
150
            continue  # ignore the problem for the time being
151
152
    print("\nCheck will be repeated in {} minutes...\n".format(interval))
153
    sc.enter(
154
        interval * 60,
155
        1,
156
        execute_send_delta_emails,
157
        argument=(sc,),
158
        kwargs=kwargs,
159
    )
160
161
162
def main(gmp, args):
163
    # pylint: disable=undefined-variable
164
165
    check_args(args)
166
167
    interval = 1  # in minutes
168
    task_tag = 'send_delta_alert'
169
    report_tag_name = 'delta_alert_sent'
170
    email_subject = 'Delta Report'
171
    from_address = '[email protected]'
172
    to_addresses = ['[email protected]', '[email protected]']
173
    mta_address = 'mail.example.com'
174
    mta_port = 25
175
    mta_user = '[email protected]'
176
    mta_password = 'mysecret'
177
178
    print('send_delta_alerts starting up with following settings:')
179
    print('User:          %s' % args.username)
180
    print('Interval:      %d minutes' % interval)
181
    print('Task tag:      %s' % task_tag)
182
    print('Email subject: %s' % email_subject)
183
    print('From Address:  %s' % from_address)
184
    print('To Addresses:  %s' % to_addresses)
185
    print('MTA Address:   %s' % mta_address)
186
    print('MTA Port:      %s' % mta_port)
187
    print('MTA User:      %s' % mta_user)
188
    print('MTA Password:  <will not be printed here>')
189
    print()
190
191
    print('Entering loop with interval %s minutes ...' % interval)
192
193
    schedule = sched.scheduler(time.time, time.sleep)
194
195
    # Enter the scheduled execution with the given interval
196
    schedule.enter(
197
        0,
198
        1,
199
        execute_send_delta_emails,
200
        argument=(schedule,),
201
        kwargs={
202
            'gmp': gmp,
203
            'task_tag': task_tag,
204
            'interval': interval,
205
            'email_subject': email_subject,
206
            'to_addresses': to_addresses,
207
            'from_address': from_address,
208
            'mta_address': mta_address,
209
            'mta_password': mta_password,
210
            'mta_port': mta_port,
211
            'mta_user': mta_user,
212
            'report_tag_name': report_tag_name,
213
        },
214
    )
215
    schedule.run()
216
217
218
if __name__ == '__gmp__':
219
    main(gmp, args)
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable gmp does not seem to be defined.
Loading history...
Comprehensibility Best Practice introduced by
The variable args does not seem to be defined.
Loading history...
220