Completed
Pull Request — master (#423)
by
unknown
04:18 queued 01:59
created

GetLatestScan.run()   F

Complexity

Conditions 14

Size

Total Lines 141

Duplication

Lines 0
Ratio 0 %
Metric Value
dl 0
loc 141
rs 2
cc 14

How to fix   Long Method    Complexity   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

Complexity

Complex classes like GetLatestScan.run() often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
#!/usr/bin/env python
2
3
# Licensed to the StackStorm, Inc ('StackStorm') under one or more
4
# contributor license agreements.  See the NOTICE file distributed with
5
# this work for additional information regarding copyright ownership.
6
# The ASF licenses this file to You under the Apache License, Version 2.0
7
# (the "License"); you may not use this file except in compliance with
8
# the License.  You may obtain a copy of the License at
9
#
10
#     http://www.apache.org/licenses/LICENSE-2.0
11
#
12
# Unless required by applicable law or agreed to in writing, software
13
# distributed under the License is distributed on an "AS IS" BASIS,
14
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15
# See the License for the specific language governing permissions and
16
# limitations under the License.
17
18
import sys
19
import requests
20
import json
21
import os
22
import yaml
23
import datetime
24
25
from getpass import getpass
26
from st2actions.runners.pythonrunner import Action
27
28
from scan_get_results import GetScanResults
29
30
class GetLatestScan(Action):
31
    def run(self, customer_id=None, scan_title=None):
32
        """
33
        The template class for 
34
35
        Returns: An blank Dict.
36
37
        Raises:
38
           ValueError: On lack of key in config.
39
        """
40
41
        # Set up the results
42
        results = {}
43
44
        url = "https://{}/api/scan/v1/scans".format(self.config['api_host'])
45
        payload = None
46
        headers = { "Accept": "application/json" }
47
48
        if customer_id is not None:
49
            payload = {}
50
            payload['customer_id'] = customer_id
51
52
        try:
53
            r = requests.get(url,
54
                             headers=headers,
55
                             auth=(self.config['api_key'], ''),
56
                             params=payload)
57
            r.raise_for_status()
58
        except:
59
            raise ValueError("HTTP error: %s" % r.status_code)
60
61
        try:
62
            data = r.json()
63
        except:
64
            raise ValueError("Invalid JSON")
65
        else:
66
            r.close()
67
68
        # get the scan ID...
69
        scans = {}
70
        for item in data:
71
            scans[item['title']] = {"active": item["active"], 
72
                                   "id": item["id"],
73
                                   "type": item["type" ] }
74
75
            
76
        url = "https://{}/api/scan/v1/scans/{}".format(self.config['api_host'],
77
                                                       scans[scan_title]['id'])
78
        payload = None
79
        headers = { "Accept": "application/json" }
80
                
81
        try:
82
            r = requests.get(url,
83
                             headers=headers,
84
                             auth=(self.config['api_key'], ''),
85
                             params=payload)
86
            r.raise_for_status()
87
        except:
88
            raise ValueError("HTTP error: %s" % r.status_code)
89
90
        try:
91
            data = r.json()
92
        except:
93
            raise ValueError("Invalid JSON")
94
        else:
95
            r.close()
96
97
        completed_scans = {}
98
        for item in data:
99
            if item['active'] is False:
100
                create_date = datetime.datetime.fromtimestamp(item['create_date']).strftime('%Y-%m-%d %H:%M:%S')
101
                finish_date = datetime.datetime.fromtimestamp(item['finish_date']).strftime('%Y-%m-%d %H:%M:%S')
102
103
                completed_scans[item['id']] = { "active":      item['active'],
104
                                                "create_date": create_date,
105
                                                "finish_date": finish_date
106
                                                }
107
108
        latest_scan_id = max(completed_scans.keys())
109
110
        #latest_scan = { 'scan_id': latest_scan_id,
111
        #                'create_date': completed_scans[latest_scan_id]["create_date"],
112
        #                'finish_date': results[latest_scan_id]["finish_date"]
113
        #            }
114
        #results[latest_scan_id] = completed_scans[latest_scan_id]
115
116
        ScanResults = GetScanResults(config)
117
        scan_results_raw = ScanResults.run(scan_exec_id=latest_scan_id, new_vulns=False, new_ports=False)
118
119
        ### Do the like Stuffs...
120
121
        scan_results = {}
122
123
        scan_results['details'] = {}
124
        scan_results['details']["customer_id"] = scan_results_raw["customer_id"]
125
        scan_results['details']["errors"] = scan_results_raw["errors"]
126
127
        scan_results['details']['scan'] = {}
128
        scan_results['details']['scan']['started'] = completed_scans[latest_scan_id]['create_date']
129
        scan_results['details']['scan']['finished'] = completed_scans[latest_scan_id]['finish_date']
130
        scan_results['details']['scan']['fast'] = scan_results_raw["fast_scan"]
131
        scan_results['details']['scan']['summary'] = scan_results_raw['scan_summary']
132
133
        scan_results['details']['totals'] = {}
134
        scan_results['details']['totals']['hosts'] = scan_results_raw["total_hosts"]
135
        scan_results['details']['totals']['vulns'] = scan_results_raw["total_vulns"]
136
        scan_results['details']['totals']['vulns_summary'] = scan_results_raw["vulns_summary"]
137
138
        scan_results['details']['policy'] = {}
139
        scan_results['details']['policy']['name'] = scan_results_raw["policy_name"]
140
        scan_results['details']['policy']['id'] = scan_results_raw["policy_id"]
141
        scan_results['details']['policy']['type'] = scan_results_raw["policy_type"]
142
143
        scan_results['hosts'] = {}
144
        for scanned_host in scan_results_raw['scanned_hosts']:
145
            scan_results['hosts'][ scanned_host['ip_address'] ] = {
146
                "host_name": scanned_host['host_name'],
147
                "tasks": scanned_host['tasks'],
148
                "ui_link": scanned_host['ui_link'],
149
                "vulnerable": None,
150
                "vulns_count": 0,
151
                "vulns": [],
152
            }
153
154
        for vh in scan_results_raw['vulnerable_hosts']:
155
            scan_results['hosts'][ vh['ip_address'] ]['vulnerable']  = True
156
            scan_results['hosts'][ vh['ip_address'] ]['vulns_count']  = len(vh['vulns'])
157
158
            scan_results['hosts'][ vh['ip_address'] ]['vulns'] = {  "Critical": [],
159
                                                                    "High": [],
160
                                                                    "Low": [],
161
                                                                    "Medium": [],
162
                                                                    "Unclassified": [],
163
                                                                    "Urgent": []
164
                                                                }
165
            ### This needs to be better...
166
            for vln in vh["vulns"]:
167
                scan_results['hosts'][ vh['ip_address'] ]['vulns'][ vln["risk_level"] ].append(vln)
168
169
            #scan_results['hosts'][ vh['ip_address'] ]['vulns'] = vh["vulns"]
170
171
        return scan_results
172