1
|
|
|
<?php |
2
|
|
|
/** |
3
|
|
|
* Generator of database load balancing objects. |
4
|
|
|
* |
5
|
|
|
* This program is free software; you can redistribute it and/or modify |
6
|
|
|
* it under the terms of the GNU General Public License as published by |
7
|
|
|
* the Free Software Foundation; either version 2 of the License, or |
8
|
|
|
* (at your option) any later version. |
9
|
|
|
* |
10
|
|
|
* This program is distributed in the hope that it will be useful, |
11
|
|
|
* but WITHOUT ANY WARRANTY; without even the implied warranty of |
12
|
|
|
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
13
|
|
|
* GNU General Public License for more details. |
14
|
|
|
* |
15
|
|
|
* You should have received a copy of the GNU General Public License along |
16
|
|
|
* with this program; if not, write to the Free Software Foundation, Inc., |
17
|
|
|
* 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. |
18
|
|
|
* http://www.gnu.org/copyleft/gpl.html |
19
|
|
|
* |
20
|
|
|
* @file |
21
|
|
|
* @ingroup Database |
22
|
|
|
*/ |
23
|
|
|
|
24
|
|
|
/** |
25
|
|
|
* Class for ensuring a consistent ordering of events as seen by the user, despite replication. |
26
|
|
|
* Kind of like Hawking's [[Chronology Protection Agency]]. |
27
|
|
|
*/ |
28
|
|
|
class ChronologyProtector { |
29
|
|
|
/** @var BagOStuff */ |
30
|
|
|
protected $store; |
31
|
|
|
|
32
|
|
|
/** @var string Storage key name */ |
33
|
|
|
protected $key; |
34
|
|
|
/** @var string Hash of client parameters */ |
35
|
|
|
protected $clientId; |
36
|
|
|
/** @var float|null Minimum UNIX timestamp of 1+ expected startup positions */ |
37
|
|
|
protected $waitForPosTime; |
38
|
|
|
/** @var int Max seconds to wait on positions to appear */ |
39
|
|
|
protected $waitForPosTimeout = self::POS_WAIT_TIMEOUT; |
40
|
|
|
/** @var bool Whether to no-op all method calls */ |
41
|
|
|
protected $enabled = true; |
42
|
|
|
/** @var bool Whether to check and wait on positions */ |
43
|
|
|
protected $wait = true; |
44
|
|
|
|
45
|
|
|
/** @var bool Whether the client data was loaded */ |
46
|
|
|
protected $initialized = false; |
47
|
|
|
/** @var DBMasterPos[] Map of (DB master name => position) */ |
48
|
|
|
protected $startupPositions = []; |
49
|
|
|
/** @var DBMasterPos[] Map of (DB master name => position) */ |
50
|
|
|
protected $shutdownPositions = []; |
51
|
|
|
/** @var float[] Map of (DB master name => 1) */ |
52
|
|
|
protected $shutdownTouchDBs = []; |
53
|
|
|
|
54
|
|
|
/** @var integer Seconds to store positions */ |
55
|
|
|
const POSITION_TTL = 60; |
56
|
|
|
/** @var integer Max time to wait for positions to appear */ |
57
|
|
|
const POS_WAIT_TIMEOUT = 5; |
58
|
|
|
|
59
|
|
|
/** |
60
|
|
|
* @param BagOStuff $store |
61
|
|
|
* @param array $client Map of (ip: <IP>, agent: <user-agent>) |
62
|
|
|
* @param float $posTime UNIX timestamp |
63
|
|
|
* @since 1.27 |
64
|
|
|
*/ |
65
|
|
|
public function __construct( BagOStuff $store, array $client, $posTime = null ) { |
66
|
|
|
$this->store = $store; |
67
|
|
|
$this->clientId = md5( $client['ip'] . "\n" . $client['agent'] ); |
68
|
|
|
$this->key = $store->makeGlobalKey( __CLASS__, $this->clientId ); |
69
|
|
|
$this->waitForPosTime = $posTime; |
70
|
|
|
} |
71
|
|
|
|
72
|
|
|
/** |
73
|
|
|
* @param bool $enabled Whether to no-op all method calls |
74
|
|
|
* @since 1.27 |
75
|
|
|
*/ |
76
|
|
|
public function setEnabled( $enabled ) { |
77
|
|
|
$this->enabled = $enabled; |
78
|
|
|
} |
79
|
|
|
|
80
|
|
|
/** |
81
|
|
|
* @param bool $enabled Whether to check and wait on positions |
82
|
|
|
* @since 1.27 |
83
|
|
|
*/ |
84
|
|
|
public function setWaitEnabled( $enabled ) { |
85
|
|
|
$this->wait = $enabled; |
86
|
|
|
} |
87
|
|
|
|
88
|
|
|
/** |
89
|
|
|
* Initialise a LoadBalancer to give it appropriate chronology protection. |
90
|
|
|
* |
91
|
|
|
* If the stash has a previous master position recorded, this will try to |
92
|
|
|
* make sure that the next query to a replica DB of that master will see changes up |
93
|
|
|
* to that position by delaying execution. The delay may timeout and allow stale |
94
|
|
|
* data if no non-lagged replica DBs are available. |
95
|
|
|
* |
96
|
|
|
* @param LoadBalancer $lb |
97
|
|
|
* @return void |
98
|
|
|
*/ |
99
|
|
|
public function initLB( LoadBalancer $lb ) { |
100
|
|
|
if ( !$this->enabled || $lb->getServerCount() <= 1 ) { |
101
|
|
|
return; // non-replicated setup or disabled |
102
|
|
|
} |
103
|
|
|
|
104
|
|
|
$this->initPositions(); |
105
|
|
|
|
106
|
|
|
$masterName = $lb->getServerName( $lb->getWriterIndex() ); |
107
|
|
|
if ( !empty( $this->startupPositions[$masterName] ) ) { |
108
|
|
|
$pos = $this->startupPositions[$masterName]; |
109
|
|
|
wfDebugLog( 'replication', __METHOD__ . ": LB for '$masterName' set to pos $pos\n" ); |
110
|
|
|
$lb->waitFor( $pos ); |
111
|
|
|
} |
112
|
|
|
} |
113
|
|
|
|
114
|
|
|
/** |
115
|
|
|
* Notify the ChronologyProtector that the LoadBalancer is about to shut |
116
|
|
|
* down. Saves replication positions. |
117
|
|
|
* |
118
|
|
|
* @param LoadBalancer $lb |
119
|
|
|
* @return void |
120
|
|
|
*/ |
121
|
|
|
public function shutdownLB( LoadBalancer $lb ) { |
122
|
|
|
if ( !$this->enabled ) { |
123
|
|
|
return; // not enabled |
124
|
|
|
} elseif ( !$lb->hasOrMadeRecentMasterChanges( INF ) ) { |
125
|
|
|
// Only save the position if writes have been done on the connection |
126
|
|
|
return; |
127
|
|
|
} |
128
|
|
|
|
129
|
|
|
$masterName = $lb->getServerName( $lb->getWriterIndex() ); |
130
|
|
|
if ( $lb->getServerCount() > 1 ) { |
131
|
|
|
$pos = $lb->getMasterPos(); |
132
|
|
|
wfDebugLog( 'replication', __METHOD__ . ": LB for '$masterName' has pos $pos\n" ); |
133
|
|
|
$this->shutdownPositions[$masterName] = $pos; |
134
|
|
|
} else { |
135
|
|
|
wfDebugLog( 'replication', __METHOD__ . ": DB '$masterName' touched\n" ); |
136
|
|
|
} |
137
|
|
|
$this->shutdownTouchDBs[$masterName] = 1; |
138
|
|
|
} |
139
|
|
|
|
140
|
|
|
/** |
141
|
|
|
* Notify the ChronologyProtector that the LBFactory is done calling shutdownLB() for now. |
142
|
|
|
* May commit chronology data to persistent storage. |
143
|
|
|
* |
144
|
|
|
* @param callable|null $workCallback Work to do instead of waiting on syncing positions |
145
|
|
|
* @param string $mode One of (sync, async); whether to wait on remote datacenters |
146
|
|
|
* @return DBMasterPos[] Empty on success; returns the (db name => position) map on failure |
147
|
|
|
*/ |
148
|
|
|
public function shutdown( callable $workCallback = null, $mode = 'sync' ) { |
149
|
|
|
if ( !$this->enabled ) { |
150
|
|
|
return []; |
151
|
|
|
} |
152
|
|
|
|
153
|
|
|
$store = $this->store; |
154
|
|
|
// Some callers might want to know if a user recently touched a DB. |
155
|
|
|
// These writes do not need to block on all datacenters receiving them. |
156
|
|
|
foreach ( $this->shutdownTouchDBs as $dbName => $unused ) { |
157
|
|
|
$store->set( |
158
|
|
|
$this->getTouchedKey( $this->store, $dbName ), |
159
|
|
|
microtime( true ), |
160
|
|
|
$store::TTL_DAY |
161
|
|
|
); |
162
|
|
|
} |
163
|
|
|
|
164
|
|
|
if ( !count( $this->shutdownPositions ) ) { |
165
|
|
|
return []; // nothing to save |
166
|
|
|
} |
167
|
|
|
|
168
|
|
|
wfDebugLog( 'replication', |
169
|
|
|
__METHOD__ . ": saving master pos for " . |
170
|
|
|
implode( ', ', array_keys( $this->shutdownPositions ) ) . "\n" |
171
|
|
|
); |
172
|
|
|
|
173
|
|
|
// CP-protected writes should overwhemingly go to the master datacenter, so get DC-local |
174
|
|
|
// lock to merge the values. Use a DC-local get() and a synchronous all-DC set(). This |
175
|
|
|
// makes it possible for the BagOStuff class to write in parallel to all DCs with one RTT. |
176
|
|
|
if ( $store->lock( $this->key, 3 ) ) { |
177
|
|
|
if ( $workCallback ) { |
178
|
|
|
// Let the store run the work before blocking on a replication sync barrier. By the |
179
|
|
|
// time it's done with the work, the barrier should be fast if replication caught up. |
180
|
|
|
$store->addBusyCallback( $workCallback ); |
181
|
|
|
} |
182
|
|
|
$ok = $store->set( |
183
|
|
|
$this->key, |
184
|
|
|
self::mergePositions( $store->get( $this->key ), $this->shutdownPositions ), |
185
|
|
|
self::POSITION_TTL, |
186
|
|
|
( $mode === 'sync' ) ? $store::WRITE_SYNC : 0 |
187
|
|
|
); |
188
|
|
|
$store->unlock( $this->key ); |
189
|
|
|
} else { |
190
|
|
|
$ok = false; |
191
|
|
|
} |
192
|
|
|
|
193
|
|
|
if ( !$ok ) { |
194
|
|
|
$bouncedPositions = $this->shutdownPositions; |
195
|
|
|
// Raced out too many times or stash is down |
196
|
|
|
wfDebugLog( 'replication', |
197
|
|
|
__METHOD__ . ": failed to save master pos for " . |
198
|
|
|
implode( ', ', array_keys( $this->shutdownPositions ) ) . "\n" |
199
|
|
|
); |
200
|
|
|
} elseif ( $mode === 'sync' && |
201
|
|
|
$store->getQoS( $store::ATTR_SYNCWRITES ) < $store::QOS_SYNCWRITES_BE |
202
|
|
|
) { |
203
|
|
|
// Positions may not be in all datacenters, force LBFactory to play it safe |
204
|
|
|
wfDebugLog( 'replication', |
205
|
|
|
__METHOD__ . ": store does not report ability to sync replicas. " ); |
206
|
|
|
$bouncedPositions = $this->shutdownPositions; |
207
|
|
|
} else { |
208
|
|
|
$bouncedPositions = []; |
209
|
|
|
} |
210
|
|
|
|
211
|
|
|
return $bouncedPositions; |
212
|
|
|
} |
213
|
|
|
|
214
|
|
|
/** |
215
|
|
|
* @param string $dbName DB master name (e.g. "db1052") |
216
|
|
|
* @return float|bool UNIX timestamp when client last touched the DB; false if not on record |
217
|
|
|
* @since 1.28 |
218
|
|
|
*/ |
219
|
|
|
public function getTouched( $dbName ) { |
220
|
|
|
return $this->store->get( $this->getTouchedKey( $this->store, $dbName ) ); |
221
|
|
|
} |
222
|
|
|
|
223
|
|
|
/** |
224
|
|
|
* @param BagOStuff $store |
225
|
|
|
* @param string $dbName |
226
|
|
|
* @return string |
227
|
|
|
*/ |
228
|
|
|
private function getTouchedKey( BagOStuff $store, $dbName ) { |
229
|
|
|
return $store->makeGlobalKey( __CLASS__, 'mtime', $this->clientId, $dbName ); |
230
|
|
|
} |
231
|
|
|
|
232
|
|
|
/** |
233
|
|
|
* Load in previous master positions for the client |
234
|
|
|
*/ |
235
|
|
|
protected function initPositions() { |
236
|
|
|
if ( $this->initialized ) { |
237
|
|
|
return; |
238
|
|
|
} |
239
|
|
|
|
240
|
|
|
$this->initialized = true; |
241
|
|
|
if ( $this->wait ) { |
242
|
|
|
// If there is an expectation to see master positions with a certain min |
243
|
|
|
// timestamp, then block until they appear, or until a timeout is reached. |
244
|
|
|
if ( $this->waitForPosTime ) { |
245
|
|
|
$data = null; |
246
|
|
|
$loop = new WaitConditionLoop( |
247
|
|
|
function () use ( &$data ) { |
248
|
|
|
$data = $this->store->get( $this->key ); |
249
|
|
|
|
250
|
|
|
return ( self::minPosTime( $data ) >= $this->waitForPosTime ) |
251
|
|
|
? WaitConditionLoop::CONDITION_REACHED |
252
|
|
|
: WaitConditionLoop::CONDITION_CONTINUE; |
253
|
|
|
}, |
254
|
|
|
$this->waitForPosTimeout |
255
|
|
|
); |
256
|
|
|
$result = $loop->invoke(); |
257
|
|
|
$waitedMs = $loop->getLastWaitTime() * 1e3; |
258
|
|
|
|
259
|
|
|
if ( $result == $loop::CONDITION_REACHED ) { |
260
|
|
|
$msg = "expected and found pos time {$this->waitForPosTime} ({$waitedMs}ms)"; |
261
|
|
|
} else { |
262
|
|
|
$msg = "expected but missed pos time {$this->waitForPosTime} ({$waitedMs}ms)"; |
263
|
|
|
} |
264
|
|
|
wfDebugLog( 'replication', $msg ); |
265
|
|
|
} else { |
266
|
|
|
$data = $this->store->get( $this->key ); |
267
|
|
|
} |
268
|
|
|
|
269
|
|
|
$this->startupPositions = $data ? $data['positions'] : []; |
270
|
|
|
wfDebugLog( 'replication', __METHOD__ . ": key is {$this->key} (read)\n" ); |
271
|
|
|
} else { |
272
|
|
|
$this->startupPositions = []; |
273
|
|
|
wfDebugLog( 'replication', __METHOD__ . ": key is {$this->key} (unread)\n" ); |
274
|
|
|
} |
275
|
|
|
} |
276
|
|
|
|
277
|
|
|
/** |
278
|
|
|
* @param array|bool $data |
279
|
|
|
* @return float|null |
280
|
|
|
*/ |
281
|
|
|
private static function minPosTime( $data ) { |
282
|
|
|
if ( !isset( $data['positions'] ) ) { |
283
|
|
|
return null; |
284
|
|
|
} |
285
|
|
|
|
286
|
|
|
$min = null; |
287
|
|
|
foreach ( $data['positions'] as $pos ) { |
288
|
|
|
/** @var DBMasterPos $pos */ |
289
|
|
|
$min = $min ? min( $pos->asOfTime(), $min ) : $pos->asOfTime(); |
290
|
|
|
} |
291
|
|
|
|
292
|
|
|
return $min; |
293
|
|
|
} |
294
|
|
|
|
295
|
|
|
/** |
296
|
|
|
* @param array|bool $curValue |
297
|
|
|
* @param DBMasterPos[] $shutdownPositions |
298
|
|
|
* @return array |
299
|
|
|
*/ |
300
|
|
|
private static function mergePositions( $curValue, array $shutdownPositions ) { |
301
|
|
|
/** @var $curPositions DBMasterPos[] */ |
302
|
|
|
if ( $curValue === false ) { |
303
|
|
|
$curPositions = $shutdownPositions; |
304
|
|
|
} else { |
305
|
|
|
$curPositions = $curValue['positions']; |
306
|
|
|
// Use the newest positions for each DB master |
307
|
|
|
foreach ( $shutdownPositions as $db => $pos ) { |
308
|
|
|
if ( !isset( $curPositions[$db] ) |
309
|
|
|
|| $pos->asOfTime() > $curPositions[$db]->asOfTime() |
310
|
|
|
) { |
311
|
|
|
$curPositions[$db] = $pos; |
312
|
|
|
} |
313
|
|
|
} |
314
|
|
|
} |
315
|
|
|
|
316
|
|
|
return [ 'positions' => $curPositions ]; |
317
|
|
|
} |
318
|
|
|
} |
319
|
|
|
|