Completed
Branch master (dc3656)
by
unknown
30:14
created

LoadBalancer::runMasterPostTrxCallbacks()   B

Complexity

Conditions 5
Paths 1

Size

Total Lines 20
Code Lines 14

Duplication

Lines 0
Ratio 0 %

Importance

Changes 1
Bugs 0 Features 0
Metric Value
cc 5
eloc 14
c 1
b 0
f 0
nc 1
nop 1
dl 0
loc 20
rs 8.8571
1
<?php
2
/**
3
 * Database load balancing.
4
 *
5
 * This program is free software; you can redistribute it and/or modify
6
 * it under the terms of the GNU General Public License as published by
7
 * the Free Software Foundation; either version 2 of the License, or
8
 * (at your option) any later version.
9
 *
10
 * This program is distributed in the hope that it will be useful,
11
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13
 * GNU General Public License for more details.
14
 *
15
 * You should have received a copy of the GNU General Public License along
16
 * with this program; if not, write to the Free Software Foundation, Inc.,
17
 * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
18
 * http://www.gnu.org/copyleft/gpl.html
19
 *
20
 * @file
21
 * @ingroup Database
22
 */
23
24
/**
25
 * Database load balancing object
26
 *
27
 * @todo document
28
 * @ingroup Database
29
 */
