StampedeProtector   A
last analyzed

Complexity

Total Complexity 26

Size/Duplication

Total Lines 251
Duplicated Lines 0 %

Importance

Changes 5
Bugs 0 Features 1
Metric Value
eloc 51
c 5
b 0
f 1
dl 0
loc 251
rs 10
wmc 26

18 Methods

Rating   Name   Duplication   Size   Complexity  
A getCollection() 0 5 1
A add() 0 3 1
A delete() 0 3 1
A replace() 0 3 1
A get() 0 6 3
A stampedeKey() 0 9 2
A decrement() 0 3 1
A __construct() 0 4 1
A touch() 0 3 1
A sleep() 0 6 1
A cas() 0 3 1
A deleteMulti() 0 3 1
A setMulti() 0 3 1
A protect() 0 18 3
A set() 0 3 1
A increment() 0 3 1
A flush() 0 3 1
A getMulti() 0 43 4
1
<?php
2
3
namespace MatthiasMullie\Scrapbook\Scale;
4
5
use MatthiasMullie\Scrapbook\Exception\InvalidKey;
6
use MatthiasMullie\Scrapbook\KeyValueStore;
7
8
/**
9
 * Cache is usually used to reduce performing a complex operation. In case of a
10
 * cache miss, that operation is executed & the result is stored.
11
 *
12
 * A cache stampede happens when there are a lot of requests for data that is
13
 * not currently in cache. Examples could be:
14
 * * cache expires for something that is often under very heavy load
15
 * * sudden unexpected high load on something that is likely to not be in cache
16
 * In those cases, this huge amount of requests for data that is not at that
17
 * time in cache, causes that expensive operation to be executed a lot of times,
18
 * all at once.
19
 *
20
 * This class is designed to counteract that: if a value can't be found in cache
21
 * we'll write something else to cache for a short period of time, to indicate
22
 * that another process has already requested this same key (and is probably
23
 * already performing that complex operation that will result in the key being
24
 * filled)
25
 *
26
 * All of the follow-up requests (that find that the "stampede indicator" has
27
 * already been set) will just wait (usleep): instead of crippling the servers
28
 * by all having to execute the same operation, these processes will just idle
29
 * to give the first process the chance to fill in the cache. Periodically,
30
 * these processes will poll the cache to see if the value has already been
31
 * stored in the meantime.
32
 *
33
 * The stampede protection will only be temporary, for $sla milliseconds. We
34
 * need to limit it because the first process (tasked with filling the cache
35
 * after executing the expensive operation) may fail/crash/... If the expensive
36
 * operation fails to conclude in < $sla milliseconds.
37
 * This class guarantees that the stampede will hold off for $sla amount of time
38
 * but after that, all follow-up requests will go through without cached values
39
 * and cause a stampede after all, if the initial process fails to complete
40
 * within that time.
41
 *
42
 * @author Matthias Mullie <[email protected]>
43
 * @copyright Copyright (c) 2014, Matthias Mullie. All rights reserved
44
 * @license LICENSE MIT
45
 */
