Completed
Branch master (fc4983)
by
unknown
30:21
created

LoadBalancer::laggedReplicaUsed()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 3
Code Lines 2

Duplication

Lines 0
Ratio 0 %

Importance

Changes 1
Bugs 0 Features 0
Metric Value
cc 1
eloc 2
c 1
b 0
f 0
nc 1
nop 0
dl 0
loc 3
rs 10
1
<?php
2
/**
3
 * Database load balancing.
4
 *
5
 * This program is free software; you can redistribute it and/or modify
6
 * it under the terms of the GNU General Public License as published by
7
 * the Free Software Foundation; either version 2 of the License, or
8
 * (at your option) any later version.
9
 *
10
 * This program is distributed in the hope that it will be useful,
11
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13
 * GNU General Public License for more details.
14
 *
15
 * You should have received a copy of the GNU General Public License along
16
 * with this program; if not, write to the Free Software Foundation, Inc.,
17
 * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
18
 * http://www.gnu.org/copyleft/gpl.html
19
 *
20
 * @file
21
 * @ingroup Database
22
 */
23
24
/**
25
 * Database load balancing object
26
 *
27
 * @todo document
28
 * @ingroup Database
29
 */
30
class LoadBalancer {
31
	/** @var array[] Map of (server index => server config array) */
32
	private $mServers;
33
	/** @var array[] Map of (local/foreignUsed/foreignFree => server index => DatabaseBase array) */
34
	private $mConns;
35
	/** @var array Map of (server index => weight) */
36
	private $mLoads;
37
	/** @var array[] Map of (group => server index => weight) */
38
	private $mGroupLoads;
39
	/** @var bool Whether to disregard replica DB lag as a factor in replica DB selection */
40
	private $mAllowLagged;
41
	/** @var integer Seconds to spend waiting on replica DB lag to resolve */
42
	private $mWaitTimeout;
43
	/** @var array LBFactory information */
44
	private $mParentInfo;
45
46
	/** @var string The LoadMonitor subclass name */
47
	private $mLoadMonitorClass;
48
	/** @var LoadMonitor */
49
	private $mLoadMonitor;
50
	/** @var BagOStuff */
51
	private $srvCache;
52
	/** @var WANObjectCache */
53
	private $wanCache;
54
	/** @var TransactionProfiler */
55
	protected $trxProfiler;
56
57
	/** @var bool|DatabaseBase Database connection that caused a problem */
58
	private $mErrorConnection;
59
	/** @var integer The generic (not query grouped) replica DB index (of $mServers) */
60
	private $mReadIndex;
61
	/** @var bool|DBMasterPos False if not set */
62
	private $mWaitForPos;
63
	/** @var bool Whether the generic reader fell back to a lagged replica DB */
64
	private $laggedReplicaMode = false;
65
	/** @var bool Whether the generic reader fell back to a lagged replica DB */
66
	private $allReplicasDownMode = false;
67
	/** @var string The last DB selection or connection error */
68
	private $mLastError = 'Unknown error';
69
	/** @var string|bool Reason the LB is read-only or false if not */
70
	private $readOnlyReason = false;
71
	/** @var integer Total connections opened */
72
	private $connsOpened = 0;
73
	/** @var string|bool String if a requested DBO_TRX transaction round is active */
74
	private $trxRoundId = false;
75
	/** @var array[] Map of (name => callable) */
76
	private $trxRecurringCallbacks = [];
77
78
	/** @var integer Warn when this many connection are held */
79
	const CONN_HELD_WARN_THRESHOLD = 10;
80
	/** @var integer Default 'max lag' when unspecified */
81
	const MAX_LAG_DEFAULT = 10;
82
	/** @var integer Max time to wait for a replica DB to catch up (e.g. ChronologyProtector) */
83
	const POS_WAIT_TIMEOUT = 10;
84
	/** @var integer Seconds to cache master server read-only status */
85
	const TTL_CACHE_READONLY = 5;
86
87
	/**
88
	 * @var boolean
89
	 */
90
	private $disabled = false;
91
92
	/**
93
	 * @param array $params Array with keys:
94
	 *  - servers : Required. Array of server info structures.
95
	 *  - loadMonitor : Name of a class used to fetch server lag and load.
96
	 *  - readOnlyReason : Reason the master DB is read-only if so [optional]
97
	 *  - waitTimeout : Maximum time to wait for replicas for consistency [optional]
98
	 *  - srvCache : BagOStuff object [optional]
99
	 *  - wanCache : WANObjectCache object [optional]
100
	 * @throws MWException
101
	 */
102
	public function __construct( array $params ) {
103
		if ( !isset( $params['servers'] ) ) {
104
			throw new MWException( __CLASS__ . ': missing servers parameter' );
105
		}
106
		$this->mServers = $params['servers'];
107
		$this->mWaitTimeout = isset( $params['waitTimeout'] )
108
			? $params['waitTimeout']
109
			: self::POS_WAIT_TIMEOUT;
110
111
		$this->mReadIndex = -1;
112
		$this->mWriteIndex = -1;
0 ignored issues
show
Bug introduced by
The property mWriteIndex does not exist. Did you maybe forget to declare it?

In PHP it is possible to write to properties without declaring them. For example, the following is perfectly valid PHP code:

class MyClass { }

$x = new MyClass();
$x->foo = true;

Generally, it is a good practice to explictly declare properties to avoid accidental typos and provide IDE auto-completion:

class MyClass {
    public $foo;
}

$x = new MyClass();
$x->foo = true;
Loading history...
113
		$this->mConns = [
0 ignored issues
show
Documentation Bug introduced by
It seems like array('local' => array()...oreignFree' => array()) of type array<string,array,{"loc..."foreignFree":"array"}> is incompatible with the declared type array<integer,array> of property $mConns.

Our type inference engine has found an assignment to a property that is incompatible with the declared type of that property.

Either this assignment is in error or the assigned type should be added to the documentation/type hint for that property..

Loading history...
114
			'local' => [],
115
			'foreignUsed' => [],
116
			'foreignFree' => [] ];
117
		$this->mLoads = [];
118
		$this->mWaitForPos = false;
119
		$this->mErrorConnection = false;
120
		$this->mAllowLagged = false;
121
122 View Code Duplication
		if ( isset( $params['readOnlyReason'] ) && is_string( $params['readOnlyReason'] ) ) {
123
			$this->readOnlyReason = $params['readOnlyReason'];
124
		}
125
126
		if ( isset( $params['loadMonitor'] ) ) {
127
			$this->mLoadMonitorClass = $params['loadMonitor'];
128
		} else {
129
			$master = reset( $params['servers'] );
130
			if ( isset( $master['type'] ) && $master['type'] === 'mysql' ) {
131
				$this->mLoadMonitorClass = 'LoadMonitorMySQL';
132
			} else {
133
				$this->mLoadMonitorClass = 'LoadMonitorNull';
134
			}
135
		}
136
137
		foreach ( $params['servers'] as $i => $server ) {
138
			$this->mLoads[$i] = $server['load'];
139
			if ( isset( $server['groupLoads'] ) ) {
140
				foreach ( $server['groupLoads'] as $group => $ratio ) {
141
					if ( !isset( $this->mGroupLoads[$group] ) ) {
142
						$this->mGroupLoads[$group] = [];
143
					}
144
					$this->mGroupLoads[$group][$i] = $ratio;
145
				}
146
			}
147
		}
148
149
		if ( isset( $params['srvCache'] ) ) {
150
			$this->srvCache = $params['srvCache'];
151
		} else {
152
			$this->srvCache = new EmptyBagOStuff();
153
		}
154
		if ( isset( $params['wanCache'] ) ) {
155
			$this->wanCache = $params['wanCache'];
156
		} else {
157
			$this->wanCache = WANObjectCache::newEmpty();
158
		}
159
		if ( isset( $params['trxProfiler'] ) ) {
160
			$this->trxProfiler = $params['trxProfiler'];
161
		} else {
162
			$this->trxProfiler = new TransactionProfiler();
163
		}
164
	}
165
166
	/**
167
	 * Get a LoadMonitor instance
168
	 *
169
	 * @return LoadMonitor
170
	 */
171
	private function getLoadMonitor() {
172
		if ( !isset( $this->mLoadMonitor ) ) {
173
			$class = $this->mLoadMonitorClass;
174
			$this->mLoadMonitor = new $class( $this );
175
		}
176
177
		return $this->mLoadMonitor;
178
	}
179
180
	/**
181
	 * Get or set arbitrary data used by the parent object, usually an LBFactory
182
	 * @param mixed $x
183
	 * @return mixed
184
	 */
185
	public function parentInfo( $x = null ) {
186
		return wfSetVar( $this->mParentInfo, $x );
187
	}
188
189
	/**
190
	 * @param array $loads
191
	 * @param bool|string $wiki Wiki to get non-lagged for
192
	 * @param int $maxLag Restrict the maximum allowed lag to this many seconds
193
	 * @return bool|int|string
194
	 */
195
	private function getRandomNonLagged( array $loads, $wiki = false, $maxLag = INF ) {
196
		$lags = $this->getLagTimes( $wiki );
197
198
		# Unset excessively lagged servers
199
		foreach ( $lags as $i => $lag ) {
200
			if ( $i != 0 ) {
201
				# How much lag this server nominally is allowed to have
202
				$maxServerLag = isset( $this->mServers[$i]['max lag'] )
203
					? $this->mServers[$i]['max lag']
204
					: self::MAX_LAG_DEFAULT; // default
205
				# Constrain that futher by $maxLag argument
206
				$maxServerLag = min( $maxServerLag, $maxLag );
207
208
				$host = $this->getServerName( $i );
209
				if ( $lag === false && !is_infinite( $maxServerLag ) ) {
210
					wfDebugLog( 'replication', "Server $host (#$i) is not replicating?" );
211
					unset( $loads[$i] );
212
				} elseif ( $lag > $maxServerLag ) {
213
					wfDebugLog( 'replication', "Server $host (#$i) has >= $lag seconds of lag" );
214
					unset( $loads[$i] );
215
				}
216
			}
217
		}
218
219
		# Find out if all the replica DBs with non-zero load are lagged
220
		$sum = 0;
221
		foreach ( $loads as $load ) {
222
			$sum += $load;
223
		}
224
		if ( $sum == 0 ) {
225
			# No appropriate DB servers except maybe the master and some replica DBs with zero load
226
			# Do NOT use the master
227
			# Instead, this function will return false, triggering read-only mode,
228
			# and a lagged replica DB will be used instead.
229
			return false;
230
		}
231
232
		if ( count( $loads ) == 0 ) {
233
			return false;
234
		}
235
236
		# Return a random representative of the remainder
237
		return ArrayUtils::pickRandom( $loads );
238
	}
239
240
	/**
241
	 * Get the index of the reader connection, which may be a replica DB
242
	 * This takes into account load ratios and lag times. It should
243
	 * always return a consistent index during a given invocation
244
	 *
245
	 * Side effect: opens connections to databases
246
	 * @param string|bool $group Query group, or false for the generic reader
247
	 * @param string|bool $wiki Wiki ID, or false for the current wiki
248
	 * @throws MWException
249
	 * @return bool|int|string
250
	 */
251
	public function getReaderIndex( $group = false, $wiki = false ) {
252
		global $wgDBtype;
253
254
		# @todo FIXME: For now, only go through all this for mysql databases
255
		if ( $wgDBtype != 'mysql' ) {
256
			return $this->getWriterIndex();
257
		}
258
259
		if ( count( $this->mServers ) == 1 ) {
260
			# Skip the load balancing if there's only one server
261
			return 0;
262
		} elseif ( $group === false && $this->mReadIndex >= 0 ) {
263
			# Shortcut if generic reader exists already
264
			return $this->mReadIndex;
265
		}
266
267
		# Find the relevant load array
268
		if ( $group !== false ) {
269
			if ( isset( $this->mGroupLoads[$group] ) ) {
270
				$nonErrorLoads = $this->mGroupLoads[$group];
271
			} else {
272
				# No loads for this group, return false and the caller can use some other group
273
				wfDebugLog( 'connect', __METHOD__ . ": no loads for group $group\n" );
274
275
				return false;
276
			}
277
		} else {
278
			$nonErrorLoads = $this->mLoads;
279
		}
280
281
		if ( !count( $nonErrorLoads ) ) {
282
			throw new MWException( "Empty server array given to LoadBalancer" );
283
		}
284
285
		# Scale the configured load ratios according to the dynamic load (if the load monitor supports it)
286
		$this->getLoadMonitor()->scaleLoads( $nonErrorLoads, $group, $wiki );
287
288
		$laggedReplicaMode = false;
289
290
		# No server found yet
291
		$i = false;
292
		# First try quickly looking through the available servers for a server that
293
		# meets our criteria
294
		$currentLoads = $nonErrorLoads;
295
		while ( count( $currentLoads ) ) {
296
			if ( $this->mAllowLagged || $laggedReplicaMode ) {
297
				$i = ArrayUtils::pickRandom( $currentLoads );
298
			} else {
299
				$i = false;
300
				if ( $this->mWaitForPos && $this->mWaitForPos->asOfTime() ) {
301
					# ChronologyProtecter causes mWaitForPos to be set via sessions.
302
					# This triggers doWait() after connect, so it's especially good to
303
					# avoid lagged servers so as to avoid just blocking in that method.
304
					$ago = microtime( true ) - $this->mWaitForPos->asOfTime();
305
					# Aim for <= 1 second of waiting (being too picky can backfire)
306
					$i = $this->getRandomNonLagged( $currentLoads, $wiki, $ago + 1 );
307
				}
308
				if ( $i === false ) {
309
					# Any server with less lag than it's 'max lag' param is preferable
310
					$i = $this->getRandomNonLagged( $currentLoads, $wiki );
311
				}
312
				if ( $i === false && count( $currentLoads ) != 0 ) {
313
					# All replica DBs lagged. Switch to read-only mode
314
					wfDebugLog( 'replication', "All replica DBs lagged. Switch to read-only mode" );
315
					$i = ArrayUtils::pickRandom( $currentLoads );
316
					$laggedReplicaMode = true;
317
				}
318
			}
319
320
			if ( $i === false ) {
321
				# pickRandom() returned false
322
				# This is permanent and means the configuration or the load monitor
323
				# wants us to return false.
324
				wfDebugLog( 'connect', __METHOD__ . ": pickRandom() returned false" );
325
326
				return false;
327
			}
328
329
			$serverName = $this->getServerName( $i );
330
			wfDebugLog( 'connect', __METHOD__ . ": Using reader #$i: $serverName..." );
331
332
			$conn = $this->openConnection( $i, $wiki );
333
			if ( !$conn ) {
334
				wfDebugLog( 'connect', __METHOD__ . ": Failed connecting to $i/$wiki" );
335
				unset( $nonErrorLoads[$i] );
336
				unset( $currentLoads[$i] );
337
				$i = false;
338
				continue;
339
			}
340
341
			// Decrement reference counter, we are finished with this connection.
342
			// It will be incremented for the caller later.
343
			if ( $wiki !== false ) {
344
				$this->reuseConnection( $conn );
345
			}
346
347
			# Return this server
348
			break;
349
		}
350
351
		# If all servers were down, quit now
352
		if ( !count( $nonErrorLoads ) ) {
353
			wfDebugLog( 'connect', "All servers down" );
354
		}
355
356
		if ( $i !== false ) {
357
			# Replica DB connection successful.
358
			# Wait for the session master pos for a short time.
359
			if ( $this->mWaitForPos && $i > 0 ) {
360
				$this->doWait( $i );
361
			}
362
			if ( $this->mReadIndex <= 0 && $this->mLoads[$i] > 0 && $group === false ) {
363
				$this->mReadIndex = $i;
0 ignored issues
show
Documentation Bug introduced by
It seems like $i can also be of type string. However, the property $mReadIndex is declared as type integer. Maybe add an additional type check?

Our type inference engine has found a suspicous assignment of a value to a property. This check raises an issue when a value that can be of a mixed type is assigned to a property that is type hinted more strictly.

For example, imagine you have a variable $accountId that can either hold an Id object or false (if there is no account id yet). Your code now assigns that value to the id property of an instance of the Account class. This class holds a proper account, so the id value must no longer be false.

Either this assignment is in error or a type check should be added for that assignment.

class Id
{
    public $id;

    public function __construct($id)
    {
        $this->id = $id;
    }

}

class Account
{
    /** @var  Id $id */
    public $id;
}

$account_id = false;

if (starsAreRight()) {
    $account_id = new Id(42);
}

$account = new Account();
if ($account instanceof Id)
{
    $account->id = $account_id;
}
Loading history...
364
				# Record if the generic reader index is in "lagged replica DB" mode
365
				if ( $laggedReplicaMode ) {
366
					$this->laggedReplicaMode = true;
367
				}
368
			}
369
			$serverName = $this->getServerName( $i );
370
			wfDebugLog( 'connect', __METHOD__ .
371
				": using server $serverName for group '$group'\n" );
372
		}
373
374
		return $i;
375
	}
376
377
	/**
378
	 * Set the master wait position
379
	 * If a DB_REPLICA connection has been opened already, waits
380
	 * Otherwise sets a variable telling it to wait if such a connection is opened
381
	 * @param DBMasterPos $pos
382
	 */
383
	public function waitFor( $pos ) {
384
		$this->mWaitForPos = $pos;
385
		$i = $this->mReadIndex;
386
387
		if ( $i > 0 ) {
388
			if ( !$this->doWait( $i ) ) {
389
				$this->laggedReplicaMode = true;
390
			}
391
		}
392
	}
393
394
	/**
395
	 * Set the master wait position and wait for a "generic" replica DB to catch up to it
396
	 *
397
	 * This can be used a faster proxy for waitForAll()
398
	 *
399
	 * @param DBMasterPos $pos
400
	 * @param int $timeout Max seconds to wait; default is mWaitTimeout
401
	 * @return bool Success (able to connect and no timeouts reached)
402
	 * @since 1.26
403
	 */
404
	public function waitForOne( $pos, $timeout = null ) {
405
		$this->mWaitForPos = $pos;
406
407
		$i = $this->mReadIndex;
408
		if ( $i <= 0 ) {
409
			// Pick a generic replica DB if there isn't one yet
410
			$readLoads = $this->mLoads;
411
			unset( $readLoads[$this->getWriterIndex()] ); // replica DBs only
412
			$readLoads = array_filter( $readLoads ); // with non-zero load
413
			$i = ArrayUtils::pickRandom( $readLoads );
414
		}
415
416 View Code Duplication
		if ( $i > 0 ) {
417
			$ok = $this->doWait( $i, true, $timeout );
418
		} else {
419
			$ok = true; // no applicable loads
420
		}
421
422
		return $ok;
423
	}
424
425
	/**
426
	 * Set the master wait position and wait for ALL replica DBs to catch up to it
427
	 * @param DBMasterPos $pos
428
	 * @param int $timeout Max seconds to wait; default is mWaitTimeout
429
	 * @return bool Success (able to connect and no timeouts reached)
430
	 */
431
	public function waitForAll( $pos, $timeout = null ) {
432
		$this->mWaitForPos = $pos;
433
		$serverCount = count( $this->mServers );
434
435
		$ok = true;
436
		for ( $i = 1; $i < $serverCount; $i++ ) {
437 View Code Duplication
			if ( $this->mLoads[$i] > 0 ) {
438
				$ok = $this->doWait( $i, true, $timeout ) && $ok;
439
			}
440
		}
441
442
		return $ok;
443
	}
444
445
	/**
446
	 * Get any open connection to a given server index, local or foreign
447
	 * Returns false if there is no connection open
448
	 *
449
	 * @param int $i
450
	 * @return DatabaseBase|bool False on failure
451
	 */
452
	public function getAnyOpenConnection( $i ) {
453
		foreach ( $this->mConns as $conns ) {
454
			if ( !empty( $conns[$i] ) ) {
455
				return reset( $conns[$i] );
456
			}
457
		}
458
459
		return false;
460
	}
461
462
	/**
463
	 * Wait for a given replica DB to catch up to the master pos stored in $this
464
	 * @param int $index Server index
465
	 * @param bool $open Check the server even if a new connection has to be made
466
	 * @param int $timeout Max seconds to wait; default is mWaitTimeout
467
	 * @return bool
468
	 */
469
	protected function doWait( $index, $open = false, $timeout = null ) {
470
		$close = false; // close the connection afterwards
471
472
		// Check if we already know that the DB has reached this point
473
		$server = $this->getServerName( $index );
474
		$key = $this->srvCache->makeGlobalKey( __CLASS__, 'last-known-pos', $server );
475
		/** @var DBMasterPos $knownReachedPos */
476
		$knownReachedPos = $this->srvCache->get( $key );
477
		if ( $knownReachedPos && $knownReachedPos->hasReached( $this->mWaitForPos ) ) {
0 ignored issues
show
Bug introduced by
It seems like $this->mWaitForPos can also be of type boolean; however, DBMasterPos::hasReached() does only seem to accept object<DBMasterPos>, maybe add an additional type check?

If a method or function can return multiple different values and unless you are sure that you only can receive a single value in this context, we recommend to add an additional type check:

/**
 * @return array|string
 */
function returnsDifferentValues($x) {
    if ($x) {
        return 'foo';
    }

    return array();
}

$x = returnsDifferentValues($y);
if (is_array($x)) {
    // $x is an array.
}

If this a common case that PHP Analyzer should handle natively, please let us know by opening an issue.

Loading history...
478
			wfDebugLog( 'replication', __METHOD__ .
479
				": replica DB $server known to be caught up (pos >= $knownReachedPos).\n" );
480
			return true;
481
		}
482
483
		// Find a connection to wait on, creating one if needed and allowed
484
		$conn = $this->getAnyOpenConnection( $index );
485
		if ( !$conn ) {
486
			if ( !$open ) {
487
				wfDebugLog( 'replication', __METHOD__ . ": no connection open for $server\n" );
488
489
				return false;
490
			} else {
491
				$conn = $this->openConnection( $index, '' );
492
				if ( !$conn ) {
493
					wfDebugLog( 'replication', __METHOD__ . ": failed to connect to $server\n" );
494
495
					return false;
496
				}
497
				// Avoid connection spam in waitForAll() when connections
498
				// are made just for the sake of doing this lag check.
499
				$close = true;
500
			}
501
		}
502
503
		wfDebugLog( 'replication', __METHOD__ . ": Waiting for replica DB $server to catch up...\n" );
504
		$timeout = $timeout ?: $this->mWaitTimeout;
505
		$result = $conn->masterPosWait( $this->mWaitForPos, $timeout );
0 ignored issues
show
Bug introduced by
It seems like $this->mWaitForPos can also be of type boolean; however, DatabaseBase::masterPosWait() does only seem to accept object<DBMasterPos>, maybe add an additional type check?

If a method or function can return multiple different values and unless you are sure that you only can receive a single value in this context, we recommend to add an additional type check:

/**
 * @return array|string
 */
function returnsDifferentValues($x) {
    if ($x) {
        return 'foo';
    }

    return array();
}

$x = returnsDifferentValues($y);
if (is_array($x)) {
    // $x is an array.
}

If this a common case that PHP Analyzer should handle natively, please let us know by opening an issue.

Loading history...
506
507
		if ( $result == -1 || is_null( $result ) ) {
508
			// Timed out waiting for replica DB, use master instead
509
			$msg = __METHOD__ . ": Timed out waiting on $server pos {$this->mWaitForPos}";
510
			wfDebugLog( 'replication', "$msg\n" );
511
			wfDebugLog( 'DBPerformance', "$msg:\n" . wfBacktrace( true ) );
512
			$ok = false;
513
		} else {
514
			wfDebugLog( 'replication', __METHOD__ . ": Done\n" );
515
			$ok = true;
516
			// Remember that the DB reached this point
517
			$this->srvCache->set( $key, $this->mWaitForPos, BagOStuff::TTL_DAY );
518
		}
519
520
		if ( $close ) {
521
			$this->closeConnection( $conn );
0 ignored issues
show
Bug introduced by
It seems like $conn can also be of type boolean; however, LoadBalancer::closeConnection() does only seem to accept object<DatabaseBase>, maybe add an additional type check?

If a method or function can return multiple different values and unless you are sure that you only can receive a single value in this context, we recommend to add an additional type check:

/**
 * @return array|string
 */
function returnsDifferentValues($x) {
    if ($x) {
        return 'foo';
    }

    return array();
}

$x = returnsDifferentValues($y);
if (is_array($x)) {
    // $x is an array.
}

If this a common case that PHP Analyzer should handle natively, please let us know by opening an issue.

Loading history...
522
		}
523
524
		return $ok;
525
	}
526
527
	/**
528
	 * Get a connection by index
529
	 * This is the main entry point for this class.
530
	 *
531
	 * @param int $i Server index
532
	 * @param array|string|bool $groups Query group(s), or false for the generic reader
533
	 * @param string|bool $wiki Wiki ID, or false for the current wiki
534
	 *
535
	 * @throws MWException
536
	 * @return DatabaseBase
537
	 */
538
	public function getConnection( $i, $groups = [], $wiki = false ) {
539
		if ( $i === null || $i === false ) {
540
			throw new MWException( 'Attempt to call ' . __METHOD__ .
541
				' with invalid server index' );
542
		}
543
544
		if ( $wiki === wfWikiID() ) {
545
			$wiki = false;
546
		}
547
548
		$groups = ( $groups === false || $groups === [] )
549
			? [ false ] // check one "group": the generic pool
550
			: (array)$groups;
551
552
		$masterOnly = ( $i == DB_MASTER || $i == $this->getWriterIndex() );
553
		$oldConnsOpened = $this->connsOpened; // connections open now
554
555
		if ( $i == DB_MASTER ) {
556
			$i = $this->getWriterIndex();
557
		} else {
558
			# Try to find an available server in any the query groups (in order)
559
			foreach ( $groups as $group ) {
560
				$groupIndex = $this->getReaderIndex( $group, $wiki );
561
				if ( $groupIndex !== false ) {
562
					$i = $groupIndex;
563
					break;
564
				}
565
			}
566
		}
567
568
		# Operation-based index
569
		if ( $i == DB_REPLICA ) {
570
			$this->mLastError = 'Unknown error'; // reset error string
571
			# Try the general server pool if $groups are unavailable.
572
			$i = in_array( false, $groups, true )
573
				? false // don't bother with this if that is what was tried above
574
				: $this->getReaderIndex( false, $wiki );
575
			# Couldn't find a working server in getReaderIndex()?
576
			if ( $i === false ) {
577
				$this->mLastError = 'No working replica DB server: ' . $this->mLastError;
578
579
				return $this->reportConnectionError();
580
			}
581
		}
582
583
		# Now we have an explicit index into the servers array
584
		$conn = $this->openConnection( $i, $wiki );
585
		if ( !$conn ) {
586
			return $this->reportConnectionError();
587
		}
588
589
		# Profile any new connections that happen
590
		if ( $this->connsOpened > $oldConnsOpened ) {
591
			$host = $conn->getServer();
592
			$dbname = $conn->getDBname();
593
			$trxProf = Profiler::instance()->getTransactionProfiler();
594
			$trxProf->recordConnection( $host, $dbname, $masterOnly );
595
		}
596
597
		if ( $masterOnly ) {
598
			# Make master-requested DB handles inherit any read-only mode setting
599
			$conn->setLBInfo( 'readOnlyReason', $this->getReadOnlyReason( $wiki, $conn ) );
0 ignored issues
show
Bug introduced by
It seems like $conn defined by $this->openConnection($i, $wiki) on line 584 can also be of type boolean; however, LoadBalancer::getReadOnlyReason() does only seem to accept null|object<DatabaseBase>, maybe add an additional type check?

If a method or function can return multiple different values and unless you are sure that you only can receive a single value in this context, we recommend to add an additional type check:

/**
 * @return array|string
 */
function returnsDifferentValues($x) {
    if ($x) {
        return 'foo';
    }

    return array();
}

$x = returnsDifferentValues($y);
if (is_array($x)) {
    // $x is an array.
}

If this a common case that PHP Analyzer should handle natively, please let us know by opening an issue.

Loading history...
600
		}
601
602
		return $conn;
603
	}
604
605
	/**
606
	 * Mark a foreign connection as being available for reuse under a different
607
	 * DB name or prefix. This mechanism is reference-counted, and must be called
608
	 * the same number of times as getConnection() to work.
609
	 *
610
	 * @param DatabaseBase $conn
611
	 * @throws MWException
612
	 */
613
	public function reuseConnection( $conn ) {
614
		$serverIndex = $conn->getLBInfo( 'serverIndex' );
615
		$refCount = $conn->getLBInfo( 'foreignPoolRefCount' );
616
		if ( $serverIndex === null || $refCount === null ) {
617
			/**
618
			 * This can happen in code like:
619
			 *   foreach ( $dbs as $db ) {
620
			 *     $conn = $lb->getConnection( DB_REPLICA, [], $db );
621
			 *     ...
622
			 *     $lb->reuseConnection( $conn );
623
			 *   }
624
			 * When a connection to the local DB is opened in this way, reuseConnection()
625
			 * should be ignored
626
			 */
627
			return;
628
		}
629
630
		$dbName = $conn->getDBname();
631
		$prefix = $conn->tablePrefix();
632
		if ( strval( $prefix ) !== '' ) {
633
			$wiki = "$dbName-$prefix";
634
		} else {
635
			$wiki = $dbName;
636
		}
637
		if ( $this->mConns['foreignUsed'][$serverIndex][$wiki] !== $conn ) {
638
			throw new MWException( __METHOD__ . ": connection not found, has " .
639
				"the connection been freed already?" );
640
		}
641
		$conn->setLBInfo( 'foreignPoolRefCount', --$refCount );
642
		if ( $refCount <= 0 ) {
643
			$this->mConns['foreignFree'][$serverIndex][$wiki] = $conn;
644
			unset( $this->mConns['foreignUsed'][$serverIndex][$wiki] );
645
			wfDebug( __METHOD__ . ": freed connection $serverIndex/$wiki\n" );
646
		} else {
647
			wfDebug( __METHOD__ . ": reference count for $serverIndex/$wiki reduced to $refCount\n" );
648
		}
649
	}
650
651
	/**
652
	 * Get a database connection handle reference
653
	 *
654
	 * The handle's methods wrap simply wrap those of a DatabaseBase handle
655
	 *
656
	 * @see LoadBalancer::getConnection() for parameter information
657
	 *
658
	 * @param int $db
659
	 * @param array|string|bool $groups Query group(s), or false for the generic reader
660
	 * @param string|bool $wiki Wiki ID, or false for the current wiki
661
	 * @return DBConnRef
662
	 */
663
	public function getConnectionRef( $db, $groups = [], $wiki = false ) {
664
		return new DBConnRef( $this, $this->getConnection( $db, $groups, $wiki ) );
665
	}
666
667
	/**
668
	 * Get a database connection handle reference without connecting yet
669
	 *
670
	 * The handle's methods wrap simply wrap those of a DatabaseBase handle
671
	 *
672
	 * @see LoadBalancer::getConnection() for parameter information
673
	 *
674
	 * @param int $db
675
	 * @param array|string|bool $groups Query group(s), or false for the generic reader
676
	 * @param string|bool $wiki Wiki ID, or false for the current wiki
677
	 * @return DBConnRef
678
	 */
679
	public function getLazyConnectionRef( $db, $groups = [], $wiki = false ) {
680
		return new DBConnRef( $this, [ $db, $groups, $wiki ] );
681
	}
682
683
	/**
684
	 * Open a connection to the server given by the specified index
685
	 * Index must be an actual index into the array.
686
	 * If the server is already open, returns it.
687
	 *
688
	 * On error, returns false, and the connection which caused the
689
	 * error will be available via $this->mErrorConnection.
690
	 *
691
	 * @note If disable() was called on this LoadBalancer, this method will throw a DBAccessError.
692
	 *
693
	 * @param int $i Server index
694
	 * @param string|bool $wiki Wiki ID, or false for the current wiki
695
	 * @return DatabaseBase|bool Returns false on errors
696
	 */
697
	public function openConnection( $i, $wiki = false ) {
698
		if ( $wiki !== false ) {
699
			$conn = $this->openForeignConnection( $i, $wiki );
0 ignored issues
show
Bug introduced by
It seems like $wiki defined by parameter $wiki on line 697 can also be of type boolean; however, LoadBalancer::openForeignConnection() does only seem to accept string, maybe add an additional type check?

This check looks at variables that have been passed in as parameters and are passed out again to other methods.

If the outgoing method call has stricter type requirements than the method itself, an issue is raised.

An additional type check may prevent trouble.

Loading history...
700
		} elseif ( isset( $this->mConns['local'][$i][0] ) ) {
701
			$conn = $this->mConns['local'][$i][0];
702
		} else {
703
			$server = $this->mServers[$i];
704
			$server['serverIndex'] = $i;
705
			$conn = $this->reallyOpenConnection( $server, false );
706
			$serverName = $this->getServerName( $i );
707
			if ( $conn->isOpen() ) {
708
				wfDebugLog( 'connect', "Connected to database $i at $serverName\n" );
709
				$this->mConns['local'][$i][0] = $conn;
710
			} else {
711
				wfDebugLog( 'connect', "Failed to connect to database $i at $serverName\n" );
712
				$this->mErrorConnection = $conn;
713
				$conn = false;
714
			}
715
		}
716
717
		if ( $conn && !$conn->isOpen() ) {
718
			// Connection was made but later unrecoverably lost for some reason.
719
			// Do not return a handle that will just throw exceptions on use,
720
			// but let the calling code (e.g. getReaderIndex) try another server.
721
			// See DatabaseMyslBase::ping() for how this can happen.
722
			$this->mErrorConnection = $conn;
723
			$conn = false;
724
		}
725
726
		return $conn;
727
	}
728
729
	/**
730
	 * Open a connection to a foreign DB, or return one if it is already open.
731
	 *
732
	 * Increments a reference count on the returned connection which locks the
733
	 * connection to the requested wiki. This reference count can be
734
	 * decremented by calling reuseConnection().
735
	 *
736
	 * If a connection is open to the appropriate server already, but with the wrong
737
	 * database, it will be switched to the right database and returned, as long as
738
	 * it has been freed first with reuseConnection().
739
	 *
740
	 * On error, returns false, and the connection which caused the
741
	 * error will be available via $this->mErrorConnection.
742
	 *
743
	 * @note If disable() was called on this LoadBalancer, this method will throw a DBAccessError.
744
	 *
745
	 * @param int $i Server index
746
	 * @param string $wiki Wiki ID to open
747
	 * @return DatabaseBase
748
	 */
749
	private function openForeignConnection( $i, $wiki ) {
750
		list( $dbName, $prefix ) = wfSplitWikiID( $wiki );
751
		if ( isset( $this->mConns['foreignUsed'][$i][$wiki] ) ) {
752
			// Reuse an already-used connection
753
			$conn = $this->mConns['foreignUsed'][$i][$wiki];
754
			wfDebug( __METHOD__ . ": reusing connection $i/$wiki\n" );
755
		} elseif ( isset( $this->mConns['foreignFree'][$i][$wiki] ) ) {
756
			// Reuse a free connection for the same wiki
757
			$conn = $this->mConns['foreignFree'][$i][$wiki];
758
			unset( $this->mConns['foreignFree'][$i][$wiki] );
759
			$this->mConns['foreignUsed'][$i][$wiki] = $conn;
760
			wfDebug( __METHOD__ . ": reusing free connection $i/$wiki\n" );
761
		} elseif ( !empty( $this->mConns['foreignFree'][$i] ) ) {
762
			// Reuse a connection from another wiki
763
			$conn = reset( $this->mConns['foreignFree'][$i] );
764
			$oldWiki = key( $this->mConns['foreignFree'][$i] );
765
766
			// The empty string as a DB name means "don't care".
767
			// DatabaseMysqlBase::open() already handle this on connection.
768
			if ( $dbName !== '' && !$conn->selectDB( $dbName ) ) {
769
				$this->mLastError = "Error selecting database $dbName on server " .
770
					$conn->getServer() . " from client host " . wfHostname() . "\n";
771
				$this->mErrorConnection = $conn;
772
				$conn = false;
773
			} else {
774
				$conn->tablePrefix( $prefix );
775
				unset( $this->mConns['foreignFree'][$i][$oldWiki] );
776
				$this->mConns['foreignUsed'][$i][$wiki] = $conn;
777
				wfDebug( __METHOD__ . ": reusing free connection from $oldWiki for $wiki\n" );
778
			}
779
		} else {
780
			// Open a new connection
781
			$server = $this->mServers[$i];
782
			$server['serverIndex'] = $i;
783
			$server['foreignPoolRefCount'] = 0;
784
			$server['foreign'] = true;
785
			$conn = $this->reallyOpenConnection( $server, $dbName );
786
			if ( !$conn->isOpen() ) {
787
				wfDebug( __METHOD__ . ": error opening connection for $i/$wiki\n" );
788
				$this->mErrorConnection = $conn;
789
				$conn = false;
790
			} else {
791
				$conn->tablePrefix( $prefix );
792
				$this->mConns['foreignUsed'][$i][$wiki] = $conn;
793
				wfDebug( __METHOD__ . ": opened new connection for $i/$wiki\n" );
794
			}
795
		}
796
797
		// Increment reference count
798
		if ( $conn ) {
799
			$refCount = $conn->getLBInfo( 'foreignPoolRefCount' );
800
			$conn->setLBInfo( 'foreignPoolRefCount', $refCount + 1 );
801
		}
802
803
		return $conn;
804
	}
805
806
	/**
807
	 * Test if the specified index represents an open connection
808
	 *
809
	 * @param int $index Server index
810
	 * @access private
811
	 * @return bool
812
	 */
813
	private function isOpen( $index ) {
814
		if ( !is_integer( $index ) ) {
815
			return false;
816
		}
817
818
		return (bool)$this->getAnyOpenConnection( $index );
819
	}
820
821
	/**
822
	 * Really opens a connection. Uncached.
823
	 * Returns a Database object whether or not the connection was successful.
824
	 * @access private
825
	 *
826
	 * @param array $server
827
	 * @param bool $dbNameOverride
828
	 * @throws MWException
829
	 * @return DatabaseBase
830
	 */
831
	protected function reallyOpenConnection( $server, $dbNameOverride = false ) {
832
		if ( $this->disabled ) {
833
			throw new DBAccessError();
834
		}
835
836
		if ( !is_array( $server ) ) {
837
			throw new MWException( 'You must update your load-balancing configuration. ' .
838
				'See DefaultSettings.php entry for $wgDBservers.' );
839
		}
840
841
		if ( $dbNameOverride !== false ) {
842
			$server['dbname'] = $dbNameOverride;
843
		}
844
845
		// Let the handle know what the cluster master is (e.g. "db1052")
846
		$masterName = $this->getServerName( 0 );
847
		$server['clusterMasterHost'] = $masterName;
848
849
		// Log when many connection are made on requests
850
		if ( ++$this->connsOpened >= self::CONN_HELD_WARN_THRESHOLD ) {
851
			wfDebugLog( 'DBPerformance', __METHOD__ . ": " .
852
				"{$this->connsOpened}+ connections made (master=$masterName)\n" .
853
				wfBacktrace( true ) );
854
		}
855
856
		# Create object
857
		try {
858
			$db = DatabaseBase::factory( $server['type'], $server );
859
		} catch ( DBConnectionError $e ) {
860
			// FIXME: This is probably the ugliest thing I have ever done to
861
			// PHP. I'm half-expecting it to segfault, just out of disgust. -- TS
862
			$db = $e->db;
863
		}
864
865
		$db->setLBInfo( $server );
866
		$db->setLazyMasterHandle(
867
			$this->getLazyConnectionRef( DB_MASTER, [], $db->getWikiID() )
868
		);
869
		$db->setTransactionProfiler( $this->trxProfiler );
870
871
		if ( $server['serverIndex'] === $this->getWriterIndex() ) {
872
			if ( $this->trxRoundId !== false ) {
873
				$this->applyTransactionRoundFlags( $db );
874
			}
875
			foreach ( $this->trxRecurringCallbacks as $name => $callback ) {
876
				$db->setTransactionListener( $name, $callback );
877
			}
878
		}
879
880
		return $db;
881
	}
882
883
	/**
884
	 * @throws DBConnectionError
885
	 * @return bool
886
	 */
887
	private function reportConnectionError() {
888
		$conn = $this->mErrorConnection; // The connection which caused the error
889
		$context = [
890
			'method' => __METHOD__,
891
			'last_error' => $this->mLastError,
892
		];
893
894
		if ( !is_object( $conn ) ) {
895
			// No last connection, probably due to all servers being too busy
896
			wfLogDBError(
897
				"LB failure with no last connection. Connection error: {last_error}",
898
				$context
899
			);
900
901
			// If all servers were busy, mLastError will contain something sensible
902
			throw new DBConnectionError( null, $this->mLastError );
903
		} else {
904
			$context['db_server'] = $conn->getProperty( 'mServer' );
905
			wfLogDBError(
906
				"Connection error: {last_error} ({db_server})",
907
				$context
908
			);
909
910
			// throws DBConnectionError
911
			$conn->reportConnectionError( "{$this->mLastError} ({$context['db_server']})" );
912
		}
913
914
		return false; /* not reached */
915
	}
916
917
	/**
918
	 * @return int
919
	 * @since 1.26
920
	 */
921
	public function getWriterIndex() {
922
		return 0;
923
	}
924
925
	/**
926
	 * Returns true if the specified index is a valid server index
927
	 *
928
	 * @param string $i
929
	 * @return bool
930
	 */
931
	public function haveIndex( $i ) {
932
		return array_key_exists( $i, $this->mServers );
933
	}
934
935
	/**
936
	 * Returns true if the specified index is valid and has non-zero load
937
	 *
938
	 * @param string $i
939
	 * @return bool
940
	 */
941
	public function isNonZeroLoad( $i ) {
942
		return array_key_exists( $i, $this->mServers ) && $this->mLoads[$i] != 0;
943
	}
944
945
	/**
946
	 * Get the number of defined servers (not the number of open connections)
947
	 *
948
	 * @return int
949
	 */
950
	public function getServerCount() {
951
		return count( $this->mServers );
952
	}
953
954
	/**
955
	 * Get the host name or IP address of the server with the specified index
956
	 * Prefer a readable name if available.
957
	 * @param string $i
958
	 * @return string
959
	 */
960
	public function getServerName( $i ) {
961
		if ( isset( $this->mServers[$i]['hostName'] ) ) {
962
			$name = $this->mServers[$i]['hostName'];
963
		} elseif ( isset( $this->mServers[$i]['host'] ) ) {
964
			$name = $this->mServers[$i]['host'];
965
		} else {
966
			$name = '';
967
		}
968
969
		return ( $name != '' ) ? $name : 'localhost';
970
	}
971
972
	/**
973
	 * Return the server info structure for a given index, or false if the index is invalid.
974
	 * @param int $i
975
	 * @return array|bool
976
	 */
977
	public function getServerInfo( $i ) {
978
		if ( isset( $this->mServers[$i] ) ) {
979
			return $this->mServers[$i];
980
		} else {
981
			return false;
982
		}
983
	}
984
985
	/**
986
	 * Sets the server info structure for the given index. Entry at index $i
987
	 * is created if it doesn't exist
988
	 * @param int $i
989
	 * @param array $serverInfo
990
	 */
991
	public function setServerInfo( $i, array $serverInfo ) {
992
		$this->mServers[$i] = $serverInfo;
993
	}
994
995
	/**
996
	 * Get the current master position for chronology control purposes
997
	 * @return mixed
998
	 */
999
	public function getMasterPos() {
1000
		# If this entire request was served from a replica DB without opening a connection to the
1001
		# master (however unlikely that may be), then we can fetch the position from the replica DB.
1002
		$masterConn = $this->getAnyOpenConnection( 0 );
1003
		if ( !$masterConn ) {
1004
			$serverCount = count( $this->mServers );
1005
			for ( $i = 1; $i < $serverCount; $i++ ) {
1006
				$conn = $this->getAnyOpenConnection( $i );
1007
				if ( $conn ) {
1008
					return $conn->getSlavePos();
0 ignored issues
show
Bug introduced by
The method getSlavePos cannot be called on $conn (of type boolean).

Methods can only be called on objects. This check looks for methods being called on variables that have been inferred to never be objects.

Loading history...
1009
				}
1010
			}
1011
		} else {
1012
			return $masterConn->getMasterPos();
1013
		}
1014
1015
		return false;
1016
	}
1017
1018
	/**
1019
	 * Disable this load balancer. All connections are closed, and any attempt to
1020
	 * open a new connection will result in a DBAccessError.
1021
	 *
1022
	 * @since 1.27
1023
	 */
1024
	public function disable() {
1025
		$this->closeAll();
1026
		$this->disabled = true;
1027
	}
1028
1029
	/**
1030
	 * Close all open connections
1031
	 */
1032
	public function closeAll() {
1033
		$this->forEachOpenConnection( function ( DatabaseBase $conn ) {
1034
			$conn->close();
1035
		} );
1036
1037
		$this->mConns = [
0 ignored issues
show
Documentation Bug introduced by
It seems like array('local' => array()...oreignUsed' => array()) of type array<string,array,{"loc..."foreignUsed":"array"}> is incompatible with the declared type array<integer,array> of property $mConns.

Our type inference engine has found an assignment to a property that is incompatible with the declared type of that property.

Either this assignment is in error or the assigned type should be added to the documentation/type hint for that property..

Loading history...
1038
			'local' => [],
1039
			'foreignFree' => [],
1040
			'foreignUsed' => [],
1041
		];
1042
		$this->connsOpened = 0;
1043
	}
1044
1045
	/**
1046
	 * Close a connection
1047
	 * Using this function makes sure the LoadBalancer knows the connection is closed.
1048
	 * If you use $conn->close() directly, the load balancer won't update its state.
1049
	 * @param DatabaseBase $conn
1050
	 */
1051
	public function closeConnection( $conn ) {
1052
		$done = false;
1053
		foreach ( $this->mConns as $i1 => $conns2 ) {
1054
			foreach ( $conns2 as $i2 => $conns3 ) {
1055
				foreach ( $conns3 as $i3 => $candidateConn ) {
1056
					if ( $conn === $candidateConn ) {
1057
						$conn->close();
1058
						unset( $this->mConns[$i1][$i2][$i3] );
1059
						--$this->connsOpened;
1060
						$done = true;
1061
						break;
1062
					}
1063
				}
1064
			}
1065
		}
1066
		if ( !$done ) {
1067
			$conn->close();
1068
		}
1069
	}
1070
1071
	/**
1072
	 * Commit transactions on all open connections
1073
	 * @param string $fname Caller name
1074
	 * @throws DBExpectedError
1075
	 */
1076
	public function commitAll( $fname = __METHOD__ ) {
1077
		$failures = [];
1078
1079
		$restore = ( $this->trxRoundId !== false );
1080
		$this->trxRoundId = false;
1081
		$this->forEachOpenConnection(
1082
			function ( DatabaseBase $conn ) use ( $fname, $restore, &$failures ) {
1083
				try {
1084
					$conn->commit( $fname, $conn::FLUSHING_ALL_PEERS );
1085
				} catch ( DBError $e ) {
1086
					MWExceptionHandler::logException( $e );
1087
					$failures[] = "{$conn->getServer()}: {$e->getMessage()}";
1088
				}
1089
				if ( $restore && $conn->getLBInfo( 'master' ) ) {
1090
					$this->undoTransactionRoundFlags( $conn );
1091
				}
1092
			}
1093
		);
1094
1095
		if ( $failures ) {
1096
			throw new DBExpectedError(
1097
				null,
1098
				"Commit failed on server(s) " . implode( "\n", array_unique( $failures ) )
1099
			);
1100
		}
1101
	}
1102
1103
	/**
1104
	 * Perform all pre-commit callbacks that remain part of the atomic transactions
1105
	 * and disable any post-commit callbacks until runMasterPostTrxCallbacks()
1106
	 * @since 1.28
1107
	 */
1108
	public function finalizeMasterChanges() {
1109
		$this->forEachOpenMasterConnection( function ( DatabaseBase $conn ) {
1110
			// Any error should cause all DB transactions to be rolled back together
1111
			$conn->setTrxEndCallbackSuppression( false );
1112
			$conn->runOnTransactionPreCommitCallbacks();
1113
			// Defer post-commit callbacks until COMMIT finishes for all DBs
1114
			$conn->setTrxEndCallbackSuppression( true );
1115
		} );
1116
	}
1117
1118
	/**
1119
	 * Perform all pre-commit checks for things like replication safety
1120
	 * @param array $options Includes:
1121
	 *   - maxWriteDuration : max write query duration time in seconds
1122
	 * @throws DBTransactionError
1123
	 * @since 1.28
1124
	 */
1125
	public function approveMasterChanges( array $options ) {
1126
		$limit = isset( $options['maxWriteDuration'] ) ? $options['maxWriteDuration'] : 0;
1127
		$this->forEachOpenMasterConnection( function ( DatabaseBase $conn ) use ( $limit ) {
1128
			// If atomic sections or explicit transactions are still open, some caller must have
1129
			// caught an exception but failed to properly rollback any changes. Detect that and
1130
			// throw and error (causing rollback).
1131
			if ( $conn->explicitTrxActive() ) {
1132
				throw new DBTransactionError(
1133
					$conn,
1134
					"Explicit transaction still active. A caller may have caught an error."
1135
				);
1136
			}
1137
			// Assert that the time to replicate the transaction will be sane.
1138
			// If this fails, then all DB transactions will be rollback back together.
1139
			$time = $conn->pendingWriteQueryDuration( $conn::ESTIMATE_DB_APPLY );
1140
			if ( $limit > 0 && $time > $limit ) {
1141
				throw new DBTransactionError(
1142
					$conn,
1143
					wfMessage( 'transaction-duration-limit-exceeded', $time, $limit )->text()
1144
				);
1145
			}
1146
			// If a connection sits idle while slow queries execute on another, that connection
1147
			// may end up dropped before the commit round is reached. Ping servers to detect this.
1148
			if ( $conn->writesOrCallbacksPending() && !$conn->ping() ) {
1149
				throw new DBTransactionError(
1150
					$conn,
1151
					"A connection to the {$conn->getDBname()} database was lost before commit."
1152
				);
1153
			}
1154
		} );
1155
	}
1156
1157
	/**
1158
	 * Flush any master transaction snapshots and set DBO_TRX (if DBO_DEFAULT is set)
1159
	 *
1160
	 * The DBO_TRX setting will be reverted to the default in each of these methods:
1161
	 *   - commitMasterChanges()
1162
	 *   - rollbackMasterChanges()
1163
	 *   - commitAll()
1164
	 * This allows for custom transaction rounds from any outer transaction scope.
1165
	 *
1166
	 * @param string $fname
1167
	 * @throws DBExpectedError
1168
	 * @since 1.28
1169
	 */
1170
	public function beginMasterChanges( $fname = __METHOD__ ) {
1171
		if ( $this->trxRoundId !== false ) {
1172
			throw new DBTransactionError(
1173
				null,
1174
				"$fname: Transaction round '{$this->trxRoundId}' already started."
1175
			);
1176
		}
1177
		$this->trxRoundId = $fname;
1178
1179
		$failures = [];
1180
		$this->forEachOpenMasterConnection(
1181
			function ( DatabaseBase $conn ) use ( $fname, &$failures ) {
1182
				$conn->setTrxEndCallbackSuppression( true );
1183
				try {
1184
					$conn->clearSnapshot( $fname );
1185
				} catch ( DBError $e ) {
1186
					MWExceptionHandler::logException( $e );
1187
					$failures[] = "{$conn->getServer()}: {$e->getMessage()}";
1188
				}
1189
				$conn->setTrxEndCallbackSuppression( false );
1190
				$this->applyTransactionRoundFlags( $conn );
1191
			}
1192
		);
1193
1194 View Code Duplication
		if ( $failures ) {
1195
			throw new DBExpectedError(
1196
				null,
1197
				"$fname: Flush failed on server(s) " . implode( "\n", array_unique( $failures ) )
1198
			);
1199
		}
1200
	}
1201
1202
	/**
1203
	 * Issue COMMIT on all master connections where writes where done
1204
	 * @param string $fname Caller name
1205
	 * @throws DBExpectedError
1206
	 */
1207
	public function commitMasterChanges( $fname = __METHOD__ ) {
1208
		$failures = [];
1209
1210
		$restore = ( $this->trxRoundId !== false );
1211
		$this->trxRoundId = false;
1212
		$this->forEachOpenMasterConnection(
1213
			function ( DatabaseBase $conn ) use ( $fname, $restore, &$failures ) {
1214
				try {
1215
					if ( $conn->writesOrCallbacksPending() ) {
1216
						$conn->commit( $fname, $conn::FLUSHING_ALL_PEERS );
1217
					} elseif ( $restore ) {
1218
						$conn->clearSnapshot( $fname );
1219
					}
1220
				} catch ( DBError $e ) {
1221
					MWExceptionHandler::logException( $e );
1222
					$failures[] = "{$conn->getServer()}: {$e->getMessage()}";
1223
				}
1224
				if ( $restore ) {
1225
					$this->undoTransactionRoundFlags( $conn );
1226
				}
1227
			}
1228
		);
1229
1230 View Code Duplication
		if ( $failures ) {
1231
			throw new DBExpectedError(
1232
				null,
1233
				"$fname: Commit failed on server(s) " . implode( "\n", array_unique( $failures ) )
1234
			);
1235
		}
1236
	}
1237
1238
	/**
1239
	 * Issue all pending post-COMMIT/ROLLBACK callbacks
1240
	 * @param integer $type IDatabase::TRIGGER_* constant
1241
	 * @return Exception|null The first exception or null if there were none
1242
	 * @since 1.28
1243
	 */
1244
	public function runMasterPostTrxCallbacks( $type ) {
1245
		$e = null; // first exception
1246
		$this->forEachOpenMasterConnection( function ( DatabaseBase $conn ) use ( $type, &$e ) {
1247
			$conn->clearSnapshot( __METHOD__ ); // clear no-op transactions
1248
1249
			$conn->setTrxEndCallbackSuppression( false );
1250
			try {
1251
				$conn->runOnTransactionIdleCallbacks( $type );
1252
			} catch ( Exception $ex ) {
1253
				$e = $e ?: $ex;
1254
			}
1255
			try {
1256
				$conn->runTransactionListenerCallbacks( $type );
1257
			} catch ( Exception $ex ) {
1258
				$e = $e ?: $ex;
1259
			}
1260
		} );
1261
1262
		return $e;
1263
	}
1264
1265
	/**
1266
	 * Issue ROLLBACK only on master, only if queries were done on connection
1267
	 * @param string $fname Caller name
1268
	 * @throws DBExpectedError
1269
	 * @since 1.23
1270
	 */
1271
	public function rollbackMasterChanges( $fname = __METHOD__ ) {
1272
		$restore = ( $this->trxRoundId !== false );
1273
		$this->trxRoundId = false;
1274
		$this->forEachOpenMasterConnection(
1275
			function ( DatabaseBase $conn ) use ( $fname, $restore ) {
1276
				if ( $conn->writesOrCallbacksPending() ) {
1277
					$conn->rollback( $fname, $conn::FLUSHING_ALL_PEERS );
1278
				}
1279
				if ( $restore ) {
1280
					$this->undoTransactionRoundFlags( $conn );
1281
				}
1282
			}
1283
		);
1284
	}
1285
1286
	/**
1287
	 * Suppress all pending post-COMMIT/ROLLBACK callbacks
1288
	 * @return Exception|null The first exception or null if there were none
1289
	 * @since 1.28
1290
	 */
1291
	public function suppressTransactionEndCallbacks() {
1292
		$this->forEachOpenMasterConnection( function ( DatabaseBase $conn ) {
1293
			$conn->setTrxEndCallbackSuppression( true );
1294
		} );
1295
	}
1296
1297
	/**
1298
	 * @param DatabaseBase $conn
1299
	 */
1300
	private function applyTransactionRoundFlags( DatabaseBase $conn ) {
1301
		if ( $conn->getFlag( DBO_DEFAULT ) ) {
1302
			// DBO_TRX is controlled entirely by CLI mode presence with DBO_DEFAULT.
1303
			// Force DBO_TRX even in CLI mode since a commit round is expected soon.
1304
			$conn->setFlag( DBO_TRX, $conn::REMEMBER_PRIOR );
1305
			// If config has explicitly requested DBO_TRX be either on or off by not
1306
			// setting DBO_DEFAULT, then respect that. Forcing no transactions is useful
1307
			// for things like blob stores (ExternalStore) which want auto-commit mode.
1308
		}
1309
	}
1310
1311
	/**
1312
	 * @param DatabaseBase $conn
1313
	 */
1314
	private function undoTransactionRoundFlags( DatabaseBase $conn ) {
1315
		if ( $conn->getFlag( DBO_DEFAULT ) ) {
1316
			$conn->restoreFlags( $conn::RESTORE_PRIOR );
1317
		}
1318
	}
1319
1320
	/**
1321
	 * Commit all replica DB transactions so as to flush any REPEATABLE-READ or SSI snapshot
1322
	 *
1323
	 * @param string $fname Caller name
1324
	 * @since 1.28
1325
	 */
1326
	public function flushReplicaSnapshots( $fname = __METHOD__ ) {
1327
		$this->forEachOpenReplicaConnection( function ( DatabaseBase $conn ) {
1328
			$conn->clearSnapshot( __METHOD__ );
1329
		} );
1330
	}
1331
1332
	/**
1333
	 * @return bool Whether a master connection is already open
1334
	 * @since 1.24
1335
	 */
1336
	public function hasMasterConnection() {
1337
		return $this->isOpen( $this->getWriterIndex() );
1338
	}
1339
1340
	/**
1341
	 * Determine if there are pending changes in a transaction by this thread
1342
	 * @since 1.23
1343
	 * @return bool
1344
	 */
1345
	public function hasMasterChanges() {
1346
		$masterIndex = $this->getWriterIndex();
1347
		foreach ( $this->mConns as $conns2 ) {
1348
			if ( empty( $conns2[$masterIndex] ) ) {
1349
				continue;
1350
			}
1351
			/** @var DatabaseBase $conn */
1352
			foreach ( $conns2[$masterIndex] as $conn ) {
1353
				if ( $conn->trxLevel() && $conn->writesOrCallbacksPending() ) {
1354
					return true;
1355
				}
1356
			}
1357
		}
1358
		return false;
1359
	}
1360
1361
	/**
1362
	 * Get the timestamp of the latest write query done by this thread
1363
	 * @since 1.25
1364
	 * @return float|bool UNIX timestamp or false
1365
	 */
1366 View Code Duplication
	public function lastMasterChangeTimestamp() {
1367
		$lastTime = false;
1368
		$masterIndex = $this->getWriterIndex();
1369
		foreach ( $this->mConns as $conns2 ) {
1370
			if ( empty( $conns2[$masterIndex] ) ) {
1371
				continue;
1372
			}
1373
			/** @var DatabaseBase $conn */
1374
			foreach ( $conns2[$masterIndex] as $conn ) {
1375
				$lastTime = max( $lastTime, $conn->lastDoneWrites() );
1376
			}
1377
		}
1378
		return $lastTime;
1379
	}
1380
1381
	/**
1382
	 * Check if this load balancer object had any recent or still
1383
	 * pending writes issued against it by this PHP thread
1384
	 *
1385
	 * @param float $age How many seconds ago is "recent" [defaults to mWaitTimeout]
1386
	 * @return bool
1387
	 * @since 1.25
1388
	 */
1389
	public function hasOrMadeRecentMasterChanges( $age = null ) {
1390
		$age = ( $age === null ) ? $this->mWaitTimeout : $age;
1391
1392
		return ( $this->hasMasterChanges()
1393
			|| $this->lastMasterChangeTimestamp() > microtime( true ) - $age );
1394
	}
1395
1396
	/**
1397
	 * Get the list of callers that have pending master changes
1398
	 *
1399
	 * @return array
1400
	 * @since 1.27
1401
	 */
1402 View Code Duplication
	public function pendingMasterChangeCallers() {
1403
		$fnames = [];
1404
1405
		$masterIndex = $this->getWriterIndex();
1406
		foreach ( $this->mConns as $conns2 ) {
1407
			if ( empty( $conns2[$masterIndex] ) ) {
1408
				continue;
1409
			}
1410
			/** @var DatabaseBase $conn */
1411
			foreach ( $conns2[$masterIndex] as $conn ) {
1412
				$fnames = array_merge( $fnames, $conn->pendingWriteCallers() );
1413
			}
1414
		}
1415
1416
		return $fnames;
1417
	}
1418
1419
	/**
1420
	 * @note This method will trigger a DB connection if not yet done
1421
	 * @param string|bool $wiki Wiki ID, or false for the current wiki
1422
	 * @return bool Whether the generic connection for reads is highly "lagged"
1423
	 */
1424
	public function getLaggedReplicaMode( $wiki = false ) {
1425
		// No-op if there is only one DB (also avoids recursion)
1426
		if ( !$this->laggedReplicaMode && $this->getServerCount() > 1 ) {
1427
			try {
1428
				// See if laggedReplicaMode gets set
1429
				$conn = $this->getConnection( DB_REPLICA, false, $wiki );
1430
				$this->reuseConnection( $conn );
1431
			} catch ( DBConnectionError $e ) {
1432
				// Avoid expensive re-connect attempts and failures
1433
				$this->allReplicasDownMode = true;
1434
				$this->laggedReplicaMode = true;
1435
			}
1436
		}
1437
1438
		return $this->laggedReplicaMode;
1439
	}
1440
1441
	/**
1442
	 * @param bool $wiki
1443
	 * @return bool
1444
	 * @deprecated 1.28; use getLaggedReplicaMode()
1445
	 */
1446
	public function getLaggedSlaveMode( $wiki = false ) {
1447
		return $this->getLaggedReplicaMode( $wiki );
1448
	}
1449
1450
	/**
1451
	 * @note This method will never cause a new DB connection
1452
	 * @return bool Whether any generic connection used for reads was highly "lagged"
1453
	 * @since 1.28
1454
	 */
1455
	public function laggedReplicaUsed() {
1456
		return $this->laggedReplicaMode;
1457
	}
1458
1459
	/**
1460
	 * @return bool
1461
	 * @since 1.27
1462
	 * @deprecated Since 1.28; use laggedReplicaUsed()
1463
	 */
1464
	public function laggedSlaveUsed() {
1465
		return $this->laggedReplicaUsed();
1466
	}
1467
1468
	/**
1469
	 * @note This method may trigger a DB connection if not yet done
1470
	 * @param string|bool $wiki Wiki ID, or false for the current wiki
1471
	 * @param DatabaseBase|null DB master connection; used to avoid loops [optional]
1472
	 * @return string|bool Reason the master is read-only or false if it is not
1473
	 * @since 1.27
1474
	 */
1475
	public function getReadOnlyReason( $wiki = false, DatabaseBase $conn = null ) {
1476
		if ( $this->readOnlyReason !== false ) {
1477
			return $this->readOnlyReason;
1478
		} elseif ( $this->getLaggedReplicaMode( $wiki ) ) {
1479
			if ( $this->allReplicasDownMode ) {
1480
				return 'The database has been automatically locked ' .
1481
					'until the replica database servers become available';
1482
			} else {
1483
				return 'The database has been automatically locked ' .
1484
					'while the replica database servers catch up to the master.';
1485
			}
1486
		} elseif ( $this->masterRunningReadOnly( $wiki, $conn ) ) {
0 ignored issues
show
Bug introduced by
It seems like $wiki defined by parameter $wiki on line 1475 can also be of type boolean; however, LoadBalancer::masterRunningReadOnly() does only seem to accept string, maybe add an additional type check?

This check looks at variables that have been passed in as parameters and are passed out again to other methods.

If the outgoing method call has stricter type requirements than the method itself, an issue is raised.

An additional type check may prevent trouble.

Loading history...
1487
			return 'The database master is running in read-only mode.';
1488
		}
1489
1490
		return false;
1491
	}
1492
1493
	/**
1494
	 * @param string $wiki Wiki ID, or false for the current wiki
1495
	 * @param DatabaseBase|null DB master connectionl used to avoid loops [optional]
1496
	 * @return bool
1497
	 */
1498
	private function masterRunningReadOnly( $wiki, DatabaseBase $conn = null ) {
1499
		$cache = $this->wanCache;
1500
		$masterServer = $this->getServerName( $this->getWriterIndex() );
1501
1502
		return (bool)$cache->getWithSetCallback(
1503
			$cache->makeGlobalKey( __CLASS__, 'server-read-only', $masterServer ),
1504
			self::TTL_CACHE_READONLY,
1505
			function () use ( $wiki, $conn ) {
1506
				$this->trxProfiler->setSilenced( true );
1507
				try {
1508
					$dbw = $conn ?: $this->getConnection( DB_MASTER, [], $wiki );
1509
					$readOnly = (int)$dbw->serverIsReadOnly();
1510
				} catch ( DBError $e ) {
1511
					$readOnly = 0;
1512
				}
1513
				$this->trxProfiler->setSilenced( false );
1514
				return $readOnly;
1515
			},
1516
			[ 'pcTTL' => $cache::TTL_PROC_LONG, 'busyValue' => 0 ]
1517
		);
1518
	}
1519
1520
	/**
1521
	 * Disables/enables lag checks
1522
	 * @param null|bool $mode
1523
	 * @return bool
1524
	 */
1525
	public function allowLagged( $mode = null ) {
1526
		if ( $mode === null ) {
1527
			return $this->mAllowLagged;
1528
		}
1529
		$this->mAllowLagged = $mode;
1530
1531
		return $this->mAllowLagged;
1532
	}
1533
1534
	/**
1535
	 * @return bool
1536
	 */
1537
	public function pingAll() {
1538
		$success = true;
1539
		$this->forEachOpenConnection( function ( DatabaseBase $conn ) use ( &$success ) {
1540
			if ( !$conn->ping() ) {
1541
				$success = false;
1542
			}
1543
		} );
1544
1545
		return $success;
1546
	}
1547
1548
	/**
1549
	 * Call a function with each open connection object
1550
	 * @param callable $callback
1551
	 * @param array $params
1552
	 */
1553 View Code Duplication
	public function forEachOpenConnection( $callback, array $params = [] ) {
1554
		foreach ( $this->mConns as $connsByServer ) {
1555
			foreach ( $connsByServer as $serverConns ) {
1556
				foreach ( $serverConns as $conn ) {
1557
					$mergedParams = array_merge( [ $conn ], $params );
1558
					call_user_func_array( $callback, $mergedParams );
1559
				}
1560
			}
1561
		}
1562
	}
1563
1564
	/**
1565
	 * Call a function with each open connection object to a master
1566
	 * @param callable $callback
1567
	 * @param array $params
1568
	 * @since 1.28
1569
	 */
1570
	public function forEachOpenMasterConnection( $callback, array $params = [] ) {
1571
		$masterIndex = $this->getWriterIndex();
1572
		foreach ( $this->mConns as $connsByServer ) {
1573
			if ( isset( $connsByServer[$masterIndex] ) ) {
1574
				/** @var DatabaseBase $conn */
1575
				foreach ( $connsByServer[$masterIndex] as $conn ) {
1576
					$mergedParams = array_merge( [ $conn ], $params );
1577
					call_user_func_array( $callback, $mergedParams );
1578
				}
1579
			}
1580
		}
1581
	}
1582
1583
	/**
1584
	 * Call a function with each open replica DB connection object
1585
	 * @param callable $callback
1586
	 * @param array $params
1587
	 * @since 1.28
1588
	 */
1589 View Code Duplication
	public function forEachOpenReplicaConnection( $callback, array $params = [] ) {
1590
		foreach ( $this->mConns as $connsByServer ) {
1591
			foreach ( $connsByServer as $i => $serverConns ) {
1592
				if ( $i === $this->getWriterIndex() ) {
1593
					continue; // skip master
1594
				}
1595
				foreach ( $serverConns as $conn ) {
1596
					$mergedParams = array_merge( [ $conn ], $params );
1597
					call_user_func_array( $callback, $mergedParams );
1598
				}
1599
			}
1600
		}
1601
	}
1602
1603
	/**
1604
	 * Get the hostname and lag time of the most-lagged replica DB
1605
	 *
1606
	 * This is useful for maintenance scripts that need to throttle their updates.
1607
	 * May attempt to open connections to replica DBs on the default DB. If there is
1608
	 * no lag, the maximum lag will be reported as -1.
1609
	 *
1610
	 * @param bool|string $wiki Wiki ID, or false for the default database
1611
	 * @return array ( host, max lag, index of max lagged host )
1612
	 */
1613
	public function getMaxLag( $wiki = false ) {
1614
		$maxLag = -1;
1615
		$host = '';
1616
		$maxIndex = 0;
1617
1618
		if ( $this->getServerCount() <= 1 ) {
1619
			return [ $host, $maxLag, $maxIndex ]; // no replication = no lag
1620
		}
1621
1622
		$lagTimes = $this->getLagTimes( $wiki );
1623
		foreach ( $lagTimes as $i => $lag ) {
1624
			if ( $this->mLoads[$i] > 0 && $lag > $maxLag ) {
1625
				$maxLag = $lag;
1626
				$host = $this->mServers[$i]['host'];
1627
				$maxIndex = $i;
1628
			}
1629
		}
1630
1631
		return [ $host, $maxLag, $maxIndex ];
1632
	}
1633
1634
	/**
1635
	 * Get an estimate of replication lag (in seconds) for each server
1636
	 *
1637
	 * Results are cached for a short time in memcached/process cache
1638
	 *
1639
	 * Values may be "false" if replication is too broken to estimate
1640
	 *
1641
	 * @param string|bool $wiki
1642
	 * @return int[] Map of (server index => float|int|bool)
1643
	 */
1644
	public function getLagTimes( $wiki = false ) {
1645
		if ( $this->getServerCount() <= 1 ) {
1646
			return [ 0 => 0 ]; // no replication = no lag
1647
		}
1648
1649
		# Send the request to the load monitor
1650
		return $this->getLoadMonitor()->getLagTimes( array_keys( $this->mServers ), $wiki );
0 ignored issues
show
Bug introduced by
It seems like $wiki defined by parameter $wiki on line 1644 can also be of type boolean; however, LoadMonitor::getLagTimes() does only seem to accept string, maybe add an additional type check?

This check looks at variables that have been passed in as parameters and are passed out again to other methods.

If the outgoing method call has stricter type requirements than the method itself, an issue is raised.

An additional type check may prevent trouble.

Loading history...
1651
	}
1652
1653
	/**
1654
	 * Get the lag in seconds for a given connection, or zero if this load
1655
	 * balancer does not have replication enabled.
1656
	 *
1657
	 * This should be used in preference to Database::getLag() in cases where
1658
	 * replication may not be in use, since there is no way to determine if
1659
	 * replication is in use at the connection level without running
1660
	 * potentially restricted queries such as SHOW SLAVE STATUS. Using this
1661
	 * function instead of Database::getLag() avoids a fatal error in this
1662
	 * case on many installations.
1663
	 *
1664
	 * @param IDatabase $conn
1665
	 * @return int|bool Returns false on error
1666
	 */
1667
	public function safeGetLag( IDatabase $conn ) {
1668
		if ( $this->getServerCount() == 1 ) {
1669
			return 0;
1670
		} else {
1671
			return $conn->getLag();
1672
		}
1673
	}
1674
1675
	/**
1676
	 * Wait for a replica DB to reach a specified master position
1677
	 *
1678
	 * This will connect to the master to get an accurate position if $pos is not given
1679
	 *
1680
	 * @param IDatabase $conn Replica DB
1681
	 * @param DBMasterPos|bool $pos Master position; default: current position
1682
	 * @param integer $timeout Timeout in seconds
1683
	 * @return bool Success
1684
	 * @since 1.27
1685
	 */
1686
	public function safeWaitForMasterPos( IDatabase $conn, $pos = false, $timeout = 10 ) {
1687
		if ( $this->getServerCount() == 1 || !$conn->getLBInfo( 'replica' ) ) {
1688
			return true; // server is not a replica DB
1689
		}
1690
1691
		$pos = $pos ?: $this->getConnection( DB_MASTER )->getMasterPos();
1692
		if ( !( $pos instanceof DBMasterPos ) ) {
1693
			return false; // something is misconfigured
1694
		}
1695
1696
		$result = $conn->masterPosWait( $pos, $timeout );
1697
		if ( $result == -1 || is_null( $result ) ) {
1698
			$msg = __METHOD__ . ": Timed out waiting on {$conn->getServer()} pos {$pos}";
1699
			wfDebugLog( 'replication', "$msg\n" );
1700
			wfDebugLog( 'DBPerformance', "$msg:\n" . wfBacktrace( true ) );
1701
			$ok = false;
1702
		} else {
1703
			wfDebugLog( 'replication', __METHOD__ . ": Done\n" );
1704
			$ok = true;
1705
		}
1706
1707
		return $ok;
1708
	}
1709
1710
	/**
1711
	 * Clear the cache for slag lag delay times
1712
	 *
1713
	 * This is only used for testing
1714
	 */
1715
	public function clearLagTimeCache() {
1716
		$this->getLoadMonitor()->clearCaches();
1717
	}
1718
1719
	/**
1720
	 * Set a callback via DatabaseBase::setTransactionListener() on
1721
	 * all current and future master connections of this load balancer
1722
	 *
1723
	 * @param string $name Callback name
1724
	 * @param callable|null $callback
1725
	 * @since 1.28
1726
	 */
1727
	public function setTransactionListener( $name, callable $callback = null ) {
1728
		if ( $callback ) {
1729
			$this->trxRecurringCallbacks[$name] = $callback;
1730
		} else {
1731
			unset( $this->trxRecurringCallbacks[$name] );
1732
		}
1733
		$this->forEachOpenMasterConnection(
1734
			function ( DatabaseBase $conn ) use ( $name, $callback ) {
1735
				$conn->setTransactionListener( $name, $callback );
1736
			}
1737
		);
1738
	}
1739
}
1740