Completed
Push — wip-ergoithz ( cc842c...6140e3 )
by Felipe A.
39s
created

parse_set_cookie_option()   C

Complexity

Conditions 7

Size

Total Lines 28

Duplication

Lines 0
Ratio 0 %

Importance

Changes 1
Bugs 0 Features 1
Metric Value
cc 7
c 1
b 0
f 1
dl 0
loc 28
rs 5.5
1
#!/usr/bin/env python
2
# -*- coding: UTF-8 -*-
3
4
import re
5
import json
6
import base64
7
import logging
8
import zlib
9
10
from werkzeug.http import dump_header, dump_options_header, dump_cookie, \
11
                          parse_cookie, parse_date
12
from werkzeug.datastructures import Headers as BaseHeaders
13
14
from .compat import range
15
from .exceptions import InvalidCookieSizeError
16
17
18
logger = logging.getLogger(__name__)
19
20
21
class Headers(BaseHeaders):
22
    '''
23
    A wrapper around :class:`werkzeug.datastructures.Headers`, allowing
24
    to specify headers with options on initialization.
25
26
    Headers are provided as keyword arguments while values can be either
27
    :type:`str` (no options) or tuple of :type:`str` and :type:`dict`.
28
    '''
29
    snake_replace = staticmethod(re.compile(r'(^|-)[a-z]').sub)
30
31
    @classmethod
32
    def genpair(cls, key, value):
33
        '''
34
        Extract value and options from values dict based on given key and
35
        options-key.
36
37
        :param key: value key
38
        :type key: str
39
        :param value: value or value/options pair
40
        :type value: str or pair of (str, dict)
41
        :returns: tuple with key and value
42
        :rtype: tuple of (str, str)
43
        '''
44
        rkey = cls.snake_replace(
45
            lambda x: x.group(0).upper(),
46
            key.replace('_', '-')
47
            )
48
        rvalue = (
49
            dump_header([value])
50
            if isinstance(value, str) else
51
            dump_options_header(*value)
52
            )
53
        return rkey, rvalue
54
55
    def __init__(self, **kwargs):
56
        '''
57
        :param **kwargs: header and values as keyword arguments
58
        :type **kwargs: str or (str, dict)
59
        '''
60
        items = [
61
            self.genpair(key, value)
62
            for key, value in kwargs.items()
63
            ]
64
        return super(Headers, self).__init__(items)
65
66
67
class DataCookie(object):
68
    '''
69
    Compressed base64 paginated cookie manager.
70
71
    Usage
72
    -----
73
74
    > from flask import Flask, request, make_response
75
    >
76
    > app = Flask(__name__)
77
    > cookie = DataCookie('my-cookie')
78
    >
79
    > @app.route('/get')
80
    > def get():
81
    >     return 'Cookie value is %s' % cookie.load(request.cookies)
82
    >
83
    > @app.route('/set')
84
    > def set():
85
    >     response = make_response('Cookie set')
86
    >     cookie.dump('setted', response, request.cookies)
87
    >     return response
88
89
    '''
90
    page_length = 4000
91
    size_error = InvalidCookieSizeError
92
    headers_class = BaseHeaders
93
94
    @staticmethod
95
    def _serialize(data):
96
        '''
97
        :type data: json-serializable
98
        :rtype: bytes
99
        '''
100
        serialized = zlib.compress(json.dumps(data).encode('utf-8'))
101
        return base64.b64encode(serialized).decode('ascii')
102
103
    @staticmethod
104
    def _deserialize(data):
105
        '''
106
        :type data: bytes
107
        :rtype: json-serializable
108
        '''
109
        decoded = base64.b64decode(data)
110
        serialized = zlib.decompress(decoded)
111
        return json.loads(serialized)
112
113
    def __init__(self, cookie_name, max_pages=1, max_age=None, path='/'):
114
        '''
115
        :param cookie_name: first cookie name and prefix for the following
116
        :type cookie_name: str
117
        :param max_pages: maximum allowed cookie parts, defaults to 1
118
        :type max_pages: int
119
        :param max_age: cookie lifetime in seconds or None (session, default)
120
        :type max_age: int, datetime.timedelta or None
121
        :param path: cookie path, defaults to /
122
        :type path: str
123
        '''
124
        self.cookie_name = cookie_name
125
        self.max_pages = max_pages
126
        self.max_age = max_age
127
        self.path = path
128
129
    def _name_cookie_page(self, page):
130
        '''
131
        Get name of cookie corresponding to given page.
132
133
        By design (see :method:`_name_page`), pages lower than 1 results on
134
        cookie names without a page name.
135
136
        :param page: page number or name
137
        :type page: int or str
138
        :returns: cookie name
139
        :rtype: str
140
        '''
141
        return '{}{}'.format(
142
            self.cookie_name,
143
            page if isinstance(page, str) else
144
            '-{:x}'.format(page - 1) if page else
145
            ''
146
            )
147
148
    def _available_cookie_size(self, name):
149
        '''
150
        Get available cookie size for value.
151
152
        :param name: cookie name
153
        :type name: str
154
        :return: available bytes for cookie value
155
        :rtype: int
156
        '''
157
        empty = 'Set-Cookie: %s' % dump_cookie(
158
            name,
159
            value=' ',  # force quotes
160
            max_age=self.max_age,
161
            path=self.path
162
            )
163
        return self.page_length - len(empty)
164
165
    def _extract_cookies(self, headers):
