GitHub Access Token became invalid

It seems like the GitHub access token used for retrieving details about this repository from GitHub became invalid. This might prevent certain types of inspections from being run (in particular, everything related to pull requests).
Please ask an admin of your repository to re-new the access token on this website.
Completed
Push — master ( 4129fe...98fb2a )
by Ashim
10s
created

flatten_urlinfo()   F

Complexity

Conditions 15

Size

Total Lines 45

Duplication

Lines 0
Ratio 0 %

Importance

Changes 5
Bugs 0 Features 0
Metric Value
cc 15
c 5
b 0
f 0
dl 0
loc 45
rs 2.7451

How to fix   Complexity   

Complexity

Complex classes like flatten_urlinfo() often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
import datetime
2
import hmac
3
import hashlib
4
import base64
5
import requests
6
import xmltodict
7
from bs4 import BeautifulSoup
8
try:
9
    from urllib import quote, urlencode
10
except ImportError:
11
    from urllib.parse import quote, urlencode
12
13
URLINFO_RESPONSE_GROUPS = ",".join(
14
    ["RelatedLinks", "Categories", "Rank", "ContactInfo", "RankByCountry",
15
     "UsageStats", "Speed", "Language", "OwnedDomains", "LinksInCount",
16
     "SiteData", "AdultContent"])
17
18
19
def create_timestamp():
20
    now = datetime.datetime.now()
21
    timestamp = now.isoformat()
22
    return timestamp
23
24
25
def is_string(obj):
26
    try:
27
        return isinstance(obj, basestring)  # python 2
28
    except NameError:
29
        return isinstance(obj, str)  # python 3
30
31
32
class CallAwis(object):
33
34
    def __init__(self, domainname, responsegroup, access_id, secret_access_key):
35
        self.domainname = domainname
36
        self.responsegroup = responsegroup
37
        self.access_id = access_id
38
        self.secret_access_key = secret_access_key
39
        self.SignatureVersion = "2"
40
        self.SignatureMethod = "HmacSHA256"
41
        self.ServiceHost = "awis.amazonaws.com"
42
        self.range = "31"
43
        self.PATH = "/"
44
45
    def create_uri(self, params):
46
        params = [(key, params[key])
47
                  for key in sorted(params.keys())]
48
        return urlencode(params)
49
50
    def create_signature(self):
51
        Uri = self.create_uri(self.params)
52
        msg = "\n".join(["GET", self.ServiceHost, self.PATH, Uri])
53
        try:
54
            hmac_signature = hmac.new(
55
                self.secret_access_key, msg, hashlib.sha256)
56
        except TypeError:
57
            hmac_signature = hmac.new(self.secret_access_key.encode(
58
                'utf-8'), msg.encode('utf-8'), hashlib.sha256)
59
        signature = base64.b64encode(hmac_signature.digest())
60
        return quote(signature)
61
62
    def urlinfo(self):
63
            # Query Options  # refer to AWIS API reference for full details.
64
            # Action =
65
        self.params = {
66
            'Action': "UrlInfo",
67
            'Url': self.domainname,
68
            'ResponseGroup': self.responsegroup,
69
            'SignatureVersion': self.SignatureVersion,
70
            'SignatureMethod': self.SignatureMethod,
71
            'Timestamp': create_timestamp(),
72
            'AWSAccessKeyId': self.access_id,
73
        }
74
75
        uri = self.create_uri(self.params)
76
        signature = self.create_signature()
77
78
        url = "http://%s/?%s&Signature=%s" % (self.ServiceHost, uri, signature)
79
        return self.return_output(url)
80
81
    def traffichistory(self, myrange=31, start=20070801):
82
        # Action="TrafficHistory"
83
        self.params = {
84
            'Action': "TrafficHistory",
85
            'AWSAccessKeyId': self.access_id,
86
            'SignatureMethod': self.SignatureMethod,
87
            'SignatureVersion': self.SignatureVersion,
88
            'Timestamp': create_timestamp(),
89
            'Url': self.domainname,
90
            'ResponseGroup': self.responsegroup,
91
            'Range': myrange,
92
            'Start': start,
93
        }
94
        uri = self.create_uri(self.params)
95
        signature = self.create_signature()
96
        url = "http://%s/?%s&Signature=%s" % (self.ServiceHost, uri, signature)
97
        return self.return_output(url)
98
99
    def cat_browse(self, path):
100
        # Action=''
101
        self.params = {
102
            'Action': "CategoryListings",
103
            'AWSAccessKeyId': self.access_id,
104
            'SignatureMethod': self.SignatureMethod,
105
            'SignatureVersion': self.SignatureVersion,
106
            'Timestamp': create_timestamp(),
107
            'ResponseGroup': 'Listings',
108
            'Path': quote(path),
109
        }
110
        uri = self.create_uri(self.params)
111
        signature = self.create_signature()
112
        url = "http://%s/?%s&Signature=%s" % (self.ServiceHost, uri, signature)
113
        return self.return_output(url)
114
115
    def return_output(self, url):
116
        r = requests.get(url)
117
        soup = BeautifulSoup(r.text.encode('utf-8'), 'xml')
118
        return soup
119
120
121
def flatten_urlinfo(urlinfo, shorter_keys=True):
122
    """ Takes a urlinfo object and returns a flat dictionary."""
123
    def flatten(value, prefix=""):
124
        if is_string(value):
125
            _result[prefix[1:]] = value
126
            return
127
        try:
128
            len(value)
129
        except (AttributeError, TypeError):  # a leaf
130
            _result[prefix[1:]] = value
131
            return
132
133
        try:
134
            items = value.items()
135
        except AttributeError:  # an iterable, but not a dict
136
            last_prefix = prefix.split(".")[-1]
137
            if shorter_keys:
138
                prefix = "." + last_prefix
139
140
            if last_prefix == "Country":
141
                for v in value:
142
                    country = v.pop("@Code")
143
                    flatten(v, ".".join([prefix, country]))
144
            elif last_prefix in ["RelatedLink", "CategoryData"]:
145
                for i, v in enumerate(value):
146
                    flatten(v, ".".join([prefix, str(i)]))
147
            elif value[0].get("TimeRange"):
148
                for v in value:
149
                    time_range = ".".join(tuple(v.pop("TimeRange").items())[0])
150
                    # python 3 odict_items don't support indexing
151
                    if v.get("DataUrl"):
152
                        time_range = ".".join([v.pop("DataUrl"), time_range])
153
                    flatten(v, ".".join([prefix, time_range]))
154
            else:
155
                msg = prefix + " contains a list we don't know how to flatten."
156
                raise NotImplementedError(msg)
157
        else:  # a dict, go one level deeper
158
            for k, v in items:
159
                flatten(v, ".".join([prefix, k]))
160
161
    _result = {}
162
    info = xmltodict.parse(str(urlinfo))
163
    flatten(info["aws:UrlInfoResponse"]["Response"]["UrlInfoResult"]["Alexa"])
164
    _result["OutputTimestamp"] = create_timestamp()
165
    return _result
166