Completed
Push — master ( d5111b...7b397f )
by Tomasz
02:47
created

ConsulConfig   A

Complexity

Total Complexity 29

Size/Duplication

Total Lines 218
Duplicated Lines 0 %

Coupling/Cohesion

Components 1
Dependencies 1

Test Coverage

Coverage 96.39%

Importance

Changes 4
Bugs 0 Features 1
Metric Value
wmc 29
c 4
b 0
f 1
lcom 1
cbo 1
dl 0
loc 218
ccs 80
cts 83
cp 0.9639
rs 10

10 Methods

Rating   Name   Duplication   Size   Complexity  
A __construct() 0 9 1
A __destruct() 0 4 1
A getMachine() 0 7 2
A lockKey() 0 10 3
A releaseKey() 0 4 1
A createSession() 0 15 2
A destroySession() 0 6 2
C heartbeat() 0 26 7
B acquireMachineId() 0 22 4
B computePossibleMachineId() 0 20 6
1
<?php
2
3
/**
4
 * To change this license header, choose License Headers in Project Properties.
5
 * To change this template file, choose Tools | Templates
6
 * and open the template in the editor.
7
 */
8
9
namespace Gendoria\CruftFlake\Config;
10
11
use RuntimeException;
12
13
/**
14
 * Description of ConsulConfig
15
 *
16
 * @author Tomasz Struczyński <[email protected]>
17
 */