166
        '''
167
        Extract relevant cookies from headers.
168
        '''
169
        regex = re.compile('^%s$' % self._name_cookie_page('(-[0-9a-f])?'))
170
        return {
171
            key: value
172
            for header in headers.get_all('cookie')
173
            for key, value in parse_cookie(header).items()
174
            if regex.match(key)
175
            }
176
177
    def load_cookies(self, cookies, default=None):
178
        '''
179
        Parse data from relevant paginated cookie data given as mapping.
180
181
        :param cookies: request cookies
182
        :type cookies: collections.abc.Mapping
183
        :returns: deserialized value
184
        :rtype: browsepy.abc.JSONSerializable
185
        '''
186
        chunks = []
187
        for i in range(self.max_pages):
188
            name = self._name_cookie_page(i)
189
            cookie = cookies.get(name, '').encode('ascii')
190
            chunks.append(cookie)
191
            if len(cookie) < self._available_cookie_size(name):
192
                break
193
        data = b''.join(chunks)
194
        if data:
195
            try:
196
                return self._deserialize(data)
197
            except BaseException:
198
                pass
199
        return default
200
201
    def load_headers(self, headers, default=None):
202
        '''
203
        Parse data from relevant paginated cookie data on request headers.
204
205
        :param headers: request headers
206
        :type headers: werkzeug.http.Headers
207
        :returns: deserialized value
208
        :rtype: browsepy.abc.JSONSerializable
209
        '''
210
        cookies = self._extract_cookies(headers)
211
        return self.load_cookies(cookies)
212
213
    def dump_headers(self, data, headers=None):
214
        '''
215
        Serialize given object into a :class:`werkzeug.datastructures.Headers`
216
        instance.
217
218
        :param data: any json-serializable value
219
        :type data: browsepy.abc.JSONSerializable
220
        :param headers: optional request headers, used to truncate old pages
221
        :type headers: werkzeug.http.Headers
222
        :return: response headers
223
        :rtype: werkzeug.http.Headers
224
        '''
225
        result = self.headers_class()
226
        data = self._serialize(data)
227
        start = 0
228
        size = len(data)
229
        for i in range(self.max_pages):
230
            name = self._name_cookie_page(i)
231
            end = start + self._available_cookie_size(name)
232
            result.set(
233
                'Set-Cookie',
234
                dump_cookie(
235
                    name,
236
                    data[start:end],
237
                    max_age=self.max_age,
238
                    path=self.path,
239
                    )
240
                )
241
            if end > size:
242
                # incidentally, an empty page will be added after end == size
243
                if headers:
244
                    result.extend(self.truncate_headers(headers, i + 1))
245
                return result
246
            start = end
247
        # pages exhausted, limit reached
248
        raise self.size_error(max_cookies=self.max_pages)
249
250
    def truncate_headers(self, headers, start=0):
251
        '''
252
        Evict relevant cookies found on request headers, optionally starting
253
        from a given page number.
254
255
        :param headers: request headers, required to truncate old pages
256
        :type headers: werkzeug.http.Headers
257
        :param start: paginated cookie start, defaults to 0
258
        :type start: int
259
        :return: response headers
260
        :rtype: werkzeug.http.Headers
261
        '''
262
        name_cookie = self._name_cookie_page
263
        cookie_names = set(self._extract_cookies(headers))
264
        cookie_names.difference_update(name_cookie(i) for i in range(start))
265
266
        result = self.headers_class()
267
        for name in cookie_names:
268
            result.add('Set-Cookie', dump_cookie(name, expires=0))
269
        return result
270
271
272
def parse_set_cookie_option(name, value):
273
    '''
274
    Parse Set-Cookie header option (acepting option 'value' as cookie value),
275
    both name and value.
276
277
    Resulting names are compatible as :func:`werkzeug.http.dump_cookie`
278
    keyword arguments.
279
280
    :param name: option name
281
    :type name: str
282
    :param value: option value
283
    :type value: str
284
    :returns: tuple of parsed name and option, or None if name is unknown
285
    :rtype: tuple of str or None
286
    '''
287
    try:
288
        if name == 'Max-Age':
289
            return 'max_age', int(value)
290
        if name == 'Expires':
291
            return 'expires', parse_date(value)
292
        if name in ('value', 'Path', 'Domain', 'SameSite'):
293
            return name.lower(), value
294
        if name in ('Secure', 'HttpOnly'):
295
            return name.lower(), True
296
    except (AttributeError, ValueError, TypeError):
297
        pass
298
    except BaseException as e:
299
        logger.exception(e)
300
301
302
re_parse_set_cookie = re.compile(r'([^=;]+)(?:=([^;]*))?(?:$|;\s*)')
303
304
305
def parse_set_cookie(header, option_parse_fnc=parse_set_cookie_option):
306
    '''
307
    Parse the content of a Set-Type HTTP header.
308
309
    Result options are compatible as :func:`werkzeug.http.dump_cookie`
310
    keyword arguments.
311
312
    :param header: Set-Cookie header value
313
    :type header: str
314
    :returns: tuple with cookie name and its options
315
    :rtype: tuple of str and dict
316
    '''
317
    pairs = re_parse_set_cookie.findall(header)
318
    name, value = pairs[0]
319
    pairs[0] = ('value', parse_cookie('v=%s' % value).get('v', None))
320
    return name, dict(filter(None, (option_parse_fnc(*p) for p in pairs)))
321