Redis::__construct()   D
last analyzed

Complexity

Conditions 10
Paths 16

Size

Total Lines 31
Code Lines 18

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
dl 0
loc 31
rs 4.8196
c 0
b 0
f 0
cc 10
eloc 18
nc 16
nop 1

How to fix   Complexity   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

1
<?php
2
/**
3
 * For the full copyright and license information, please view the LICENSE
4
 * file that was distributed with this source code.
5
 *
6
 * @author Nikita Vershinin <[email protected]>
7
 * @license MIT
8
 */
9
namespace Endeveit\Cache\Drivers;
10
11
use Endeveit\Cache\Abstracts\Common;
12
use Endeveit\Cache\Exception;
13
14
/**
15
 * Driver that stores data in Redis and uses \Redis extension
16
 * to work with it.
17
 *
18
 * The implementation of consistent hashring was taken from Rediska project
19
 *  https://github.com/Shumkov/Rediska/blob/master/library/Rediska/KeyDistributor/ConsistentHashing.php
20
 */
21
class Redis extends Common
22
{
23
    const DEFAULT_PORT = 6379;
24
    const DEFAULT_TIMEOUT = 0.0;
25
    const DEFAULT_WEIGHT = 1;
26
27
    protected $connectionsOptions = array();
28
    protected $connections = array();
29
30
    protected $backendsWeights = array();
31
    protected $nbBackends = 0;
32
33
    protected $hashring = array();
34
    protected $nbHashrings = 0;
35
36
    protected $nativeExpires = false;
37
38
    protected $replicas = 256;
39
    protected $slicesCount = 0;
40
    protected $slicesHalf = 0;
41
    protected $slicesDiv = 0;
42
43
    protected $localCache = array();
44
    protected $localCacheCount = 0;
45
    protected $localCacheSize = 256;
46
47
    protected $hashringIsInitialized = false;
48
49
    /**
50
     * {@inheritdoc}
51
     *
52
     * Additional options:
53
     *  "local_cache_size" => the size of local cache
54
     *  "native_expires"   => use or not native expiration time
55
     *  "servers"          => array with connections parameters
56
     *                        array(
57
     *                          array('host' => '127.0.0.1', 'port' => 6379, 'timeout' => 0.0, 'weight' => 2),
58
     *                          array('host' => '127.0.0.1', 'port' => 6380, 'timeout' => 0.0, 'weight' => 1),
59
     *                          array('host' => '127.0.0.1', 'port' => 6381, 'timeout' => 0.0, 'weight' => 1),
60
     *                        )
61
     *
62
     * @codeCoverageIgnore
63
     * @param  array                     $options
64
     * @throws \Endeveit\Cache\Exception
65
     */
66
    public function __construct(array $options = array())
67
    {
68
        if (array_key_exists('local_cache_size', $options)) {
69
            $this->localCacheSize = intval($options['local_cache_size']);
70
            unset($options['local_cache_size']);
71
        }
72
73
        if (array_key_exists('native_expires', $options)) {
74
            $this->nativeExpires = (bool) $options['native_expires'];
75
            unset($options['native_expires']);
76
        }
77
78
        if (!array_key_exists('servers', $options) || !is_array($options['servers'])) {
79
            throw new Exception('You must provide option "servers" with array of connections parameters');
80
        }
81
82
        parent::__construct($options);
83
84
        foreach ($this->getOption('servers') as $server) {
85
            if (!array_key_exists('host', $server)) {
86
                throw new Exception('You must provide host in connection parameters');
87
            }
88
89
            $this->addConnection(
90
                $server['host'],
91
                array_key_exists('port', $server) ? intval($server['port']) : self::DEFAULT_PORT,
92
                array_key_exists('timeout', $server) ? floatval($server['timeout']) : self::DEFAULT_TIMEOUT,
93
                array_key_exists('weight', $server) ? intval($server['weight']) : self::DEFAULT_WEIGHT
94
            );
95
        }
96
    }
97
98
    /**
99
     * {@inheritdoc}
100
     *
101
     * @param  string  $id
102
     * @param  integer $value
103
     * @return integer
104
     */
105
    public function increment($id, $value = 1)
106
    {
107
        $id = $this->getPrefixedIdentifier($id);
108
109
        return $this->getConnection($id)->incrBy($id, $value);
110
    }
111
112
    /**
113
     * {@inheritdoc}
114
     *
115
     * @param  string  $id
116
     * @param  integer $value
117
     * @return integer
118
     */
119
    public function decrement($id, $value = 1)
120
    {
121
        $id = $this->getPrefixedIdentifier($id);
122
123
        return $this->getConnection($id)->decrBy($id, $value);
124
    }
125
126
    /**
127
     * Adds new connection to connections pool.
128
     *
129
     * @param  string                    $host
130
     * @param  integer                   $port
131
     * @param  float                     $timeout
132
     * @param  integer                   $weight
133
     * @throws \Endeveit\Cache\Exception
134
     */
135
    protected function addConnection($host, $port, $timeout, $weight)
136
    {
137
        $key = crc32(json_encode(array($host, $port)));
138
        if (isset($this->backendsWeights[$key])) {
139
            throw new Exception('Connection with the same parameters already exists.');
140
        }
141
142
        $this->backendsWeights[$key] = $weight;
143
        $this->connectionsOptions[$key] = array($host, $port, $timeout);
144
145
        $this->nbBackends++;
146
147
        $this->hashringIsInitialized = false;
148
    }
149
150
    /**
151
     * {@inheritdoc}
152
     *
153
     * @param  string      $id
154
     * @return mixed|false
155
     */
156 View Code Duplication
    protected function doLoad($id)
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
157
    {
158
        $source = $this->getConnection($id)->get($id);
159
160
        if (false !== $source) {
161
            if (is_string($source) && !is_numeric($source)) {
162
                $source = $this->getSerializer()->unserialize($source);
163
            }
164
165
            return $this->getProcessedLoadedValue($source);
166
        }
167
168
        return false;
169
    }
170
171
    /**
172
     * {@inheritdoc}
173
     *
174
     * @param  array $identifiers
175
     * @return array
176
     */
177
    protected function doLoadMany(array $identifiers)
178
    {
179
        $result = array();
180
        $now    = time();
181
182
        foreach (array_keys($this->connectionsOptions) as $key) {
183
            $mGetResult = $this->getRedisObject($key)->mGet($identifiers);
184
185
            if ($mGetResult === false) {
186
                continue;
187
            }
188
189 View Code Duplication
            foreach ($mGetResult as $i => $row) {
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated across your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
190
                if (empty($row)) {
191
                    continue;
192
                }
193
194
                $id = $this->getIdentifierWithoutPrefix($identifiers[$i]);
195
196
                if (is_string($row) && !is_numeric($row)) {
197
                    $source = $this->getSerializer()->unserialize($row);
198
                } else {
199
                    $source = array(
200
                        'data' => $row
201
                    );
202
                }
203
204
                if (array_key_exists('expiresAt', $source) && ($source['expiresAt'] < $now)) {
205
                    $result[$id] = false;
206
                } else {
207
                    $result[$id] = $source['data'];
208
                }
209
            }
210
        }
211
212
        $this->fillNotFoundKeys($result, $identifiers);
213
214
        return $result;
215
    }
216
217
    /**
218
     * {@inheritdoc}
219
     *
220
     * @param  string      $id
221
     * @return mixed|false
222
     */
223
    protected function doLoadRaw($id)
224
    {
225
        $result = $this->getConnection($id)->get($id);
226
227
        return !empty($result) ? $result : false;
228
    }
229
230
    /**
231
     * {@inheritdoc}
232
     *
233
     * @param  mixed   $data
234
     * @param  string  $id
235
     * @param  array   $tags
236
     * @return boolean
237
     */
238
    protected function doSave($data, $id, array $tags = array())
239
    {
240
        $conn   = $this->getConnection($id);
241
        $result = $conn->set($id, $this->getSerializer()->serialize($data));
242
243
        if ($this->nativeExpires && array_key_exists('expiresAt', $data) && is_int($data['expiresAt'])) {
244
            $conn->expireAt($id, $data['expiresAt']);
245
        }
246
247
        if (!empty($tags)) {
248
            foreach (array_unique($tags) as $tag) {
249
                $this->getConnection($tag)->sAdd($tag, $id);
250
            }
251
        }
252
253
        return $result;
254
    }
255
256
    /**
257
     * {@inheritdoc}
258
     *
259
     * @param  mixed           $data
260
     * @param  string          $id
261
     * @param  integer|boolean $lifetime
262
     * @return boolean
263
     */
264
    protected function doSaveScalar($data, $id, $lifetime = false)
265
    {
266
        $con    = $this->getConnection($id);
267
        $result = $con->set($id, $data);
268
269
        if (false !== $lifetime) {
270
            $con->expire($id, $lifetime);
271
        }
272
273
        return $result;
274
    }
275
276
    /**
277
     * {@inheritdoc}
278
     *
279
     * @param  array   $tags
280
     * @return boolean
281
     */
282
    protected function doRemoveByTags(array $tags)
283
    {
284
        foreach (array_unique($tags) as $tag) {
285
            $tag  = $this->getPrefixedTag($tag);
286
            $con  = $this->getConnection($tag);
287
            $keys = $con->sMembers($tag);
288
289
            if (!empty($keys)) {
290
                foreach ($keys as $key) {
291
                    $this->remove($this->getIdentifierWithoutPrefix($key));
292
                }
293
            };
294
295
            $con->del($tag);
296
        }
297
298
        return true;
299
    }
300
301
    /**
302
     * {@inheritdoc}
303
     *
304
     * @return boolean
305
     */
306
    protected function doFlush()
307
    {
308
        foreach (array_keys($this->connectionsOptions) as $key) {
309
            $this->getRedisObject($key)->flushDB();
310
        }
311
312
        return true;
313
    }
314
315
    /**
316
     * Returns connection by key name.
317
     *
318
     * @param  string                    $id
319
     * @return \Redis
320
     * @throws \RuntimeException
321
     * @throws \Endeveit\Cache\Exception
322
     */
323
    private function getConnection($id)
324
    {
325
        if (0 == $this->nbBackends) {
326
            throw new Exception('You must add at least one connection.');
327
        }
328
329
        // Initialize the return value.
330
        $return = null;
331
332
        // If we have only one backend, return it.
333
        if ($this->nbBackends == 1) {
334
            reset($this->backendsWeights);
335
            $return = key($this->backendsWeights);
336
        } else {
337
            if (!$this->hashringIsInitialized) {
338
                $this->initializeHashring();
339
                $this->hashringIsInitialized = true;
340
            }
341
342
            // If the key has already been mapped, return the cached entry.
343
            if ($this->localCacheSize > 0 && isset($this->localCache[$id])) {
344
                $return = $this->localCache[$id];
345
            } else {
346
                $crc32 = crc32($id);
347
348
                // Select the slice to begin with.
349
                $slice = floor($crc32 / $this->slicesDiv) + $this->slicesHalf;
350
351
                // This counter prevents going through more than 1 loop.
352
                $looped = false;
353
354
                while (true) {
355
                    // Go through the hashring, one slice at a time.
356
                    foreach ($this->hashring[$slice] as $position => $backend) {
357
                        // If we have a usable backend, add to the return array.
358
                        if ($position >= $crc32) {
359
                            // If $count = 1, no more checks are necessary.
360
                            $return = $backend;
361
                            break 2;
362
                        }
363
                    }
364
365
                    // Continue to the next slice.
366
                    $slice++;
367
368
                    // If at the end of the hashring.
369
                    if ($slice >= $this->slicesCount) {
370
                        // If already looped once, something is wrong.
371
                        if ($looped) {
372
                            break;
373
                        }
374
375
                        // Otherwise, loop back to the beginning.
376
                        $crc32 = -2147483648;
377
                        $slice = 0;
378
                        $looped = true;
379
                    }
380
                }
381
382
                // Cache the result for quick retrieval in the future.
383
                if ($this->localCacheSize > 0) {
384
                    // Add to internal cache.
385
                    $this->localCache[$id] = $return;
386
                    $this->localCacheCount++;
387
388
                    // If the cache is getting too big, clear it.
389
                    if ($this->localCacheCount > $this->localCacheSize) {
390
                        $this->cleanBackendsCache();
391
                    }
392
                }
393
            }
394
        }
395
396
        if (null === $return || !array_key_exists($return, $this->connectionsOptions)) {
397
            throw new \RuntimeException('Unable to determine connection or it\'s options.');
398
        }
399
400
        return $this->getRedisObject($return);
401
    }
402
403
    /**
404
     * Returns \Redis object by key value.
405
     *
406
     * @param  integer $key
407
     * @return \Redis
408
     */
409
    private function getRedisObject($key)
410
    {
411
        if (!array_key_exists($key, $this->connections)) {
412
            $this->connections[$key] = new \Redis();
413
            $this->connections[$key]->connect(
414
                $this->connectionsOptions[$key][0],
415
                $this->connectionsOptions[$key][1],
416
                $this->connectionsOptions[$key][2]
417
            );
418
        }
419
420
        return $this->connections[$key];
421
    }
422
423
    /**
424
     * Initialization of hashring.
425
     */
426
    private function initializeHashring()
427
    {
428
        if ($this->nbBackends < 2) {
429
            $this->hashring = array();
430
            $this->nbHashrings = 0;
431
432
            $this->slicesCount = 0;
433
            $this->slicesHalf = 0;
434
            $this->slicesDiv = 0;
435
        } else {
436
            $this->slicesCount = ($this->replicas * $this->nbBackends) / 8;
437
            $this->slicesHalf = $this->slicesCount / 2;
438
            $this->slicesDiv = (2147483648 / $this->slicesHalf);
439
440
            // Initialize the hashring.
441
            $this->hashring = array_fill(0, $this->slicesCount, array());
442
443
            // Calculate the average weight.
444
            $avg = round(array_sum($this->backendsWeights) / $this->nbBackends, 2);
445
446
            // Interate over the backends.
447
            foreach ($this->backendsWeights as $backend => $weight) {
448
                // Adjust the weight.
449
                $weight = round(($weight / $avg) * $this->replicas);
450
451
                // Create as many replicas as $weight.
452
                for ($i = 0; $i < $weight; $i++) {
453
                    $position = crc32($backend . ':' . $i);
454
                    $slice = floor($position / $this->slicesDiv) + $this->slicesHalf;
455
                    $this->hashring[$slice][$position] = $backend;
456
                }
457
            }
458
459
            // Sort each slice of the hashring.
460
            for ($i = 0; $i < $this->slicesCount; $i++) {
461
                ksort($this->hashring[$i], SORT_NUMERIC);
462
            }
463
        }
464
465
        $this->cleanBackendsCache();
466
    }
467
468
    /**
469
     * Cleans up the local cache.
470
     */
471
    private function cleanBackendsCache()
472
    {
473
        $this->localCache = array();
474
        $this->localCacheCount = 0;
475
    }
476
}
477