Completed
Push — wip-ergoithz ( 0a393e...4cf754 )
by Felipe A.
29s
created

DataCookie.__init__()   B

Complexity

Conditions 1

Size

Total Lines 37

Duplication

Lines 0
Ratio 0 %

Importance

Changes 2
Bugs 1 Features 0
Metric Value
c 2
b 1
f 0
dl 0
loc 37
rs 8.8571
cc 1
1
#!/usr/bin/env python
2
# -*- coding: UTF-8 -*-
3
4
import re
5
import json
6
import base64
7
import logging
8
import zlib
9
10
from werkzeug.http import dump_header, dump_options_header, dump_cookie, \
11
                          parse_cookie, parse_date
12
from werkzeug.datastructures import Headers as BaseHeaders
13
14
from .compat import range
15
from .exceptions import InvalidCookieSizeError
16
17
18
logger = logging.getLogger(__name__)
19
20
21
class Headers(BaseHeaders):
22
    '''
23
    A wrapper around :class:`werkzeug.datastructures.Headers`, allowing
24
    to specify headers with options on initialization.
25
26
    Headers are provided as keyword arguments while values can be either
27
    :type:`str` (no options) or tuple of :type:`str` and :type:`dict`.
28
    '''
29
    snake_replace = staticmethod(re.compile(r'(^|-)[a-z]').sub)
30
31
    @classmethod
32
    def genpair(cls, key, value):
33
        '''
34
        Extract value and options from values dict based on given key and
35
        options-key.
36
37
        :param key: value key
38
        :type key: str
39
        :param value: value or value/options pair
40
        :type value: str or pair of (str, dict)
41
        :returns: tuple with key and value
42
        :rtype: tuple of (str, str)
43
        '''
44
        rkey = cls.snake_replace(
45
            lambda x: x.group(0).upper(),
46
            key.replace('_', '-')
47
            )
48
        rvalue = (
49
            dump_header([value])
50
            if isinstance(value, str) else
51
            dump_options_header(*value)
52
            )
53
        return rkey, rvalue
54
55
    def __init__(self, **kwargs):
56
        '''
57
        :param **kwargs: header and values as keyword arguments
58
        :type **kwargs: str or (str, dict)
59
        '''
60
        items = [
61
            self.genpair(key, value)
62
            for key, value in kwargs.items()
63
            ]
64
        return super(Headers, self).__init__(items)
65
66
67
class DataCookie(object):
68
    '''
69
    Compressed base64 paginated cookie manager.
70
71
    Usage
72
    -----
73
74
    > from flask import Flask, request, make_response
75
    >
76
    > app = Flask(__name__)
77
    > cookie = DataCookie('my-cookie')
78
    >
79
    > @app.route('/get')
80
    > def get():
81
    >     return 'Cookie value is %s' % cookie.load(request.cookies)
82
    >
83
    > @app.route('/set')
84
    > def set():
85
    >     response = make_response('Cookie set')
86
    >     cookie.dump('setted', response, request.cookies)
87
    >     return response
88
89
    '''
90
    size_error = InvalidCookieSizeError
91
    headers_class = BaseHeaders
92
    header_max_size = 4096
93
    header_initial_size = len('Set-Cookie: ')
94
95
    @staticmethod
96
    def _serialize(data):
97
        '''
98
        :type data: json-serializable
99
        :rtype: bytes
100
        '''
101
        serialized = zlib.compress(json.dumps(data).encode('utf-8'))
102
        return base64.b64encode(serialized).decode('ascii')
103
104
    @staticmethod
105
    def _deserialize(data):
106
        '''
107
        :type data: bytes
108
        :rtype: json-serializable
109
        '''
110
        decoded = base64.b64decode(data)
111
        serialized = zlib.decompress(decoded)
112
        return json.loads(serialized)
