api.decorators.paginate()   B
last analyzed

Complexity

Conditions 3

Size

Total Lines 53
Code Lines 47

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 3
eloc 47
nop 1
dl 0
loc 53
rs 8.7345
c 0
b 0
f 0

How to fix   Long Method   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

1
import functools
2
import hashlib
3
from flask import jsonify, request, url_for, current_app, make_response, g
4
from .rate_limit import RateLimit
5
from .errors import too_many_requests, precondition_failed, not_modified
6
7
8
def json(f):
9
  @functools.wraps(f)
10
  def wrapped(*args, **kwargs):
11
    rv = f(*args, **kwargs)
12
    status_or_headers = None
13
    headers = None
14
    if isinstance(rv, tuple):
15
      rv, status_or_headers, headers = rv + (None, ) * (3 - len(rv))
16
    if isinstance(status_or_headers, (dict, list)):
17
      headers, status_or_headers = status_or_headers, None
18
    if not isinstance(rv, dict):
19
      rv = rv.to_json()
20
    rv = jsonify(rv)
21
    if status_or_headers is not None:
22
      rv.status_code = status_or_headers
23
    if headers is not None:
24
      rv.headers.extend(headers)
25
    return rv
26
27
  return wrapped
28
29
30
def hit_count(f):
31
  @functools.wraps(f)
32
  def wrapped(*args, **kwargs):
33
    rv = f(*args, **kwargs)
34
35
    if current_app.config['TESTING']:
36
      from .rate_limit import FakeRedis
37
      redis = FakeRedis()
38
    else:  # pragma: no cover
39
      from redis import Redis
40
      redis_host = current_app.config.get("REDIS_HOST", "localhost")
41
      redis_port = current_app.config.get("REDIS_PORT", 6379)
42
      redis_db = current_app.config.get("REDIS_DB", 0)
43
      redis = Redis(host=redis_host, port=redis_port, db=redis_db)
44
45
    key = 'hit-count%s' % (request.path)
46
    redis.incr(key)
47
    return rv
48
49
  return wrapped
50
51
52
def rate_limit(limit, per, scope_func=lambda: request.remote_addr):
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable request does not seem to be defined.
Loading history...
53
  def decorator(f):
54
    @functools.wraps(f)
55
    def wrapped(*args, **kwargs):
56
      if current_app.config['USE_RATE_LIMITS']:
57
        key = 'rate-limit/%s/%s/' % (f.__name__, scope_func())
58
        limiter = RateLimit(key, limit, per)
59
        if not limiter.over_limit:
60
          rv = f(*args, **kwargs)
61
        else:
62
          rv = too_many_requests('You have exceeded your request rate')
63
        # rv = make_response(rv)
64
        g.headers = {
65
            'X-RateLimit-Remaining': str(limiter.remaining),
66
            'X-RateLimit-Limit': str(limiter.limit),
67
            'X-RateLimit-Reset': str(limiter.reset)
68
        }
69
        return rv
70
      else:
71
        return f(*args, **kwargs)
72
73
    return wrapped
74
75
  return decorator
76
77
78
def paginate(max_per_page=10):
79
  def decorator(f):
80
    @functools.wraps(f)
81
    def wrapped(*args, **kwargs):
82
      page = request.args.get('page', 1, type=int)
83
      per_page = min(
84
          request.args.get('per_page', max_per_page, type=int), max_per_page)
85
      query = f(*args, **kwargs)
86
      p = query.paginate(page, per_page)
87
      pages = {
88
          'page': page,
89
          'per_page': per_page,
90
          'total': p.total,
91
          'pages': p.pages
92
      }
93
      if p.has_prev:
94
        pages['prev'] = url_for(
95
            request.endpoint,
96
            page=p.prev_num,
97
            per_page=per_page,
98
            _external=True,
99
            **kwargs)
100
      else:
101
        pages['prev'] = None
102
      if p.has_next:
103
        pages['next'] = url_for(
104
            request.endpoint,
105
            page=p.next_num,
106
            per_page=per_page,
107
            _external=True,
108
            **kwargs)
109
      else:
110
        pages['next'] = None
111
      pages['first'] = url_for(
112
          request.endpoint,
113
          page=1,
114
          per_page=per_page,
115
          _external=True,
116
          **kwargs)
117
      pages['last'] = url_for(
118
          request.endpoint,
119
          page=p.pages,
120
          per_page=per_page,
121
          _external=True,
122
          **kwargs)
123
      return jsonify({
124
          'urls': [item.get_url() for item in p.items],
125
          'meta': pages
126
      })
127
128
    return wrapped
129
130
  return decorator
131
132
133
def cache_control(*directives):
134
  def decorator(f):
135
    @functools.wraps(f)
136
    def wrapped(*args, **kwargs):
137
      rv = f(*args, **kwargs)
138
      rv = make_response(rv)
139
      rv.headers['Cache-Control'] = ', '.join(directives)
140
      return rv
141
142
    return wrapped
143
144
  return decorator
145
146
147
def no_cache(f):
148
  return cache_control('no-cache', 'no-store', 'max-age=0')(f)
149
150
151
def etag(f):
152
  @functools.wraps(f)
153
  def wrapped(*args, **kwargs):
154
    # only for HEAD and GET requests
155
    assert request.method in ['HEAD', 'GET'],\
156
        '@etag is only supported for GET requests'
157
    rv = f(*args, **kwargs)
158
    rv = make_response(rv)
159
    etag_str = '"' + hashlib.md5(rv.get_data()).hexdigest() + '"'
160
    rv.headers['ETag'] = etag_str
161
    if_match = request.headers.get('If-Match')
162
    if_none_match = request.headers.get('If-None-Match')
163
    if if_match:
164
      etag_list = [tag.strip() for tag in if_match.split(',')]
165
      if etag_str not in etag_list and '*' not in etag_list:
166
        rv = precondition_failed()
167
    elif if_none_match:
168
      etag_list = [tag.strip() for tag in if_none_match.split(',')]
169
      if etag_str in etag_list or '*' in etag_list:
170
        rv = not_modified()
171
    return rv
172
173
  return wrapped
174