Passed
Push — master ( aeb165...d9ae97 )
by Dean
03:04
created

CacheManager._start()   A

Complexity

Conditions 2

Size

Total Lines 9

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 5
CRAP Score 2.0185
Metric Value
cc 2
dl 0
loc 9
ccs 5
cts 6
cp 0.8333
crap 2.0185
rs 9.6666
1 1
from plugin.core.database import Database
2
3 1
from stash import Stash, ApswArchive
4 1
from threading import Lock, Thread
5 1
import logging
6 1
import time
7
8 1
DEFAULT_SERIALIZER = 'msgpack:///'
9
10 1
log = logging.getLogger(__name__)
11
12
13 1
class CacheManager(object):
14 1
    active = {}
15
16 1
    _lock = Lock()
17
18 1
    _process_interval = 10
19 1
    _process_running = True
20 1
    _process_thread = None
21
22 1
    @classmethod
23 1
    def get(cls, key, serializer=DEFAULT_SERIALIZER):
24 1
        with cls._lock:
25 1
            if key in cls.active:
26 1
                return cls.active[key]
27
28 1
            return cls.open(
29
                key,
30
                serializer=serializer,
31
                block=False
32
            )
33
34 1
    @classmethod
35 1
    def open(cls, key, serializer=DEFAULT_SERIALIZER, block=True):
36 1
        if block:
37
            # Open cache in lock
38
            with cls._lock:
39
                return cls.open(
40
                    key,
41
                    serializer=serializer,
42
                    block=False
43
                )
44
45
        # Construct cache
46 1
        cls.active[key] = Cache(
47
            key,
48
            serializer=serializer
49
        )
50
51
        # Ensure process thread has started
52 1
        cls._start()
53
54
        # Return cache
55 1
        log.debug('Opened "%s" cache (serializer: %r)', key, serializer)
56 1
        return cls.active[key]
57
58 1
    @classmethod
59
    def _start(cls):
60 1
        if cls._process_thread is not None:
61
            return
62
63 1
        cls._process_thread = Thread(name='CacheManager._process', target=cls._process)
64 1
        cls._process_thread.daemon = True
65
66 1
        cls._process_thread.start()
67
68 1
    @classmethod
69
    def _process(cls):
70 1
        try:
71 1
            cls._process_run()
72
        except Exception, ex:
73
            log.error('Exception raised in CacheManager: %s', ex, exc_info=True)
74
75 1
    @classmethod
76
    def _process_run(cls):
77 1
        while cls._process_running:
78
            # Retrieve current time
79 1
            now = time.time()
80
81
            # Retrieve active caches
82 1
            with cls._lock:
83 1
                caches = cls.active.values()
84
85
            # Sync caches that have been queued
86 1
            for cache in caches:
87 1
                if cache.flush_at is None or cache.flush_at > now:
88
                    continue
89
90
                cache.flush()
91
92 1
            time.sleep(cls._process_interval)
93
94
95 1
class Cache(object):
96 1
    def __init__(self, key, serializer=DEFAULT_SERIALIZER):
97 1
        self.key = key
98
99 1
        self.stash = self._construct(key, serializer=serializer)
100
101 1
        self._flush_at = None
102 1
        self._flush_lock = Lock()
103
104 1
    @property
105
    def flush_at(self):
106 1
        return self._flush_at
107
108 1
    @staticmethod
109 1
    def _construct(key, serializer=DEFAULT_SERIALIZER):
110
        # Parse `key`
111 1
        fragments = key.split('.')
112
113 1
        if len(fragments) != 2:
114
            raise ValueError('Invalid "key" format')
115
116 1
        database, table = tuple(fragments)
117
118
        # Construct cache
119 1
        return Stash(
120
            ApswArchive(Database.cache(database), table),
121
            'lru:///?capacity=500&compact_threshold=1500',
122
            serializer=serializer,
123
            key_transform=(lambda k: str(k), lambda k: k)
124
        )
125
126 1
    def get(self, key, default=None):
127
        return self.stash.get(key, default)
128
129 1
    def prime(self, keys=None, force=False):
130
        return self.stash.prime(keys, force)
131
132 1
    def __getitem__(self, key):
133
        return self.stash[key]
134
135 1
    def __setitem__(self, key, value):
136
        self.stash[key] = value
137
138 1
    def flush(self, force=False):
139
        with self._flush_lock:
140
            self._flush(force=force)
141
142 1
    def _flush(self, force=False):
143
        if not force and self._flush_at is None:
144
            return
145
146
        try:
147
            self.stash.flush()
148
149
            log.debug('Flushed "%s" cache', self.key)
150
        except Exception, ex:
151
            log.error('Unable to flush "%s" cache: %s', self.key, ex, exc_info=True)
152
        finally:
153
            self.flush_clear()
154
155 1
    def flush_queue(self, delay=120):
156
        log.debug('Queued flush for "%s" cache in %ss', self.key, delay)
157
158
        self._flush_at = time.time() + 120
159
160 1
    def flush_clear(self):
161
        if self._flush_at is None:
162
            return
163
164
        log.debug('Cleared flush for "%s" cache', self.key)
165
166
        self._flush_at = None
167