GitHub Access Token became invalid

It seems like the GitHub access token used for retrieving details about this repository from GitHub became invalid. This might prevent certain types of inspections from being run (in particular, everything related to pull requests).
Please ask an admin of your repository to re-new the access token on this website.

flatten_urlinfo()   F
last analyzed

Complexity

Conditions 15

Size

Total Lines 45

Duplication

Lines 0
Ratio 0 %

Importance

Changes 5
Bugs 0 Features 0
Metric Value
cc 15
c 5
b 0
f 0
dl 0
loc 45
rs 2.9998

How to fix   Complexity   

Complexity

Complex classes like flatten_urlinfo() often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
import datetime
2
import hashlib
3
import hmac
4
5
import requests  # pip install requests
6
import xmltodict
7
from bs4 import BeautifulSoup
8
9
try:
10
    from urllib import quote, urlencode
11
except ImportError:
12
    from urllib.parse import quote, urlencode
13
14
URLINFO_RESPONSE_GROUPS = ",".join(
15
    ["RelatedLinks", "Categories", "Rank", "ContactInfo", "RankByCountry",
16
     "UsageStats", "Speed", "Language", "OwnedDomains", "LinksInCount",
17
     "SiteData", "AdultContent"])
18
19
TRAFFICINFO_RESPONSE_GROUPS = "History"
20
CATEGORYBROWSE_RESPONSE_GROUPS = ",".join(["Categories", "RelatedCategories", "LanguageCategories", "LetterBars"])
21
SITESLINKINGIN_RESPONSE_GROUP = "SitesLinkingIn"
22
23
def is_string(obj):
24
    try:
25
        return isinstance(obj, basestring)  # python 2
26
    except NameError:
27
        return isinstance(obj, str)  # python 3
28
29
class CallAwis(object):
30
    def __init__(self, access_id, secret_access_key):
31
        self.access_id = access_id
32
        self.secret_access_key = secret_access_key
33
34
    def create_v4_signature(self, request_params):
35
        '''
36
        Create URI and signature headers based on AWS V4 signing process.
37
        Refer to https://docs.aws.amazon.com/AlexaWebInfoService/latest/ApiReferenceArticle.html for request params.
38
        :param request_params: dictionary of request parameters
39
        :return: URL and header to be passed to requests.get
40
        '''
41
42
        method = 'GET'
43
        service = 'awis'
44
        host = 'awis.us-west-1.amazonaws.com'
45
        region = 'us-west-1'
46
        endpoint = 'https://awis.amazonaws.com/api'
47
        request_parameters = urlencode([(key, request_params[key]) for key in sorted(request_params.keys())])
48
49
        # Key derivation functions. See:
50
        # http://docs.aws.amazon.com/general/latest/gr/signature-v4-examples.html#signature-v4-examples-python
51
        def sign(key, msg):
52
            return hmac.new(key, msg.encode('utf-8'), hashlib.sha256).digest()
53
54
        def getSignatureKey(key, dateStamp, regionName, serviceName):
55
            kDate = sign(('AWS4' + key).encode('utf-8'), dateStamp)
56
            kRegion = sign(kDate, regionName)
57
            kService = sign(kRegion, serviceName)
58
            kSigning = sign(kService, 'aws4_request')
59
            return kSigning
60
61
        # Create a date for headers and the credential string
62
        t = datetime.datetime.utcnow()
63
        amzdate = t.strftime('%Y%m%dT%H%M%SZ')
64
        datestamp = t.strftime('%Y%m%d') # Date w/o time, used in credential scope
65
66
        # Create canonical request
67
        canonical_uri = '/api'
68
        canonical_querystring = request_parameters
69
        canonical_headers = 'host:' + host + '\n' + 'x-amz-date:' + amzdate + '\n'
70
        signed_headers = 'host;x-amz-date'
71
        payload_hash = hashlib.sha256(''.encode('utf8')).hexdigest()
72
        canonical_request = method + '\n' + canonical_uri + '\n' + canonical_querystring + '\n' + canonical_headers + '\n' + signed_headers + '\n' + payload_hash
73
74
        # Create string to sign
75
        algorithm = 'AWS4-HMAC-SHA256'
76
        credential_scope = datestamp + '/' + region + '/' + service + '/' + 'aws4_request'
77
        string_to_sign = algorithm + '\n' +  amzdate + '\n' +  credential_scope + '\n' +  hashlib.sha256(canonical_request.encode('utf8')).hexdigest()
78
79
        # Calculate signature
80
        signing_key = getSignatureKey(self.secret_access_key, datestamp, region, service)
81
82
        # Sign the string_to_sign using the signing_key
83
        signature = hmac.new(signing_key, (string_to_sign).encode('utf-8'), hashlib.sha256).hexdigest()
84
85
        # Add signing information to the request
