GitHub Access Token became invalid

It seems like the GitHub access token used for retrieving details about this repository from GitHub became invalid. This might prevent certain types of inspections from being run (in particular, everything related to pull requests).
Please ask an admin of your repository to re-new the access token on this website.
Completed
Push — master ( a0d81a...878c84 )
by Ashim
11s
created

CallAwis.getSignatureKey()   A

Complexity

Conditions 1

Size

Total Lines 6

Duplication

Lines 0
Ratio 0 %

Importance

Changes 1
Bugs 0 Features 0
Metric Value
cc 1
c 1
b 0
f 0
dl 0
loc 6
rs 9.4285
1
import datetime
2
import hashlib
3
import hmac
4
5
import requests  # pip install requests
6
import xmltodict
7
from bs4 import BeautifulSoup
8
9
try:
10
    from urllib import quote, urlencode
11
except ImportError:
12
    from urllib.parse import quote, urlencode
13
14
URLINFO_RESPONSE_GROUPS = ",".join(
15
    ["RelatedLinks", "Categories", "Rank", "ContactInfo", "RankByCountry",
16
     "UsageStats", "Speed", "Language", "OwnedDomains", "LinksInCount",
17
     "SiteData", "AdultContent"])
18
19
TRAFFICINFO_RESPONSE_GROUPS = "History"
20
CATEGORYBROWSE_RESPONSE_GROUPS = ",".join(["Categories", "RelatedCategories", "LanguageCategories", "LetterBars"])
21
22
23
def is_string(obj):
24
    try:
25
        return isinstance(obj, basestring)  # python 2
26
    except NameError:
27
        return isinstance(obj, str)  # python 3
28
29
class CallAwis(object):
30
    def __init__(self, access_id, secret_access_key):
31
        self.access_id = access_id
32
        self.secret_access_key = secret_access_key
33
34
    def create_v4_signature(self, request_params):
35
        '''
36
        Create URI and signature headers based on AWS V4 signing process.
37
        Refer to https://docs.aws.amazon.com/AlexaWebInfoService/latest/ApiReferenceArticle.html for request params.
38
        :param request_params: dictionary of request parameters
39
        :return: URL and header to be passed to requests.get
40
        '''
41
42
        method = 'GET'
43
        service = 'awis'
44
        host = 'awis.us-west-1.amazonaws.com'
45
        region = 'us-west-1'
46
        endpoint = 'https://awis.amazonaws.com/api'
47
        request_parameters = urlencode([(key, request_params[key]) for key in sorted(request_params.keys())])
48
49
        # Key derivation functions. See:
50
        # http://docs.aws.amazon.com/general/latest/gr/signature-v4-examples.html#signature-v4-examples-python
51
        def sign(key, msg):
52
            return hmac.new(key, msg.encode('utf-8'), hashlib.sha256).digest()
53
54
        def getSignatureKey(key, dateStamp, regionName, serviceName):
55
            kDate = sign(('AWS4' + key).encode('utf-8'), dateStamp)
56
            kRegion = sign(kDate, regionName)
57
            kService = sign(kRegion, serviceName)
58
            kSigning = sign(kService, 'aws4_request')
59
            return kSigning
60
61
        # Create a date for headers and the credential string
62
        t = datetime.datetime.utcnow()
63
        amzdate = t.strftime('%Y%m%dT%H%M%SZ')
64
        datestamp = t.strftime('%Y%m%d') # Date w/o time, used in credential scope
65
66
        # Create canonical request
67
        canonical_uri = '/api'
68
        canonical_querystring = request_parameters
69
        canonical_headers = 'host:' + host + '\n' + 'x-amz-date:' + amzdate + '\n'
70
        signed_headers = 'host;x-amz-date'
71
        payload_hash = hashlib.sha256(''.encode('utf8')).hexdigest()
72
        canonical_request = method + '\n' + canonical_uri + '\n' + canonical_querystring + '\n' + canonical_headers + '\n' + signed_headers + '\n' + payload_hash
73
74
        # Create string to sign
75
        algorithm = 'AWS4-HMAC-SHA256'
76
        credential_scope = datestamp + '/' + region + '/' + service + '/' + 'aws4_request'
77
        string_to_sign = algorithm + '\n' +  amzdate + '\n' +  credential_scope + '\n' +  hashlib.sha256(canonical_request.encode('utf8')).hexdigest()
78
79
        # Calculate signature
80
        signing_key = getSignatureKey(self.secret_access_key, datestamp, region, service)
81
82
        # Sign the string_to_sign using the signing_key
83
        signature = hmac.new(signing_key, (string_to_sign).encode('utf-8'), hashlib.sha256).hexdigest()
