Completed
Push — feature-datacookie ( 457ec2 )
by Felipe A.
42s
created

DataCookie._name_page()   B

Complexity

Conditions 4

Size

Total Lines 26

Duplication

Lines 0
Ratio 0 %

Importance

Changes 1
Bugs 0 Features 0
Metric Value
c 1
b 0
f 0
dl 0
loc 26
rs 8.5806
cc 4
1
#!/usr/bin/env python
2
# -*- coding: UTF-8 -*-
3
4
import re
5
import string
6
import json
7
import base64
8
import logging
9
import zlib
10
11
from werkzeug.http import dump_header, dump_options_header, dump_cookie, \
12
                          parse_cookie
13
from werkzeug.datastructures import Headers as BaseHeaders
14
15
from .compat import range, map
16
from .exceptions import InvalidCookieSizeError
17
18
19
logger = logging.getLogger(__name__)
20
21
22
class Headers(BaseHeaders):
23
    '''
24
    A wrapper around :class:`werkzeug.datastructures.Headers`, allowing
25
    to specify headers with options on initialization.
26
27
    Headers are provided as keyword arguments while values can be either
28
    :type:`str` (no options) or tuple of :type:`str` and :type:`dict`.
29
    '''
30
    snake_replace = staticmethod(re.compile(r'(^|-)[a-z]').sub)
31
32
    @classmethod
33
    def genpair(cls, key, value):
34
        '''
35
        Extract value and options from values dict based on given key and
36
        options-key.
37
38
        :param key: value key
39
        :type key: str
40
        :param value: value or value/options pair
41
        :type value: str or pair of (str, dict)
42
        :returns: tuple with key and value
43
        :rtype: tuple of (str, str)
44
        '''
45
        rkey = cls.snake_replace(
46
            lambda x: x.group(0).upper(),
47
            key.replace('_', '-')
48
            )
49
        rvalue = (
50
            dump_header([value])
51
            if isinstance(value, str) else
52
            dump_options_header(*value)
53
            )
54
        return rkey, rvalue
55
56
    def __init__(self, **kwargs):
57
        '''
58
        :param **kwargs: header and values as keyword arguments
59
        :type **kwargs: str or (str, dict)
60
        '''
61
        items = [
62
            self.genpair(key, value)
63
            for key, value in kwargs.items()
64
            ]
65
        return super(Headers, self).__init__(items)
66
67
68
class DataCookie(object):
69
    '''
70
    Compressed base64 paginated cookie manager.
71
72
    Usage
73
    -----
74
75
    > from flask import Flask, request, make_response
76
    >
77
    > app = Flask(__name__)
78
    > cookie = DataCookie('my-cookie')
79
    >
80
    > @app.route('/get')
81
    > def get():
82
    >     return 'Cookie value is %s' % cookie.load(request.cookies)
83
    >
84
    > @app.route('/set')
85
    > def set():
86
    >     response = make_response('Cookie set')
87
    >     cookie.dump('setted', response, request.cookies)
88
    >     return response
89
90
    '''
91
    NOT_FOUND = object()
92
    cookie_path = '/'
93
    page_digits = string.digits + string.ascii_letters
94
    max_pages = 5
95
    max_length = 3990
96
    size_error = InvalidCookieSizeError
97
    compress_fnc = staticmethod(zlib.compress)
98
    decompress_fnc = staticmethod(zlib.decompress)
99
    headers_class = BaseHeaders
100
101
    def __init__(self, cookie_name, max_pages=None):
102
        self.cookie_name = cookie_name
103
        self.request_cache_field = '_browsepy.cache.cookie.%s' % cookie_name
104
        self.max_pages = max_pages or self.max_pages
105
106
    @classmethod
107
    def _name_page(cls, page):
108
        '''
109
        Converts page integer to string, using fewer characters as possible.
110
        If string is given, it is returned as is.
111
112
        :param page: page number
113
        :type page: int or str
114
        :return: page id
115
        :rtype: str
116
        '''
117
        if isinstance(page, str):
118
            return page
119
120
        digits = []
121
122
        if page > 1:
123
            base = len(cls.page_digits)
124
            remaining = page - 1
125
            while remaining >= base:
