Completed
Pull Request — master (#2895)
by Anthony
17:08 queued 11:50
created

HTTPClient   A

Complexity

Total Complexity 15

Size/Duplication

Total Lines 101
Duplicated Lines 0 %

Importance

Changes 1
Bugs 0 Features 0
Metric Value
c 1
b 0
f 0
dl 0
loc 101
rs 10
wmc 15

10 Methods

Rating   Name   Duplication   Size   Complexity  
A __init__() 0 4 1
A get() 0 6 1
A post() 0 7 1
A patch() 0 7 1
A delete() 0 6 1
A post_raw() 0 7 1
A _response_hook() 0 12 2
A put() 0 7 1
B _get_curl_line_for_request() 0 23 4
A _get_url_without_trailing_slash() 0 11 2
1
# Licensed to the StackStorm, Inc ('StackStorm') under one or more
2
# contributor license agreements.  See the NOTICE file distributed with
3
# this work for additional information regarding copyright ownership.
4
# The ASF licenses this file to You under the Apache License, Version 2.0
5
# (the "License"); you may not use this file except in compliance with
6
# the License.  You may obtain a copy of the License at
7
#
8
#     http://www.apache.org/licenses/LICENSE-2.0
9
#
10
# Unless required by applicable law or agreed to in writing, software
11
# distributed under the License is distributed on an "AS IS" BASIS,
12
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
# See the License for the specific language governing permissions and
14
# limitations under the License.
15
16
from __future__ import absolute_import
17
18
import json
19
import logging
20
from pipes import quote as pquote
21
22
import requests
23
24
25
LOG = logging.getLogger(__name__)
26
27
28
def add_ssl_verify_to_kwargs(func):
29
    def decorate(*args, **kwargs):
30
        if isinstance(args[0], HTTPClient) and 'https' in getattr(args[0], 'root', ''):
31
            cacert = getattr(args[0], 'cacert', None)
32
            kwargs['verify'] = cacert if cacert is not None else False
33
        return func(*args, **kwargs)
34
    return decorate
35
36
37
def add_auth_token_to_headers(func):
38
    def decorate(*args, **kwargs):
39
        headers = kwargs.get('headers', dict())
40
41
        token = kwargs.pop('token', None)
42
        if token:
43
            headers['X-Auth-Token'] = str(token)
44
            kwargs['headers'] = headers
45
46
        api_key = kwargs.pop('api_key', None)
47
        if api_key:
48
            headers['St2-Api-Key'] = str(api_key)
49
            kwargs['headers'] = headers
50
51
        return func(*args, **kwargs)
52
    return decorate
53
54
55
def add_json_content_type_to_headers(func):
56
    def decorate(*args, **kwargs):
57
        headers = kwargs.get('headers', dict())
58
        content_type = headers.get('content-type', 'application/json')
59
        headers['content-type'] = content_type
60
        kwargs['headers'] = headers
61
        return func(*args, **kwargs)
62
    return decorate
63
64
65
class HTTPClient(object):
66
67
    def __init__(self, root, cacert=None, debug=False):
68
        self.root = self._get_url_without_trailing_slash(root)
69
        self.cacert = cacert
70
        self.debug = debug
71
72
    @add_ssl_verify_to_kwargs
73
    @add_auth_token_to_headers
74
    def get(self, url, **kwargs):
75
        response = requests.get(self.root + url, **kwargs)
76
        response = self._response_hook(response=response)
77
        return response
78
79
    @add_ssl_verify_to_kwargs
80
    @add_auth_token_to_headers
81
    @add_json_content_type_to_headers
82
    def post(self, url, data, **kwargs):
83
        response = requests.post(self.root + url, json.dumps(data), **kwargs)
84
        response = self._response_hook(response=response)
85
        return response
86
87
    @add_ssl_verify_to_kwargs
88
    @add_auth_token_to_headers
89
    @add_json_content_type_to_headers
90
    def post_raw(self, url, data, **kwargs):
91
        response = requests.post(self.root + url, data, **kwargs)
92
        response = self._response_hook(response=response)
93
        return response
94
95
    @add_ssl_verify_to_kwargs
96
    @add_auth_token_to_headers
97
    @add_json_content_type_to_headers
98
    def put(self, url, data, **kwargs):
99
        response = requests.put(self.root + url, json.dumps(data), **kwargs)
100
        response = self._response_hook(response=response)
101
        return response
102
103
    @add_ssl_verify_to_kwargs
104
    @add_auth_token_to_headers
105
    @add_json_content_type_to_headers
106
    def patch(self, url, data, **kwargs):
107
        response = requests.patch(self.root + url, data, **kwargs)
108
        response = self._response_hook(response=response)
109
        return response
110
111
    @add_ssl_verify_to_kwargs
112
    @add_auth_token_to_headers
113
    def delete(self, url, **kwargs):
114
        response = requests.delete(self.root + url, **kwargs)
115
        response = self._response_hook(response=response)
116
        return response
117
118
    def _response_hook(self, response):
119
        if self.debug:
120
            # Log cURL request line
121
            curl_line = self._get_curl_line_for_request(request=response.request)
122
            print("# -------- begin %d request ----------" % id(self))
123
            print(curl_line)
124
            print("# -------- begin %d response ----------" % (id(self)))
125
            print(response.text)
126
            print("# -------- end %d response ------------" % (id(self)))
127
            print('')
128
129
        return response
130
131
    def _get_curl_line_for_request(self, request):
132
        parts = ['curl']
133
134
        # method
135
        method = request.method.upper()
136
        if method in ['HEAD']:
137
            parts.extend(['--head'])
138
        else:
139
            parts.extend(['-X', pquote(method)])
140
141
        # headers
142
        for key, value in request.headers.items():
143
            parts.extend(['-H ', pquote('%s: %s' % (key, value))])
144
145
        # body
146
        if request.body:
147
            parts.extend(['--data-binary', pquote(request.body)])
148
149
        # URL
150
        parts.extend([pquote(request.url)])
151
152
        curl_line = ' '.join(parts)
153
        return curl_line
154
155
    def _get_url_without_trailing_slash(self, value):
156
        """
157
        Function which strips a trailing slash from the provided url if one is present.
158
159
        :param value: URL to format.
160
        :type value: ``str``
161
162
        :rtype: ``str``
163
        """
164
        result = value[:-1] if value.endswith('/') else value
165
        return result
166