Completed
Push — master ( 3562b3...42cf04 )
by Bertrand
01:12
created

_patch_cursor_execute()   C

Complexity

Conditions 8

Size

Total Lines 15

Duplication

Lines 0
Ratio 0 %

Importance

Changes 1
Bugs 1 Features 0
Metric Value
cc 8
c 1
b 1
f 0
dl 0
loc 15
rs 6.6666
1
# -*- coding: utf-8 -*-
2
3
from __future__ import unicode_literals
4
from collections import Iterable
5
from functools import wraps
6
from time import time
7
8
from django.db.backends.utils import CursorWrapper
9
from django.db.models.query import EmptyResultSet
10
from django.db.models.signals import post_migrate
11
from django.db.models.sql.compiler import (
12
    SQLCompiler, SQLInsertCompiler, SQLUpdateCompiler, SQLDeleteCompiler)
13
from django.db.transaction import Atomic, get_connection
14
15
from .api import invalidate
16
from .cache import cachalot_caches
17
from .settings import cachalot_settings
18
from .utils import (
19
    _get_query_cache_key, _get_table_cache_keys, _get_tables_from_sql,
20
    _invalidate_table, UncachableQuery, TUPLE_OR_LIST)
21
22
WRITE_COMPILERS = (SQLInsertCompiler, SQLUpdateCompiler, SQLDeleteCompiler)
23
24
25
def _unset_raw_connection(original):
26
    def inner(compiler, *args, **kwargs):
27
        compiler.connection.raw = False
28
        out = original(compiler, *args, **kwargs)
29
        compiler.connection.raw = True
30
        return out
31
    return inner
32
33
34
def _get_result_or_execute_query(execute_query_func, cache,
35
                                 cache_key, table_cache_keys):
36
    data = cache.get_many(table_cache_keys + [cache_key])
37
38
    new_table_cache_keys = set(table_cache_keys)
39
    new_table_cache_keys.difference_update(data)
40
41
    if new_table_cache_keys:
42
        now = time()
43
        cache.set_many({k: now for k in new_table_cache_keys}, None)
44
    elif cache_key in data:
45
        timestamp, result = data.pop(cache_key)
46
        table_times = data.values()
47
        if table_times and timestamp > max(table_times):
48
            return result
49
50
    result = execute_query_func()
51
    if isinstance(result, Iterable) and result.__class__ not in TUPLE_OR_LIST:
52
        result = list(result)
53
54
    cache.set(cache_key, (time(), result), None)
55
56
    return result
57
58
59
def _patch_compiler(original):
60
    @wraps(original)
61
    @_unset_raw_connection
62
    def inner(compiler, *args, **kwargs):
63
        execute_query_func = lambda: original(compiler, *args, **kwargs)
64
        if not cachalot_settings.CACHALOT_ENABLED \
65
                or isinstance(compiler, WRITE_COMPILERS):
66
            return execute_query_func()
67
68
        try:
69
            cache_key = _get_query_cache_key(compiler)
70
            table_cache_keys = _get_table_cache_keys(compiler)
71
        except (EmptyResultSet, UncachableQuery):
72
            return execute_query_func()
73
74
        return _get_result_or_execute_query(
75
            execute_query_func,
76
            cachalot_caches.get_cache(db_alias=compiler.using),
77
            cache_key, table_cache_keys)
78
79
    return inner
80
81
82
def _patch_write_compiler(original):
83
    @wraps(original)
84
    @_unset_raw_connection
85
    def inner(write_compiler, *args, **kwargs):
86
        db_alias = write_compiler.using
87
        table = write_compiler.query.get_meta().db_table
88
        _invalidate_table(cachalot_caches.get_cache(db_alias=db_alias),
89
                          db_alias, table)
90
        return original(write_compiler, *args, **kwargs)
91
92
    return inner
93
94
95
def _patch_orm():
96
    SQLCompiler.execute_sql = _patch_compiler(SQLCompiler.execute_sql)
97
    for compiler in WRITE_COMPILERS:
98
        compiler.execute_sql = _patch_write_compiler(compiler.execute_sql)
99
100
101
def _patch_cursor():
102
    def _patch_cursor_execute(original):
103
        @wraps(original)
104
        def inner(cursor, sql, *args, **kwargs):
105
            out = original(cursor, sql, *args, **kwargs)
106
            if getattr(cursor.db, 'raw', True) \
107
                    and cachalot_settings.CACHALOT_INVALIDATE_RAW:
108
                sql = sql.lower()
109
                if not isinstance(sql, unicode):
110
                    sql = unicode(sql, 'utf-8')
111
                if unicode('update') in sql or unicode('insert') in sql or unicode('delete') in sql:
112
                    tables = _get_tables_from_sql(cursor.db, sql)
113
                    invalidate(*tables, db_alias=cursor.db.alias)
114
            return out
115
116
        return inner
117
118
    CursorWrapper.execute = _patch_cursor_execute(CursorWrapper.execute)
119
    CursorWrapper.executemany = _patch_cursor_execute(CursorWrapper.executemany)
120
121
122
def _patch_atomic():
123
    def patch_enter(original):
124
        @wraps(original)
125
        def inner(self):
126
            cachalot_caches.enter_atomic(self.using)
127
            original(self)
128
129
        return inner
130
131
    def patch_exit(original):
132
        @wraps(original)
133
        def inner(self, exc_type, exc_value, traceback):
134
            needs_rollback = get_connection(self.using).needs_rollback
135
            original(self, exc_type, exc_value, traceback)
136
            cachalot_caches.exit_atomic(
137
                self.using, exc_type is None and not needs_rollback)
138
139
        return inner
140
141
    Atomic.__enter__ = patch_enter(Atomic.__enter__)
142
    Atomic.__exit__ = patch_exit(Atomic.__exit__)
143
144
145
def _invalidate_on_migration(sender, **kwargs):
146
    invalidate(*sender.get_models(), db_alias=kwargs['using'])
147
148
149
def patch():
150
    post_migrate.connect(_invalidate_on_migration)
151
152
    _patch_cursor()
153
    _patch_atomic()
154
    _patch_orm()
155