18
class ConsulConfig implements ConfigInterface
19
{
20
    const DEFAULT_KV_PREFIX = 'service/CruftFlake/machines/';
21
    
22
    /**
23
     * CURL requestor.
24
     * 
25
     * @var ConsulCurl
26
     */
27
    private $curl;
28
    
29
    private $kvPrefix = self::DEFAULT_KV_PREFIX;
30
    
31
    /**
32
     * Consul session ID.
33
     * 
34
     * @var string
35
     */
36
    private $sessionId = "";
37
    
38
    private $sessionTTL;
39
    
40
    /**
41
     * Last successfull check.
42
     * 
43
     * @var integer|null
44
     */
45
    private $lastSuccessfullCheck = null;
46
    
47
    /**
48
     * Machine ID.
49
     * 
50
     * @var integer
51
     */
52
    private $machineId;
53
    
54
    /**
55
     * Class constructor.
56
     * 
57
     * @param ConsulCurl $curl
58
     * @param integer $sessionTTL
59
     * @param string $kvPrefix
60
     */
61 10
    public function __construct(ConsulCurl $curl, $sessionTTL = 600, $kvPrefix = self::DEFAULT_KV_PREFIX)
62
    {
63 10
        $this->curl = $curl;
64 10
        $this->kvPrefix = $kvPrefix;
65 10
        $this->sessionTTL = (int)$sessionTTL;
66
        //If we cannot connect to Consul on start, we have a problem.
67 10
        $this->createSession();
68 10
        $this->lastSuccessfullCheck = time();
69 10
    }
70
71
    /**
72
     * On object destruction, we have to destroy session.
73
     */
74 10
    public function __destruct()
75
    {
76 10
        $this->destroySession();
77 10
    }
78
79
    /**
80
     * {@inheritdoc}
81
     */
82 5
    public function getMachine()
83
    {
84 5
        if (!$this->machineId) {
85 5
            $this->machineId = $this->acquireMachineId();
86 4
        }
87 4
        return $this->machineId;
88
    }
89
90
    /**
91
     * Configuration heartbeat. 
92
     * 
93
     * Heartbeat connects periodically to Consul to renew session and check its validity.
94
     * 
95
     * @return bool True, if configuration data had been changed during heartbeat.
96
     * 
97
     * @throws RuntimeException Thrown, when we could not create new session and it was needed.
98
     */
99 5
    public function heartbeat()
100
    {
101
        //If we have last successfull check recently new, we don't have to do anything
102 5
        if ($this->lastSuccessfullCheck !== null && time() - $this->lastSuccessfullCheck < $this->sessionTTL / 2 ) {
103 1
            return false;
104
        }
105
        //If session reneval succeedes, update last successfull check.
106 4
        if ($this->curl->performPutRequest("/session/renew/".$this->sessionId)) {
107 1
            $this->lastSuccessfullCheck = time();
108 1
            return false;
109
        }
110
        //Ok, we don't have a valid session. We have to create new one and signal update.
111
        try {
112 3
            $this->createSession();
113 1
            $this->lastSuccessfullCheck = time();
114 1
            $this->machineId = null;
115 1
            return true;
116 2
        } catch (RuntimeException $e) {
117
            //We could not create new session. We can work for some time in 'detached' mode,
118
            //but if our TTL time runs out, we have to throw an exception.
119 2
            if ($this->lastSuccessfullCheck === null || time() - $this->lastSuccessfullCheck >= $this->sessionTTL) {
120 1
                throw $e;
121
            }
122 1
            return false;
123
        }
124
    }
125
    
126
    /**
127
     * Return machine ID from consul queries.
128
     * 
129
     * @return integer
130
     * @throws RuntimeException
131
     */
132 5
    private function acquireMachineId()
133
    {
134
        //Check, if we don't have existing value for the session
135 5
        $currentValue = $this->curl->performGetRequest('/kv/'.$this->kvPrefix.$this->sessionId);
136 5
        if (!empty($currentValue['Value'])) {
137 1
            return (int)base64_decode($currentValue['Value']);
138
        }
139
        //Lock main key to block concurrent checks
140 4
        $this->lockKey();
141
        //Get currently locked machine IDs to check, if we can get a new one. If yes, save it.
142 4
        $currentValues = $this->curl->performGetRequest('/kv/'.$this->kvPrefix.'?recurse');
143 4
        if (!is_array($currentValues)) {
144 1
            $currentValues = array();
145 1
        }
146 4
        $machineId = $this->computePossibleMachineId($currentValues);
147 3
        if (!$this->curl->performPutRequest('/kv/'.$this->kvPrefix.$this->sessionId.'?acquire='.$this->sessionId, $machineId)) {
148
            throw new RuntimeException("Could not register machine ID on consul.");
149
        }
150
        //Release the lock on the main key and return machine ID.
151 3
        $this->releaseKey();
152 3
        return (int)$machineId;
153
    }
154
    
155
    /**
156
     * Try to fetch machine ID.
157
     * 
158
     * @param array $currentValues
159
     * @return integer
160
     * @throws RuntimeException
161
     */
162 4
    private function computePossibleMachineId(array $currentValues)
163
    {
164 4
        $usedIds = array();
165 4
        foreach ($currentValues as $currentValue) {
166 3
            if ($currentValue['Key'] == $this->kvPrefix) {
167 2
                continue;
168 3
            } elseif ($currentValue['Key'] == $this->sessionId) {
169 1
                return (int)base64_decode($currentValue['Value']);
170
            }
171
            else {
172 2
                $usedIds[] = (int)base64_decode($currentValue['Value']);
173
            }
174 3
        }
175 3
        for ($k = 0; $k < 1024; $k++) {
176 3
            if (!in_array($k, $usedIds)) {
177 2
                return $k;
178
            }
179 2
        }
180 1
        throw new RuntimeException("Cannot acquire machine ID - all machine IDs are used up");
181
    }
182
    
183
    /**
184
     * Lock master key.
185
     */
186 4
    private function lockKey()
187
    {
188
        //try to acquire the lock on prefix during whole operation.
189
        do {
190 4
            $acquired = $this->curl->performPutRequest('/kv/'.$this->kvPrefix.'?acquire='.$this->sessionId, $this->sessionId);
191 4
            if (!$acquired) {
192
                sleep(1);
193
            }
194 4
        } while (!$acquired);
195 4
    }
196
    
197
    /**
198
     * Release master key.
199
     */
200 3
    private function releaseKey()
201
    {
202 3
        $this->curl->performPutRequest('/kv/'.$this->kvPrefix.'?release='.$this->sessionId, $this->sessionId);
203 3
    }
204
205
    /**
206
     * Create new session.
207
     * 
208
     * @throws RuntimeException
209
     */
210 10
    private function createSession()
211
    {
212 10
        $url ='/session/create';
213
        //We create new session with given TTL and with lock delay equal to half of TTL.
214
        $payload = array(
215 10
            'TTL' => $this->sessionTTL.'s',
216 10
            "Behavior" => "delete",
217 10
            'LockDelay' => floor($this->sessionTTL/2).'s',
218 10
        );
219 10
        $returnData = $this->curl->performPutRequest($url, json_encode($payload));
220 10
        if (empty($returnData['ID'])) {
221 2
            throw new RuntimeException("Cannot create session");
222
        }
223 10
        $this->sessionId = $returnData['ID'];
224 10
    }
225
    
226
    /**
227
     * Destroy session.
228
     */
229 10
    private function destroySession()
230
    {
231 10
        if ($this->sessionId) {
232 10
            $this->curl->performPutRequest("/session/destroy/".$this->sessionId);
233 10
        }
234 10
    }
235
}
236