126
                remaining, modulus = divmod(remaining, base)
127
                digits.append(modulus)
128
            digits.append(remaining)
129
            digits.reverse()
130
131
        return ''.join(map(cls.page_digits.__getitem__, digits))
132
133
    def _name_cookie_page(self, page):
134
        '''
135
        Get name of cookie corresponding to given page.
136
137
        By design (see :method:`_name_page`), pages lower than 1 results on
138
        cookie names without a page name.
139
140
        :param page: page number or name
141
        :type page: int or str
142
        :returns: cookie name
143
        :rtype: str
144
        '''
145
        return '{}{}'.format(self.cookie_name, self._name_page(page))
146
147
    def _available_cookie_size(self, name):
148
        '''
149
        Get available cookie size for value.
150
        '''
151
        return self.max_length - len(name + self.cookie_path)
152
153
    def _extract_cookies(self, headers):
154
        '''
155
        Extract relevant cookies from headers.
156
        '''
157
        regex_page_name = '[%s]' % re.escape(self.page_digits)
158
        regex = re.compile('^%s$' % self._name_cookie_page(regex_page_name))
159
        return {
160
            key: value
161
            for header in headers.get_all('cookie')
162
            for key, value in parse_cookie(header).items()
163
            if regex.match(key)
164
            }
165
166
    def load_headers(self, headers):
167
        '''
168
        Parse data from relevant paginated cookie data on request headers.
169
170
        :param headers: request headers
171
        :type headers: werkzeug.http.Headers
172
        :returns: deserialized value
173
        :rtype: browsepy.abc.JSONSerializable
174
        '''
175
        cookies = self._extract_cookies(headers)
176
        chunks = []
177
        for i in range(self.max_pages):
178
            name = self._name_cookie_page(i)
179
            cookie = cookies.get(name, '').encode('ascii')
180
            chunks.append(cookie)
181
            if len(cookie) < self._available_cookie_size(name):
182
                break
183
        data = b''.join(chunks)
184
        try:
185
            data = base64.b64decode(data)
186
            serialized = self.decompress_fnc(data)
187
            return json.loads(serialized.decode('utf-8'))
188
        except (json.JSONDecodeError, ValueError, TypeError):
189
            return None
190
191
    def dump_headers(self, data, headers=None):
192
        '''
193
        Serialize given object into a :class:`werkzeug.datastructures.Headers`
194
        instance.
195
196
        :param data: any json-serializable value
197
        :type data: browsepy.abc.JSONSerializable
198
        :param headers: optional request headers, used to truncate old pages
199
        :type headers: werkzeug.http.Headers
200
        :return: response headers
201
        :rtype: werkzeug.http.Headers
202
        '''
203
        result = self.headers_class()
204
        serialized = self.compress_fnc(json.dumps(data).encode('utf-8'))
205
        data = base64.b64encode(serialized)
206
        start = 0
207
        total = len(data)
208
        for i in range(self.max_pages):
209
            name = self._name_cookie_page(i)
210
            end = start + self._available_cookie_size(name)
211
            result.set(name, data[start:end].decode('ascii'))
212
            start = end
213
            if start > total:
214
                # incidentally, an empty page will be added after start == size
215
                break
216
        else:
217
            # pages exhausted, limit reached
218
            raise self.size_error(max_cookies=self.max_pages)
219
220
        if headers:
221
            result.extend(self.truncate_headers(headers, i + 1))
222
223
        return headers
224
225
    def truncate_headers(self, headers, start=0):
226
        '''
227
        Evict relevant cookies found on request headers, optionally starting
228
        from a given page number.
229
230
        :param headers: request headers, required to truncate old pages
231
        :type headers: werkzeug.http.Headers
232
        :param start: paginated cookie start, defaults to 0
233
        :type start: int
234
        :return: response headers
235
        :rtype: werkzeug.http.Headers
236
        '''
237
        name_cookie = self._name_cookie_page
238
        cookie_names = set(self._extract_cookies(headers))
239
        cookie_names.difference_update(name_cookie(i) for i in range(start))
240
241
        result = self.headers_class()
242
        for name in cookie_names:
243
            result.add('Set-Cookie', dump_cookie(name, expires=0))
244
        return result
245