30
class LoadBalancer {
31
	/** @var array[] Map of (server index => server config array) */
32
	private $mServers;
33
	/** @var array[] Map of (local/foreignUsed/foreignFree => server index => DatabaseBase array) */
34
	private $mConns;
35
	/** @var array Map of (server index => weight) */
36
	private $mLoads;
37
	/** @var array[] Map of (group => server index => weight) */
38
	private $mGroupLoads;
39
	/** @var bool Whether to disregard slave lag as a factor in slave selection */
40
	private $mAllowLagged;
41
	/** @var integer Seconds to spend waiting on slave lag to resolve */
42
	private $mWaitTimeout;
43
	/** @var array LBFactory information */
44
	private $mParentInfo;
45
46
	/** @var string The LoadMonitor subclass name */
47
	private $mLoadMonitorClass;
48
	/** @var LoadMonitor */
49
	private $mLoadMonitor;
50
	/** @var BagOStuff */
51
	private $srvCache;
52
	/** @var WANObjectCache */
53
	private $wanCache;
54
	/** @var TransactionProfiler */
55
	protected $trxProfiler;
56
57
	/** @var bool|DatabaseBase Database connection that caused a problem */
58
	private $mErrorConnection;
59
	/** @var integer The generic (not query grouped) slave index (of $mServers) */
60
	private $mReadIndex;
61
	/** @var bool|DBMasterPos False if not set */
62
	private $mWaitForPos;
63
	/** @var bool Whether the generic reader fell back to a lagged slave */
64
	private $laggedSlaveMode = false;
65
	/** @var bool Whether the generic reader fell back to a lagged slave */
66
	private $slavesDownMode = false;
67
	/** @var string The last DB selection or connection error */
68
	private $mLastError = 'Unknown error';
69
	/** @var string|bool Reason the LB is read-only or false if not */
70
	private $readOnlyReason = false;
71
	/** @var integer Total connections opened */
72
	private $connsOpened = 0;
73
	/** @var string|bool String if a requested DBO_TRX transaction round is active */
74
	private $trxRoundId = false;
75
	/** @var array[] Map of (name => callable) */
76
	private $trxRecurringCallbacks = [];
77
78
	/** @var integer Warn when this many connection are held */
79
	const CONN_HELD_WARN_THRESHOLD = 10;
80
	/** @var integer Default 'max lag' when unspecified */
81
	const MAX_LAG = 10;
82
	/** @var integer Max time to wait for a slave to catch up (e.g. ChronologyProtector) */
83
	const POS_WAIT_TIMEOUT = 10;
84
	/** @var integer Seconds to cache master server read-only status */
85
	const TTL_CACHE_READONLY = 5;
86
87
	/**
88
	 * @var boolean
89
	 */
90
	private $disabled = false;
91
92
	/**
93
	 * @param array $params Array with keys:
94
	 *  - servers : Required. Array of server info structures.
95
	 *  - loadMonitor : Name of a class used to fetch server lag and load.
96
	 *  - readOnlyReason : Reason the master DB is read-only if so [optional]
97
	 *  - srvCache : BagOStuff object [optional]
98
	 *  - wanCache : WANObjectCache object [optional]
99
	 * @throws MWException
100
	 */
101
	public function __construct( array $params ) {
102
		if ( !isset( $params['servers'] ) ) {
103
			throw new MWException( __CLASS__ . ': missing servers parameter' );
104
		}
105
		$this->mServers = $params['servers'];
106
		$this->mWaitTimeout = self::POS_WAIT_TIMEOUT;
107
108
		$this->mReadIndex = -1;
109
		$this->mWriteIndex = -1;
0 ignored issues
show
Bug introduced by
The property mWriteIndex does not exist. Did you maybe forget to declare it?

In PHP it is possible to write to properties without declaring them. For example, the following is perfectly valid PHP code:

class MyClass { }

$x = new MyClass();
$x->foo = true;

Generally, it is a good practice to explictly declare properties to avoid accidental typos and provide IDE auto-completion:

class MyClass {
    public $foo;
}

$x = new MyClass();
$x->foo = true;
Loading history...
110
		$this->mConns = [
0 ignored issues
show
Documentation Bug introduced by
It seems like array('local' => array()...oreignFree' => array()) of type array<string,array,{"loc..."foreignFree":"array"}> is incompatible with the declared type array<integer,array> of property $mConns.

Our type inference engine has found an assignment to a property that is incompatible with the declared type of that property.

Either this assignment is in error or the assigned type should be added to the documentation/type hint for that property..

Loading history...
111
			'local' => [],
112
			'foreignUsed' => [],
113
			'foreignFree' => [] ];
114
		$this->mLoads = [];
115
		$this->mWaitForPos = false;
116
		$this->mErrorConnection = false;
117
		$this->mAllowLagged = false;
118
119 View Code Duplication
		if ( isset( $params['readOnlyReason'] ) && is_string( $params['readOnlyReason'] ) ) {
120
			$this->readOnlyReason = $params['readOnlyReason'];
121
		}
122
123
		if ( isset( $params['loadMonitor'] ) ) {
124
			$this->mLoadMonitorClass = $params['loadMonitor'];
125
		} else {
126
			$master = reset( $params['servers'] );
127
			if ( isset( $master['type'] ) && $master['type'] === 'mysql' ) {
128
				$this->mLoadMonitorClass = 'LoadMonitorMySQL';
129
			} else {
130
				$this->mLoadMonitorClass = 'LoadMonitorNull';
131
			}
132
		}
133
134
		foreach ( $params['servers'] as $i => $server ) {
135
			$this->mLoads[$i] = $server['load'];
136
			if ( isset( $server['groupLoads'] ) ) {
137
				foreach ( $server['groupLoads'] as $group => $ratio ) {
138
					if ( !isset( $this->mGroupLoads[$group] ) ) {
139
						$this->mGroupLoads[$group] = [];
140
					}
141
					$this->mGroupLoads[$group][$i] = $ratio;
142
				}
143
			}
144
		}
145
146
		if ( isset( $params['srvCache'] ) ) {
147
			$this->srvCache = $params['srvCache'];
148
		} else {
149
			$this->srvCache = new EmptyBagOStuff();
150
		}
151
		if ( isset( $params['wanCache'] ) ) {
152
			$this->wanCache = $params['wanCache'];
153
		} else {
154
			$this->wanCache = WANObjectCache::newEmpty();
155
		}
156
		if ( isset( $params['trxProfiler'] ) ) {
157
			$this->trxProfiler = $params['trxProfiler'];
158
		} else {
159
			$this->trxProfiler = new TransactionProfiler();
160
		}
161
	}
162
163
	/**
164
	 * Get a LoadMonitor instance
165
	 *
166
	 * @return LoadMonitor
167
	 */
168
	private function getLoadMonitor() {
169
		if ( !isset( $this->mLoadMonitor ) ) {
170
			$class = $this->mLoadMonitorClass;
171
			$this->mLoadMonitor = new $class( $this );
172
		}
173
174
		return $this->mLoadMonitor;
175
	}
176
177
	/**
178
	 * Get or set arbitrary data used by the parent object, usually an LBFactory
179
	 * @param mixed $x
180
	 * @return mixed
181
	 */
182
	public function parentInfo( $x = null ) {
183
		return wfSetVar( $this->mParentInfo, $x );
184
	}
185
186
	/**
187
	 * @param array $loads
188
	 * @param bool|string $wiki Wiki to get non-lagged for
189
	 * @param int $maxLag Restrict the maximum allowed lag to this many seconds
190
	 * @return bool|int|string
191
	 */
192
	private function getRandomNonLagged( array $loads, $wiki = false, $maxLag = self::MAX_LAG ) {
193
		$lags = $this->getLagTimes( $wiki );
194
195
		# Unset excessively lagged servers
196
		foreach ( $lags as $i => $lag ) {
197
			if ( $i != 0 ) {
198
				$maxServerLag = $maxLag;
199 View Code Duplication
				if ( isset( $this->mServers[$i]['max lag'] ) ) {
200
					$maxServerLag = min( $maxServerLag, $this->mServers[$i]['max lag'] );
201
				}
202
203
				$host = $this->getServerName( $i );
204
				if ( $lag === false ) {
205
					wfDebugLog( 'replication', "Server $host (#$i) is not replicating?" );
206
					unset( $loads[$i] );
207
				} elseif ( $lag > $maxServerLag ) {
208
					wfDebugLog( 'replication', "Server $host (#$i) has >= $lag seconds of lag" );
209
					unset( $loads[$i] );
210
				}
211
			}
212
		}
213
214
		# Find out if all the slaves with non-zero load are lagged
215
		$sum = 0;
216
		foreach ( $loads as $load ) {
217
			$sum += $load;
218
		}
219
		if ( $sum == 0 ) {
220
			# No appropriate DB servers except maybe the master and some slaves with zero load
221
			# Do NOT use the master
222
			# Instead, this function will return false, triggering read-only mode,
223
			# and a lagged slave will be used instead.
224
			return false;
225
		}
226
227
		if ( count( $loads ) == 0 ) {
228
			return false;
229
		}
230
231
		# Return a random representative of the remainder
232
		return ArrayUtils::pickRandom( $loads );
233
	}
234
235
	/**
236
	 * Get the index of the reader connection, which may be a slave
237
	 * This takes into account load ratios and lag times. It should
238
	 * always return a consistent index during a given invocation
239
	 *
240
	 * Side effect: opens connections to databases
241
	 * @param string|bool $group Query group, or false for the generic reader
242
	 * @param string|bool $wiki Wiki ID, or false for the current wiki
243
	 * @throws MWException
244
	 * @return bool|int|string
245
	 */
246
	public function getReaderIndex( $group = false, $wiki = false ) {
247
		global $wgDBtype;
248
249
		# @todo FIXME: For now, only go through all this for mysql databases
250
		if ( $wgDBtype != 'mysql' ) {
251
			return $this->getWriterIndex();
252
		}
253
254
		if ( count( $this->mServers ) == 1 ) {
255
			# Skip the load balancing if there's only one server
256
			return 0;
257
		} elseif ( $group === false && $this->mReadIndex >= 0 ) {
258
			# Shortcut if generic reader exists already
259
			return $this->mReadIndex;
260
		}
261
262
		# Find the relevant load array
263
		if ( $group !== false ) {
264
			if ( isset( $this->mGroupLoads[$group] ) ) {
265
				$nonErrorLoads = $this->mGroupLoads[$group];
266
			} else {
267
				# No loads for this group, return false and the caller can use some other group
268
				wfDebugLog( 'connect', __METHOD__ . ": no loads for group $group\n" );
269
270
				return false;
271
			}
272
		} else {
273
			$nonErrorLoads = $this->mLoads;
274
		}
275
276
		if ( !count( $nonErrorLoads ) ) {
277
			throw new MWException( "Empty server array given to LoadBalancer" );
278
		}
279
280
		# Scale the configured load ratios according to the dynamic load (if the load monitor supports it)
281
		$this->getLoadMonitor()->scaleLoads( $nonErrorLoads, $group, $wiki );
282
283
		$laggedSlaveMode = false;
284
285
		# No server found yet
286
		$i = false;
287
		$conn = false;
288
		# First try quickly looking through the available servers for a server that
289
		# meets our criteria
290
		$currentLoads = $nonErrorLoads;
291
		while ( count( $currentLoads ) ) {
292
			if ( $this->mAllowLagged || $laggedSlaveMode ) {
293
				$i = ArrayUtils::pickRandom( $currentLoads );
294
			} else {
295
				$i = false;
296
				if ( $this->mWaitForPos && $this->mWaitForPos->asOfTime() ) {
297
					# ChronologyProtecter causes mWaitForPos to be set via sessions.
298
					# This triggers doWait() after connect, so it's especially good to
299
					# avoid lagged servers so as to avoid just blocking in that method.
300
					$ago = microtime( true ) - $this->mWaitForPos->asOfTime();
301
					# Aim for <= 1 second of waiting (being too picky can backfire)
302
					$i = $this->getRandomNonLagged( $currentLoads, $wiki, $ago + 1 );
303
				}
304
				if ( $i === false ) {
305
					# Any server with less lag than it's 'max lag' param is preferable
306
					$i = $this->getRandomNonLagged( $currentLoads, $wiki );
307
				}
308
				if ( $i === false && count( $currentLoads ) != 0 ) {
309
					# All slaves lagged. Switch to read-only mode
310
					wfDebugLog( 'replication', "All slaves lagged. Switch to read-only mode" );
311
					$i = ArrayUtils::pickRandom( $currentLoads );
312
					$laggedSlaveMode = true;
313
				}
314
			}
315
316
			if ( $i === false ) {
317
				# pickRandom() returned false
318
				# This is permanent and means the configuration or the load monitor
319
				# wants us to return false.
320
				wfDebugLog( 'connect', __METHOD__ . ": pickRandom() returned false" );
321
322
				return false;
323
			}
324
325
			$serverName = $this->getServerName( $i );
326
			wfDebugLog( 'connect', __METHOD__ . ": Using reader #$i: $serverName..." );
327
328
			$conn = $this->openConnection( $i, $wiki );
329
			if ( !$conn ) {
330
				wfDebugLog( 'connect', __METHOD__ . ": Failed connecting to $i/$wiki" );
331
				unset( $nonErrorLoads[$i] );
332
				unset( $currentLoads[$i] );
333
				$i = false;
334
				continue;
335
			}
336
337
			// Decrement reference counter, we are finished with this connection.
338
			// It will be incremented for the caller later.
339
			if ( $wiki !== false ) {
340
				$this->reuseConnection( $conn );
341
			}
342
343
			# Return this server
344
			break;
345
		}
346
347
		# If all servers were down, quit now
348
		if ( !count( $nonErrorLoads ) ) {
349
			wfDebugLog( 'connect', "All servers down" );
350
		}
351
352
		if ( $i !== false ) {
353
			# Slave connection successful
354
			# Wait for the session master pos for a short time
355 View Code Duplication
			if ( $this->mWaitForPos && $i > 0 ) {
356
				if ( !$this->doWait( $i ) ) {
357
					$this->mServers[$i]['slave pos'] = $conn->getSlavePos();
0 ignored issues
show
Bug introduced by
The method getSlavePos cannot be called on $conn (of type boolean).

Methods can only be called on objects. This check looks for methods being called on variables that have been inferred to never be objects.

Loading history...
358
				}
359
			}
360
			if ( $this->mReadIndex <= 0 && $this->mLoads[$i] > 0 && $group === false ) {
361
				$this->mReadIndex = $i;
0 ignored issues
show
Documentation Bug introduced by
It seems like $i can also be of type string. However, the property $mReadIndex is declared as type integer. Maybe add an additional type check?

Our type inference engine has found a suspicous assignment of a value to a property. This check raises an issue when a value that can be of a mixed type is assigned to a property that is type hinted more strictly.

For example, imagine you have a variable $accountId that can either hold an Id object or false (if there is no account id yet). Your code now assigns that value to the id property of an instance of the Account class. This class holds a proper account, so the id value must no longer be false.

Either this assignment is in error or a type check should be added for that assignment.

class Id
{
    public $id;

    public function __construct($id)
    {
        $this->id = $id;
    }

}

class Account
{
    /** @var  Id $id */
    public $id;
}

$account_id = false;

if (starsAreRight()) {
    $account_id = new Id(42);
}

$account = new Account();
if ($account instanceof Id)
{
    $account->id = $account_id;
}
Loading history...
362
				# Record if the generic reader index is in "lagged slave" mode
363
				if ( $laggedSlaveMode ) {
364
					$this->laggedSlaveMode = true;
365
				}
366
			}
367
			$serverName = $this->getServerName( $i );
368
			wfDebugLog( 'connect', __METHOD__ .
369
				": using server $serverName for group '$group'\n" );
370
		}
371
372
		return $i;
373
	}
374
375
	/**
376
	 * Set the master wait position
377
	 * If a DB_SLAVE connection has been opened already, waits
378
	 * Otherwise sets a variable telling it to wait if such a connection is opened
379
	 * @param DBMasterPos $pos
380
	 */
381
	public function waitFor( $pos ) {
382
		$this->mWaitForPos = $pos;
383
		$i = $this->mReadIndex;
384
385 View Code Duplication
		if ( $i > 0 ) {
386
			if ( !$this->doWait( $i ) ) {
387
				$this->mServers[$i]['slave pos'] = $this->getAnyOpenConnection( $i )->getSlavePos();
388
				$this->laggedSlaveMode = true;
389
			}
390
		}
391
	}
392
393
	/**
394
	 * Set the master wait position and wait for a "generic" slave to catch up to it
395
	 *
396
	 * This can be used a faster proxy for waitForAll()
397
	 *
398
	 * @param DBMasterPos $pos
399
	 * @param int $timeout Max seconds to wait; default is mWaitTimeout
400
	 * @return bool Success (able to connect and no timeouts reached)
401
	 * @since 1.26
402
	 */
403
	public function waitForOne( $pos, $timeout = null ) {
404
		$this->mWaitForPos = $pos;
405
406
		$i = $this->mReadIndex;
407
		if ( $i <= 0 ) {
408
			// Pick a generic slave if there isn't one yet
409
			$readLoads = $this->mLoads;
410
			unset( $readLoads[$this->getWriterIndex()] ); // slaves only
411
			$readLoads = array_filter( $readLoads ); // with non-zero load
412
			$i = ArrayUtils::pickRandom( $readLoads );
413
		}
414
415 View Code Duplication
		if ( $i > 0 ) {
416
			$ok = $this->doWait( $i, true, $timeout );
417
		} else {
418
			$ok = true; // no applicable loads
419
		}
420
421
		return $ok;
422
	}
423
424
	/**
425
	 * Set the master wait position and wait for ALL slaves to catch up to it
426
	 * @param DBMasterPos $pos
427
	 * @param int $timeout Max seconds to wait; default is mWaitTimeout
428
	 * @return bool Success (able to connect and no timeouts reached)
429
	 */
430
	public function waitForAll( $pos, $timeout = null ) {
431
		$this->mWaitForPos = $pos;
432
		$serverCount = count( $this->mServers );
433
434
		$ok = true;
435
		for ( $i = 1; $i < $serverCount; $i++ ) {
436 View Code Duplication
			if ( $this->mLoads[$i] > 0 ) {
437
				$ok = $this->doWait( $i, true, $timeout ) && $ok;
438
			}
439
		}
440
441
		return $ok;
442
	}
443
444
	/**
445
	 * Get any open connection to a given server index, local or foreign
446
	 * Returns false if there is no connection open
447
	 *
448
	 * @param int $i
449
	 * @return DatabaseBase|bool False on failure
450
	 */
451
	public function getAnyOpenConnection( $i ) {
452
		foreach ( $this->mConns as $conns ) {
453
			if ( !empty( $conns[$i] ) ) {
454
				return reset( $conns[$i] );
455
			}
456
		}
457
458
		return false;
459
	}
460
461
	/**
462
	 * Wait for a given slave to catch up to the master pos stored in $this
463
	 * @param int $index Server index
464
	 * @param bool $open Check the server even if a new connection has to be made
465
	 * @param int $timeout Max seconds to wait; default is mWaitTimeout
466
	 * @return bool
467
	 */
468
	protected function doWait( $index, $open = false, $timeout = null ) {
469
		$close = false; // close the connection afterwards
470
471
		// Check if we already know that the DB has reached this point
472
		$server = $this->getServerName( $index );
473
		$key = $this->srvCache->makeGlobalKey( __CLASS__, 'last-known-pos', $server );
474
		/** @var DBMasterPos $knownReachedPos */
475
		$knownReachedPos = $this->srvCache->get( $key );
476
		if ( $knownReachedPos && $knownReachedPos->hasReached( $this->mWaitForPos ) ) {
0 ignored issues
show
Bug introduced by
It seems like $this->mWaitForPos can also be of type boolean; however, DBMasterPos::hasReached() does only seem to accept object<DBMasterPos>, maybe add an additional type check?

If a method or function can return multiple different values and unless you are sure that you only can receive a single value in this context, we recommend to add an additional type check:

/**
 * @return array|string
 */
function returnsDifferentValues($x) {
    if ($x) {
        return 'foo';
    }

    return array();
}

$x = returnsDifferentValues($y);
if (is_array($x)) {
    // $x is an array.
}

If this a common case that PHP Analyzer should handle natively, please let us know by opening an issue.

Loading history...
477
			wfDebugLog( 'replication', __METHOD__ .
478
				": slave $server known to be caught up (pos >= $knownReachedPos).\n" );
479
			return true;
480
		}
481
482
		// Find a connection to wait on, creating one if needed and allowed
483
		$conn = $this->getAnyOpenConnection( $index );
484
		if ( !$conn ) {
485
			if ( !$open ) {
486
				wfDebugLog( 'replication', __METHOD__ . ": no connection open for $server\n" );
487
488
				return false;
489
			} else {
490
				$conn = $this->openConnection( $index, '' );
491
				if ( !$conn ) {
492
					wfDebugLog( 'replication', __METHOD__ . ": failed to connect to $server\n" );
493
494
					return false;
495
				}
496
				// Avoid connection spam in waitForAll() when connections
497
				// are made just for the sake of doing this lag check.
498
				$close = true;
499
			}
500
		}
501
502
		wfDebugLog( 'replication', __METHOD__ . ": Waiting for slave $server to catch up...\n" );
503
		$timeout = $timeout ?: $this->mWaitTimeout;
504
		$result = $conn->masterPosWait( $this->mWaitForPos, $timeout );
0 ignored issues
show
Bug introduced by
It seems like $this->mWaitForPos can also be of type boolean; however, DatabaseBase::masterPosWait() does only seem to accept object<DBMasterPos>, maybe add an additional type check?

If a method or function can return multiple different values and unless you are sure that you only can receive a single value in this context, we recommend to add an additional type check:

/**
 * @return array|string
 */
function returnsDifferentValues($x) {
    if ($x) {
        return 'foo';
    }

    return array();
}

$x = returnsDifferentValues($y);
if (is_array($x)) {
    // $x is an array.
}

If this a common case that PHP Analyzer should handle natively, please let us know by opening an issue.

Loading history...
505
506
		if ( $result == -1 || is_null( $result ) ) {
507
			// Timed out waiting for slave, use master instead
508
			$msg = __METHOD__ . ": Timed out waiting on $server pos {$this->mWaitForPos}";
509
			wfDebugLog( 'replication', "$msg\n" );
510
			wfDebugLog( 'DBPerformance', "$msg:\n" . wfBacktrace( true ) );
511
			$ok = false;
512
		} else {
513
			wfDebugLog( 'replication', __METHOD__ . ": Done\n" );
514
			$ok = true;
515
			// Remember that the DB reached this point
516
			$this->srvCache->set( $key, $this->mWaitForPos, BagOStuff::TTL_DAY );
517
		}
518
519
		if ( $close ) {
520
			$this->closeConnection( $conn );
0 ignored issues
show
Bug introduced by
It seems like $conn can also be of type boolean; however, LoadBalancer::closeConnection() does only seem to accept object<DatabaseBase>, maybe add an additional type check?

If a method or function can return multiple different values and unless you are sure that you only can receive a single value in this context, we recommend to add an additional type check:

/**
 * @return array|string
 */
function returnsDifferentValues($x) {
    if ($x) {
        return 'foo';
    }

    return array();
}

$x = returnsDifferentValues($y);
if (is_array($x)) {
    // $x is an array.
}

If this a common case that PHP Analyzer should handle natively, please let us know by opening an issue.

Loading history...
521
		}
522
523
		return $ok;
524
	}
525
526
	/**
527
	 * Get a connection by index
528
	 * This is the main entry point for this class.
529
	 *
530
	 * @param int $i Server index
531
	 * @param array|string|bool $groups Query group(s), or false for the generic reader
532
	 * @param string|bool $wiki Wiki ID, or false for the current wiki
533
	 *
534
	 * @throws MWException
535
	 * @return DatabaseBase
536
	 */
537
	public function getConnection( $i, $groups = [], $wiki = false ) {
538
		if ( $i === null || $i === false ) {
539
			throw new MWException( 'Attempt to call ' . __METHOD__ .
540
				' with invalid server index' );
541
		}
542
543
		if ( $wiki === wfWikiID() ) {
544
			$wiki = false;
545
		}
546
547
		$groups = ( $groups === false || $groups === [] )
548
			? [ false ] // check one "group": the generic pool
549
			: (array)$groups;
550
551
		$masterOnly = ( $i == DB_MASTER || $i == $this->getWriterIndex() );
552
		$oldConnsOpened = $this->connsOpened; // connections open now
553
554
		if ( $i == DB_MASTER ) {
555
			$i = $this->getWriterIndex();
556
		} else {
557
			# Try to find an available server in any the query groups (in order)
558
			foreach ( $groups as $group ) {
559
				$groupIndex = $this->getReaderIndex( $group, $wiki );
560
				if ( $groupIndex !== false ) {
561
					$i = $groupIndex;
562
					break;
563
				}
564
			}
565
		}
566
567
		# Operation-based index
568
		if ( $i == DB_SLAVE ) {
569
			$this->mLastError = 'Unknown error'; // reset error string
570
			# Try the general server pool if $groups are unavailable.
571
			$i = in_array( false, $groups, true )
572
				? false // don't bother with this if that is what was tried above
573
				: $this->getReaderIndex( false, $wiki );
574
			# Couldn't find a working server in getReaderIndex()?
575
			if ( $i === false ) {
576
				$this->mLastError = 'No working slave server: ' . $this->mLastError;
577
578
				return $this->reportConnectionError();
579
			}
580
		}
581
582
		# Now we have an explicit index into the servers array
583
		$conn = $this->openConnection( $i, $wiki );
584
		if ( !$conn ) {
585
			return $this->reportConnectionError();
586
		}
587
588
		# Profile any new connections that happen
589
		if ( $this->connsOpened > $oldConnsOpened ) {
590
			$host = $conn->getServer();
591
			$dbname = $conn->getDBname();
592
			$trxProf = Profiler::instance()->getTransactionProfiler();
593
			$trxProf->recordConnection( $host, $dbname, $masterOnly );
594
		}
595
596
		if ( $masterOnly ) {
597
			# Make master-requested DB handles inherit any read-only mode setting
598
			$conn->setLBInfo( 'readOnlyReason', $this->getReadOnlyReason( $wiki, $conn ) );
0 ignored issues
show
Bug introduced by
It seems like $conn defined by $this->openConnection($i, $wiki) on line 583 can also be of type boolean; however, LoadBalancer::getReadOnlyReason() does only seem to accept null|object<DatabaseBase>, maybe add an additional type check?

If a method or function can return multiple different values and unless you are sure that you only can receive a single value in this context, we recommend to add an additional type check:

/**
 * @return array|string
 */
function returnsDifferentValues($x) {
    if ($x) {
        return 'foo';
    }

    return array();
}

$x = returnsDifferentValues($y);
if (is_array($x)) {
    // $x is an array.
}

If this a common case that PHP Analyzer should handle natively, please let us know by opening an issue.

Loading history...
599
		}
600
601
		return $conn;
602
	}
603
604
	/**
605
	 * Mark a foreign connection as being available for reuse under a different
606
	 * DB name or prefix. This mechanism is reference-counted, and must be called
607
	 * the same number of times as getConnection() to work.
608
	 *
609
	 * @param DatabaseBase $conn
610
	 * @throws MWException
611
	 */
612
	public function reuseConnection( $conn ) {
613
		$serverIndex = $conn->getLBInfo( 'serverIndex' );
614
		$refCount = $conn->getLBInfo( 'foreignPoolRefCount' );
615
		if ( $serverIndex === null || $refCount === null ) {
616
			wfDebug( __METHOD__ . ": this connection was not opened as a foreign connection\n" );
617
			/**
618
			 * This can happen in code like:
619
			 *   foreach ( $dbs as $db ) {
620
			 *     $conn = $lb->getConnection( DB_SLAVE, [], $db );
621
			 *     ...
622
			 *     $lb->reuseConnection( $conn );
623
			 *   }
624
			 * When a connection to the local DB is opened in this way, reuseConnection()
625
			 * should be ignored
626
			 */
627
			return;
628
		}
629
630
		$dbName = $conn->getDBname();
631
		$prefix = $conn->tablePrefix();
632
		if ( strval( $prefix ) !== '' ) {
633
			$wiki = "$dbName-$prefix";
634
		} else {
635
			$wiki = $dbName;
636
		}
637
		if ( $this->mConns['foreignUsed'][$serverIndex][$wiki] !== $conn ) {
638
			throw new MWException( __METHOD__ . ": connection not found, has " .
639
				"the connection been freed already?" );
640
		}
641
		$conn->setLBInfo( 'foreignPoolRefCount', --$refCount );
642
		if ( $refCount <= 0 ) {
643
			$this->mConns['foreignFree'][$serverIndex][$wiki] = $conn;
644
			unset( $this->mConns['foreignUsed'][$serverIndex][$wiki] );
645
			wfDebug( __METHOD__ . ": freed connection $serverIndex/$wiki\n" );
646
		} else {
647
			wfDebug( __METHOD__ . ": reference count for $serverIndex/$wiki reduced to $refCount\n" );
648
		}
649
	}
650
651
	/**
652
	 * Get a database connection handle reference
653
	 *
654
	 * The handle's methods wrap simply wrap those of a DatabaseBase handle
655
	 *
656
	 * @see LoadBalancer::getConnection() for parameter information
657
	 *
658
	 * @param int $db
659
	 * @param array|string|bool $groups Query group(s), or false for the generic reader
660
	 * @param string|bool $wiki Wiki ID, or false for the current wiki
661
	 * @return DBConnRef
662
	 */
663
	public function getConnectionRef( $db, $groups = [], $wiki = false ) {
664
		return new DBConnRef( $this, $this->getConnection( $db, $groups, $wiki ) );
665
	}
666
667
	/**
668
	 * Get a database connection handle reference without connecting yet
669
	 *
670
	 * The handle's methods wrap simply wrap those of a DatabaseBase handle
671
	 *
672
	 * @see LoadBalancer::getConnection() for parameter information
673
	 *
674
	 * @param int $db
675
	 * @param array|string|bool $groups Query group(s), or false for the generic reader
676
	 * @param string|bool $wiki Wiki ID, or false for the current wiki
677
	 * @return DBConnRef
678
	 */
679
	public function getLazyConnectionRef( $db, $groups = [], $wiki = false ) {
680
		return new DBConnRef( $this, [ $db, $groups, $wiki ] );
681
	}
682
683
	/**
684
	 * Open a connection to the server given by the specified index
685
	 * Index must be an actual index into the array.
686
	 * If the server is already open, returns it.
687
	 *
688
	 * On error, returns false, and the connection which caused the
689
	 * error will be available via $this->mErrorConnection.
690
	 *
691
	 * @note If disable() was called on this LoadBalancer, this method will throw a DBAccessError.
692
	 *
693
	 * @param int $i Server index
694
	 * @param string|bool $wiki Wiki ID, or false for the current wiki
695
	 * @return DatabaseBase|bool Returns false on errors
696
	 */
697
	public function openConnection( $i, $wiki = false ) {
698
		if ( $wiki !== false ) {
699
			$conn = $this->openForeignConnection( $i, $wiki );
0 ignored issues
show
Bug introduced by
It seems like $wiki defined by parameter $wiki on line 697 can also be of type boolean; however, LoadBalancer::openForeignConnection() does only seem to accept string, maybe add an additional type check?

This check looks at variables that have been passed in as parameters and are passed out again to other methods.

If the outgoing method call has stricter type requirements than the method itself, an issue is raised.

An additional type check may prevent trouble.

Loading history...
700
		} elseif ( isset( $this->mConns['local'][$i][0] ) ) {
701
			$conn = $this->mConns['local'][$i][0];
702
		} else {
703
			$server = $this->mServers[$i];
704
			$server['serverIndex'] = $i;
705
			$conn = $this->reallyOpenConnection( $server, false );
706
			$serverName = $this->getServerName( $i );
707
			if ( $conn->isOpen() ) {
708
				wfDebugLog( 'connect', "Connected to database $i at $serverName\n" );
709
				$this->mConns['local'][$i][0] = $conn;
710
			} else {
711
				wfDebugLog( 'connect', "Failed to connect to database $i at $serverName\n" );
712
				$this->mErrorConnection = $conn;
713
				$conn = false;
714
			}
715
		}
716
717
		if ( $conn && !$conn->isOpen() ) {
718
			// Connection was made but later unrecoverably lost for some reason.
719
			// Do not return a handle that will just throw exceptions on use,
720
			// but let the calling code (e.g. getReaderIndex) try another server.
721
			// See DatabaseMyslBase::ping() for how this can happen.
722
			$this->mErrorConnection = $conn;
723
			$conn = false;
724
		}
725
726
		return $conn;
727
	}
728
729
	/**
730
	 * Open a connection to a foreign DB, or return one if it is already open.
731
	 *
732
	 * Increments a reference count on the returned connection which locks the
733
	 * connection to the requested wiki. This reference count can be
734
	 * decremented by calling reuseConnection().
735
	 *
736
	 * If a connection is open to the appropriate server already, but with the wrong
737
	 * database, it will be switched to the right database and returned, as long as
738
	 * it has been freed first with reuseConnection().
739
	 *
740
	 * On error, returns false, and the connection which caused the
741
	 * error will be available via $this->mErrorConnection.
742
	 *
743
	 * @note If disable() was called on this LoadBalancer, this method will throw a DBAccessError.
744
	 *
745
	 * @param int $i Server index
746
	 * @param string $wiki Wiki ID to open
747
	 * @return DatabaseBase
748
	 */
749
	private function openForeignConnection( $i, $wiki ) {
750
		list( $dbName, $prefix ) = wfSplitWikiID( $wiki );
751
		if ( isset( $this->mConns['foreignUsed'][$i][$wiki] ) ) {
752
			// Reuse an already-used connection
753
			$conn = $this->mConns['foreignUsed'][$i][$wiki];
754
			wfDebug( __METHOD__ . ": reusing connection $i/$wiki\n" );
755
		} elseif ( isset( $this->mConns['foreignFree'][$i][$wiki] ) ) {
756
			// Reuse a free connection for the same wiki
757
			$conn = $this->mConns['foreignFree'][$i][$wiki];
758
			unset( $this->mConns['foreignFree'][$i][$wiki] );
759
			$this->mConns['foreignUsed'][$i][$wiki] = $conn;
760
			wfDebug( __METHOD__ . ": reusing free connection $i/$wiki\n" );
761
		} elseif ( !empty( $this->mConns['foreignFree'][$i] ) ) {
762
			// Reuse a connection from another wiki
763
			$conn = reset( $this->mConns['foreignFree'][$i] );
764
			$oldWiki = key( $this->mConns['foreignFree'][$i] );
765
766
			// The empty string as a DB name means "don't care".
767
			// DatabaseMysqlBase::open() already handle this on connection.
768
			if ( $dbName !== '' && !$conn->selectDB( $dbName ) ) {
769
				$this->mLastError = "Error selecting database $dbName on server " .
770
					$conn->getServer() . " from client host " . wfHostname() . "\n";
771
				$this->mErrorConnection = $conn;
772
				$conn = false;
773
			} else {
774
				$conn->tablePrefix( $prefix );
775
				unset( $this->mConns['foreignFree'][$i][$oldWiki] );
776
				$this->mConns['foreignUsed'][$i][$wiki] = $conn;
777
				wfDebug( __METHOD__ . ": reusing free connection from $oldWiki for $wiki\n" );
778
			}
779
		} else {
780
			// Open a new connection
781
			$server = $this->mServers[$i];
782
			$server['serverIndex'] = $i;
783
			$server['foreignPoolRefCount'] = 0;
784
			$server['foreign'] = true;
785
			$conn = $this->reallyOpenConnection( $server, $dbName );
786
			if ( !$conn->isOpen() ) {
787
				wfDebug( __METHOD__ . ": error opening connection for $i/$wiki\n" );
788
				$this->mErrorConnection = $conn;
789
				$conn = false;
790
			} else {
791
				$conn->tablePrefix( $prefix );
792
				$this->mConns['foreignUsed'][$i][$wiki] = $conn;
793
				wfDebug( __METHOD__ . ": opened new connection for $i/$wiki\n" );
794
			}
795
		}
796
797
		// Increment reference count
798
		if ( $conn ) {
799
			$refCount = $conn->getLBInfo( 'foreignPoolRefCount' );
800
			$conn->setLBInfo( 'foreignPoolRefCount', $refCount + 1 );
801
		}
802
803
		return $conn;
804
	}
805
806
	/**
807
	 * Test if the specified index represents an open connection
808
	 *
809
	 * @param int $index Server index
810
	 * @access private
811
	 * @return bool
812
	 */
813
	private function isOpen( $index ) {
814
		if ( !is_integer( $index ) ) {
815
			return false;
816
		}
817
818
		return (bool)$this->getAnyOpenConnection( $index );
819
	}
820
821
	/**
822
	 * Really opens a connection. Uncached.
823
	 * Returns a Database object whether or not the connection was successful.
824
	 * @access private
825
	 *
826
	 * @param array $server
827
	 * @param bool $dbNameOverride
828
	 * @throws MWException
829
	 * @return DatabaseBase
830
	 */
831
	protected function reallyOpenConnection( $server, $dbNameOverride = false ) {
832
		if ( $this->disabled ) {
833
			throw new DBAccessError();
834
		}
835
836
		if ( !is_array( $server ) ) {
837
			throw new MWException( 'You must update your load-balancing configuration. ' .
838
				'See DefaultSettings.php entry for $wgDBservers.' );
839
		}
840
841
		if ( $dbNameOverride !== false ) {
842
			$server['dbname'] = $dbNameOverride;
843
		}
844
845
		// Let the handle know what the cluster master is (e.g. "db1052")
846
		$masterName = $this->getServerName( 0 );
847
		$server['clusterMasterHost'] = $masterName;
848
849
		// Log when many connection are made on requests
850
		if ( ++$this->connsOpened >= self::CONN_HELD_WARN_THRESHOLD ) {
851
			wfDebugLog( 'DBPerformance', __METHOD__ . ": " .
852
				"{$this->connsOpened}+ connections made (master=$masterName)\n" .
853
				wfBacktrace( true ) );
854
		}
855
856
		# Create object
857
		try {
858
			$db = DatabaseBase::factory( $server['type'], $server );
859
		} catch ( DBConnectionError $e ) {
860
			// FIXME: This is probably the ugliest thing I have ever done to
861
			// PHP. I'm half-expecting it to segfault, just out of disgust. -- TS
862
			$db = $e->db;
863
		}
864
865
		$db->setLBInfo( $server );
866
		$db->setLazyMasterHandle(
867
			$this->getLazyConnectionRef( DB_MASTER, [], $db->getWikiID() )
868
		);
869
		$db->setTransactionProfiler( $this->trxProfiler );
870
		if ( $this->trxRoundId !== false ) {
871
			$this->applyTransactionRoundFlags( $db );
872
		}
873
874
		if ( $server['serverIndex'] === $this->getWriterIndex() ) {
875
			foreach ( $this->trxRecurringCallbacks as $name => $callback ) {
876
				$db->setTransactionListener( $name, $callback );
877
			}
878
		}
879
880
		return $db;
881
	}
882
883
	/**
884
	 * @throws DBConnectionError
885
	 * @return bool
886
	 */
887
	private function reportConnectionError() {
888
		$conn = $this->mErrorConnection; // The connection which caused the error
889
		$context = [
890
			'method' => __METHOD__,
891
			'last_error' => $this->mLastError,
892
		];
893
894
		if ( !is_object( $conn ) ) {
895
			// No last connection, probably due to all servers being too busy
896
			wfLogDBError(
897
				"LB failure with no last connection. Connection error: {last_error}",
898
				$context
899
			);
900
901
			// If all servers were busy, mLastError will contain something sensible
902
			throw new DBConnectionError( null, $this->mLastError );
903
		} else {
904
			$context['db_server'] = $conn->getProperty( 'mServer' );
905
			wfLogDBError(
906
				"Connection error: {last_error} ({db_server})",
907
				$context
908
			);
909
910
			// throws DBConnectionError
911
			$conn->reportConnectionError( "{$this->mLastError} ({$context['db_server']})" );
912
		}
913
914
		return false; /* not reached */
915
	}
916
917
	/**
918
	 * @return int
919
	 * @since 1.26
920
	 */
921
	public function getWriterIndex() {
922
		return 0;
923
	}
924
925
	/**
926
	 * Returns true if the specified index is a valid server index
927
	 *
928
	 * @param string $i
929
	 * @return bool
930
	 */
931
	public function haveIndex( $i ) {
932
		return array_key_exists( $i, $this->mServers );
933
	}
934
935
	/**
936
	 * Returns true if the specified index is valid and has non-zero load
937
	 *
938
	 * @param string $i
939
	 * @return bool
940
	 */
941
	public function isNonZeroLoad( $i ) {
942
		return array_key_exists( $i, $this->mServers ) && $this->mLoads[$i] != 0;
943
	}
944
945
	/**
946
	 * Get the number of defined servers (not the number of open connections)
947
	 *
948
	 * @return int
949
	 */
950
	public function getServerCount() {
951
		return count( $this->mServers );
952
	}
953
954
	/**
955
	 * Get the host name or IP address of the server with the specified index
956
	 * Prefer a readable name if available.
957
	 * @param string $i
958
	 * @return string
959
	 */
960
	public function getServerName( $i ) {
961
		if ( isset( $this->mServers[$i]['hostName'] ) ) {
962
			$name = $this->mServers[$i]['hostName'];
963 View Code Duplication
		} elseif ( isset( $this->mServers[$i]['host'] ) ) {
964
			$name = $this->mServers[$i]['host'];
965
		} else {
966
			$name = '';
967
		}
968
969
		return ( $name != '' ) ? $name : 'localhost';
970
	}
971
972
	/**
973
	 * Return the server info structure for a given index, or false if the index is invalid.
974
	 * @param int $i
975
	 * @return array|bool
976
	 */
977
	public function getServerInfo( $i ) {
978
		if ( isset( $this->mServers[$i] ) ) {
979
			return $this->mServers[$i];
980
		} else {
981
			return false;
982
		}
983
	}
984
985
	/**
986
	 * Sets the server info structure for the given index. Entry at index $i
987
	 * is created if it doesn't exist
988
	 * @param int $i
989
	 * @param array $serverInfo
990
	 */
991
	public function setServerInfo( $i, array $serverInfo ) {
992
		$this->mServers[$i] = $serverInfo;
993
	}
994
995
	/**
996
	 * Get the current master position for chronology control purposes
997
	 * @return mixed
998
	 */
999
	public function getMasterPos() {
1000
		# If this entire request was served from a slave without opening a connection to the
1001
		# master (however unlikely that may be), then we can fetch the position from the slave.
1002
		$masterConn = $this->getAnyOpenConnection( 0 );
1003
		if ( !$masterConn ) {
1004
			$serverCount = count( $this->mServers );
1005
			for ( $i = 1; $i < $serverCount; $i++ ) {
1006
				$conn = $this->getAnyOpenConnection( $i );
1007
				if ( $conn ) {
1008
					return $conn->getSlavePos();
0 ignored issues
show
Bug introduced by
The method getSlavePos cannot be called on $conn (of type boolean).

Methods can only be called on objects. This check looks for methods being called on variables that have been inferred to never be objects.

Loading history...
1009
				}
1010
			}
1011
		} else {
1012
			return $masterConn->getMasterPos();
1013
		}
1014
1015
		return false;
1016
	}
1017
1018
	/**
1019
	 * Disable this load balancer. All connections are closed, and any attempt to
1020
	 * open a new connection will result in a DBAccessError.
1021
	 *
1022
	 * @since 1.27
1023
	 */
1024
	public function disable() {
1025
		$this->closeAll();
1026
		$this->disabled = true;
1027
	}
1028
1029
	/**
1030
	 * Close all open connections
1031
	 */
1032
	public function closeAll() {
1033
		$this->forEachOpenConnection( function ( DatabaseBase $conn ) {
1034
			$conn->close();
1035
		} );
1036
1037
		$this->mConns = [
0 ignored issues
show
Documentation Bug introduced by
It seems like array('local' => array()...oreignUsed' => array()) of type array<string,array,{"loc..."foreignUsed":"array"}> is incompatible with the declared type array<integer,array> of property $mConns.

Our type inference engine has found an assignment to a property that is incompatible with the declared type of that property.

Either this assignment is in error or the assigned type should be added to the documentation/type hint for that property..

Loading history...
1038
			'local' => [],
1039
			'foreignFree' => [],
1040
			'foreignUsed' => [],
1041
		];
1042
		$this->connsOpened = 0;
1043
	}
1044
1045
	/**
1046
	 * Close a connection
1047
	 * Using this function makes sure the LoadBalancer knows the connection is closed.
1048
	 * If you use $conn->close() directly, the load balancer won't update its state.
1049
	 * @param DatabaseBase $conn
1050
	 */
1051
	public function closeConnection( $conn ) {
1052
		$done = false;
1053
		foreach ( $this->mConns as $i1 => $conns2 ) {
1054
			foreach ( $conns2 as $i2 => $conns3 ) {
1055
				foreach ( $conns3 as $i3 => $candidateConn ) {
1056
					if ( $conn === $candidateConn ) {
1057
						$conn->close();
1058
						unset( $this->mConns[$i1][$i2][$i3] );
1059
						--$this->connsOpened;
1060
						$done = true;
1061
						break;
1062
					}
1063
				}
1064
			}
1065
		}
1066
		if ( !$done ) {
1067
			$conn->close();
1068
		}
1069
	}
1070
1071
	/**
1072
	 * Commit transactions on all open connections
1073
	 * @param string $fname Caller name
1074
	 * @throws DBExpectedError
1075
	 */
1076
	public function commitAll( $fname = __METHOD__ ) {
1077
		$failures = [];
1078
1079
		$restore = ( $this->trxRoundId !== false );
1080
		$this->trxRoundId = false;
1081
		$this->forEachOpenConnection(
1082
			function ( DatabaseBase $conn ) use ( $fname, $restore, &$failures ) {
1083
				try {
1084
					$conn->commit( $fname, $conn::FLUSHING_ALL_PEERS );
1085
				} catch ( DBError $e ) {
1086
					MWExceptionHandler::logException( $e );
1087
					$failures[] = "{$conn->getServer()}: {$e->getMessage()}";
1088
				}
1089
				if ( $restore && $conn->getLBInfo( 'master' ) ) {
1090
					$this->undoTransactionRoundFlags( $conn );
1091
				}
1092
			}
1093
		);
1094
1095
		if ( $failures ) {
1096
			throw new DBExpectedError(
1097
				null,
1098
				"Commit failed on server(s) " . implode( "\n", array_unique( $failures ) )
1099
			);
1100
		}
1101
	}
1102
1103
	/**
1104
	 * Perform all pre-commit callbacks that remain part of the atomic transactions
1105
	 * and disable any post-commit callbacks until runMasterPostTrxCallbacks()
1106
	 * @since 1.28
1107
	 */
1108
	public function finalizeMasterChanges() {
1109
		$this->forEachOpenMasterConnection( function ( DatabaseBase $conn ) {
1110
			// Any error should cause all DB transactions to be rolled back together
1111
			$conn->setTrxEndCallbackSuppression( false );
1112
			$conn->runOnTransactionPreCommitCallbacks();
1113
			// Defer post-commit callbacks until COMMIT finishes for all DBs
1114
			$conn->setTrxEndCallbackSuppression( true );
1115
		} );
1116
	}
1117
1118
	/**
1119
	 * Perform all pre-commit checks for things like replication safety
1120
	 * @param array $options Includes:
1121
	 *   - maxWriteDuration : max write query duration time in seconds
1122
	 * @throws DBTransactionError
1123
	 * @since 1.28
1124
	 */
1125
	public function approveMasterChanges( array $options ) {
1126
		$limit = isset( $options['maxWriteDuration'] ) ? $options['maxWriteDuration'] : 0;
1127
		$this->forEachOpenMasterConnection( function ( DatabaseBase $conn ) use ( $limit ) {
1128
			// If atomic sections or explicit transactions are still open, some caller must have
1129
			// caught an exception but failed to properly rollback any changes. Detect that and
1130
			// throw and error (causing rollback).
1131
			if ( $conn->explicitTrxActive() ) {
1132
				throw new DBTransactionError(
1133
					$conn,
1134
					"Explicit transaction still active. A caller may have caught an error."
1135
				);
1136
			}
1137
			// Assert that the time to replicate the transaction will be sane.
1138
			// If this fails, then all DB transactions will be rollback back together.
1139
			$time = $conn->pendingWriteQueryDuration( $conn::ESTIMATE_DB_APPLY );
1140
			if ( $limit > 0 && $time > $limit ) {
1141
				throw new DBTransactionError(
1142
					$conn,
1143
					wfMessage( 'transaction-duration-limit-exceeded', $time, $limit )->text()
1144
				);
1145
			}
1146
			// If a connection sits idle while slow queries execute on another, that connection
1147
			// may end up dropped before the commit round is reached. Ping servers to detect this.
1148
			if ( $conn->writesOrCallbacksPending() && !$conn->ping() ) {
1149
				throw new DBTransactionError(
1150
					$conn,
1151
					"A connection to the {$conn->getDBname()} database was lost before commit."
1152
				);
1153
			}
1154
		} );
1155
	}
1156
1157
	/**
1158
	 * Flush any master transaction snapshots and set DBO_TRX (if DBO_DEFAULT is set)
1159
	 *
1160
	 * The DBO_TRX setting will be reverted to the default in each of these methods:
1161
	 *   - commitMasterChanges()
1162
	 *   - rollbackMasterChanges()
1163
	 *   - commitAll()
1164
	 * This allows for custom transaction rounds from any outer transaction scope.
1165
	 *
1166
	 * @param string $fname
1167
	 * @throws DBExpectedError
1168
	 * @since 1.28
1169
	 */
1170
	public function beginMasterChanges( $fname = __METHOD__ ) {
1171
		if ( $this->trxRoundId !== false ) {
1172
			throw new DBTransactionError(
1173
				null,
1174
				"$fname: Transaction round '{$this->trxRoundId}' already started."
1175
			);
1176
		}
1177
		$this->trxRoundId = $fname;
1178
1179
		$failures = [];
1180
		$this->forEachOpenMasterConnection(
1181
			function ( DatabaseBase $conn ) use ( $fname, &$failures ) {
1182
				$conn->setTrxEndCallbackSuppression( true );
1183
				try {
1184
					$conn->clearSnapshot( $fname );
1185
				} catch ( DBError $e ) {
1186
					MWExceptionHandler::logException( $e );
1187
					$failures[] = "{$conn->getServer()}: {$e->getMessage()}";
1188
				}
1189
				$conn->setTrxEndCallbackSuppression( false );
1190
				$this->applyTransactionRoundFlags( $conn );
1191
			}
1192
		);
1193
1194 View Code Duplication
		if ( $failures ) {
1195
			throw new DBExpectedError(
1196
				null,
1197
				"$fname: Flush failed on server(s) " . implode( "\n", array_unique( $failures ) )
1198
			);
1199
		}
1200
	}
1201
1202
	/**
1203
	 * Issue COMMIT on all master connections where writes where done
1204
	 * @param string $fname Caller name
1205
	 * @throws DBExpectedError
1206
	 */
1207
	public function commitMasterChanges( $fname = __METHOD__ ) {
1208
		$failures = [];
1209
1210
		$restore = ( $this->trxRoundId !== false );
1211
		$this->trxRoundId = false;
1212
		$this->forEachOpenMasterConnection(
1213
			function ( DatabaseBase $conn ) use ( $fname, $restore, &$failures ) {
1214
				try {
1215
					if ( $conn->writesOrCallbacksPending() ) {
1216
						$conn->commit( $fname, $conn::FLUSHING_ALL_PEERS );
1217
					} elseif ( $restore ) {
1218
						$conn->clearSnapshot( $fname );
1219
					}
1220
				} catch ( DBError $e ) {
1221
					MWExceptionHandler::logException( $e );
1222
					$failures[] = "{$conn->getServer()}: {$e->getMessage()}";
1223
				}
1224
				if ( $restore ) {
1225
					$this->undoTransactionRoundFlags( $conn );
1226
				}
1227
			}
1228
		);
1229
1230 View Code Duplication
		if ( $failures ) {
1231
			throw new DBExpectedError(
1232
				null,
1233
				"$fname: Commit failed on server(s) " . implode( "\n", array_unique( $failures ) )
1234
			);
1235
		}
1236
	}
1237
1238
	/**
1239
	 * Issue all pending post-COMMIT/ROLLBACK callbacks
1240
	 * @param integer $type IDatabase::TRIGGER_* constant
1241
	 * @return Exception|null The first exception or null if there were none
1242
	 * @since 1.28
1243
	 */
1244
	public function runMasterPostTrxCallbacks( $type ) {
1245
		$e = null; // first exception
1246
		$this->forEachOpenMasterConnection( function ( DatabaseBase $conn ) use ( $type, &$e ) {
1247
			$conn->clearSnapshot( __METHOD__ ); // clear no-op transactions
1248
1249
			$conn->setTrxEndCallbackSuppression( false );
1250
			try {
1251
				$conn->runOnTransactionIdleCallbacks( $type );
1252
			} catch ( Exception $ex ) {
1253
				$e = $e ?: $ex;
1254
			}
1255
			try {
1256
				$conn->runTransactionListenerCallbacks( $type );
1257
			} catch ( Exception $ex ) {
1258
				$e = $e ?: $ex;
1259
			}
1260
		} );
1261
1262
		return $e;
1263
	}
1264
1265
	/**
1266
	 * Issue ROLLBACK only on master, only if queries were done on connection
1267
	 * @param string $fname Caller name
1268
	 * @throws DBExpectedError
1269
	 * @since 1.23
1270
	 */
1271
	public function rollbackMasterChanges( $fname = __METHOD__ ) {
1272
		$restore = ( $this->trxRoundId !== false );
1273
		$this->trxRoundId = false;
1274
		$this->forEachOpenMasterConnection(
1275
			function ( DatabaseBase $conn ) use ( $fname, $restore ) {
1276
				if ( $conn->writesOrCallbacksPending() ) {
1277
					$conn->rollback( $fname, $conn::FLUSHING_ALL_PEERS );
1278
				}
1279
				if ( $restore ) {
1280
					$this->undoTransactionRoundFlags( $conn );
1281
				}
1282
			}
1283
		);
1284
	}
1285
1286
	/**
1287
	 * Suppress all pending post-COMMIT/ROLLBACK callbacks
1288
	 * @return Exception|null The first exception or null if there were none
1289
	 * @since 1.28
1290
	 */
1291
	public function suppressTransactionEndCallbacks() {
1292
		$this->forEachOpenMasterConnection( function ( DatabaseBase $conn ) {
1293
			$conn->setTrxEndCallbackSuppression( true );
1294
		} );
1295
	}
1296
1297
	/**
1298
	 * @param DatabaseBase $conn
1299
	 */
1300
	private function applyTransactionRoundFlags( DatabaseBase $conn ) {
1301
		if ( $conn->getFlag( DBO_DEFAULT ) ) {
1302
			// DBO_TRX is controlled entirely by CLI mode presence with DBO_DEFAULT.
1303
			// Force DBO_TRX even in CLI mode since a commit round is expected soon.
1304
			$conn->setFlag( DBO_TRX, $conn::REMEMBER_PRIOR );
1305
			// If config has explicitly requested DBO_TRX be either on or off by not
1306
			// setting DBO_DEFAULT, then respect that. Forcing no transactions is useful
1307
			// for things like blob stores (ExternalStore) which want auto-commit mode.
1308
		}
1309
	}
1310
1311
	/**
1312
	 * @param DatabaseBase $conn
1313
	 */
1314
	private function undoTransactionRoundFlags( DatabaseBase $conn ) {
1315
		if ( $conn->getFlag( DBO_DEFAULT ) ) {
1316
			$conn->restoreFlags( $conn::RESTORE_PRIOR );
1317
		}
1318
	}
1319
1320
	/**
1321
	 * @return bool Whether a master connection is already open
1322
	 * @since 1.24
1323
	 */
1324
	public function hasMasterConnection() {
1325
		return $this->isOpen( $this->getWriterIndex() );
1326
	}
1327
1328
	/**
1329
	 * Determine if there are pending changes in a transaction by this thread
1330
	 * @since 1.23
1331
	 * @return bool
1332
	 */
1333
	public function hasMasterChanges() {
1334
		$masterIndex = $this->getWriterIndex();
1335
		foreach ( $this->mConns as $conns2 ) {
1336
			if ( empty( $conns2[$masterIndex] ) ) {
1337
				continue;
1338
			}
1339
			/** @var DatabaseBase $conn */
1340
			foreach ( $conns2[$masterIndex] as $conn ) {
1341
				if ( $conn->trxLevel() && $conn->writesOrCallbacksPending() ) {
1342
					return true;
1343
				}
1344
			}
1345
		}
1346
		return false;
1347
	}
1348
1349
	/**
1350
	 * Get the timestamp of the latest write query done by this thread
1351
	 * @since 1.25
1352
	 * @return float|bool UNIX timestamp or false
1353
	 */
1354 View Code Duplication
	public function lastMasterChangeTimestamp() {
1355
		$lastTime = false;
1356
		$masterIndex = $this->getWriterIndex();
1357
		foreach ( $this->mConns as $conns2 ) {
1358
			if ( empty( $conns2[$masterIndex] ) ) {
1359
				continue;
1360
			}
1361
			/** @var DatabaseBase $conn */
1362
			foreach ( $conns2[$masterIndex] as $conn ) {
1363
				$lastTime = max( $lastTime, $conn->lastDoneWrites() );
1364
			}
1365
		}
1366
		return $lastTime;
1367
	}
1368
1369
	/**
1370
	 * Check if this load balancer object had any recent or still
1371
	 * pending writes issued against it by this PHP thread
1372
	 *
1373
	 * @param float $age How many seconds ago is "recent" [defaults to mWaitTimeout]
1374
	 * @return bool
1375
	 * @since 1.25
1376
	 */
1377
	public function hasOrMadeRecentMasterChanges( $age = null ) {
1378
		$age = ( $age === null ) ? $this->mWaitTimeout : $age;
1379
1380
		return ( $this->hasMasterChanges()
1381
			|| $this->lastMasterChangeTimestamp() > microtime( true ) - $age );
1382
	}
1383
1384
	/**
1385
	 * Get the list of callers that have pending master changes
1386
	 *
1387
	 * @return array
1388
	 * @since 1.27
1389
	 */
1390 View Code Duplication
	public function pendingMasterChangeCallers() {
1391
		$fnames = [];
1392
1393
		$masterIndex = $this->getWriterIndex();
1394
		foreach ( $this->mConns as $conns2 ) {
1395
			if ( empty( $conns2[$masterIndex] ) ) {
1396
				continue;
1397
			}
1398
			/** @var DatabaseBase $conn */
1399
			foreach ( $conns2[$masterIndex] as $conn ) {
1400
				$fnames = array_merge( $fnames, $conn->pendingWriteCallers() );
1401
			}
1402
		}
1403
1404
		return $fnames;
1405
	}
1406
1407
	/**
1408
	 * @param mixed $value
1409
	 * @return mixed
1410
	 */
1411
	public function waitTimeout( $value = null ) {
1412
		return wfSetVar( $this->mWaitTimeout, $value );
1413
	}
1414
1415
	/**
1416
	 * @note This method will trigger a DB connection if not yet done
1417
	 *
1418
	 * @param string|bool $wiki Wiki ID, or false for the current wiki
1419
	 * @return bool Whether the generic connection for reads is highly "lagged"
1420
	 */
1421
	public function getLaggedSlaveMode( $wiki = false ) {
1422
		// No-op if there is only one DB (also avoids recursion)
1423
		if ( !$this->laggedSlaveMode && $this->getServerCount() > 1 ) {
1424
			try {
1425
				// See if laggedSlaveMode gets set
1426
				$conn = $this->getConnection( DB_SLAVE, false, $wiki );
1427
				$this->reuseConnection( $conn );
1428
			} catch ( DBConnectionError $e ) {
1429
				// Avoid expensive re-connect attempts and failures
1430
				$this->slavesDownMode = true;
1431
				$this->laggedSlaveMode = true;
1432
			}
1433
		}
1434
1435
		return $this->laggedSlaveMode;
1436
	}
1437
1438
	/**
1439
	 * @note This method will never cause a new DB connection
1440
	 * @return bool Whether any generic connection used for reads was highly "lagged"
1441
	 * @since 1.27
1442
	 */
1443
	public function laggedSlaveUsed() {
1444
		return $this->laggedSlaveMode;
1445
	}
1446
1447
	/**
1448
	 * @note This method may trigger a DB connection if not yet done
1449
	 * @param string|bool $wiki Wiki ID, or false for the current wiki
1450
	 * @param DatabaseBase|null DB master connection; used to avoid loops [optional]
1451
	 * @return string|bool Reason the master is read-only or false if it is not
1452
	 * @since 1.27
1453
	 */
1454
	public function getReadOnlyReason( $wiki = false, DatabaseBase $conn = null ) {
1455
		if ( $this->readOnlyReason !== false ) {
1456
			return $this->readOnlyReason;
1457
		} elseif ( $this->getLaggedSlaveMode( $wiki ) ) {
1458
			if ( $this->slavesDownMode ) {
1459
				return 'The database has been automatically locked ' .
1460
					'until the slave database servers become available';
1461
			} else {
1462
				return 'The database has been automatically locked ' .
1463
					'while the slave database servers catch up to the master.';
1464
			}
1465
		} elseif ( $this->masterRunningReadOnly( $wiki, $conn ) ) {
0 ignored issues
show
Bug introduced by
It seems like $wiki defined by parameter $wiki on line 1454 can also be of type boolean; however, LoadBalancer::masterRunningReadOnly() does only seem to accept string, maybe add an additional type check?

This check looks at variables that have been passed in as parameters and are passed out again to other methods.

If the outgoing method call has stricter type requirements than the method itself, an issue is raised.

An additional type check may prevent trouble.

Loading history...
1466
			return 'The database master is running in read-only mode.';
1467
		}
1468
1469
		return false;
1470
	}
1471
1472
	/**
1473
	 * @param string $wiki Wiki ID, or false for the current wiki
1474
	 * @param DatabaseBase|null DB master connectionl used to avoid loops [optional]
1475
	 * @return bool
1476
	 */
1477
	private function masterRunningReadOnly( $wiki, DatabaseBase $conn = null ) {
1478
		$cache = $this->wanCache;
1479
		$masterServer = $this->getServerName( $this->getWriterIndex() );
1480
1481
		return (bool)$cache->getWithSetCallback(
1482
			$cache->makeGlobalKey( __CLASS__, 'server-read-only', $masterServer ),
1483
			self::TTL_CACHE_READONLY,
1484
			function () use ( $wiki, $conn ) {
1485
				$this->trxProfiler->setSilenced( true );
1486
				try {
1487
					$dbw = $conn ?: $this->getConnection( DB_MASTER, [], $wiki );
1488
					$readOnly = (int)$dbw->serverIsReadOnly();
1489
				} catch ( DBError $e ) {
1490
					$readOnly = 0;
1491
				}
1492
				$this->trxProfiler->setSilenced( false );
1493
				return $readOnly;
1494
			},
1495
			[ 'pcTTL' => $cache::TTL_PROC_LONG, 'busyValue' => 0 ]
1496
		);
1497
	}
1498
1499
	/**
1500
	 * Disables/enables lag checks
1501
	 * @param null|bool $mode
1502
	 * @return bool
1503
	 */
1504
	public function allowLagged( $mode = null ) {
1505
		if ( $mode === null ) {
1506
			return $this->mAllowLagged;
1507
		}
1508
		$this->mAllowLagged = $mode;
1509
1510
		return $this->mAllowLagged;
1511
	}
1512
1513
	/**
1514
	 * @return bool
1515
	 */
1516
	public function pingAll() {
1517
		$success = true;
1518
		$this->forEachOpenConnection( function ( DatabaseBase $conn ) use ( &$success ) {
1519
			if ( !$conn->ping() ) {
1520
				$success = false;
1521
			}
1522
		} );
1523
1524
		return $success;
1525
	}
1526
1527
	/**
1528
	 * Call a function with each open connection object
1529
	 * @param callable $callback
1530
	 * @param array $params
1531
	 */
1532
	public function forEachOpenConnection( $callback, array $params = [] ) {
1533 View Code Duplication
		foreach ( $this->mConns as $connsByServer ) {
1534
			foreach ( $connsByServer as $serverConns ) {
1535
				foreach ( $serverConns as $conn ) {
1536
					$mergedParams = array_merge( [ $conn ], $params );
1537
					call_user_func_array( $callback, $mergedParams );
1538
				}
1539
			}
1540
		}
1541
	}
1542
1543
	/**
1544
	 * Call a function with each open connection object to a master
1545
	 * @param callable $callback
1546
	 * @param array $params
1547
	 * @since 1.28
1548
	 */
1549
	public function forEachOpenMasterConnection( $callback, array $params = [] ) {
1550
		$masterIndex = $this->getWriterIndex();
1551 View Code Duplication
		foreach ( $this->mConns as $connsByServer ) {
1552
			if ( isset( $connsByServer[$masterIndex] ) ) {
1553
				/** @var DatabaseBase $conn */
1554
				foreach ( $connsByServer[$masterIndex] as $conn ) {
1555
					$mergedParams = array_merge( [ $conn ], $params );
1556
					call_user_func_array( $callback, $mergedParams );
1557
				}
1558
			}
1559
		}
1560
	}
1561
1562
	/**
1563
	 * Get the hostname and lag time of the most-lagged slave
1564
	 *
1565
	 * This is useful for maintenance scripts that need to throttle their updates.
1566
	 * May attempt to open connections to slaves on the default DB. If there is
1567
	 * no lag, the maximum lag will be reported as -1.
1568
	 *
1569
	 * @param bool|string $wiki Wiki ID, or false for the default database
1570
	 * @return array ( host, max lag, index of max lagged host )
1571
	 */
1572
	public function getMaxLag( $wiki = false ) {
1573
		$maxLag = -1;
1574
		$host = '';
1575
		$maxIndex = 0;
1576
1577
		if ( $this->getServerCount() <= 1 ) {
1578
			return [ $host, $maxLag, $maxIndex ]; // no replication = no lag
1579
		}
1580
1581
		$lagTimes = $this->getLagTimes( $wiki );
1582
		foreach ( $lagTimes as $i => $lag ) {
1583
			if ( $this->mLoads[$i] > 0 && $lag > $maxLag ) {
1584
				$maxLag = $lag;
1585
				$host = $this->mServers[$i]['host'];
1586
				$maxIndex = $i;
1587
			}
1588
		}
1589
1590
		return [ $host, $maxLag, $maxIndex ];
1591
	}
1592
1593
	/**
1594
	 * Get an estimate of replication lag (in seconds) for each server
1595
	 *
1596
	 * Results are cached for a short time in memcached/process cache
1597
	 *
1598
	 * Values may be "false" if replication is too broken to estimate
1599
	 *
1600
	 * @param string|bool $wiki
1601
	 * @return int[] Map of (server index => float|int|bool)
1602
	 */
1603
	public function getLagTimes( $wiki = false ) {
1604
		if ( $this->getServerCount() <= 1 ) {
1605
			return [ 0 => 0 ]; // no replication = no lag
1606
		}
1607
1608
		# Send the request to the load monitor
1609
		return $this->getLoadMonitor()->getLagTimes( array_keys( $this->mServers ), $wiki );
0 ignored issues
show
Bug introduced by
It seems like $wiki defined by parameter $wiki on line 1603 can also be of type boolean; however, LoadMonitor::getLagTimes() does only seem to accept string, maybe add an additional type check?

This check looks at variables that have been passed in as parameters and are passed out again to other methods.

If the outgoing method call has stricter type requirements than the method itself, an issue is raised.

An additional type check may prevent trouble.

Loading history...
1610
	}
1611
1612
	/**
1613
	 * Get the lag in seconds for a given connection, or zero if this load
1614
	 * balancer does not have replication enabled.
1615
	 *
1616
	 * This should be used in preference to Database::getLag() in cases where
1617
	 * replication may not be in use, since there is no way to determine if
1618
	 * replication is in use at the connection level without running
1619
	 * potentially restricted queries such as SHOW SLAVE STATUS. Using this
1620
	 * function instead of Database::getLag() avoids a fatal error in this
1621
	 * case on many installations.
1622
	 *
1623
	 * @param IDatabase $conn
1624
	 * @return int|bool Returns false on error
1625
	 */
1626
	public function safeGetLag( IDatabase $conn ) {
1627
		if ( $this->getServerCount() == 1 ) {
1628
			return 0;
1629
		} else {
1630
			return $conn->getLag();
1631
		}
1632
	}
1633
1634
	/**
1635
	 * Wait for a slave DB to reach a specified master position
1636
	 *
1637
	 * This will connect to the master to get an accurate position if $pos is not given
1638
	 *
1639
	 * @param IDatabase $conn Slave DB
1640
	 * @param DBMasterPos|bool $pos Master position; default: current position
1641
	 * @param integer $timeout Timeout in seconds
1642
	 * @return bool Success
1643
	 * @since 1.27
1644
	 */
1645
	public function safeWaitForMasterPos( IDatabase $conn, $pos = false, $timeout = 10 ) {
1646
		if ( $this->getServerCount() == 1 || !$conn->getLBInfo( 'slave' ) ) {
1647
			return true; // server is not a slave DB
1648
		}
1649
1650
		$pos = $pos ?: $this->getConnection( DB_MASTER )->getMasterPos();
1651
		if ( !( $pos instanceof DBMasterPos ) ) {
1652
			return false; // something is misconfigured
1653
		}
1654
1655
		$result = $conn->masterPosWait( $pos, $timeout );
1656
		if ( $result == -1 || is_null( $result ) ) {
1657
			$msg = __METHOD__ . ": Timed out waiting on {$conn->getServer()} pos {$pos}";
1658
			wfDebugLog( 'replication', "$msg\n" );
1659
			wfDebugLog( 'DBPerformance', "$msg:\n" . wfBacktrace( true ) );
1660
			$ok = false;
1661
		} else {
1662
			wfDebugLog( 'replication', __METHOD__ . ": Done\n" );
1663
			$ok = true;
1664
		}
1665
1666
		return $ok;
1667
	}
1668
1669
	/**
1670
	 * Clear the cache for slag lag delay times
1671
	 *
1672
	 * This is only used for testing
1673
	 */
1674
	public function clearLagTimeCache() {
1675
		$this->getLoadMonitor()->clearCaches();
1676
	}
1677
1678
	/**
1679
	 * Set a callback via DatabaseBase::setTransactionListener() on
1680
	 * all current and future master connections of this load balancer
1681
	 *
1682
	 * @param string $name Callback name
1683
	 * @param callable|null $callback
1684
	 * @since 1.28
1685
	 */
1686
	public function setTransactionListener( $name, callable $callback = null ) {
1687
		if ( $callback ) {
1688
			$this->trxRecurringCallbacks[$name] = $callback;
1689
		} else {
1690
			unset( $this->trxRecurringCallbacks[$name] );
1691
		}
1692
		$this->forEachOpenMasterConnection(
1693
			function ( DatabaseBase $conn ) use ( $name, $callback ) {
1694
				$conn->setTransactionListener( $name, $callback );
1695
			}
1696
		);
1697
	}
1698
}
1699