Passed
Pull Request — master (#211)
by Juan José
01:24
created

ScanCollection.calculate_target_progress()   A

Complexity

Conditions 3

Size

Total Lines 23
Code Lines 15

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 3
eloc 15
nop 2
dl 0
loc 23
rs 9.65
c 0
b 0
f 0
1
# Copyright (C) 2020 Greenbone Networks GmbH
2
#
3
# SPDX-License-Identifier: GPL-2.0-or-later
4
#
5
# This program is free software; you can redistribute it and/or
6
# modify it under the terms of the GNU General Public License
7
# as published by the Free Software Foundation; either version 2
8
# of the License, or (at your option) any later version.
9
#
10
# This program is distributed in the hope that it will be useful,
11
# but WITHOUT ANY WARRANTY; without even the implied warranty of
12
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13
# GNU General Public License for more details.
14
#
15
# You should have received a copy of the GNU General Public License
16
# along with this program; if not, write to the Free Software
17
# Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
18
19
import logging
20
import multiprocessing
21
import time
22
import uuid
23
24
from collections import OrderedDict
25
from enum import Enum
26
from typing import List, Any, Dict, Iterator, Optional
27
28
from ospd.network import target_str_to_list
29
30
LOGGER = logging.getLogger(__name__)
31
32
33
class ScanStatus(Enum):
34
    """Scan status. """
35
36
    INIT = 0
37
    RUNNING = 1
38
    STOPPED = 2
39
    FINISHED = 3
40
41
42
class ScanCollection:
43
44
    """ Scans collection, managing scans and results read and write, exposing
45
    only needed information.
46
47
    Each scan has meta-information such as scan ID, current progress (from 0 to
48
    100), start time, end time, scan target and options and a list of results.
49
50
    There are 4 types of results: Alarms, Logs, Errors and Host Details.
51
52
    Todo:
53
    - Better checking for Scan ID existence and handling otherwise.
54
    - More data validation.
55
    - Mutex access per table/scan_info.
56
57
    """
58
59
    def __init__(self) -> None:
60
        """ Initialize the Scan Collection. """
61
62
        self.data_manager = (
63
            None
64
        )  # type: Optional[multiprocessing.managers.SyncManager]
65
        self.scans_table = dict()  # type: Dict
66
67
    def add_result(
68
        self,
69
        scan_id: str,
70
        result_type: int,
71
        host: str = '',
72
        hostname: str = '',
73
        name: str = '',
74
        value: str = '',
75
        port: str = '',
76
        test_id: str = '',
77
        severity: str = '',
78
        qod: str = '',
79
    ) -> None:
80
        """ Add a result to a scan in the table. """
81
82
        assert scan_id
83
        assert len(name) or len(value)
84
85
        result = OrderedDict()  # type: Dict
86
        result['type'] = result_type
87
        result['name'] = name
88
        result['severity'] = severity
89
        result['test_id'] = test_id
90
        result['value'] = value
91
        result['host'] = host
92
        result['hostname'] = hostname
93
        result['port'] = port
94
        result['qod'] = qod
95
        results = self.scans_table[scan_id]['results']
96
        results.append(result)
97
98
        # Set scan_info's results to propagate results to parent process.
99
        self.scans_table[scan_id]['results'] = results
100
101
    def remove_hosts_from_target_progress(
102
        self, scan_id: str, hosts: List
103
    ) -> None:
104
        """Remove a list of hosts from the main scan progress table to avoid
105
        the hosts to be included in the calculation of the scan progress"""
106
        if not hosts:
107
            return
108
109
        target = self.scans_table[scan_id].get('target_progress')
110
        for host in hosts:
111
            if host in target:
112
                del target[host]
113
114
        # Set scan_info's target_progress to propagate progresses
115
        # to parent process.
116
        self.scans_table[scan_id]['target_progress'] = target
117
118
    def set_progress(self, scan_id: str, progress: int) -> None:
119
        """ Sets scan_id scan's progress. """
120
121
        if progress > 0 and progress <= 100:
122
            self.scans_table[scan_id]['progress'] = progress
123
124
        if progress == 100:
125
            self.scans_table[scan_id]['end_time'] = int(time.time())
126
127
    def set_host_progress(self, scan_id: str, host: str, progress: int) -> None:
128
        """ Sets scan_id scan's progress. """
129
        if progress > 0 and progress <= 100:
130
            host_progresses = self.scans_table[scan_id].get('target_progress')
131
            host_progresses[host] = progress
132
            # Set scan_info's target_progress to propagate progresses
133
            # to parent process.
134
            self.scans_table[scan_id]['target_progress'] = host_progresses
135
136
    def set_host_finished(self, scan_id: str, host: str) -> None:
137
        """ Add the host in a list of finished hosts """
138
        finished_hosts = self.scans_table[scan_id].get('finished_hosts')
139
140
        if host not in finished_hosts:
141
            finished_hosts.append(host)
142
143
        self.scans_table[scan_id]['finished_hosts'] = finished_hosts
144
145
    def get_hosts_unfinished(self, scan_id: str) -> List[Any]:
146
        """ Get a list of unfinished hosts."""
147
148
        unfinished_hosts = target_str_to_list(self.get_host_list(scan_id))
149
150
        finished_hosts = self.get_hosts_finished(scan_id)
151
152
        for host in finished_hosts:
153
            unfinished_hosts.remove(host)
154
155
        return unfinished_hosts
156
157
    def get_hosts_finished(self, scan_id: str) -> List:
158
        """ Get a list of finished hosts."""
159
160
        return self.scans_table[scan_id].get('finished_hosts')
161
162
    def results_iterator(
163
        self, scan_id: str, pop_res: bool = False, max_res: int = None
164
    ) -> Iterator[Any]:
165
        """ Returns an iterator over scan_id scan's results. If pop_res is True,
166
        it removed the fetched results from the list.
167
168
        If max_res is None, return all the results.
169
        Otherwise, if max_res = N > 0 return N as maximum number of results.
170
171
        max_res works only together with pop_results.
172
        """
173
        if pop_res and max_res:
174
            result_aux = self.scans_table[scan_id]['results']
175
            self.scans_table[scan_id]['results'] = result_aux[max_res:]
176
            return iter(result_aux[:max_res])
177
        elif pop_res:
178
            result_aux = self.scans_table[scan_id]['results']
179
            self.scans_table[scan_id]['results'] = list()
180
            return iter(result_aux)
181
182
        return iter(self.scans_table[scan_id]['results'])
183
184
    def ids_iterator(self) -> Iterator[str]:
185
        """ Returns an iterator over the collection's scan IDS. """
186
187
        return iter(self.scans_table.keys())
188
189
    def remove_single_result(
190
        self, scan_id: str, result: Dict[str, str]
191
    ) -> None:
192
        """Removes a single result from the result list in scan_table.
193
194
        Parameters:
195
            scan_id (uuid): Scan ID to identify the scan process to be resumed.
196
            result (dict): The result to be removed from the results list.
197
        """
198
        results = self.scans_table[scan_id]['results']
199
        results.remove(result)
200
        self.scans_table[scan_id]['results'] = results
201
202
    def del_results_for_stopped_hosts(self, scan_id: str) -> None:
203
        """ Remove results from the result table for those host
204
        """
205
        unfinished_hosts = self.get_hosts_unfinished(scan_id)
206
        for result in self.results_iterator(
207
            scan_id, pop_res=False, max_res=None
208
        ):
209
            if result['host'] in unfinished_hosts:
210
                self.remove_single_result(scan_id, result)
211
212
    def resume_scan(self, scan_id: str, options: Optional[Dict]) -> str:
213
        """ Reset the scan status in the scan_table to INIT.
214
        Also, overwrite the options, because a resume task cmd
215
        can add some new option. E.g. exclude hosts list.
216
        Parameters:
217
            scan_id (uuid): Scan ID to identify the scan process to be resumed.
218
            options (dict): Options for the scan to be resumed. This options
219
                            are not added to the already existent ones.
220
                            The old ones are removed
221
222
        Return:
223
            Scan ID which identifies the current scan.
224
        """
225
        self.scans_table[scan_id]['status'] = ScanStatus.INIT
226
        if options:
227
            self.scans_table[scan_id]['options'] = options
228
229
        self.del_results_for_stopped_hosts(scan_id)
230
231
        return scan_id
232
233
    def create_scan(
234
        self,
235
        scan_id: str = '',
236
        target: Dict = None,
237
        options: Optional[Dict] = None,
238
        vts: Dict = None,
239
    ) -> str:
240
        """ Creates a new scan with provided scan information. """
241
242
        if not target:
243
            target = {}
244
245
        if self.data_manager is None:
246
            self.data_manager = multiprocessing.Manager()
247
248
        # Check if it is possible to resume task. To avoid to resume, the
249
        # scan must be deleted from the scans_table.
250
        if (
251
            scan_id
252
            and self.id_exists(scan_id)
253
            and (self.get_status(scan_id) == ScanStatus.STOPPED)
254
        ):
255
            self.scans_table[scan_id]['end_time'] = 0
256
257
            return self.resume_scan(scan_id, options)
258
259
        if not options:
260
            options = dict()
261
262
        scan_info = self.data_manager.dict()  # type: Dict
263
        scan_info['results'] = list()
264
        scan_info['finished_hosts'] = list()
265
        scan_info['progress'] = 0
266
        scan_info['target_progress'] = dict()
267
        scan_info['target'] = target
268
        scan_info['vts'] = vts
269
        scan_info['options'] = options
270
        scan_info['start_time'] = int(time.time())
271
        scan_info['end_time'] = 0
272
        scan_info['status'] = ScanStatus.INIT
273
274
        if scan_id is None or scan_id == '':
275
            scan_id = str(uuid.uuid4())
276
277
        scan_info['scan_id'] = scan_id
278
279
        self.scans_table[scan_id] = scan_info
280
        return scan_id
281
282
    def set_status(self, scan_id: str, status: ScanStatus) -> None:
283
        """ Sets scan_id scan's status. """
284
        self.scans_table[scan_id]['status'] = status
285
        if status == ScanStatus.STOPPED:
286
            self.scans_table[scan_id]['end_time'] = int(time.time())
287
288
    def get_status(self, scan_id: str) -> ScanStatus:
289
        """ Get scan_id scans's status."""
290
291
        return self.scans_table[scan_id]['status']
292
293
    def get_options(self, scan_id: str) -> Dict:
294
        """ Get scan_id scan's options list. """
295
296
        return self.scans_table[scan_id]['options']
297
298
    def set_option(self, scan_id, name: str, value: Any) -> None:
299
        """ Set a scan_id scan's name option to value. """
300
301
        self.scans_table[scan_id]['options'][name] = value
302
303
    def get_progress(self, scan_id: str) -> int:
304
        """ Get a scan's current progress value. """
305
306
        return self.scans_table[scan_id]['progress']
307
308
    def simplify_exclude_host_list(self, scan_id: str) -> List[Any]:
309
        """ Remove from exclude_hosts the received hosts in the finished_hosts
310
        list sent by the client.
311
        The finished hosts are sent also as exclude hosts for backward
312
        compatibility purposses.
313
        """
314
315
        exc_hosts_list = target_str_to_list(self.get_exclude_hosts(scan_id))
316
317
        finished_hosts_list = target_str_to_list(
318
            self.get_finished_hosts(scan_id)
319
        )
320
321
        if finished_hosts_list and exc_hosts_list:
322
            for finished in finished_hosts_list:
323
                if finished in exc_hosts_list:
324
                    exc_hosts_list.remove(finished)
325
326
        return exc_hosts_list
327
328
    def calculate_target_progress(self, scan_id: str) -> float:
329
        """ Get a target's current progress value.
330
        The value is calculated with the progress of each single host
331
        in the target."""
332
333
        host = self.get_host_list(scan_id)
334
        total_hosts = len(target_str_to_list(host))
335
        exc_hosts_list = self.simplify_exclude_host_list(scan_id)
336
        exc_hosts = len(exc_hosts_list) if exc_hosts_list else 0
337
        host_progresses = self.scans_table[scan_id].get('target_progress')
338
339
        try:
340
            t_prog = sum(host_progresses.values()) / (
341
                total_hosts - exc_hosts
342
            )  # type: float
343
        except ZeroDivisionError:
344
            LOGGER.error(
345
                "Zero division error in %s",
346
                self.calculate_target_progress.__name__,
347
            )
348
            raise
349
350
        return t_prog
351
352
    def get_start_time(self, scan_id: str) -> str:
353
        """ Get a scan's start time. """
354
355
        return self.scans_table[scan_id]['start_time']
356
357
    def get_end_time(self, scan_id: str) -> str:
358
        """ Get a scan's end time. """
359
360
        return self.scans_table[scan_id]['end_time']
361
362
    def get_host_list(self, scan_id: str) -> Dict:
363
        """ Get a scan's host list. """
364
365
        return self.scans_table[scan_id]['target'].get('hosts')
366
367
    def get_ports(self, scan_id: str):
368
        """ Get a scan's ports list.
369
        """
370
        return self.scans_table[scan_id]['target'].get('ports')
371
372
    def get_exclude_hosts(self, scan_id: str):
373
        """ Get an exclude host list for a given target.
374
        """
375
        return self.scans_table[scan_id]['target'].get('exclude_hosts')
376
377
    def get_finished_hosts(self, scan_id: str):
378
        """ Get the finished host list sent by the client for a given target.
379
        """
380
        return self.scans_table[scan_id]['target'].get('finished_hosts')
381
382
    def get_credentials(self, scan_id: str):
383
        """ Get a scan's credential list. It return dictionary with
384
        the corresponding credential for a given target.
385
        """
386
        return self.scans_table[scan_id]['target'].get('credentials')
387
388
    def get_target_options(self, scan_id: str):
389
        """ Get a scan's target option dictionary.
390
        It return dictionary with the corresponding options for
391
        a given target.
392
        """
393
        return self.scans_table[scan_id]['target'].get('options')
394
395
    def get_vts(self, scan_id: str) -> Dict:
396
        """ Get a scan's vts. """
397
398
        return self.scans_table[scan_id]['vts']
399
400
    def id_exists(self, scan_id: str) -> bool:
401
        """ Check whether a scan exists in the table. """
402
403
        return self.scans_table.get(scan_id) is not None
404
405
    def delete_scan(self, scan_id: str) -> bool:
406
        """ Delete a scan if fully finished. """
407
408
        if self.get_status(scan_id) == ScanStatus.RUNNING:
409
            return False
410
411
        self.scans_table.pop(scan_id)
412
413
        if len(self.scans_table) == 0:
414
            del self.data_manager
415
            self.data_manager = None
416
417
        return True
418