84
85
        # Add signing information to the request
86
        authorization_header = algorithm + ' ' + 'Credential=' + self.access_id + '/' + credential_scope + ', ' +  'SignedHeaders=' + signed_headers + ', ' + 'Signature=' + signature
87
        headers = {'X-Amz-Date':amzdate, 'Authorization':authorization_header, 'Content-Type': 'application/xml', 'Accept': 'application/xml'}
88
89
        # Create request url
90
        request_url = endpoint + '?' + canonical_querystring
91
92
        return request_url, headers
93
94
    def urlinfo(self, domain, response_group = URLINFO_RESPONSE_GROUPS):
95
        '''
96
        Provide information about supplied domain as specified by the response group
97
        :param domain: Any valid URL
98
        :param response_group: Any valid urlinfo response group
99
        :return: Traffic and/or content data of the domain in XML format
100
        '''
101
        params = {
102
            'Action': "UrlInfo",
103
            'Url': domain,
104
            'ResponseGroup': response_group
105
        }
106
107
        url, headers = self.create_v4_signature(params)
108
        return self.return_output(url, headers)
109
110
    def traffichistory(self, domain, response_group=TRAFFICINFO_RESPONSE_GROUPS, myrange=31, start=20070801):
111
        '''
112
        Provide traffic history of supplied domain
113
        :param domain: Any valid URL
114
        :param response_group: Any valid traffic history response group
115
        :return: Traffic and/or content data of the domain in XML format
116
        '''
117
        params = {
118
            'Action': "TrafficHistory",
119
            'Url': domain,
120
            'ResponseGroup': response_group,
121
            'Range': myrange,
122
            'Start': start,
123
        }
124
125
        url, headers = self.create_v4_signature(params)
126
        return self.return_output(url, headers)
127
128
    def cat_browse(self, domain, path, response_group=CATEGORYBROWSE_RESPONSE_GROUPS, descriptions='True'):
129
        '''
130
        Provide category browse information of specified domain
131
        :param domain: Any valid URL
132
        :param path: Valid category path
133
        :param response_group: Any valid traffic history response group
134
        :return: Traffic and/or content data of the domain in XML format
135
        '''
136
        params = {
137
            'Action': "CategoryListings",
138
            'ResponseGroup': 'Listings',
139
            'Path': quote(path),
140
            'Descriptions': descriptions
141
        }
142
143
        url, headers = self.create_v4_signature(params)
144
        return self.return_output(url, headers)
145
146
    def return_output(self, url, headers):
147
        r = requests.get(url, headers=headers)
148
        soup = BeautifulSoup(r.text.encode('utf-8'), 'xml')
149
        return soup
150
151
152
def flatten_urlinfo(urlinfo, shorter_keys=True):
153
    """ Takes a urlinfo object and returns a flat dictionary."""
154
    def flatten(value, prefix=""):
155
        if is_string(value):
156
            _result[prefix[1:]] = value
157
            return
158
        try:
159
            len(value)
160
        except (AttributeError, TypeError):  # a leaf
161
            _result[prefix[1:]] = value
162
            return
163
164
        try:
165
            items = value.items()
166
        except AttributeError:  # an iterable, but not a dict
167
            last_prefix = prefix.split(".")[-1]
168
            if shorter_keys:
169
                prefix = "." + last_prefix
170
171
            if last_prefix == "Country":
172
                for v in value:
173
                    country = v.pop("@Code")
174
                    flatten(v, ".".join([prefix, country]))
175
            elif last_prefix in ["RelatedLink", "CategoryData"]:
176
                for i, v in enumerate(value):
177
                    flatten(v, ".".join([prefix, str(i)]))
178
            elif value[0].get("TimeRange"):
179
                for v in value:
180
                    time_range = ".".join(tuple(v.pop("TimeRange").items())[0])
181
                    # python 3 odict_items don't support indexing
182
                    if v.get("DataUrl"):
183
                        time_range = ".".join([v.pop("DataUrl"), time_range])
184
                    flatten(v, ".".join([prefix, time_range]))
185
            else:
186
                msg = prefix + " contains a list we don't know how to flatten."
187
                raise NotImplementedError(msg)
188
        else:  # a dict, go one level deeper
189
            for k, v in items:
190
                flatten(v, ".".join([prefix, k]))
191
192
    _result = {}
193
    info = xmltodict.parse(str(urlinfo))
194
    flatten(info["aws:UrlInfoResponse"]["Response"]["UrlInfoResult"]["Alexa"])
195
    _result["OutputTimestamp"] = datetime.datetime.utcnow().strftime('%Y%m%dT%H%M%SZ')
196
    return _result
197