46
class StampedeProtector implements KeyValueStore
47
{
48
    /**
49
     * @var KeyValueStore
50
     */
51
    protected $cache = array();
52
53
    /**
54
     * Amount of time, in milliseconds, this class guarantees protection.
55
     *
56
     * @var int
57
     */
58
    protected $sla = 1000;
59
60
    /**
61
     * Amount of times every process will poll within $sla time.
62
     *
63
     * @var int
64
     */
65
    protected $attempts = 10;
66
67
    /**
68
     * @param KeyValueStore $cache The real cache we'll buffer for
69
     * @param int           $sla   Stampede protection time, in milliseconds
70
     */
71
    public function __construct(KeyValueStore $cache, $sla = 1000)
72
    {
73
        $this->cache = $cache;
74
        $this->sla = $sla;
75
    }
76
77
    /**
78
     * {@inheritdoc}
79
     */
80
    public function get($key, &$token = null)
81
    {
82
        $values = $this->getMulti(array($key), $tokens);
83
        $token = isset($tokens[$key]) ? $tokens[$key] : null;
84
85
        return isset($values[$key]) ? $values[$key] : false;
86
    }
87
88
    /**
89
     * {@inheritdoc}
90
     */
91
    public function getMulti(array $keys, array &$tokens = null)
92
    {
93
        // fetch both requested keys + stampede protection indicators at once
94
        $stampedeKeys = array_combine($keys, array_map(array($this, 'stampedeKey'), $keys));
95
        $values = $this->cache->getMulti(array_merge($keys, $stampedeKeys), $tokens);
96
97
        // figure out which of the requested keys are protected, and which need
98
        // protection (=currently empty & not yet protected)
99
        $protected = array_keys(array_intersect($stampedeKeys, array_keys($values)));
100
        $protect = array_diff($keys, array_keys($values), $protected);
101
102
        // protect keys that we couldn't find, and remove them from the list of
103
        // keys we want results from, because we'll keep fetching empty keys
104
        // (that are currently protected)
105
        $done = $this->protect($protect);
106
        $keys = array_diff($keys, $done);
107
108
        // we may have failed to protect some keys after all (race condition
109
        // with another process), in which case we also have to keep polling
110
        // those keys (which the other process is likely working on already)
111
        $protected += array_diff($protect, $done);
112
113
        // we over-fetched (to include stampede indicators), now limit the
114
        // results to only the keys we requested
115
        $results = array_intersect_key($values, array_flip($keys));
116
        $tokens = array_intersect_key($tokens, $results);
117
118
        // we may not have been able to retrieve all keys yet: some may have
119
        // been "protected" (and are being regenerated in another process) in
120
        // which case we'll retry a couple of times, hoping the other process
121
        // stores the new value in the meantime
122
        $attempts = $this->attempts;
123
        while (--$attempts > 0 && !empty($protected) && $this->sleep()) {
124
            $values = $this->cache->getMulti($protected, $tokens2);
125
126
            $results += array_intersect_key($values, array_flip($keys));
127
            $tokens += array_intersect_key($tokens2, array_flip($keys));
128
129
            // don't keep polling for values we just fetched...
130
            $protected = array_diff($protected, array_keys($values));
131
        }
132
133
        return $results;
134
    }
135
136
    /**
137
     * {@inheritdoc}
138
     */
139
    public function set($key, $value, $expire = 0)
140
    {
141
        return $this->cache->set($key, $value, $expire);
142
    }
143
144
    /**
145
     * {@inheritdoc}
146
     */
147
    public function setMulti(array $items, $expire = 0)
148
    {
149
        return $this->cache->setMulti($items, $expire);
150
    }
151
152
    /**
153
     * {@inheritdoc}
154
     */
155
    public function delete($key)
156
    {
157
        return $this->cache->delete($key);
158
    }
159
160
    /**
161
     * {@inheritdoc}
162
     */
163
    public function deleteMulti(array $keys)
164
    {
165
        return $this->cache->deleteMulti($keys);
166
    }
167
168
    /**
169
     * {@inheritdoc}
170
     */
171
    public function add($key, $value, $expire = 0)
172
    {
173
        return $this->cache->add($key, $value, $expire);
174
    }
175
176
    /**
177
     * {@inheritdoc}
178
     */
179
    public function replace($key, $value, $expire = 0)
180
    {
181
        return $this->cache->replace($key, $value, $expire);
182
    }
183
184
    /**
185
     * {@inheritdoc}
186
     */
187
    public function cas($token, $key, $value, $expire = 0)
188
    {
189
        return $this->cache->cas($token, $key, $value, $expire);
190
    }
191
192
    /**
193
     * {@inheritdoc}
194
     */
195
    public function increment($key, $offset = 1, $initial = 0, $expire = 0)
196
    {
197
        return $this->cache->increment($key, $offset, $initial, $expire);
198
    }
199
200
    /**
201
     * {@inheritdoc}
202
     */
203
    public function decrement($key, $offset = 1, $initial = 0, $expire = 0)
204
    {
205
        return $this->cache->decrement($key, $offset, $initial, $expire);
206
    }
207
208
    /**
209
     * {@inheritdoc}
210
     */
211
    public function touch($key, $expire)
212
    {
213
        return $this->cache->touch($key, $expire);
214
    }
215
216
    /**
217
     * {@inheritdoc}
218
     */
219
    public function flush()
220
    {
221
        return $this->cache->flush();
222
    }
223
224
    /**
225
     * {@inheritdoc}
226
     */
227
    public function getCollection($name)
228
    {
229
        $collection = $this->cache->getCollection($name);
230
231
        return new static($collection);
232
    }
233
234
    /**
235
     * As soon as a key turns up empty (doesn't yet exist in cache), we'll
236
     * "protect" it for some time. This will be done by writing to a key similar
237
     * to the original key name. If this key is present (which it will only be
238
     * for a short amount of time) we'll know it's protected.
239
     *
240
     * @return string[] Array of keys that were successfully protected
241
     */
242
    protected function protect(array $keys)
243
    {
244
        if (empty($keys)) {
245
            return array();
246
        }
247
248
        $success = array();
249
        foreach ($keys as $key) {
250
            /*
251
             * Key is add()ed because there may be multiple concurrent processes
252
             * that are both in the process of protecting - first one to add()
253
             * wins (and those are returned by the function, so those that are
254
             * failed to protect can be considered protected)
255
             */
256
            $success[$key] = $this->cache->add($this->stampedeKey($key), '', $this->sla);
257
        }
258
259
        return array_keys(array_filter($success));
260
    }
261
262
    /**
263
     * When waiting for stampede-protected keys, we'll just sleep, not using
264
     * much resources.
265
     *
266
     * @return bool
267
     */
268
    protected function sleep()
269
    {
270
        $break = $this->sla / $this->attempts;
271
        usleep(1000 * $break);
272
273
        return true;
274
    }
275
276
    /**
277
     * To figure out if something has recently been requested already (and is
278
     * likely in the process of being recalculated), we'll temporarily write to
279
     * another key, so follow-up requests know another process is likely already
280
     * re-processing the value.
281
     *
282
     * @param string $key
283
     *
284
     * @return string
285
     *
286
     * @throws InvalidKey
287
     */
288
    protected function stampedeKey($key)
289
    {
290
        $suffix = '.stampede';
291
292
        if (substr($key, -strlen($suffix)) === $suffix) {
293
            throw new InvalidKey("Invalid key: $key. Keys with suffix '$suffix' are reserved.");
294
        }
295
296
        return $key.$suffix;
297
    }
298
}
299