86
        authorization_header = algorithm + ' ' + 'Credential=' + self.access_id + '/' + credential_scope + ', ' +  'SignedHeaders=' + signed_headers + ', ' + 'Signature=' + signature
87
        headers = {'X-Amz-Date':amzdate, 'Authorization':authorization_header, 'Content-Type': 'application/xml', 'Accept': 'application/xml'}
88
89
        # Create request url
90
        request_url = endpoint + '?' + canonical_querystring
91
92
        return request_url, headers
93
94
    def urlinfo(self, domain, response_group = URLINFO_RESPONSE_GROUPS):
95
        '''
96
        Provide information about supplied domain as specified by the response group
97
        :param domain: Any valid URL
98
        :param response_group: Any valid urlinfo response group
99
        :return: Traffic and/or content data of the domain in XML format
100
        '''
101
        params = {
102
            'Action': "UrlInfo",
103
            'Url': domain,
104
            'ResponseGroup': response_group
105
        }
106
107
        url, headers = self.create_v4_signature(params)
108
        return self.return_output(url, headers)
109
110
    def traffichistory(self, domain, response_group=TRAFFICINFO_RESPONSE_GROUPS, myrange=31, start=20070801):
111
        '''
112
        Provide traffic history of supplied domain
113
        :param domain: Any valid URL
114
        :param response_group: Any valid traffic history response group
115
        :return: Traffic and/or content data of the domain in XML format
116
        '''
117
        params = {
118
            'Action': "TrafficHistory",
119
            'Url': domain,
120
            'ResponseGroup': response_group,
121
            'Range': myrange,
122
            'Start': start,
123
        }
124
125
        url, headers = self.create_v4_signature(params)
126
        return self.return_output(url, headers)
127
    
128
    def siteslinkingin(self, domain, response_group=SITESLINKINGIN_RESPONSE_GROUPS):
129
        
130
        params = {
131
            'Action': "SitesLinkingIn",
132
            'Url': domain,
133
            'ResponseGroup': response_group,
134
        }
135
136
        url, headers = self.create_v4_signature(params)
137
        return self.return_output(url, headers)
138
        
139
    def cat_browse(self, domain, path, response_group=CATEGORYBROWSE_RESPONSE_GROUPS, descriptions='True'):
140
        '''
141
        Provide category browse information of specified domain
142
        :param domain: Any valid URL
143
        :param path: Valid category path
144
        :param response_group: Any valid traffic history response group
145
        :return: Traffic and/or content data of the domain in XML format
146
        '''
147
        params = {
148
            'Action': "CategoryListings",
149
            'ResponseGroup': 'Listings',
150
            'Path': quote(path),
151
            'Descriptions': descriptions
152
        }
153
154
        url, headers = self.create_v4_signature(params)
155
        return self.return_output(url, headers)
156
157
    def return_output(self, url, headers):
158
        r = requests.get(url, headers=headers)
159
        soup = BeautifulSoup(r.text.encode('utf-8'), 'xml')
160
        return soup
161
162
163
def flatten_urlinfo(urlinfo, shorter_keys=True):
164
    """ Takes a urlinfo object and returns a flat dictionary."""
165
    def flatten(value, prefix=""):
166
        if is_string(value):
167
            _result[prefix[1:]] = value
168
            return
169
        try:
170
            len(value)
171
        except (AttributeError, TypeError):  # a leaf
172
            _result[prefix[1:]] = value
173
            return
174
175
        try:
176
            items = value.items()
177
        except AttributeError:  # an iterable, but not a dict
178
            last_prefix = prefix.split(".")[-1]
179
            if shorter_keys:
180
                prefix = "." + last_prefix
181
182
            if last_prefix == "Country":
183
                for v in value:
184
                    country = v.pop("@Code")
185
                    flatten(v, ".".join([prefix, country]))
186
            elif last_prefix in ["RelatedLink", "CategoryData"]:
187
                for i, v in enumerate(value):
188
                    flatten(v, ".".join([prefix, str(i)]))
189
            elif value[0].get("TimeRange"):
190
                for v in value:
191
                    time_range = ".".join(tuple(v.pop("TimeRange").items())[0])
192
                    # python 3 odict_items don't support indexing
193
                    if v.get("DataUrl"):
194
                        time_range = ".".join([v.pop("DataUrl"), time_range])
195
                    flatten(v, ".".join([prefix, time_range]))
196
            else:
197
                msg = prefix + " contains a list we don't know how to flatten."
198
                raise NotImplementedError(msg)
199
        else:  # a dict, go one level deeper
200
            for k, v in items:
201
                flatten(v, ".".join([prefix, k]))
202
203
    _result = {}
204
    info = xmltodict.parse(str(urlinfo))
205
    flatten(info["aws:UrlInfoResponse"]["Response"]["UrlInfoResult"]["Alexa"])
206
    _result["OutputTimestamp"] = datetime.datetime.utcnow().strftime('%Y%m%dT%H%M%SZ')
207
    return _result
208