Completed
Branch master (7e350b)
by
unknown
30:36
created

LoadBalancer::getLaggedReplicaMode()   A

Complexity

Conditions 4
Paths 4

Size

Total Lines 16
Code Lines 9

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 4
eloc 9
c 0
b 0
f 0
nc 4
nop 1
dl 0
loc 16
rs 9.2
1
<?php
2
/**
3
 * Database load balancing.
4
 *
5
 * This program is free software; you can redistribute it and/or modify
6
 * it under the terms of the GNU General Public License as published by
7
 * the Free Software Foundation; either version 2 of the License, or
8
 * (at your option) any later version.
9
 *
10
 * This program is distributed in the hope that it will be useful,
11
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13
 * GNU General Public License for more details.
14
 *
15
 * You should have received a copy of the GNU General Public License along
16
 * with this program; if not, write to the Free Software Foundation, Inc.,
17
 * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
18
 * http://www.gnu.org/copyleft/gpl.html
19
 *
20
 * @file
21
 * @ingroup Database
22
 */
23
24
/**
25
 * Database load balancing object
26
 *
27
 * @todo document
28
 * @ingroup Database
29
 */
30
class LoadBalancer {
31
	/** @var array[] Map of (server index => server config array) */
32
	private $mServers;
33
	/** @var array[] Map of (local/foreignUsed/foreignFree => server index => DatabaseBase array) */
34
	private $mConns;
35
	/** @var array Map of (server index => weight) */
36
	private $mLoads;
37
	/** @var array[] Map of (group => server index => weight) */
38
	private $mGroupLoads;
39
	/** @var bool Whether to disregard replica DB lag as a factor in replica DB selection */
40
	private $mAllowLagged;
41
	/** @var integer Seconds to spend waiting on replica DB lag to resolve */
42
	private $mWaitTimeout;
43
	/** @var array LBFactory information */
44
	private $mParentInfo;
45
46
	/** @var string The LoadMonitor subclass name */
47
	private $mLoadMonitorClass;
48
	/** @var LoadMonitor */
49
	private $mLoadMonitor;
50
	/** @var BagOStuff */
51
	private $srvCache;
52
	/** @var WANObjectCache */
53
	private $wanCache;
54
	/** @var TransactionProfiler */
55
	protected $trxProfiler;
56
57
	/** @var bool|DatabaseBase Database connection that caused a problem */
58
	private $mErrorConnection;
59
	/** @var integer The generic (not query grouped) replica DB index (of $mServers) */
60
	private $mReadIndex;
61
	/** @var bool|DBMasterPos False if not set */
62
	private $mWaitForPos;
63
	/** @var bool Whether the generic reader fell back to a lagged replica DB */
64
	private $laggedReplicaMode = false;
65
	/** @var bool Whether the generic reader fell back to a lagged replica DB */
66
	private $allReplicasDownMode = false;
67
	/** @var string The last DB selection or connection error */
68
	private $mLastError = 'Unknown error';
69
	/** @var string|bool Reason the LB is read-only or false if not */
70
	private $readOnlyReason = false;
71
	/** @var integer Total connections opened */
72
	private $connsOpened = 0;
73
	/** @var string|bool String if a requested DBO_TRX transaction round is active */
74
	private $trxRoundId = false;
75
	/** @var array[] Map of (name => callable) */
76
	private $trxRecurringCallbacks = [];
77
78
	/** @var integer Warn when this many connection are held */
79
	const CONN_HELD_WARN_THRESHOLD = 10;
80
	/** @var integer Default 'max lag' when unspecified */
81
	const MAX_LAG = 10;
82
	/** @var integer Max time to wait for a replica DB to catch up (e.g. ChronologyProtector) */
83
	const POS_WAIT_TIMEOUT = 10;
84
	/** @var integer Seconds to cache master server read-only status */
85
	const TTL_CACHE_READONLY = 5;
86
87
	/**
88
	 * @var boolean
89
	 */
90
	private $disabled = false;
91
92
	/**
93
	 * @param array $params Array with keys:
94
	 *  - servers : Required. Array of server info structures.
95
	 *  - loadMonitor : Name of a class used to fetch server lag and load.
96
	 *  - readOnlyReason : Reason the master DB is read-only if so [optional]
97
	 *  - srvCache : BagOStuff object [optional]
98
	 *  - wanCache : WANObjectCache object [optional]
99
	 * @throws MWException
100
	 */
101
	public function __construct( array $params ) {
102
		if ( !isset( $params['servers'] ) ) {
103
			throw new MWException( __CLASS__ . ': missing servers parameter' );
104
		}
105
		$this->mServers = $params['servers'];
106
		$this->mWaitTimeout = self::POS_WAIT_TIMEOUT;
107
108
		$this->mReadIndex = -1;
109
		$this->mWriteIndex = -1;
0 ignored issues
show
Bug introduced by
The property mWriteIndex does not exist. Did you maybe forget to declare it?

In PHP it is possible to write to properties without declaring them. For example, the following is perfectly valid PHP code:

class MyClass { }

$x = new MyClass();
$x->foo = true;

Generally, it is a good practice to explictly declare properties to avoid accidental typos and provide IDE auto-completion:

class MyClass {
    public $foo;
}

$x = new MyClass();
$x->foo = true;
Loading history...
110
		$this->mConns = [
0 ignored issues
show
Documentation Bug introduced by
It seems like array('local' => array()...oreignFree' => array()) of type array<string,array,{"loc..."foreignFree":"array"}> is incompatible with the declared type array<integer,array> of property $mConns.

Our type inference engine has found an assignment to a property that is incompatible with the declared type of that property.

Either this assignment is in error or the assigned type should be added to the documentation/type hint for that property..

Loading history...
111
			'local' => [],
112
			'foreignUsed' => [],
113
			'foreignFree' => [] ];
114
		$this->mLoads = [];
115
		$this->mWaitForPos = false;
116
		$this->mErrorConnection = false;
117
		$this->mAllowLagged = false;
118
119 View Code Duplication
		if ( isset( $params['readOnlyReason'] ) && is_string( $params['readOnlyReason'] ) ) {
120
			$this->readOnlyReason = $params['readOnlyReason'];
121
		}
122
123
		if ( isset( $params['loadMonitor'] ) ) {
124
			$this->mLoadMonitorClass = $params['loadMonitor'];
125
		} else {
126
			$master = reset( $params['servers'] );
127
			if ( isset( $master['type'] ) && $master['type'] === 'mysql' ) {
128
				$this->mLoadMonitorClass = 'LoadMonitorMySQL';
129
			} else {
130
				$this->mLoadMonitorClass = 'LoadMonitorNull';
131
			}
132
		}
133
134
		foreach ( $params['servers'] as $i => $server ) {
135
			$this->mLoads[$i] = $server['load'];
136
			if ( isset( $server['groupLoads'] ) ) {
137
				foreach ( $server['groupLoads'] as $group => $ratio ) {
138
					if ( !isset( $this->mGroupLoads[$group] ) ) {
139
						$this->mGroupLoads[$group] = [];
140
					}
141
					$this->mGroupLoads[$group][$i] = $ratio;
142
				}
143
			}
144
		}
145
146
		if ( isset( $params['srvCache'] ) ) {
147
			$this->srvCache = $params['srvCache'];
148
		} else {
149
			$this->srvCache = new EmptyBagOStuff();
150
		}
151
		if ( isset( $params['wanCache'] ) ) {
152
			$this->wanCache = $params['wanCache'];
153
		} else {
154
			$this->wanCache = WANObjectCache::newEmpty();
155
		}
156
		if ( isset( $params['trxProfiler'] ) ) {
157
			$this->trxProfiler = $params['trxProfiler'];
158
		} else {
159
			$this->trxProfiler = new TransactionProfiler();
160
		}
161
	}
162
163
	/**
164
	 * Get a LoadMonitor instance
165
	 *
166
	 * @return LoadMonitor
167
	 */
168
	private function getLoadMonitor() {
169
		if ( !isset( $this->mLoadMonitor ) ) {
170
			$class = $this->mLoadMonitorClass;
171
			$this->mLoadMonitor = new $class( $this );
172
		}
173
174
		return $this->mLoadMonitor;
175
	}
176
177
	/**
178
	 * Get or set arbitrary data used by the parent object, usually an LBFactory
179
	 * @param mixed $x
180
	 * @return mixed
181
	 */
182
	public function parentInfo( $x = null ) {
183
		return wfSetVar( $this->mParentInfo, $x );
184
	}
185
186
	/**
187
	 * @param array $loads
188
	 * @param bool|string $wiki Wiki to get non-lagged for
189
	 * @param int $maxLag Restrict the maximum allowed lag to this many seconds
190
	 * @return bool|int|string
191
	 */
192
	private function getRandomNonLagged( array $loads, $wiki = false, $maxLag = self::MAX_LAG ) {
193
		$lags = $this->getLagTimes( $wiki );
194
195
		# Unset excessively lagged servers
196
		foreach ( $lags as $i => $lag ) {
197
			if ( $i != 0 ) {
198
				$maxServerLag = $maxLag;
199 View Code Duplication
				if ( isset( $this->mServers[$i]['max lag'] ) ) {
200
					$maxServerLag = min( $maxServerLag, $this->mServers[$i]['max lag'] );
201
				}
202
203
				$host = $this->getServerName( $i );
204
				if ( $lag === false ) {
205
					wfDebugLog( 'replication', "Server $host (#$i) is not replicating?" );
206
					unset( $loads[$i] );
207
				} elseif ( $lag > $maxServerLag ) {
208
					wfDebugLog( 'replication', "Server $host (#$i) has >= $lag seconds of lag" );
209
					unset( $loads[$i] );
210
				}
211
			}
212
		}
213
214
		# Find out if all the replica DBs with non-zero load are lagged
215
		$sum = 0;
216
		foreach ( $loads as $load ) {
217
			$sum += $load;
218
		}
219
		if ( $sum == 0 ) {
220
			# No appropriate DB servers except maybe the master and some replica DBs with zero load
221
			# Do NOT use the master
222
			# Instead, this function will return false, triggering read-only mode,
223
			# and a lagged replica DB will be used instead.
224
			return false;
225
		}
226
227
		if ( count( $loads ) == 0 ) {
228
			return false;
229
		}
230
231
		# Return a random representative of the remainder
232
		return ArrayUtils::pickRandom( $loads );
233
	}
234
235
	/**
236
	 * Get the index of the reader connection, which may be a replica DB
237
	 * This takes into account load ratios and lag times. It should
238
	 * always return a consistent index during a given invocation
239
	 *
240
	 * Side effect: opens connections to databases
241
	 * @param string|bool $group Query group, or false for the generic reader
242
	 * @param string|bool $wiki Wiki ID, or false for the current wiki
243
	 * @throws MWException
244
	 * @return bool|int|string
245
	 */
246
	public function getReaderIndex( $group = false, $wiki = false ) {
247
		global $wgDBtype;
248
249
		# @todo FIXME: For now, only go through all this for mysql databases
250
		if ( $wgDBtype != 'mysql' ) {
251
			return $this->getWriterIndex();
252
		}
253
254
		if ( count( $this->mServers ) == 1 ) {
255
			# Skip the load balancing if there's only one server
256
			return 0;
257
		} elseif ( $group === false && $this->mReadIndex >= 0 ) {
258
			# Shortcut if generic reader exists already
259
			return $this->mReadIndex;
260
		}
261
262
		# Find the relevant load array
263
		if ( $group !== false ) {
264
			if ( isset( $this->mGroupLoads[$group] ) ) {
265
				$nonErrorLoads = $this->mGroupLoads[$group];
266
			} else {
267
				# No loads for this group, return false and the caller can use some other group
268
				wfDebugLog( 'connect', __METHOD__ . ": no loads for group $group\n" );
269
270
				return false;
271
			}
272
		} else {
273
			$nonErrorLoads = $this->mLoads;
274
		}
275
276
		if ( !count( $nonErrorLoads ) ) {
277
			throw new MWException( "Empty server array given to LoadBalancer" );
278
		}
279
280
		# Scale the configured load ratios according to the dynamic load (if the load monitor supports it)
281
		$this->getLoadMonitor()->scaleLoads( $nonErrorLoads, $group, $wiki );
282
283
		$laggedReplicaMode = false;
284
285
		# No server found yet
286
		$i = false;
287
		# First try quickly looking through the available servers for a server that
288
		# meets our criteria
289
		$currentLoads = $nonErrorLoads;
290
		while ( count( $currentLoads ) ) {
291
			if ( $this->mAllowLagged || $laggedReplicaMode ) {
292
				$i = ArrayUtils::pickRandom( $currentLoads );
293
			} else {
294
				$i = false;
295
				if ( $this->mWaitForPos && $this->mWaitForPos->asOfTime() ) {
296
					# ChronologyProtecter causes mWaitForPos to be set via sessions.
297
					# This triggers doWait() after connect, so it's especially good to
298
					# avoid lagged servers so as to avoid just blocking in that method.
299
					$ago = microtime( true ) - $this->mWaitForPos->asOfTime();
300
					# Aim for <= 1 second of waiting (being too picky can backfire)
301
					$i = $this->getRandomNonLagged( $currentLoads, $wiki, $ago + 1 );
302
				}
303
				if ( $i === false ) {
304
					# Any server with less lag than it's 'max lag' param is preferable
305
					$i = $this->getRandomNonLagged( $currentLoads, $wiki );
306
				}
307
				if ( $i === false && count( $currentLoads ) != 0 ) {
308
					# All replica DBs lagged. Switch to read-only mode
309
					wfDebugLog( 'replication', "All replica DBs lagged. Switch to read-only mode" );
310
					$i = ArrayUtils::pickRandom( $currentLoads );
311
					$laggedReplicaMode = true;
312
				}
313
			}
314
315
			if ( $i === false ) {
316
				# pickRandom() returned false
317
				# This is permanent and means the configuration or the load monitor
318
				# wants us to return false.
319
				wfDebugLog( 'connect', __METHOD__ . ": pickRandom() returned false" );
320
321
				return false;
322
			}
323
324
			$serverName = $this->getServerName( $i );
325
			wfDebugLog( 'connect', __METHOD__ . ": Using reader #$i: $serverName..." );
326
327
			$conn = $this->openConnection( $i, $wiki );
328
			if ( !$conn ) {
329
				wfDebugLog( 'connect', __METHOD__ . ": Failed connecting to $i/$wiki" );
330
				unset( $nonErrorLoads[$i] );
331
				unset( $currentLoads[$i] );
332
				$i = false;
333
				continue;
334
			}
335
336
			// Decrement reference counter, we are finished with this connection.
337
			// It will be incremented for the caller later.
338
			if ( $wiki !== false ) {
339
				$this->reuseConnection( $conn );
340
			}
341
342
			# Return this server
343
			break;
344
		}
345
346
		# If all servers were down, quit now
347
		if ( !count( $nonErrorLoads ) ) {
348
			wfDebugLog( 'connect', "All servers down" );
349
		}
350
351
		if ( $i !== false ) {
352
			# Replica DB connection successful.
353
			# Wait for the session master pos for a short time.
354
			if ( $this->mWaitForPos && $i > 0 ) {
355
				$this->doWait( $i );
356
			}
357
			if ( $this->mReadIndex <= 0 && $this->mLoads[$i] > 0 && $group === false ) {
358
				$this->mReadIndex = $i;
0 ignored issues
show
Documentation Bug introduced by
It seems like $i can also be of type string. However, the property $mReadIndex is declared as type integer. Maybe add an additional type check?

Our type inference engine has found a suspicous assignment of a value to a property. This check raises an issue when a value that can be of a mixed type is assigned to a property that is type hinted more strictly.

For example, imagine you have a variable $accountId that can either hold an Id object or false (if there is no account id yet). Your code now assigns that value to the id property of an instance of the Account class. This class holds a proper account, so the id value must no longer be false.

Either this assignment is in error or a type check should be added for that assignment.

class Id
{
    public $id;

    public function __construct($id)
    {
        $this->id = $id;
    }

}

class Account
{
    /** @var  Id $id */
    public $id;
}

$account_id = false;

if (starsAreRight()) {
    $account_id = new Id(42);
}

$account = new Account();
if ($account instanceof Id)
{
    $account->id = $account_id;
}
Loading history...
359
				# Record if the generic reader index is in "lagged replica DB" mode
360
				if ( $laggedReplicaMode ) {
361
					$this->laggedReplicaMode = true;
362
				}
363
			}
364
			$serverName = $this->getServerName( $i );
365
			wfDebugLog( 'connect', __METHOD__ .
366
				": using server $serverName for group '$group'\n" );
367
		}
368
369
		return $i;
370
	}
371
372
	/**
373
	 * Set the master wait position
374
	 * If a DB_REPLICA connection has been opened already, waits
375
	 * Otherwise sets a variable telling it to wait if such a connection is opened
376
	 * @param DBMasterPos $pos
377
	 */
378
	public function waitFor( $pos ) {
379
		$this->mWaitForPos = $pos;
380
		$i = $this->mReadIndex;
381
382
		if ( $i > 0 ) {
383
			if ( !$this->doWait( $i ) ) {
384
				$this->laggedReplicaMode = true;
385
			}
386
		}
387
	}
388
389
	/**
390
	 * Set the master wait position and wait for a "generic" replica DB to catch up to it
391
	 *
392
	 * This can be used a faster proxy for waitForAll()
393
	 *
394
	 * @param DBMasterPos $pos
395
	 * @param int $timeout Max seconds to wait; default is mWaitTimeout
396
	 * @return bool Success (able to connect and no timeouts reached)
397
	 * @since 1.26
398
	 */
399
	public function waitForOne( $pos, $timeout = null ) {
400
		$this->mWaitForPos = $pos;
401
402
		$i = $this->mReadIndex;
403
		if ( $i <= 0 ) {
404
			// Pick a generic replica DB if there isn't one yet
405
			$readLoads = $this->mLoads;
406
			unset( $readLoads[$this->getWriterIndex()] ); // replica DBs only
407
			$readLoads = array_filter( $readLoads ); // with non-zero load
408
			$i = ArrayUtils::pickRandom( $readLoads );
409
		}
410
411 View Code Duplication
		if ( $i > 0 ) {
412
			$ok = $this->doWait( $i, true, $timeout );
413
		} else {
414
			$ok = true; // no applicable loads
415
		}
416
417
		return $ok;
418
	}
419
420
	/**
421
	 * Set the master wait position and wait for ALL replica DBs to catch up to it
422
	 * @param DBMasterPos $pos
423
	 * @param int $timeout Max seconds to wait; default is mWaitTimeout
424
	 * @return bool Success (able to connect and no timeouts reached)
425
	 */
426
	public function waitForAll( $pos, $timeout = null ) {
427
		$this->mWaitForPos = $pos;
428
		$serverCount = count( $this->mServers );
429
430
		$ok = true;
431
		for ( $i = 1; $i < $serverCount; $i++ ) {
432 View Code Duplication
			if ( $this->mLoads[$i] > 0 ) {
433
				$ok = $this->doWait( $i, true, $timeout ) && $ok;
434
			}
435
		}
436
437
		return $ok;
438
	}
439
440
	/**
441
	 * Get any open connection to a given server index, local or foreign
442
	 * Returns false if there is no connection open
443
	 *
444
	 * @param int $i
445
	 * @return DatabaseBase|bool False on failure
446
	 */
447
	public function getAnyOpenConnection( $i ) {
448
		foreach ( $this->mConns as $conns ) {
449
			if ( !empty( $conns[$i] ) ) {
450
				return reset( $conns[$i] );
451
			}
452
		}
453
454
		return false;
455
	}
456
457
	/**
458
	 * Wait for a given replica DB to catch up to the master pos stored in $this
459
	 * @param int $index Server index
460
	 * @param bool $open Check the server even if a new connection has to be made
461
	 * @param int $timeout Max seconds to wait; default is mWaitTimeout
462
	 * @return bool
463
	 */
464
	protected function doWait( $index, $open = false, $timeout = null ) {
465
		$close = false; // close the connection afterwards
466
467
		// Check if we already know that the DB has reached this point
468
		$server = $this->getServerName( $index );
469
		$key = $this->srvCache->makeGlobalKey( __CLASS__, 'last-known-pos', $server );
470
		/** @var DBMasterPos $knownReachedPos */
471
		$knownReachedPos = $this->srvCache->get( $key );
472
		if ( $knownReachedPos && $knownReachedPos->hasReached( $this->mWaitForPos ) ) {
0 ignored issues
show
Bug introduced by
It seems like $this->mWaitForPos can also be of type boolean; however, DBMasterPos::hasReached() does only seem to accept object<DBMasterPos>, maybe add an additional type check?

If a method or function can return multiple different values and unless you are sure that you only can receive a single value in this context, we recommend to add an additional type check:

/**
 * @return array|string
 */
function returnsDifferentValues($x) {
    if ($x) {
        return 'foo';
    }

    return array();
}

$x = returnsDifferentValues($y);
if (is_array($x)) {
    // $x is an array.
}

If this a common case that PHP Analyzer should handle natively, please let us know by opening an issue.

Loading history...
473
			wfDebugLog( 'replication', __METHOD__ .
474
				": replica DB $server known to be caught up (pos >= $knownReachedPos).\n" );
475
			return true;
476
		}
477
478
		// Find a connection to wait on, creating one if needed and allowed
479
		$conn = $this->getAnyOpenConnection( $index );
480
		if ( !$conn ) {
481
			if ( !$open ) {
482
				wfDebugLog( 'replication', __METHOD__ . ": no connection open for $server\n" );
483
484
				return false;
485
			} else {
486
				$conn = $this->openConnection( $index, '' );
487
				if ( !$conn ) {
488
					wfDebugLog( 'replication', __METHOD__ . ": failed to connect to $server\n" );
489
490
					return false;
491
				}
492
				// Avoid connection spam in waitForAll() when connections
493
				// are made just for the sake of doing this lag check.
494
				$close = true;
495
			}
496
		}
497
498
		wfDebugLog( 'replication', __METHOD__ . ": Waiting for replica DB $server to catch up...\n" );
499
		$timeout = $timeout ?: $this->mWaitTimeout;
500
		$result = $conn->masterPosWait( $this->mWaitForPos, $timeout );
0 ignored issues
show
Bug introduced by
It seems like $this->mWaitForPos can also be of type boolean; however, DatabaseBase::masterPosWait() does only seem to accept object<DBMasterPos>, maybe add an additional type check?

If a method or function can return multiple different values and unless you are sure that you only can receive a single value in this context, we recommend to add an additional type check:

/**
 * @return array|string
 */
function returnsDifferentValues($x) {
    if ($x) {
        return 'foo';
    }

    return array();
}

$x = returnsDifferentValues($y);
if (is_array($x)) {
    // $x is an array.
}

If this a common case that PHP Analyzer should handle natively, please let us know by opening an issue.

Loading history...
501
502
		if ( $result == -1 || is_null( $result ) ) {
503
			// Timed out waiting for replica DB, use master instead
504
			$msg = __METHOD__ . ": Timed out waiting on $server pos {$this->mWaitForPos}";
505
			wfDebugLog( 'replication', "$msg\n" );
506
			wfDebugLog( 'DBPerformance', "$msg:\n" . wfBacktrace( true ) );
507
			$ok = false;
508
		} else {
509
			wfDebugLog( 'replication', __METHOD__ . ": Done\n" );
510
			$ok = true;
511
			// Remember that the DB reached this point
512
			$this->srvCache->set( $key, $this->mWaitForPos, BagOStuff::TTL_DAY );
513
		}
514
515
		if ( $close ) {
516
			$this->closeConnection( $conn );
0 ignored issues
show
Bug introduced by
It seems like $conn can also be of type boolean; however, LoadBalancer::closeConnection() does only seem to accept object<DatabaseBase>, maybe add an additional type check?

If a method or function can return multiple different values and unless you are sure that you only can receive a single value in this context, we recommend to add an additional type check:

/**
 * @return array|string
 */
function returnsDifferentValues($x) {
    if ($x) {
        return 'foo';
    }

    return array();
}

$x = returnsDifferentValues($y);
if (is_array($x)) {
    // $x is an array.
}

If this a common case that PHP Analyzer should handle natively, please let us know by opening an issue.

Loading history...
517
		}
518
519
		return $ok;
520
	}
521
522
	/**
523
	 * Get a connection by index
524
	 * This is the main entry point for this class.
525
	 *
526
	 * @param int $i Server index
527
	 * @param array|string|bool $groups Query group(s), or false for the generic reader
528
	 * @param string|bool $wiki Wiki ID, or false for the current wiki
529
	 *
530
	 * @throws MWException
531
	 * @return DatabaseBase
532
	 */
533
	public function getConnection( $i, $groups = [], $wiki = false ) {
534
		if ( $i === null || $i === false ) {
535
			throw new MWException( 'Attempt to call ' . __METHOD__ .
536
				' with invalid server index' );
537
		}
538
539
		if ( $wiki === wfWikiID() ) {
540
			$wiki = false;
541
		}
542
543
		$groups = ( $groups === false || $groups === [] )
544
			? [ false ] // check one "group": the generic pool
545
			: (array)$groups;
546
547
		$masterOnly = ( $i == DB_MASTER || $i == $this->getWriterIndex() );
548
		$oldConnsOpened = $this->connsOpened; // connections open now
549
550
		if ( $i == DB_MASTER ) {
551
			$i = $this->getWriterIndex();
552
		} else {
553
			# Try to find an available server in any the query groups (in order)
554
			foreach ( $groups as $group ) {
555
				$groupIndex = $this->getReaderIndex( $group, $wiki );
556
				if ( $groupIndex !== false ) {
557
					$i = $groupIndex;
558
					break;
559
				}
560
			}
561
		}
562
563
		# Operation-based index
564
		if ( $i == DB_REPLICA ) {
565
			$this->mLastError = 'Unknown error'; // reset error string
566
			# Try the general server pool if $groups are unavailable.
567
			$i = in_array( false, $groups, true )
568
				? false // don't bother with this if that is what was tried above
569
				: $this->getReaderIndex( false, $wiki );
570
			# Couldn't find a working server in getReaderIndex()?
571
			if ( $i === false ) {
572
				$this->mLastError = 'No working replica DB server: ' . $this->mLastError;
573
574
				return $this->reportConnectionError();
575
			}
576
		}
577
578
		# Now we have an explicit index into the servers array
579
		$conn = $this->openConnection( $i, $wiki );
580
		if ( !$conn ) {
581
			return $this->reportConnectionError();
582
		}
583
584
		# Profile any new connections that happen
585
		if ( $this->connsOpened > $oldConnsOpened ) {
586
			$host = $conn->getServer();
587
			$dbname = $conn->getDBname();
588
			$trxProf = Profiler::instance()->getTransactionProfiler();
589
			$trxProf->recordConnection( $host, $dbname, $masterOnly );
590
		}
591
592
		if ( $masterOnly ) {
593
			# Make master-requested DB handles inherit any read-only mode setting
594
			$conn->setLBInfo( 'readOnlyReason', $this->getReadOnlyReason( $wiki, $conn ) );
0 ignored issues
show
Bug introduced by
It seems like $conn defined by $this->openConnection($i, $wiki) on line 579 can also be of type boolean; however, LoadBalancer::getReadOnlyReason() does only seem to accept null|object<DatabaseBase>, maybe add an additional type check?

If a method or function can return multiple different values and unless you are sure that you only can receive a single value in this context, we recommend to add an additional type check:

/**
 * @return array|string
 */
function returnsDifferentValues($x) {
    if ($x) {
        return 'foo';
    }

    return array();
}

$x = returnsDifferentValues($y);
if (is_array($x)) {
    // $x is an array.
}

If this a common case that PHP Analyzer should handle natively, please let us know by opening an issue.

Loading history...
595
		}
596
597
		return $conn;
598
	}
599
600
	/**
601
	 * Mark a foreign connection as being available for reuse under a different
602
	 * DB name or prefix. This mechanism is reference-counted, and must be called
603
	 * the same number of times as getConnection() to work.
604
	 *
605
	 * @param DatabaseBase $conn
606
	 * @throws MWException
607
	 */
608
	public function reuseConnection( $conn ) {
609
		$serverIndex = $conn->getLBInfo( 'serverIndex' );
610
		$refCount = $conn->getLBInfo( 'foreignPoolRefCount' );
611
		if ( $serverIndex === null || $refCount === null ) {
612
			/**
613
			 * This can happen in code like:
614
			 *   foreach ( $dbs as $db ) {
615
			 *     $conn = $lb->getConnection( DB_REPLICA, [], $db );
616
			 *     ...
617
			 *     $lb->reuseConnection( $conn );
618
			 *   }
619
			 * When a connection to the local DB is opened in this way, reuseConnection()
620
			 * should be ignored
621
			 */
622
			return;
623
		}
624
625
		$dbName = $conn->getDBname();
626
		$prefix = $conn->tablePrefix();
627
		if ( strval( $prefix ) !== '' ) {
628
			$wiki = "$dbName-$prefix";
629
		} else {
630
			$wiki = $dbName;
631
		}
632
		if ( $this->mConns['foreignUsed'][$serverIndex][$wiki] !== $conn ) {
633
			throw new MWException( __METHOD__ . ": connection not found, has " .
634
				"the connection been freed already?" );
635
		}
636
		$conn->setLBInfo( 'foreignPoolRefCount', --$refCount );
637
		if ( $refCount <= 0 ) {
638
			$this->mConns['foreignFree'][$serverIndex][$wiki] = $conn;
639
			unset( $this->mConns['foreignUsed'][$serverIndex][$wiki] );
640
			wfDebug( __METHOD__ . ": freed connection $serverIndex/$wiki\n" );
641
		} else {
642
			wfDebug( __METHOD__ . ": reference count for $serverIndex/$wiki reduced to $refCount\n" );
643
		}
644
	}
645
646
	/**
647
	 * Get a database connection handle reference
648
	 *
649
	 * The handle's methods wrap simply wrap those of a DatabaseBase handle
650
	 *
651
	 * @see LoadBalancer::getConnection() for parameter information
652
	 *
653
	 * @param int $db
654
	 * @param array|string|bool $groups Query group(s), or false for the generic reader
655
	 * @param string|bool $wiki Wiki ID, or false for the current wiki
656
	 * @return DBConnRef
657
	 */
658
	public function getConnectionRef( $db, $groups = [], $wiki = false ) {
659
		return new DBConnRef( $this, $this->getConnection( $db, $groups, $wiki ) );
660
	}
661
662
	/**
663
	 * Get a database connection handle reference without connecting yet
664
	 *
665
	 * The handle's methods wrap simply wrap those of a DatabaseBase handle
666
	 *
667
	 * @see LoadBalancer::getConnection() for parameter information
668
	 *
669
	 * @param int $db
670
	 * @param array|string|bool $groups Query group(s), or false for the generic reader
671
	 * @param string|bool $wiki Wiki ID, or false for the current wiki
672
	 * @return DBConnRef
673
	 */
674
	public function getLazyConnectionRef( $db, $groups = [], $wiki = false ) {
675
		return new DBConnRef( $this, [ $db, $groups, $wiki ] );
676
	}
677
678
	/**
679
	 * Open a connection to the server given by the specified index
680
	 * Index must be an actual index into the array.
681
	 * If the server is already open, returns it.
682
	 *
683
	 * On error, returns false, and the connection which caused the
684
	 * error will be available via $this->mErrorConnection.
685
	 *
686
	 * @note If disable() was called on this LoadBalancer, this method will throw a DBAccessError.
687
	 *
688
	 * @param int $i Server index
689
	 * @param string|bool $wiki Wiki ID, or false for the current wiki
690
	 * @return DatabaseBase|bool Returns false on errors
691
	 */
692
	public function openConnection( $i, $wiki = false ) {
693
		if ( $wiki !== false ) {
694
			$conn = $this->openForeignConnection( $i, $wiki );
0 ignored issues
show
Bug introduced by
It seems like $wiki defined by parameter $wiki on line 692 can also be of type boolean; however, LoadBalancer::openForeignConnection() does only seem to accept string, maybe add an additional type check?

This check looks at variables that have been passed in as parameters and are passed out again to other methods.

If the outgoing method call has stricter type requirements than the method itself, an issue is raised.

An additional type check may prevent trouble.

Loading history...
695
		} elseif ( isset( $this->mConns['local'][$i][0] ) ) {
696
			$conn = $this->mConns['local'][$i][0];
697
		} else {
698
			$server = $this->mServers[$i];
699
			$server['serverIndex'] = $i;
700
			$conn = $this->reallyOpenConnection( $server, false );
701
			$serverName = $this->getServerName( $i );
702
			if ( $conn->isOpen() ) {
703
				wfDebugLog( 'connect', "Connected to database $i at $serverName\n" );
704
				$this->mConns['local'][$i][0] = $conn;
705
			} else {
706
				wfDebugLog( 'connect', "Failed to connect to database $i at $serverName\n" );
707
				$this->mErrorConnection = $conn;
708
				$conn = false;
709
			}
710
		}
711
712
		if ( $conn && !$conn->isOpen() ) {
713
			// Connection was made but later unrecoverably lost for some reason.
714
			// Do not return a handle that will just throw exceptions on use,
715
			// but let the calling code (e.g. getReaderIndex) try another server.
716
			// See DatabaseMyslBase::ping() for how this can happen.
717
			$this->mErrorConnection = $conn;
718
			$conn = false;
719
		}
720
721
		return $conn;
722
	}
723
724
	/**
725
	 * Open a connection to a foreign DB, or return one if it is already open.
726
	 *
727
	 * Increments a reference count on the returned connection which locks the
728
	 * connection to the requested wiki. This reference count can be
729
	 * decremented by calling reuseConnection().
730
	 *
731
	 * If a connection is open to the appropriate server already, but with the wrong
732
	 * database, it will be switched to the right database and returned, as long as
733
	 * it has been freed first with reuseConnection().
734
	 *
735
	 * On error, returns false, and the connection which caused the
736
	 * error will be available via $this->mErrorConnection.
737
	 *
738
	 * @note If disable() was called on this LoadBalancer, this method will throw a DBAccessError.
739
	 *
740
	 * @param int $i Server index
741
	 * @param string $wiki Wiki ID to open
742
	 * @return DatabaseBase
743
	 */
744
	private function openForeignConnection( $i, $wiki ) {
745
		list( $dbName, $prefix ) = wfSplitWikiID( $wiki );
746
		if ( isset( $this->mConns['foreignUsed'][$i][$wiki] ) ) {
747
			// Reuse an already-used connection
748
			$conn = $this->mConns['foreignUsed'][$i][$wiki];
749
			wfDebug( __METHOD__ . ": reusing connection $i/$wiki\n" );
750
		} elseif ( isset( $this->mConns['foreignFree'][$i][$wiki] ) ) {
751
			// Reuse a free connection for the same wiki
752
			$conn = $this->mConns['foreignFree'][$i][$wiki];
753
			unset( $this->mConns['foreignFree'][$i][$wiki] );
754
			$this->mConns['foreignUsed'][$i][$wiki] = $conn;
755
			wfDebug( __METHOD__ . ": reusing free connection $i/$wiki\n" );
756
		} elseif ( !empty( $this->mConns['foreignFree'][$i] ) ) {
757
			// Reuse a connection from another wiki
758
			$conn = reset( $this->mConns['foreignFree'][$i] );
759
			$oldWiki = key( $this->mConns['foreignFree'][$i] );
760
761
			// The empty string as a DB name means "don't care".
762
			// DatabaseMysqlBase::open() already handle this on connection.
763
			if ( $dbName !== '' && !$conn->selectDB( $dbName ) ) {
764
				$this->mLastError = "Error selecting database $dbName on server " .
765
					$conn->getServer() . " from client host " . wfHostname() . "\n";
766
				$this->mErrorConnection = $conn;
767
				$conn = false;
768
			} else {
769
				$conn->tablePrefix( $prefix );
770
				unset( $this->mConns['foreignFree'][$i][$oldWiki] );
771
				$this->mConns['foreignUsed'][$i][$wiki] = $conn;
772
				wfDebug( __METHOD__ . ": reusing free connection from $oldWiki for $wiki\n" );
773
			}
774
		} else {
775
			// Open a new connection
776
			$server = $this->mServers[$i];
777
			$server['serverIndex'] = $i;
778
			$server['foreignPoolRefCount'] = 0;
779
			$server['foreign'] = true;
780
			$conn = $this->reallyOpenConnection( $server, $dbName );
781
			if ( !$conn->isOpen() ) {
782
				wfDebug( __METHOD__ . ": error opening connection for $i/$wiki\n" );
783
				$this->mErrorConnection = $conn;
784
				$conn = false;
785
			} else {
786
				$conn->tablePrefix( $prefix );
787
				$this->mConns['foreignUsed'][$i][$wiki] = $conn;
788
				wfDebug( __METHOD__ . ": opened new connection for $i/$wiki\n" );
789
			}
790
		}
791
792
		// Increment reference count
793
		if ( $conn ) {
794
			$refCount = $conn->getLBInfo( 'foreignPoolRefCount' );
795
			$conn->setLBInfo( 'foreignPoolRefCount', $refCount + 1 );
796
		}
797
798
		return $conn;
799
	}
800
801
	/**
802
	 * Test if the specified index represents an open connection
803
	 *
804
	 * @param int $index Server index
805
	 * @access private
806
	 * @return bool
807
	 */
808
	private function isOpen( $index ) {
809
		if ( !is_integer( $index ) ) {
810
			return false;
811
		}
812
813
		return (bool)$this->getAnyOpenConnection( $index );
814
	}
815
816
	/**
817
	 * Really opens a connection. Uncached.
818
	 * Returns a Database object whether or not the connection was successful.
819
	 * @access private
820
	 *
821
	 * @param array $server
822
	 * @param bool $dbNameOverride
823
	 * @throws MWException
824
	 * @return DatabaseBase
825
	 */
826
	protected function reallyOpenConnection( $server, $dbNameOverride = false ) {
827
		if ( $this->disabled ) {
828
			throw new DBAccessError();
829
		}
830
831
		if ( !is_array( $server ) ) {
832
			throw new MWException( 'You must update your load-balancing configuration. ' .
833
				'See DefaultSettings.php entry for $wgDBservers.' );
834
		}
835
836
		if ( $dbNameOverride !== false ) {
837
			$server['dbname'] = $dbNameOverride;
838
		}
839
840
		// Let the handle know what the cluster master is (e.g. "db1052")
841
		$masterName = $this->getServerName( 0 );
842
		$server['clusterMasterHost'] = $masterName;
843
844
		// Log when many connection are made on requests
845
		if ( ++$this->connsOpened >= self::CONN_HELD_WARN_THRESHOLD ) {
846
			wfDebugLog( 'DBPerformance', __METHOD__ . ": " .
847
				"{$this->connsOpened}+ connections made (master=$masterName)\n" .
848
				wfBacktrace( true ) );
849
		}
850
851
		# Create object
852
		try {
853
			$db = DatabaseBase::factory( $server['type'], $server );
854
		} catch ( DBConnectionError $e ) {
855
			// FIXME: This is probably the ugliest thing I have ever done to
856
			// PHP. I'm half-expecting it to segfault, just out of disgust. -- TS
857
			$db = $e->db;
858
		}
859
860
		$db->setLBInfo( $server );
861
		$db->setLazyMasterHandle(
862
			$this->getLazyConnectionRef( DB_MASTER, [], $db->getWikiID() )
863
		);
864
		$db->setTransactionProfiler( $this->trxProfiler );
865
		if ( $this->trxRoundId !== false ) {
866
			$this->applyTransactionRoundFlags( $db );
867
		}
868
869
		if ( $server['serverIndex'] === $this->getWriterIndex() ) {
870
			foreach ( $this->trxRecurringCallbacks as $name => $callback ) {
871
				$db->setTransactionListener( $name, $callback );
872
			}
873
		}
874
875
		return $db;
876
	}
877
878
	/**
879
	 * @throws DBConnectionError
880
	 * @return bool
881
	 */
882
	private function reportConnectionError() {
883
		$conn = $this->mErrorConnection; // The connection which caused the error
884
		$context = [
885
			'method' => __METHOD__,
886
			'last_error' => $this->mLastError,
887
		];
888
889
		if ( !is_object( $conn ) ) {
890
			// No last connection, probably due to all servers being too busy
891
			wfLogDBError(
892
				"LB failure with no last connection. Connection error: {last_error}",
893
				$context
894
			);
895
896
			// If all servers were busy, mLastError will contain something sensible
897
			throw new DBConnectionError( null, $this->mLastError );
898
		} else {
899
			$context['db_server'] = $conn->getProperty( 'mServer' );
900
			wfLogDBError(
901
				"Connection error: {last_error} ({db_server})",
902
				$context
903
			);
904
905
			// throws DBConnectionError
906
			$conn->reportConnectionError( "{$this->mLastError} ({$context['db_server']})" );
907
		}
908
909
		return false; /* not reached */
910
	}
911
912
	/**
913
	 * @return int
914
	 * @since 1.26
915
	 */
916
	public function getWriterIndex() {
917
		return 0;
918
	}
919
920
	/**
921
	 * Returns true if the specified index is a valid server index
922
	 *
923
	 * @param string $i
924
	 * @return bool
925
	 */
926
	public function haveIndex( $i ) {
927
		return array_key_exists( $i, $this->mServers );
928
	}
929
930
	/**
931
	 * Returns true if the specified index is valid and has non-zero load
932
	 *
933
	 * @param string $i
934
	 * @return bool
935
	 */
936
	public function isNonZeroLoad( $i ) {
937
		return array_key_exists( $i, $this->mServers ) && $this->mLoads[$i] != 0;
938
	}
939
940
	/**
941
	 * Get the number of defined servers (not the number of open connections)
942
	 *
943
	 * @return int
944
	 */
945
	public function getServerCount() {
946
		return count( $this->mServers );
947
	}
948
949
	/**
950
	 * Get the host name or IP address of the server with the specified index
951
	 * Prefer a readable name if available.
952
	 * @param string $i
953
	 * @return string
954
	 */
955
	public function getServerName( $i ) {
956
		if ( isset( $this->mServers[$i]['hostName'] ) ) {
957
			$name = $this->mServers[$i]['hostName'];
958 View Code Duplication
		} elseif ( isset( $this->mServers[$i]['host'] ) ) {
959
			$name = $this->mServers[$i]['host'];
960
		} else {
961
			$name = '';
962
		}
963
964
		return ( $name != '' ) ? $name : 'localhost';
965
	}
966
967
	/**
968
	 * Return the server info structure for a given index, or false if the index is invalid.
969
	 * @param int $i
970
	 * @return array|bool
971
	 */
972
	public function getServerInfo( $i ) {
973
		if ( isset( $this->mServers[$i] ) ) {
974
			return $this->mServers[$i];
975
		} else {
976
			return false;
977
		}
978
	}
979
980
	/**
981
	 * Sets the server info structure for the given index. Entry at index $i
982
	 * is created if it doesn't exist
983
	 * @param int $i
984
	 * @param array $serverInfo
985
	 */
986
	public function setServerInfo( $i, array $serverInfo ) {
987
		$this->mServers[$i] = $serverInfo;
988
	}
989
990
	/**
991
	 * Get the current master position for chronology control purposes
992
	 * @return mixed
993
	 */
994
	public function getMasterPos() {
995
		# If this entire request was served from a replica DB without opening a connection to the
996
		# master (however unlikely that may be), then we can fetch the position from the replica DB.
997
		$masterConn = $this->getAnyOpenConnection( 0 );
998
		if ( !$masterConn ) {
999
			$serverCount = count( $this->mServers );
1000
			for ( $i = 1; $i < $serverCount; $i++ ) {
1001
				$conn = $this->getAnyOpenConnection( $i );
1002
				if ( $conn ) {
1003
					return $conn->getSlavePos();
0 ignored issues
show
Bug introduced by
The method getSlavePos cannot be called on $conn (of type boolean).

Methods can only be called on objects. This check looks for methods being called on variables that have been inferred to never be objects.

Loading history...
1004
				}
1005
			}
1006
		} else {
1007
			return $masterConn->getMasterPos();
1008
		}
1009
1010
		return false;
1011
	}
1012
1013
	/**
1014
	 * Disable this load balancer. All connections are closed, and any attempt to
1015
	 * open a new connection will result in a DBAccessError.
1016
	 *
1017
	 * @since 1.27
1018
	 */
1019
	public function disable() {
1020
		$this->closeAll();
1021
		$this->disabled = true;
1022
	}
1023
1024
	/**
1025
	 * Close all open connections
1026
	 */
1027
	public function closeAll() {
1028
		$this->forEachOpenConnection( function ( DatabaseBase $conn ) {
1029
			$conn->close();
1030
		} );
1031
1032
		$this->mConns = [
0 ignored issues
show
Documentation Bug introduced by
It seems like array('local' => array()...oreignUsed' => array()) of type array<string,array,{"loc..."foreignUsed":"array"}> is incompatible with the declared type array<integer,array> of property $mConns.

Our type inference engine has found an assignment to a property that is incompatible with the declared type of that property.

Either this assignment is in error or the assigned type should be added to the documentation/type hint for that property..

Loading history...
1033
			'local' => [],
1034
			'foreignFree' => [],
1035
			'foreignUsed' => [],
1036
		];
1037
		$this->connsOpened = 0;
1038
	}
1039
1040
	/**
1041
	 * Close a connection
1042
	 * Using this function makes sure the LoadBalancer knows the connection is closed.
1043
	 * If you use $conn->close() directly, the load balancer won't update its state.
1044
	 * @param DatabaseBase $conn
1045
	 */
1046
	public function closeConnection( $conn ) {
1047
		$done = false;
1048
		foreach ( $this->mConns as $i1 => $conns2 ) {
1049
			foreach ( $conns2 as $i2 => $conns3 ) {
1050
				foreach ( $conns3 as $i3 => $candidateConn ) {
1051
					if ( $conn === $candidateConn ) {
1052
						$conn->close();
1053
						unset( $this->mConns[$i1][$i2][$i3] );
1054
						--$this->connsOpened;
1055
						$done = true;
1056
						break;
1057
					}
1058
				}
1059
			}
1060
		}
1061
		if ( !$done ) {
1062
			$conn->close();
1063
		}
1064
	}
1065
1066
	/**
1067
	 * Commit transactions on all open connections
1068
	 * @param string $fname Caller name
1069
	 * @throws DBExpectedError
1070
	 */
1071
	public function commitAll( $fname = __METHOD__ ) {
1072
		$failures = [];
1073
1074
		$restore = ( $this->trxRoundId !== false );
1075
		$this->trxRoundId = false;
1076
		$this->forEachOpenConnection(
1077
			function ( DatabaseBase $conn ) use ( $fname, $restore, &$failures ) {
1078
				try {
1079
					$conn->commit( $fname, $conn::FLUSHING_ALL_PEERS );
1080
				} catch ( DBError $e ) {
1081
					MWExceptionHandler::logException( $e );
1082
					$failures[] = "{$conn->getServer()}: {$e->getMessage()}";
1083
				}
1084
				if ( $restore && $conn->getLBInfo( 'master' ) ) {
1085
					$this->undoTransactionRoundFlags( $conn );
1086
				}
1087
			}
1088
		);
1089
1090
		if ( $failures ) {
1091
			throw new DBExpectedError(
1092
				null,
1093
				"Commit failed on server(s) " . implode( "\n", array_unique( $failures ) )
1094
			);
1095
		}
1096
	}
1097
1098
	/**
1099
	 * Perform all pre-commit callbacks that remain part of the atomic transactions
1100
	 * and disable any post-commit callbacks until runMasterPostTrxCallbacks()
1101
	 * @since 1.28
1102
	 */
1103
	public function finalizeMasterChanges() {
1104
		$this->forEachOpenMasterConnection( function ( DatabaseBase $conn ) {
1105
			// Any error should cause all DB transactions to be rolled back together
1106
			$conn->setTrxEndCallbackSuppression( false );
1107
			$conn->runOnTransactionPreCommitCallbacks();
1108
			// Defer post-commit callbacks until COMMIT finishes for all DBs
1109
			$conn->setTrxEndCallbackSuppression( true );
1110
		} );
1111
	}
1112
1113
	/**
1114
	 * Perform all pre-commit checks for things like replication safety
1115
	 * @param array $options Includes:
1116
	 *   - maxWriteDuration : max write query duration time in seconds
1117
	 * @throws DBTransactionError
1118
	 * @since 1.28
1119
	 */
1120
	public function approveMasterChanges( array $options ) {
1121
		$limit = isset( $options['maxWriteDuration'] ) ? $options['maxWriteDuration'] : 0;
1122
		$this->forEachOpenMasterConnection( function ( DatabaseBase $conn ) use ( $limit ) {
1123
			// If atomic sections or explicit transactions are still open, some caller must have
1124
			// caught an exception but failed to properly rollback any changes. Detect that and
1125
			// throw and error (causing rollback).
1126
			if ( $conn->explicitTrxActive() ) {
1127
				throw new DBTransactionError(
1128
					$conn,
1129
					"Explicit transaction still active. A caller may have caught an error."
1130
				);
1131
			}
1132
			// Assert that the time to replicate the transaction will be sane.
1133
			// If this fails, then all DB transactions will be rollback back together.
1134
			$time = $conn->pendingWriteQueryDuration( $conn::ESTIMATE_DB_APPLY );
1135
			if ( $limit > 0 && $time > $limit ) {
1136
				throw new DBTransactionError(
1137
					$conn,
1138
					wfMessage( 'transaction-duration-limit-exceeded', $time, $limit )->text()
1139
				);
1140
			}
1141
			// If a connection sits idle while slow queries execute on another, that connection
1142
			// may end up dropped before the commit round is reached. Ping servers to detect this.
1143
			if ( $conn->writesOrCallbacksPending() && !$conn->ping() ) {
1144
				throw new DBTransactionError(
1145
					$conn,
1146
					"A connection to the {$conn->getDBname()} database was lost before commit."
1147
				);
1148
			}
1149
		} );
1150
	}
1151
1152
	/**
1153
	 * Flush any master transaction snapshots and set DBO_TRX (if DBO_DEFAULT is set)
1154
	 *
1155
	 * The DBO_TRX setting will be reverted to the default in each of these methods:
1156
	 *   - commitMasterChanges()
1157
	 *   - rollbackMasterChanges()
1158
	 *   - commitAll()
1159
	 * This allows for custom transaction rounds from any outer transaction scope.
1160
	 *
1161
	 * @param string $fname
1162
	 * @throws DBExpectedError
1163
	 * @since 1.28
1164
	 */
1165
	public function beginMasterChanges( $fname = __METHOD__ ) {
1166
		if ( $this->trxRoundId !== false ) {
1167
			throw new DBTransactionError(
1168
				null,
1169
				"$fname: Transaction round '{$this->trxRoundId}' already started."
1170
			);
1171
		}
1172
		$this->trxRoundId = $fname;
1173
1174
		$failures = [];
1175
		$this->forEachOpenMasterConnection(
1176
			function ( DatabaseBase $conn ) use ( $fname, &$failures ) {
1177
				$conn->setTrxEndCallbackSuppression( true );
1178
				try {
1179
					$conn->clearSnapshot( $fname );
1180
				} catch ( DBError $e ) {
1181
					MWExceptionHandler::logException( $e );
1182
					$failures[] = "{$conn->getServer()}: {$e->getMessage()}";
1183
				}
1184
				$conn->setTrxEndCallbackSuppression( false );
1185
				$this->applyTransactionRoundFlags( $conn );
1186
			}
1187
		);
1188
1189 View Code Duplication
		if ( $failures ) {
1190
			throw new DBExpectedError(
1191
				null,
1192
				"$fname: Flush failed on server(s) " . implode( "\n", array_unique( $failures ) )
1193
			);
1194
		}
1195
	}
1196
1197
	/**
1198
	 * Issue COMMIT on all master connections where writes where done
1199
	 * @param string $fname Caller name
1200
	 * @throws DBExpectedError
1201
	 */
1202
	public function commitMasterChanges( $fname = __METHOD__ ) {
1203
		$failures = [];
1204
1205
		$restore = ( $this->trxRoundId !== false );
1206
		$this->trxRoundId = false;
1207
		$this->forEachOpenMasterConnection(
1208
			function ( DatabaseBase $conn ) use ( $fname, $restore, &$failures ) {
1209
				try {
1210
					if ( $conn->writesOrCallbacksPending() ) {
1211
						$conn->commit( $fname, $conn::FLUSHING_ALL_PEERS );
1212
					} elseif ( $restore ) {
1213
						$conn->clearSnapshot( $fname );
1214
					}
1215
				} catch ( DBError $e ) {
1216
					MWExceptionHandler::logException( $e );
1217
					$failures[] = "{$conn->getServer()}: {$e->getMessage()}";
1218
				}
1219
				if ( $restore ) {
1220
					$this->undoTransactionRoundFlags( $conn );
1221
				}
1222
			}
1223
		);
1224
1225 View Code Duplication
		if ( $failures ) {
1226
			throw new DBExpectedError(
1227
				null,
1228
				"$fname: Commit failed on server(s) " . implode( "\n", array_unique( $failures ) )
1229
			);
1230
		}
1231
	}
1232
1233
	/**
1234
	 * Issue all pending post-COMMIT/ROLLBACK callbacks
1235
	 * @param integer $type IDatabase::TRIGGER_* constant
1236
	 * @return Exception|null The first exception or null if there were none
1237
	 * @since 1.28
1238
	 */
1239
	public function runMasterPostTrxCallbacks( $type ) {
1240
		$e = null; // first exception
1241
		$this->forEachOpenMasterConnection( function ( DatabaseBase $conn ) use ( $type, &$e ) {
1242
			$conn->clearSnapshot( __METHOD__ ); // clear no-op transactions
1243
1244
			$conn->setTrxEndCallbackSuppression( false );
1245
			try {
1246
				$conn->runOnTransactionIdleCallbacks( $type );
1247
			} catch ( Exception $ex ) {
1248
				$e = $e ?: $ex;
1249
			}
1250
			try {
1251
				$conn->runTransactionListenerCallbacks( $type );
1252
			} catch ( Exception $ex ) {
1253
				$e = $e ?: $ex;
1254
			}
1255
		} );
1256
1257
		return $e;
1258
	}
1259
1260
	/**
1261
	 * Issue ROLLBACK only on master, only if queries were done on connection
1262
	 * @param string $fname Caller name
1263
	 * @throws DBExpectedError
1264
	 * @since 1.23
1265
	 */
1266
	public function rollbackMasterChanges( $fname = __METHOD__ ) {
1267
		$restore = ( $this->trxRoundId !== false );
1268
		$this->trxRoundId = false;
1269
		$this->forEachOpenMasterConnection(
1270
			function ( DatabaseBase $conn ) use ( $fname, $restore ) {
1271
				if ( $conn->writesOrCallbacksPending() ) {
1272
					$conn->rollback( $fname, $conn::FLUSHING_ALL_PEERS );
1273
				}
1274
				if ( $restore ) {
1275
					$this->undoTransactionRoundFlags( $conn );
1276
				}
1277
			}
1278
		);
1279
	}
1280
1281
	/**
1282
	 * Suppress all pending post-COMMIT/ROLLBACK callbacks
1283
	 * @return Exception|null The first exception or null if there were none
1284
	 * @since 1.28
1285
	 */
1286
	public function suppressTransactionEndCallbacks() {
1287
		$this->forEachOpenMasterConnection( function ( DatabaseBase $conn ) {
1288
			$conn->setTrxEndCallbackSuppression( true );
1289
		} );
1290
	}
1291
1292
	/**
1293
	 * @param DatabaseBase $conn
1294
	 */
1295
	private function applyTransactionRoundFlags( DatabaseBase $conn ) {
1296
		if ( $conn->getFlag( DBO_DEFAULT ) ) {
1297
			// DBO_TRX is controlled entirely by CLI mode presence with DBO_DEFAULT.
1298
			// Force DBO_TRX even in CLI mode since a commit round is expected soon.
1299
			$conn->setFlag( DBO_TRX, $conn::REMEMBER_PRIOR );
1300
			// If config has explicitly requested DBO_TRX be either on or off by not
1301
			// setting DBO_DEFAULT, then respect that. Forcing no transactions is useful
1302
			// for things like blob stores (ExternalStore) which want auto-commit mode.
1303
		}
1304
	}
1305
1306
	/**
1307
	 * @param DatabaseBase $conn
1308
	 */
1309
	private function undoTransactionRoundFlags( DatabaseBase $conn ) {
1310
		if ( $conn->getFlag( DBO_DEFAULT ) ) {
1311
			$conn->restoreFlags( $conn::RESTORE_PRIOR );
1312
		}
1313
	}
1314
1315
	/**
1316
	 * Commit all replica DB transactions so as to flush any REPEATABLE-READ or SSI snapshot
1317
	 *
1318
	 * @param string $fname Caller name
1319
	 * @since 1.28
1320
	 */
1321
	public function flushReplicaSnapshots( $fname = __METHOD__ ) {
1322
		$this->forEachOpenReplicaConnection( function ( DatabaseBase $conn ) {
1323
			$conn->clearSnapshot( __METHOD__ );
1324
		} );
1325
	}
1326
1327
	/**
1328
	 * @return bool Whether a master connection is already open
1329
	 * @since 1.24
1330
	 */
1331
	public function hasMasterConnection() {
1332
		return $this->isOpen( $this->getWriterIndex() );
1333
	}
1334
1335
	/**
1336
	 * Determine if there are pending changes in a transaction by this thread
1337
	 * @since 1.23
1338
	 * @return bool
1339
	 */
1340
	public function hasMasterChanges() {
1341
		$masterIndex = $this->getWriterIndex();
1342
		foreach ( $this->mConns as $conns2 ) {
1343
			if ( empty( $conns2[$masterIndex] ) ) {
1344
				continue;
1345
			}
1346
			/** @var DatabaseBase $conn */
1347
			foreach ( $conns2[$masterIndex] as $conn ) {
1348
				if ( $conn->trxLevel() && $conn->writesOrCallbacksPending() ) {
1349
					return true;
1350
				}
1351
			}
1352
		}
1353
		return false;
1354
	}
1355
1356
	/**
1357
	 * Get the timestamp of the latest write query done by this thread
1358
	 * @since 1.25
1359
	 * @return float|bool UNIX timestamp or false
1360
	 */
1361 View Code Duplication
	public function lastMasterChangeTimestamp() {
1362
		$lastTime = false;
1363
		$masterIndex = $this->getWriterIndex();
1364
		foreach ( $this->mConns as $conns2 ) {
1365
			if ( empty( $conns2[$masterIndex] ) ) {
1366
				continue;
1367
			}
1368
			/** @var DatabaseBase $conn */
1369
			foreach ( $conns2[$masterIndex] as $conn ) {
1370
				$lastTime = max( $lastTime, $conn->lastDoneWrites() );
1371
			}
1372
		}
1373
		return $lastTime;
1374
	}
1375
1376
	/**
1377
	 * Check if this load balancer object had any recent or still
1378
	 * pending writes issued against it by this PHP thread
1379
	 *
1380
	 * @param float $age How many seconds ago is "recent" [defaults to mWaitTimeout]
1381
	 * @return bool
1382
	 * @since 1.25
1383
	 */
1384
	public function hasOrMadeRecentMasterChanges( $age = null ) {
1385
		$age = ( $age === null ) ? $this->mWaitTimeout : $age;
1386
1387
		return ( $this->hasMasterChanges()
1388
			|| $this->lastMasterChangeTimestamp() > microtime( true ) - $age );
1389
	}
1390
1391
	/**
1392
	 * Get the list of callers that have pending master changes
1393
	 *
1394
	 * @return array
1395
	 * @since 1.27
1396
	 */
1397 View Code Duplication
	public function pendingMasterChangeCallers() {
1398
		$fnames = [];
1399
1400
		$masterIndex = $this->getWriterIndex();
1401
		foreach ( $this->mConns as $conns2 ) {
1402
			if ( empty( $conns2[$masterIndex] ) ) {
1403
				continue;
1404
			}
1405
			/** @var DatabaseBase $conn */
1406
			foreach ( $conns2[$masterIndex] as $conn ) {
1407
				$fnames = array_merge( $fnames, $conn->pendingWriteCallers() );
1408
			}
1409
		}
1410
1411
		return $fnames;
1412
	}
1413
1414
	/**
1415
	 * @param mixed $value
1416
	 * @return mixed
1417
	 */
1418
	public function waitTimeout( $value = null ) {
1419
		return wfSetVar( $this->mWaitTimeout, $value );
1420
	}
1421
1422
	/**
1423
	 * @note This method will trigger a DB connection if not yet done
1424
	 * @param string|bool $wiki Wiki ID, or false for the current wiki
1425
	 * @return bool Whether the generic connection for reads is highly "lagged"
1426
	 */
1427
	public function getLaggedReplicaMode( $wiki = false ) {
1428
		// No-op if there is only one DB (also avoids recursion)
1429
		if ( !$this->laggedReplicaMode && $this->getServerCount() > 1 ) {
1430
			try {
1431
				// See if laggedReplicaMode gets set
1432
				$conn = $this->getConnection( DB_REPLICA, false, $wiki );
1433
				$this->reuseConnection( $conn );
1434
			} catch ( DBConnectionError $e ) {
1435
				// Avoid expensive re-connect attempts and failures
1436
				$this->allReplicasDownMode = true;
1437
				$this->laggedReplicaMode = true;
1438
			}
1439
		}
1440
1441
		return $this->laggedReplicaMode;
1442
	}
1443
1444
	/**
1445
	 * @param bool $wiki
1446
	 * @return bool
1447
	 * @deprecated 1.28; use getLaggedReplicaMode()
1448
	 */
1449
	public function getLaggedSlaveMode( $wiki = false ) {
1450
		return $this->getLaggedReplicaMode( $wiki );
1451
	}
1452
1453
	/**
1454
	 * @note This method will never cause a new DB connection
1455
	 * @return bool Whether any generic connection used for reads was highly "lagged"
1456
	 * @since 1.28
1457
	 */
1458
	public function laggedReplicaUsed() {
1459
		return $this->laggedReplicaMode;
1460
	}
1461
1462
	/**
1463
	 * @return bool
1464
	 * @since 1.27
1465
	 * @deprecated Since 1.28; use laggedReplicaUsed()
1466
	 */
1467
	public function laggedSlaveUsed() {
1468
		return $this->laggedReplicaUsed();
1469
	}
1470
1471
	/**
1472
	 * @note This method may trigger a DB connection if not yet done
1473
	 * @param string|bool $wiki Wiki ID, or false for the current wiki
1474
	 * @param DatabaseBase|null DB master connection; used to avoid loops [optional]
1475
	 * @return string|bool Reason the master is read-only or false if it is not
1476
	 * @since 1.27
1477
	 */
1478
	public function getReadOnlyReason( $wiki = false, DatabaseBase $conn = null ) {
1479
		if ( $this->readOnlyReason !== false ) {
1480
			return $this->readOnlyReason;
1481
		} elseif ( $this->getLaggedReplicaMode( $wiki ) ) {
1482
			if ( $this->allReplicasDownMode ) {
1483
				return 'The database has been automatically locked ' .
1484
					'until the replica database servers become available';
1485
			} else {
1486
				return 'The database has been automatically locked ' .
1487
					'while the replica database servers catch up to the master.';
1488
			}
1489
		} elseif ( $this->masterRunningReadOnly( $wiki, $conn ) ) {
0 ignored issues
show
Bug introduced by
It seems like $wiki defined by parameter $wiki on line 1478 can also be of type boolean; however, LoadBalancer::masterRunningReadOnly() does only seem to accept string, maybe add an additional type check?

This check looks at variables that have been passed in as parameters and are passed out again to other methods.

If the outgoing method call has stricter type requirements than the method itself, an issue is raised.

An additional type check may prevent trouble.

Loading history...
1490
			return 'The database master is running in read-only mode.';
1491
		}
1492
1493
		return false;
1494
	}
1495
1496
	/**
1497
	 * @param string $wiki Wiki ID, or false for the current wiki
1498
	 * @param DatabaseBase|null DB master connectionl used to avoid loops [optional]
1499
	 * @return bool
1500
	 */
1501
	private function masterRunningReadOnly( $wiki, DatabaseBase $conn = null ) {
1502
		$cache = $this->wanCache;
1503
		$masterServer = $this->getServerName( $this->getWriterIndex() );
1504
1505
		return (bool)$cache->getWithSetCallback(
1506
			$cache->makeGlobalKey( __CLASS__, 'server-read-only', $masterServer ),
1507
			self::TTL_CACHE_READONLY,
1508
			function () use ( $wiki, $conn ) {
1509
				$this->trxProfiler->setSilenced( true );
1510
				try {
1511
					$dbw = $conn ?: $this->getConnection( DB_MASTER, [], $wiki );
1512
					$readOnly = (int)$dbw->serverIsReadOnly();
1513
				} catch ( DBError $e ) {
1514
					$readOnly = 0;
1515
				}
1516
				$this->trxProfiler->setSilenced( false );
1517
				return $readOnly;
1518
			},
1519
			[ 'pcTTL' => $cache::TTL_PROC_LONG, 'busyValue' => 0 ]
1520
		);
1521
	}
1522
1523
	/**
1524
	 * Disables/enables lag checks
1525
	 * @param null|bool $mode
1526
	 * @return bool
1527
	 */
1528
	public function allowLagged( $mode = null ) {
1529
		if ( $mode === null ) {
1530
			return $this->mAllowLagged;
1531
		}
1532
		$this->mAllowLagged = $mode;
1533
1534
		return $this->mAllowLagged;
1535
	}
1536
1537
	/**
1538
	 * @return bool
1539
	 */
1540
	public function pingAll() {
1541
		$success = true;
1542
		$this->forEachOpenConnection( function ( DatabaseBase $conn ) use ( &$success ) {
1543
			if ( !$conn->ping() ) {
1544
				$success = false;
1545
			}
1546
		} );
1547
1548
		return $success;
1549
	}
1550
1551
	/**
1552
	 * Call a function with each open connection object
1553
	 * @param callable $callback
1554
	 * @param array $params
1555
	 */
1556 View Code Duplication
	public function forEachOpenConnection( $callback, array $params = [] ) {
1557
		foreach ( $this->mConns as $connsByServer ) {
1558
			foreach ( $connsByServer as $serverConns ) {
1559
				foreach ( $serverConns as $conn ) {
1560
					$mergedParams = array_merge( [ $conn ], $params );
1561
					call_user_func_array( $callback, $mergedParams );
1562
				}
1563
			}
1564
		}
1565
	}
1566
1567
	/**
1568
	 * Call a function with each open connection object to a master
1569
	 * @param callable $callback
1570
	 * @param array $params
1571
	 * @since 1.28
1572
	 */
1573
	public function forEachOpenMasterConnection( $callback, array $params = [] ) {
1574
		$masterIndex = $this->getWriterIndex();
1575
		foreach ( $this->mConns as $connsByServer ) {
1576
			if ( isset( $connsByServer[$masterIndex] ) ) {
1577
				/** @var DatabaseBase $conn */
1578
				foreach ( $connsByServer[$masterIndex] as $conn ) {
1579
					$mergedParams = array_merge( [ $conn ], $params );
1580
					call_user_func_array( $callback, $mergedParams );
1581
				}
1582
			}
1583
		}
1584
	}
1585
1586
	/**
1587
	 * Call a function with each open replica DB connection object
1588
	 * @param callable $callback
1589
	 * @param array $params
1590
	 * @since 1.28
1591
	 */
1592 View Code Duplication
	public function forEachOpenReplicaConnection( $callback, array $params = [] ) {
1593
		foreach ( $this->mConns as $connsByServer ) {
1594
			foreach ( $connsByServer as $i => $serverConns ) {
1595
				if ( $i === $this->getWriterIndex() ) {
1596
					continue; // skip master
1597
				}
1598
				foreach ( $serverConns as $conn ) {
1599
					$mergedParams = array_merge( [ $conn ], $params );
1600
					call_user_func_array( $callback, $mergedParams );
1601
				}
1602
			}
1603
		}
1604
	}
1605
1606
	/**
1607
	 * Get the hostname and lag time of the most-lagged replica DB
1608
	 *
1609
	 * This is useful for maintenance scripts that need to throttle their updates.
1610
	 * May attempt to open connections to replica DBs on the default DB. If there is
1611
	 * no lag, the maximum lag will be reported as -1.
1612
	 *
1613
	 * @param bool|string $wiki Wiki ID, or false for the default database
1614
	 * @return array ( host, max lag, index of max lagged host )
1615
	 */
1616
	public function getMaxLag( $wiki = false ) {
1617
		$maxLag = -1;
1618
		$host = '';
1619
		$maxIndex = 0;
1620
1621
		if ( $this->getServerCount() <= 1 ) {
1622
			return [ $host, $maxLag, $maxIndex ]; // no replication = no lag
1623
		}
1624
1625
		$lagTimes = $this->getLagTimes( $wiki );
1626
		foreach ( $lagTimes as $i => $lag ) {
1627
			if ( $this->mLoads[$i] > 0 && $lag > $maxLag ) {
1628
				$maxLag = $lag;
1629
				$host = $this->mServers[$i]['host'];
1630
				$maxIndex = $i;
1631
			}
1632
		}
1633
1634
		return [ $host, $maxLag, $maxIndex ];
1635
	}
1636
1637
	/**
1638
	 * Get an estimate of replication lag (in seconds) for each server
1639
	 *
1640
	 * Results are cached for a short time in memcached/process cache
1641
	 *
1642
	 * Values may be "false" if replication is too broken to estimate
1643
	 *
1644
	 * @param string|bool $wiki
1645
	 * @return int[] Map of (server index => float|int|bool)
1646
	 */
1647
	public function getLagTimes( $wiki = false ) {
1648
		if ( $this->getServerCount() <= 1 ) {
1649
			return [ 0 => 0 ]; // no replication = no lag
1650
		}
1651
1652
		# Send the request to the load monitor
1653
		return $this->getLoadMonitor()->getLagTimes( array_keys( $this->mServers ), $wiki );
0 ignored issues
show
Bug introduced by
It seems like $wiki defined by parameter $wiki on line 1647 can also be of type boolean; however, LoadMonitor::getLagTimes() does only seem to accept string, maybe add an additional type check?

This check looks at variables that have been passed in as parameters and are passed out again to other methods.

If the outgoing method call has stricter type requirements than the method itself, an issue is raised.

An additional type check may prevent trouble.

Loading history...
1654
	}
1655
1656
	/**
1657
	 * Get the lag in seconds for a given connection, or zero if this load
1658
	 * balancer does not have replication enabled.
1659
	 *
1660
	 * This should be used in preference to Database::getLag() in cases where
1661
	 * replication may not be in use, since there is no way to determine if
1662
	 * replication is in use at the connection level without running
1663
	 * potentially restricted queries such as SHOW SLAVE STATUS. Using this
1664
	 * function instead of Database::getLag() avoids a fatal error in this
1665
	 * case on many installations.
1666
	 *
1667
	 * @param IDatabase $conn
1668
	 * @return int|bool Returns false on error
1669
	 */
1670
	public function safeGetLag( IDatabase $conn ) {
1671
		if ( $this->getServerCount() == 1 ) {
1672
			return 0;
1673
		} else {
1674
			return $conn->getLag();
1675
		}
1676
	}
1677
1678
	/**
1679
	 * Wait for a replica DB to reach a specified master position
1680
	 *
1681
	 * This will connect to the master to get an accurate position if $pos is not given
1682
	 *
1683
	 * @param IDatabase $conn Replica DB
1684
	 * @param DBMasterPos|bool $pos Master position; default: current position
1685
	 * @param integer $timeout Timeout in seconds
1686
	 * @return bool Success
1687
	 * @since 1.27
1688
	 */
1689
	public function safeWaitForMasterPos( IDatabase $conn, $pos = false, $timeout = 10 ) {
1690
		if ( $this->getServerCount() == 1 || !$conn->getLBInfo( 'replica' ) ) {
1691
			return true; // server is not a replica DB
1692
		}
1693
1694
		$pos = $pos ?: $this->getConnection( DB_MASTER )->getMasterPos();
1695
		if ( !( $pos instanceof DBMasterPos ) ) {
1696
			return false; // something is misconfigured
1697
		}
1698
1699
		$result = $conn->masterPosWait( $pos, $timeout );
1700
		if ( $result == -1 || is_null( $result ) ) {
1701
			$msg = __METHOD__ . ": Timed out waiting on {$conn->getServer()} pos {$pos}";
1702
			wfDebugLog( 'replication', "$msg\n" );
1703
			wfDebugLog( 'DBPerformance', "$msg:\n" . wfBacktrace( true ) );
1704
			$ok = false;
1705
		} else {
1706
			wfDebugLog( 'replication', __METHOD__ . ": Done\n" );
1707
			$ok = true;
1708
		}
1709
1710
		return $ok;
1711
	}
1712
1713
	/**
1714
	 * Clear the cache for slag lag delay times
1715
	 *
1716
	 * This is only used for testing
1717
	 */
1718
	public function clearLagTimeCache() {
1719
		$this->getLoadMonitor()->clearCaches();
1720
	}
1721
1722
	/**
1723
	 * Set a callback via DatabaseBase::setTransactionListener() on
1724
	 * all current and future master connections of this load balancer
1725
	 *
1726
	 * @param string $name Callback name
1727
	 * @param callable|null $callback
1728
	 * @since 1.28
1729
	 */
1730
	public function setTransactionListener( $name, callable $callback = null ) {
1731
		if ( $callback ) {
1732
			$this->trxRecurringCallbacks[$name] = $callback;
1733
		} else {
1734
			unset( $this->trxRecurringCallbacks[$name] );
1735
		}
1736
		$this->forEachOpenMasterConnection(
1737
			function ( DatabaseBase $conn ) use ( $name, $callback ) {
1738
				$conn->setTransactionListener( $name, $callback );
1739
			}
1740
		);
1741
	}
1742
}
1743