CachingPool   A
last analyzed

Complexity

Total Complexity 8

Size/Duplication

Total Lines 109
Duplicated Lines 0 %

Coupling/Cohesion

Components 1
Dependencies 6

Importance

Changes 2
Bugs 1 Features 0
Metric Value
c 2
b 1
f 0
dl 0
loc 109
wmc 8
lcom 1
cbo 6
rs 10

5 Methods

Rating   Name   Duplication   Size   Complexity  
A __construct() 0 9 2
A connect() 0 23 3
A getCacheTtl() 0 4 1
A setCacheTtl() 0 4 1
A updateNodes() 0 14 1
1
<?php
2
3
namespace Phloppy\Stream;
4
5
use Phloppy\Cache\CacheInterface;
6
use Phloppy\Cache\MemoryCache;
7
use Phloppy\Client\Node;
8
use Phloppy\Exception\ConnectException;
9
use Phloppy\NodeInfo;
10
use Psr\Log\LoggerInterface;
11
12
/**
13
 * Node Connection Pool with the ability to cache cluster nodes.
14
 *
15
 * After establishing the connection with one of the provided node addresses a HELLO command will be
16
 * issued to retrieve all available nodes. The information will be stored in the provided CacheInterface implementation
17
 * and retrieved the next time when a connect() is issued.
18
 */
19
class CachingPool extends Pool
20
{
21
22
    /**
23
     * @var CacheInterface
24
     */
25
    private $cache;
26
27
    /**
28
     * Cache TTL in seconds.
29
     *
30
     * @var int Default is 10 minutes
31
     */
32
    private $ttl = 600;
33
34
35
    /**
36
     * @param array                $nodeUrls
37
     * @param CacheInterface|null  $cache
38
     * @param LoggerInterface|null $log
39
     *
40
     * @throws ConnectException
41
     */
42
    public function __construct(array $nodeUrls = array(), CacheInterface $cache = null, LoggerInterface $log = null)
43
    {
44
        if (!$cache) {
45
            $cache = new MemoryCache();
46
        }
47
48
        $this->cache = $cache;
49
        parent::__construct($nodeUrls, $log);
50
    }
51
52
53
    /**
54
     * Connect to a random node in the (cached) node list.
55
     *
56
     * @return DefaultStream Stream to a connected node.
57
     *
58
     * @throws ConnectException
59
     */
60
    public function connect()
61
    {
62
        $key = array_reduce($this->nodeUrls, function ($current, $prev) {
63
            return md5($current.$prev);
64
        }, '');
65
66
        // prefer cached results
67
        $hit = $this->cache->get($key);
68
69
        if (!empty($hit)) {
70
            $this->log->notice('nodelist retrieved from cache', ['nodes' => $hit, 'key' => $key]);
71
            $this->nodeUrls = $hit;
72
        }
73
74
        $stream = parent::connect();
75
76
        // update cache
77
        if (empty($hit)) {
78
            $this->updateNodes($key, $stream);
79
        }
80
81
        return $stream;
82
    }
83
84
85
    /**
86
     * Return the internal node cache TTL.
87
     *
88
     * @return int
89
     */
90
    public function getCacheTtl()
91
    {
92
        return $this->ttl;
93
    }
94
95
96
    /**
97
     * Set the internal node cache TTL.
98
     *
99
     * @param int $ttl
100
     */
101
    public function setCacheTtl($ttl)
102
    {
103
        $this->ttl = (int)$ttl;
104
    }
105
106
107
    /**
108
     * Update the node list using the HELLO command.
109
     *
110
     * @param string          $key
111
     * @param StreamInterface $stream
112
     */
113
    private function updateNodes($key, StreamInterface $stream)
114
    {
115
        $this->nodeUrls = array_map(function (NodeInfo $element) {
116
            return $element->getServer();
117
        }, (new Node($stream, $this->log))->hello());
118
119
        $this->log->notice('caching connection info from HELLO', [
120
            'key' => $key,
121
            'nodes' => $this->nodeUrls,
122
            'ttl' => $this->getCacheTtl(),
123
        ]);
124
125
        $this->cache->set($key, $this->nodeUrls, $this->getCacheTtl());
126
    }
127
}
128