113
114
    def __init__(self, cookie_name, max_pages=1,
115
                 max_age=None, expires=None, path='/',
116
                 domain=None, secure=False, httponly=False,
117
                 charset='utf-8', sync_expires=True):
118
        '''
119
        :param cookie_name: first cookie name and prefix for the following
120
        :type cookie_name: str
121
        :param max_pages: maximum allowed cookie parts, defaults to 1
122
        :type max_pages: int
123
        :param max_age: cookie lifetime
124
        :type max_age: int or datetime.timedelta
125
        :param expires: date (or timestamp) of cookie expiration
126
        :type expires: datetime.datetime or int
127
        :param path: cookie domain path, defaults to '/'
128
        :type path: str
129
        :param domain: cookie domain, defaults to current
130
        :type domain: str
131
        :param secure: either cookie will only be available via HTTPS or not
132
        :type secure: bool
133
        :param httponly: either cookie can be accessed via Javascript or not
134
        :type httponly: bool
135
        :param charset: the encoding for unicode values, defaults to utf-8
136
        :type charset: str
137
        :param sync_expires: set expires based on max_age as default
138
        :type sync_expires: bool
139
        '''
140
        self.max_pages = max_pages
141
        self.cookie_name = cookie_name
142
        self.cookie_options = {
143
            'max_age': max_age,
144
            'expires': None,
145
            'path': path,
146
            'domain': domain,
147
            'secure': secure,
148
            'httponly': httponly,
149
            'charset': charset,
150
            'sync_expires': sync_expires,
151
            }
152
153
    def _name_cookie_page(self, page):
154
        '''
155
        Get name of cookie corresponding to given page.
156
157
        By design (see :method:`_name_page`), pages lower than 1 results on
158
        cookie names without a page name.
159
160
        :param page: page number or name
161
        :type page: int or str
162
        :returns: cookie name
163
        :rtype: str
164
        '''
165
        return '{}{}'.format(
166
            self.cookie_name,
167
            page if isinstance(page, str) else
168
            '-{:x}'.format(page + 1) if page else
169
            ''
170
            )
171
172
    def _dump_cookie(self, name, value):
173
        '''
174
        Dump cookie with configured options.
175
176
        :param name: cookie name
177
        :type name: str
178
        :param value: cookie value
179
        :type value: str
180
        :returns: Set-Cookie header value
181
        :rtype: str
182
        '''
183
        return dump_cookie(name, value, **self.cookie_options)
184
185
    def _available_cookie_size(self, name):
186
        '''
187
        Get available cookie size for value.
188
189
        :param name: cookie name
190
        :type name: str
191
        :return: available bytes for cookie value
192
        :rtype: int
193
        '''
194
        empty = self._dump_cookie(name, ' ')  # forces quotes
195
        return self.header_max_size - self.header_initial_size - len(empty)
196
197
    def _extract_cookies(self, headers):
198
        '''
199
        Extract relevant cookies from headers.
200
201
        :param headers: request headers
202
        :type headers: werkzeug.datasturctures.Headers
203
        :returns: cookies
204
        :rtype: dict
205
        '''
206
        regex = re.compile('^%s$' % self._name_cookie_page('(-[0-9a-f]+)?'))
207
        return {
208
            key: value
209
            for header in headers.get_all('cookie')
210
            for key, value in parse_cookie(header).items()
211
            if regex.match(key)
212
            }
213
214
    def load_cookies(self, cookies, default=None):
215
        '''
216
        Parse data from relevant paginated cookie data given as mapping.
217
218
        :param cookies: request cookies
219
        :type cookies: collections.abc.Mapping
220
        :returns: deserialized value
221
        :rtype: browsepy.abc.JSONSerializable
222
        '''
223
        chunks = []
224
        for i in range(self.max_pages):
225
            name = self._name_cookie_page(i)
226
            cookie = cookies.get(name, '').encode('ascii')
227
            chunks.append(cookie)
228
            if len(cookie) < self._available_cookie_size(name):
