Completed
Branch master (8d5465)
by
unknown
31:25
created

ChronologyProtector::minPosTime()   A

Complexity

Conditions 4
Paths 4

Size

Total Lines 13
Code Lines 7

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 4
eloc 7
nc 4
nop 1
dl 0
loc 13
rs 9.2
c 0
b 0
f 0
1
<?php
2
/**
3
 * Generator of database load balancing objects.
4
 *
5
 * This program is free software; you can redistribute it and/or modify
6
 * it under the terms of the GNU General Public License as published by
7
 * the Free Software Foundation; either version 2 of the License, or
8
 * (at your option) any later version.
9
 *
10
 * This program is distributed in the hope that it will be useful,
11
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13
 * GNU General Public License for more details.
14
 *
15
 * You should have received a copy of the GNU General Public License along
16
 * with this program; if not, write to the Free Software Foundation, Inc.,
17
 * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
18
 * http://www.gnu.org/copyleft/gpl.html
19
 *
20
 * @file
21
 * @ingroup Database
22
 */
23
24
/**
25
 * Class for ensuring a consistent ordering of events as seen by the user, despite replication.
26
 * Kind of like Hawking's [[Chronology Protection Agency]].
27
 */
28
class ChronologyProtector {
29
	/** @var BagOStuff */
30
	protected $store;
31
32
	/** @var string Storage key name */
33
	protected $key;
34
	/** @var string Hash of client parameters */
35
	protected $clientId;
36
	/** @var float|null Minimum UNIX timestamp of 1+ expected startup positions */
37
	protected $waitForPosTime;
38
	/** @var int Max seconds to wait on positions to appear */
39
	protected $waitForPosTimeout = self::POS_WAIT_TIMEOUT;
40
	/** @var bool Whether to no-op all method calls */
41
	protected $enabled = true;
42
	/** @var bool Whether to check and wait on positions */
43
	protected $wait = true;
44
45
	/** @var bool Whether the client data was loaded */
46
	protected $initialized = false;
47
	/** @var DBMasterPos[] Map of (DB master name => position) */
48
	protected $startupPositions = [];
49
	/** @var DBMasterPos[] Map of (DB master name => position) */
50
	protected $shutdownPositions = [];
51
	/** @var float[] Map of (DB master name => 1) */
52
	protected $shutdownTouchDBs = [];
53
54
	/** @var integer Seconds to store positions */
55
	const POSITION_TTL = 60;
56
	/** @var integer Max time to wait for positions to appear */
57
	const POS_WAIT_TIMEOUT = 5;
58
59
	/**
60
	 * @param BagOStuff $store
61
	 * @param array $client Map of (ip: <IP>, agent: <user-agent>)
62
	 * @param float $posTime UNIX timestamp
63
	 * @since 1.27
64
	 */
65
	public function __construct( BagOStuff $store, array $client, $posTime = null ) {
66
		$this->store = $store;
67
		$this->clientId = md5( $client['ip'] . "\n" . $client['agent'] );
68
		$this->key = $store->makeGlobalKey( __CLASS__, $this->clientId );
69
		$this->waitForPosTime = $posTime;
70
	}
71
72
	/**
73
	 * @param bool $enabled Whether to no-op all method calls
74
	 * @since 1.27
75
	 */
76
	public function setEnabled( $enabled ) {
77
		$this->enabled = $enabled;
78
	}
79
80
	/**
81
	 * @param bool $enabled Whether to check and wait on positions
82
	 * @since 1.27
83
	 */
84
	public function setWaitEnabled( $enabled ) {
85
		$this->wait = $enabled;
86
	}
87
88
	/**
89
	 * Initialise a LoadBalancer to give it appropriate chronology protection.
90
	 *
91
	 * If the stash has a previous master position recorded, this will try to
92
	 * make sure that the next query to a replica DB of that master will see changes up
93
	 * to that position by delaying execution. The delay may timeout and allow stale
94
	 * data if no non-lagged replica DBs are available.
95
	 *
96
	 * @param LoadBalancer $lb
97
	 * @return void
98
	 */
99
	public function initLB( LoadBalancer $lb ) {
100
		if ( !$this->enabled || $lb->getServerCount() <= 1 ) {
101
			return; // non-replicated setup or disabled
102
		}
103
104
		$this->initPositions();
105
106
		$masterName = $lb->getServerName( $lb->getWriterIndex() );
107
		if ( !empty( $this->startupPositions[$masterName] ) ) {
108
			$pos = $this->startupPositions[$masterName];
109
			wfDebugLog( 'replication', __METHOD__ . ": LB for '$masterName' set to pos $pos\n" );
110
			$lb->waitFor( $pos );
111
		}
112
	}
113
114
	/**
115
	 * Notify the ChronologyProtector that the LoadBalancer is about to shut
116
	 * down. Saves replication positions.
117
	 *
118
	 * @param LoadBalancer $lb
119
	 * @return void
120
	 */
121
	public function shutdownLB( LoadBalancer $lb ) {
122
		if ( !$this->enabled ) {
123
			return; // not enabled
124
		} elseif ( !$lb->hasOrMadeRecentMasterChanges( INF ) ) {
125
			// Only save the position if writes have been done on the connection
126
			return;
127
		}
128
129
		$masterName = $lb->getServerName( $lb->getWriterIndex() );
130
		if ( $lb->getServerCount() > 1 ) {
131
			$pos = $lb->getMasterPos();
132
			wfDebugLog( 'replication', __METHOD__ . ": LB for '$masterName' has pos $pos\n" );
133
			$this->shutdownPositions[$masterName] = $pos;
134
		} else {
135
			wfDebugLog( 'replication', __METHOD__ . ": DB '$masterName' touched\n" );
136
		}
137
		$this->shutdownTouchDBs[$masterName] = 1;
138
	}
139
140
	/**
141
	 * Notify the ChronologyProtector that the LBFactory is done calling shutdownLB() for now.
142
	 * May commit chronology data to persistent storage.
143
	 *
144
	 * @param callable|null $workCallback Work to do instead of waiting on syncing positions
145
	 * @param string $mode One of (sync, async); whether to wait on remote datacenters
146
	 * @return DBMasterPos[] Empty on success; returns the (db name => position) map on failure
147
	 */
148
	public function shutdown( callable $workCallback = null, $mode = 'sync' ) {
149
		if ( !$this->enabled ) {
150
			return [];
151
		}
152
153
		$store = $this->store;
154
		// Some callers might want to know if a user recently touched a DB.
155
		// These writes do not need to block on all datacenters receiving them.
156
		foreach ( $this->shutdownTouchDBs as $dbName => $unused ) {
157
			$store->set(
158
				$this->getTouchedKey( $this->store, $dbName ),
159
				microtime( true ),
160
				$store::TTL_DAY
161
			);
162
		}
163
164
		if ( !count( $this->shutdownPositions ) ) {
165
			return []; // nothing to save
166
		}
167
168
		wfDebugLog( 'replication',
169
			__METHOD__ . ": saving master pos for " .
170
			implode( ', ', array_keys( $this->shutdownPositions ) ) . "\n"
171
		);
172
173
		// CP-protected writes should overwhemingly go to the master datacenter, so get DC-local
174
		// lock to merge the values. Use a DC-local get() and a synchronous all-DC set(). This
175
		// makes it possible for the BagOStuff class to write in parallel to all DCs with one RTT.
176
		if ( $store->lock( $this->key, 3 ) ) {
177
			if ( $workCallback ) {
178
				// Let the store run the work before blocking on a replication sync barrier. By the
179
				// time it's done with the work, the barrier should be fast if replication caught up.
180
				$store->addBusyCallback( $workCallback );
181
			}
182
			$ok = $store->set(
183
				$this->key,
184
				self::mergePositions( $store->get( $this->key ), $this->shutdownPositions ),
185
				self::POSITION_TTL,
186
				( $mode === 'sync' ) ? $store::WRITE_SYNC : 0
187
			);
188
			$store->unlock( $this->key );
189
		} else {
190
			$ok = false;
191
		}
192
193
		if ( !$ok ) {
194
			$bouncedPositions = $this->shutdownPositions;
195
			// Raced out too many times or stash is down
196
			wfDebugLog( 'replication',
197
				__METHOD__ . ": failed to save master pos for " .
198
				implode( ', ', array_keys( $this->shutdownPositions ) ) . "\n"
199
			);
200
		} elseif ( $mode === 'sync' &&
201
			$store->getQoS( $store::ATTR_SYNCWRITES ) < $store::QOS_SYNCWRITES_BE
202
		) {
203
			// Positions may not be in all datacenters, force LBFactory to play it safe
204
			wfDebugLog( 'replication',
205
				__METHOD__ . ": store does not report ability to sync replicas. " );
206
			$bouncedPositions = $this->shutdownPositions;
207
		} else {
208
			$bouncedPositions = [];
209
		}
210
211
		return $bouncedPositions;
212
	}
213
214
	/**
215
	 * @param string $dbName DB master name (e.g. "db1052")
216
	 * @return float|bool UNIX timestamp when client last touched the DB; false if not on record
217
	 * @since 1.28
218
	 */
219
	public function getTouched( $dbName ) {
220
		return $this->store->get( $this->getTouchedKey( $this->store, $dbName ) );
221
	}
222
223
	/**
224
	 * @param BagOStuff $store
225
	 * @param string $dbName
226
	 * @return string
227
	 */
228
	private function getTouchedKey( BagOStuff $store, $dbName ) {
229
		return $store->makeGlobalKey( __CLASS__, 'mtime', $this->clientId, $dbName );
230
	}
231
232
	/**
233
	 * Load in previous master positions for the client
234
	 */
235
	protected function initPositions() {
236
		if ( $this->initialized ) {
237
			return;
238
		}
239
240
		$this->initialized = true;
241
		if ( $this->wait ) {
242
			// If there is an expectation to see master positions with a certain min
243
			// timestamp, then block until they appear, or until a timeout is reached.
244
			if ( $this->waitForPosTime ) {
245
				$data = null;
246
				$loop = new WaitConditionLoop(
247
					function () use ( &$data ) {
248
						$data = $this->store->get( $this->key );
249
250
						return ( self::minPosTime( $data ) >= $this->waitForPosTime )
251
							? WaitConditionLoop::CONDITION_REACHED
252
							: WaitConditionLoop::CONDITION_CONTINUE;
253
					},
254
					$this->waitForPosTimeout
255
				);
256
				$result = $loop->invoke();
257
				$waitedMs = $loop->getLastWaitTime() * 1e3;
258
259
				if ( $result == $loop::CONDITION_REACHED ) {
260
					$msg = "expected and found pos time {$this->waitForPosTime} ({$waitedMs}ms)";
261
				} else {
262
					$msg = "expected but missed pos time {$this->waitForPosTime} ({$waitedMs}ms)";
263
				}
264
				wfDebugLog( 'replication', $msg );
265
			} else {
266
				$data = $this->store->get( $this->key );
267
			}
268
269
			$this->startupPositions = $data ? $data['positions'] : [];
270
			wfDebugLog( 'replication', __METHOD__ . ": key is {$this->key} (read)\n" );
271
		} else {
272
			$this->startupPositions = [];
273
			wfDebugLog( 'replication', __METHOD__ . ": key is {$this->key} (unread)\n" );
274
		}
275
	}
276
277
	/**
278
	 * @param array|bool $data
279
	 * @return float|null
280
	 */
281
	private static function minPosTime( $data ) {
282
		if ( !isset( $data['positions'] ) ) {
283
			return null;
284
		}
285
286
		$min = null;
287
		foreach ( $data['positions'] as $pos ) {
288
			/** @var DBMasterPos $pos */
289
			$min = $min ? min( $pos->asOfTime(), $min ) : $pos->asOfTime();
290
		}
291
292
		return $min;
293
	}
294
295
	/**
296
	 * @param array|bool $curValue
297
	 * @param DBMasterPos[] $shutdownPositions
298
	 * @return array
299
	 */
300
	private static function mergePositions( $curValue, array $shutdownPositions ) {
301
		/** @var $curPositions DBMasterPos[] */
302
		if ( $curValue === false ) {
303
			$curPositions = $shutdownPositions;
304
		} else {
305
			$curPositions = $curValue['positions'];
306
			// Use the newest positions for each DB master
307
			foreach ( $shutdownPositions as $db => $pos ) {
308
				if ( !isset( $curPositions[$db] )
309
					|| $pos->asOfTime() > $curPositions[$db]->asOfTime()
310
				) {
311
					$curPositions[$db] = $pos;
312
				}
313
			}
314
		}
315
316
		return [ 'positions' => $curPositions ];
317
	}
318
}
319