229
                break
230
        data = b''.join(chunks)
231
        if data:
232
            try:
233
                return self._deserialize(data)
234
            except BaseException:
235
                pass
236
        return default
237
238
    def load_headers(self, headers, default=None):
239
        '''
240
        Parse data from relevant paginated cookie data on request headers.
241
242
        :param headers: request headers
243
        :type headers: werkzeug.http.Headers
244
        :returns: deserialized value
245
        :rtype: browsepy.abc.JSONSerializable
246
        '''
247
        cookies = self._extract_cookies(headers)
248
        return self.load_cookies(cookies)
249
250
    def dump_headers(self, data, headers=None):
251
        '''
252
        Serialize given object into a :class:`werkzeug.datastructures.Headers`
253
        instance.
254
255
        :param data: any json-serializable value
256
        :type data: browsepy.abc.JSONSerializable
257
        :param headers: optional request headers, used to truncate old pages
258
        :type headers: werkzeug.http.Headers
259
        :return: response headers
260
        :rtype: werkzeug.http.Headers
261
        '''
262
        result = self.headers_class()
263
        data = self._serialize(data)
264
        start = 0
265
        size = len(data)
266
        for i in range(self.max_pages):
267
            name = self._name_cookie_page(i)
268
            end = start + self._available_cookie_size(name)
269
            result.add('Set-Cookie', self._dump_cookie(name, data[start:end]))
270
            if end > size:
271
                # incidentally, an empty page will be added after end == size
272
                if headers:
273
                    result.extend(self.truncate_headers(headers, i + 1))
274
                return result
275
            start = end
276
        # pages exhausted, limit reached
277
        raise self.size_error(max_cookies=self.max_pages)
278
279
    def truncate_headers(self, headers, start=0):
280
        '''
281
        Evict relevant cookies found on request headers, optionally starting
282
        from a given page number.
283
284
        :param headers: request headers, required to truncate old pages
285
        :type headers: werkzeug.http.Headers
286
        :param start: paginated cookie start, defaults to 0
287
        :type start: int
288
        :return: response headers
289
        :rtype: werkzeug.http.Headers
290
        '''
291
        name_cookie = self._name_cookie_page
292
        cookie_names = set(self._extract_cookies(headers))
293
        cookie_names.difference_update(name_cookie(i) for i in range(start))
294
295
        result = self.headers_class()
296
        for name in cookie_names:
297
            result.add('Set-Cookie', dump_cookie(name, max_age=0, expires=0))
298
        return result
299
300
301
re_parse_set_cookie = re.compile(r'([^=;]+)(?:=([^;]*))?(?:$|;\s*)')
302
303
304
def parse_set_cookie(header):
305
    '''
306
    Parse the content of a Set-Type HTTP header.
307
308
    Result options are compatible as :func:`werkzeug.http.dump_cookie`
309
    keyword arguments.
310
311
    :param header: Set-Cookie header value
312
    :type header: str
313
    :returns: tuple with cookie name and its options
314
    :rtype: tuple of str and dict
315
    '''
316
317
    def parse_option(pair):
318
        name, value = pair
319
        try:
320
            if name == 'Max-Age':
321
                return 'max_age', int(value)
322
            if name == 'Expires':
323
                return 'expires', parse_date(value)
324
            if name in ('Path', 'Domain', 'SameSite'):
325
                return name.lower(), value
326
            if name in ('Secure', 'HttpOnly'):
327
                return name.lower(), True
328
        except (AttributeError, ValueError, TypeError):
329
            pass
330
        except BaseException as e:
331
            logger.exception(e)
332
        return None, None
333
334
    pairs = re_parse_set_cookie.findall(header)
335
    name, value = pairs.pop(0)
336
    options = {k: v for k, v in map(parse_option, pairs) if k}
337
    options['value'] = parse_cookie('v=%s' % value).get('v', None)
338
    return name, options
339