Completed
Branch master (90e9fc)
by
unknown
29:23
created

LoadBalancer::runMasterPreCommitCallbacks()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 8
Code Lines 4

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 1
eloc 4
nc 1
nop 0
dl 0
loc 8
rs 9.4285
c 0
b 0
f 0
1
<?php
2
/**
3
 * Database load balancing.
4
 *
5
 * This program is free software; you can redistribute it and/or modify
6
 * it under the terms of the GNU General Public License as published by
7
 * the Free Software Foundation; either version 2 of the License, or
8
 * (at your option) any later version.
9
 *
10
 * This program is distributed in the hope that it will be useful,
11
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13
 * GNU General Public License for more details.
14
 *
15
 * You should have received a copy of the GNU General Public License along
16
 * with this program; if not, write to the Free Software Foundation, Inc.,
17
 * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
18
 * http://www.gnu.org/copyleft/gpl.html
19
 *
20
 * @file
21
 * @ingroup Database
22
 */
23
24
/**
25
 * Database load balancing object
26
 *
27
 * @todo document
28
 * @ingroup Database
29
 */
30
class LoadBalancer {
31
	/** @var array[] Map of (server index => server config array) */
32
	private $mServers;
33
	/** @var array[] Map of (local/foreignUsed/foreignFree => server index => DatabaseBase array) */
34
	private $mConns;
35
	/** @var array Map of (server index => weight) */
36
	private $mLoads;
37
	/** @var array[] Map of (group => server index => weight) */
38
	private $mGroupLoads;
39
	/** @var bool Whether to disregard replica DB lag as a factor in replica DB selection */
40
	private $mAllowLagged;
41
	/** @var integer Seconds to spend waiting on replica DB lag to resolve */
42
	private $mWaitTimeout;
43
	/** @var array LBFactory information */
44
	private $mParentInfo;
45
46
	/** @var string The LoadMonitor subclass name */
47
	private $mLoadMonitorClass;
48
	/** @var LoadMonitor */
49
	private $mLoadMonitor;
50
	/** @var BagOStuff */
51
	private $srvCache;
52
	/** @var WANObjectCache */
53
	private $wanCache;
54
	/** @var TransactionProfiler */
55
	protected $trxProfiler;
56
57
	/** @var bool|DatabaseBase Database connection that caused a problem */
58
	private $mErrorConnection;
59
	/** @var integer The generic (not query grouped) replica DB index (of $mServers) */
60
	private $mReadIndex;
61
	/** @var bool|DBMasterPos False if not set */
62
	private $mWaitForPos;
63
	/** @var bool Whether the generic reader fell back to a lagged replica DB */
64
	private $laggedSlaveMode = false;
65
	/** @var bool Whether the generic reader fell back to a lagged replica DB */
66
	private $slavesDownMode = false;
67
	/** @var string The last DB selection or connection error */
68
	private $mLastError = 'Unknown error';
69
	/** @var string|bool Reason the LB is read-only or false if not */
70
	private $readOnlyReason = false;
71
	/** @var integer Total connections opened */
72
	private $connsOpened = 0;
73
	/** @var string|bool String if a requested DBO_TRX transaction round is active */
74
	private $trxRoundId = false;
75
	/** @var array[] Map of (name => callable) */
76
	private $trxRecurringCallbacks = [];
77
78
	/** @var integer Warn when this many connection are held */
79
	const CONN_HELD_WARN_THRESHOLD = 10;
80
	/** @var integer Default 'max lag' when unspecified */
81
	const MAX_LAG = 10;
82
	/** @var integer Max time to wait for a replica DB to catch up (e.g. ChronologyProtector) */
83
	const POS_WAIT_TIMEOUT = 10;
84
	/** @var integer Seconds to cache master server read-only status */
85
	const TTL_CACHE_READONLY = 5;
86
87
	/**
88
	 * @var boolean
89
	 */
90
	private $disabled = false;
91
92
	/**
93
	 * @param array $params Array with keys:
94
	 *  - servers : Required. Array of server info structures.
95
	 *  - loadMonitor : Name of a class used to fetch server lag and load.
96
	 *  - readOnlyReason : Reason the master DB is read-only if so [optional]
97
	 *  - srvCache : BagOStuff object [optional]
98
	 *  - wanCache : WANObjectCache object [optional]
99
	 * @throws MWException
100
	 */
101
	public function __construct( array $params ) {
102
		if ( !isset( $params['servers'] ) ) {
103
			throw new MWException( __CLASS__ . ': missing servers parameter' );
104
		}
105
		$this->mServers = $params['servers'];
106
		$this->mWaitTimeout = self::POS_WAIT_TIMEOUT;
107
108
		$this->mReadIndex = -1;
109
		$this->mWriteIndex = -1;
0 ignored issues
show
Bug introduced by
The property mWriteIndex does not exist. Did you maybe forget to declare it?

In PHP it is possible to write to properties without declaring them. For example, the following is perfectly valid PHP code:

class MyClass { }

$x = new MyClass();
$x->foo = true;

Generally, it is a good practice to explictly declare properties to avoid accidental typos and provide IDE auto-completion:

class MyClass {
    public $foo;
}

$x = new MyClass();
$x->foo = true;
Loading history...
110
		$this->mConns = [
0 ignored issues
show
Documentation Bug introduced by
It seems like array('local' => array()...oreignFree' => array()) of type array<string,array,{"loc..."foreignFree":"array"}> is incompatible with the declared type array<integer,array> of property $mConns.

Our type inference engine has found an assignment to a property that is incompatible with the declared type of that property.

Either this assignment is in error or the assigned type should be added to the documentation/type hint for that property..

Loading history...
111
			'local' => [],
112
			'foreignUsed' => [],
113
			'foreignFree' => [] ];
114
		$this->mLoads = [];
115
		$this->mWaitForPos = false;
116
		$this->mErrorConnection = false;
117
		$this->mAllowLagged = false;
118
119 View Code Duplication
		if ( isset( $params['readOnlyReason'] ) && is_string( $params['readOnlyReason'] ) ) {
120
			$this->readOnlyReason = $params['readOnlyReason'];
121
		}
122
123
		if ( isset( $params['loadMonitor'] ) ) {
124
			$this->mLoadMonitorClass = $params['loadMonitor'];
125
		} else {
126
			$master = reset( $params['servers'] );
127
			if ( isset( $master['type'] ) && $master['type'] === 'mysql' ) {
128
				$this->mLoadMonitorClass = 'LoadMonitorMySQL';
129
			} else {
130
				$this->mLoadMonitorClass = 'LoadMonitorNull';
131
			}
132
		}
133
134
		foreach ( $params['servers'] as $i => $server ) {
135
			$this->mLoads[$i] = $server['load'];
136
			if ( isset( $server['groupLoads'] ) ) {
137
				foreach ( $server['groupLoads'] as $group => $ratio ) {
138
					if ( !isset( $this->mGroupLoads[$group] ) ) {
139
						$this->mGroupLoads[$group] = [];
140
					}
141
					$this->mGroupLoads[$group][$i] = $ratio;
142
				}
143
			}
144
		}
145
146
		if ( isset( $params['srvCache'] ) ) {
147
			$this->srvCache = $params['srvCache'];
148
		} else {
149
			$this->srvCache = new EmptyBagOStuff();
150
		}
151
		if ( isset( $params['wanCache'] ) ) {
152
			$this->wanCache = $params['wanCache'];
153
		} else {
154
			$this->wanCache = WANObjectCache::newEmpty();
155
		}
156
		if ( isset( $params['trxProfiler'] ) ) {
157
			$this->trxProfiler = $params['trxProfiler'];
158
		} else {
159
			$this->trxProfiler = new TransactionProfiler();
160
		}
161
	}
162
163
	/**
164
	 * Get a LoadMonitor instance
165
	 *
166
	 * @return LoadMonitor
167
	 */
168
	private function getLoadMonitor() {
169
		if ( !isset( $this->mLoadMonitor ) ) {
170
			$class = $this->mLoadMonitorClass;
171
			$this->mLoadMonitor = new $class( $this );
172
		}
173
174
		return $this->mLoadMonitor;
175
	}
176
177
	/**
178
	 * Get or set arbitrary data used by the parent object, usually an LBFactory
179
	 * @param mixed $x
180
	 * @return mixed
181
	 */
182
	public function parentInfo( $x = null ) {
183
		return wfSetVar( $this->mParentInfo, $x );
184
	}
185
186
	/**
187
	 * @param array $loads
188
	 * @param bool|string $wiki Wiki to get non-lagged for
189
	 * @param int $maxLag Restrict the maximum allowed lag to this many seconds
190
	 * @return bool|int|string
191
	 */
192
	private function getRandomNonLagged( array $loads, $wiki = false, $maxLag = self::MAX_LAG ) {
193
		$lags = $this->getLagTimes( $wiki );
194
195
		# Unset excessively lagged servers
196
		foreach ( $lags as $i => $lag ) {
197
			if ( $i != 0 ) {
198
				$maxServerLag = $maxLag;
199 View Code Duplication
				if ( isset( $this->mServers[$i]['max lag'] ) ) {
200
					$maxServerLag = min( $maxServerLag, $this->mServers[$i]['max lag'] );
201
				}
202
203
				$host = $this->getServerName( $i );
204
				if ( $lag === false ) {
205
					wfDebugLog( 'replication', "Server $host (#$i) is not replicating?" );
206
					unset( $loads[$i] );
207
				} elseif ( $lag > $maxServerLag ) {
208
					wfDebugLog( 'replication', "Server $host (#$i) has >= $lag seconds of lag" );
209
					unset( $loads[$i] );
210
				}
211
			}
212
		}
213
214
		# Find out if all the replica DBs with non-zero load are lagged
215
		$sum = 0;
216
		foreach ( $loads as $load ) {
217
			$sum += $load;
218
		}
219
		if ( $sum == 0 ) {
220
			# No appropriate DB servers except maybe the master and some replica DBs with zero load
221
			# Do NOT use the master
222
			# Instead, this function will return false, triggering read-only mode,
223
			# and a lagged replica DB will be used instead.
224
			return false;
225
		}
226
227
		if ( count( $loads ) == 0 ) {
228
			return false;
229
		}
230
231
		# Return a random representative of the remainder
232
		return ArrayUtils::pickRandom( $loads );
233
	}
234
235
	/**
236
	 * Get the index of the reader connection, which may be a replica DB
237
	 * This takes into account load ratios and lag times. It should
238
	 * always return a consistent index during a given invocation
239
	 *
240
	 * Side effect: opens connections to databases
241
	 * @param string|bool $group Query group, or false for the generic reader
242
	 * @param string|bool $wiki Wiki ID, or false for the current wiki
243
	 * @throws MWException
244
	 * @return bool|int|string
245
	 */
246
	public function getReaderIndex( $group = false, $wiki = false ) {
247
		global $wgDBtype;
248
249
		# @todo FIXME: For now, only go through all this for mysql databases
250
		if ( $wgDBtype != 'mysql' ) {
251
			return $this->getWriterIndex();
252
		}
253
254
		if ( count( $this->mServers ) == 1 ) {
255
			# Skip the load balancing if there's only one server
256
			return 0;
257
		} elseif ( $group === false && $this->mReadIndex >= 0 ) {
258
			# Shortcut if generic reader exists already
259
			return $this->mReadIndex;
260
		}
261
262
		# Find the relevant load array
263
		if ( $group !== false ) {
264
			if ( isset( $this->mGroupLoads[$group] ) ) {
265
				$nonErrorLoads = $this->mGroupLoads[$group];
266
			} else {
267
				# No loads for this group, return false and the caller can use some other group
268
				wfDebugLog( 'connect', __METHOD__ . ": no loads for group $group\n" );
269
270
				return false;
271
			}
272
		} else {
273
			$nonErrorLoads = $this->mLoads;
274
		}
275
276
		if ( !count( $nonErrorLoads ) ) {
277
			throw new MWException( "Empty server array given to LoadBalancer" );
278
		}
279
280
		# Scale the configured load ratios according to the dynamic load (if the load monitor supports it)
281
		$this->getLoadMonitor()->scaleLoads( $nonErrorLoads, $group, $wiki );
282
283
		$laggedSlaveMode = false;
284
285
		# No server found yet
286
		$i = false;
287
		$conn = false;
288
		# First try quickly looking through the available servers for a server that
289
		# meets our criteria
290
		$currentLoads = $nonErrorLoads;
291
		while ( count( $currentLoads ) ) {
292
			if ( $this->mAllowLagged || $laggedSlaveMode ) {
293
				$i = ArrayUtils::pickRandom( $currentLoads );
294
			} else {
295
				$i = false;
296
				if ( $this->mWaitForPos && $this->mWaitForPos->asOfTime() ) {
297
					# ChronologyProtecter causes mWaitForPos to be set via sessions.
298
					# This triggers doWait() after connect, so it's especially good to
299
					# avoid lagged servers so as to avoid just blocking in that method.
300
					$ago = microtime( true ) - $this->mWaitForPos->asOfTime();
301
					# Aim for <= 1 second of waiting (being too picky can backfire)
302
					$i = $this->getRandomNonLagged( $currentLoads, $wiki, $ago + 1 );
303
				}
304
				if ( $i === false ) {
305
					# Any server with less lag than it's 'max lag' param is preferable
306
					$i = $this->getRandomNonLagged( $currentLoads, $wiki );
307
				}
308
				if ( $i === false && count( $currentLoads ) != 0 ) {
309
					# All replica DBs lagged. Switch to read-only mode
310
					wfDebugLog( 'replication', "All replica DBs lagged. Switch to read-only mode" );
311
					$i = ArrayUtils::pickRandom( $currentLoads );
312
					$laggedSlaveMode = true;
313
				}
314
			}
315
316
			if ( $i === false ) {
317
				# pickRandom() returned false
318
				# This is permanent and means the configuration or the load monitor
319
				# wants us to return false.
320
				wfDebugLog( 'connect', __METHOD__ . ": pickRandom() returned false" );
321
322
				return false;
323
			}
324
325
			$serverName = $this->getServerName( $i );
326
			wfDebugLog( 'connect', __METHOD__ . ": Using reader #$i: $serverName..." );
327
328
			$conn = $this->openConnection( $i, $wiki );
329
			if ( !$conn ) {
330
				wfDebugLog( 'connect', __METHOD__ . ": Failed connecting to $i/$wiki" );
331
				unset( $nonErrorLoads[$i] );
332
				unset( $currentLoads[$i] );
333
				$i = false;
334
				continue;
335
			}
336
337
			// Decrement reference counter, we are finished with this connection.
338
			// It will be incremented for the caller later.
339
			if ( $wiki !== false ) {
340
				$this->reuseConnection( $conn );
341
			}
342
343
			# Return this server
344
			break;
345
		}
346
347
		# If all servers were down, quit now
348
		if ( !count( $nonErrorLoads ) ) {
349
			wfDebugLog( 'connect', "All servers down" );
350
		}
351
352
		if ( $i !== false ) {
353
			# replica DB connection successful
354
			# Wait for the session master pos for a short time
355 View Code Duplication
			if ( $this->mWaitForPos && $i > 0 ) {
356
				if ( !$this->doWait( $i ) ) {
357
					$this->mServers[$i]['slave pos'] = $conn->getSlavePos();
0 ignored issues
show
Bug introduced by
The method getSlavePos cannot be called on $conn (of type boolean).

Methods can only be called on objects. This check looks for methods being called on variables that have been inferred to never be objects.

Loading history...
358
				}
359
			}
360
			if ( $this->mReadIndex <= 0 && $this->mLoads[$i] > 0 && $group === false ) {
361
				$this->mReadIndex = $i;
0 ignored issues
show
Documentation Bug introduced by
It seems like $i can also be of type string. However, the property $mReadIndex is declared as type integer. Maybe add an additional type check?

Our type inference engine has found a suspicous assignment of a value to a property. This check raises an issue when a value that can be of a mixed type is assigned to a property that is type hinted more strictly.

For example, imagine you have a variable $accountId that can either hold an Id object or false (if there is no account id yet). Your code now assigns that value to the id property of an instance of the Account class. This class holds a proper account, so the id value must no longer be false.

Either this assignment is in error or a type check should be added for that assignment.

class Id
{
    public $id;

    public function __construct($id)
    {
        $this->id = $id;
    }

}

class Account
{
    /** @var  Id $id */
    public $id;
}

$account_id = false;

if (starsAreRight()) {
    $account_id = new Id(42);
}

$account = new Account();
if ($account instanceof Id)
{
    $account->id = $account_id;
}
Loading history...
362
				# Record if the generic reader index is in "lagged replica DB" mode
363
				if ( $laggedSlaveMode ) {
364
					$this->laggedSlaveMode = true;
365
				}
366
			}
367
			$serverName = $this->getServerName( $i );
368
			wfDebugLog( 'connect', __METHOD__ .
369
				": using server $serverName for group '$group'\n" );
370
		}
371
372
		return $i;
373
	}
374
375
	/**
376
	 * Set the master wait position
377
	 * If a DB_SLAVE connection has been opened already, waits
378
	 * Otherwise sets a variable telling it to wait if such a connection is opened
379
	 * @param DBMasterPos $pos
380
	 */
381
	public function waitFor( $pos ) {
382
		$this->mWaitForPos = $pos;
383
		$i = $this->mReadIndex;
384
385 View Code Duplication
		if ( $i > 0 ) {
386
			if ( !$this->doWait( $i ) ) {
387
				$this->mServers[$i]['slave pos'] = $this->getAnyOpenConnection( $i )->getSlavePos();
388
				$this->laggedSlaveMode = true;
389
			}
390
		}
391
	}
392
393
	/**
394
	 * Set the master wait position and wait for a "generic" replica DB to catch up to it
395
	 *
396
	 * This can be used a faster proxy for waitForAll()
397
	 *
398
	 * @param DBMasterPos $pos
399
	 * @param int $timeout Max seconds to wait; default is mWaitTimeout
400
	 * @return bool Success (able to connect and no timeouts reached)
401
	 * @since 1.26
402
	 */
403
	public function waitForOne( $pos, $timeout = null ) {
404
		$this->mWaitForPos = $pos;
405
406
		$i = $this->mReadIndex;
407
		if ( $i <= 0 ) {
408
			// Pick a generic replica DB if there isn't one yet
409
			$readLoads = $this->mLoads;
410
			unset( $readLoads[$this->getWriterIndex()] ); // replica DBs only
411
			$readLoads = array_filter( $readLoads ); // with non-zero load
412
			$i = ArrayUtils::pickRandom( $readLoads );
413
		}
414
415 View Code Duplication
		if ( $i > 0 ) {
416
			$ok = $this->doWait( $i, true, $timeout );
417
		} else {
418
			$ok = true; // no applicable loads
419
		}
420
421
		return $ok;
422
	}
423
424
	/**
425
	 * Set the master wait position and wait for ALL replica DBs to catch up to it
426
	 * @param DBMasterPos $pos
427
	 * @param int $timeout Max seconds to wait; default is mWaitTimeout
428
	 * @return bool Success (able to connect and no timeouts reached)
429
	 */
430
	public function waitForAll( $pos, $timeout = null ) {
431
		$this->mWaitForPos = $pos;
432
		$serverCount = count( $this->mServers );
433
434
		$ok = true;
435
		for ( $i = 1; $i < $serverCount; $i++ ) {
436 View Code Duplication
			if ( $this->mLoads[$i] > 0 ) {
437
				$ok = $this->doWait( $i, true, $timeout ) && $ok;
438
			}
439
		}
440
441
		return $ok;
442
	}
443
444
	/**
445
	 * Get any open connection to a given server index, local or foreign
446
	 * Returns false if there is no connection open
447
	 *
448
	 * @param int $i
449
	 * @return DatabaseBase|bool False on failure
450
	 */
451
	public function getAnyOpenConnection( $i ) {
452
		foreach ( $this->mConns as $conns ) {
453
			if ( !empty( $conns[$i] ) ) {
454
				return reset( $conns[$i] );
455
			}
456
		}
457
458
		return false;
459
	}
460
461
	/**
462
	 * Wait for a given replica DB to catch up to the master pos stored in $this
463
	 * @param int $index Server index
464
	 * @param bool $open Check the server even if a new connection has to be made
465
	 * @param int $timeout Max seconds to wait; default is mWaitTimeout
466
	 * @return bool
467
	 */
468
	protected function doWait( $index, $open = false, $timeout = null ) {
469
		$close = false; // close the connection afterwards
470
471
		// Check if we already know that the DB has reached this point
472
		$server = $this->getServerName( $index );
473
		$key = $this->srvCache->makeGlobalKey( __CLASS__, 'last-known-pos', $server );
474
		/** @var DBMasterPos $knownReachedPos */
475
		$knownReachedPos = $this->srvCache->get( $key );
476
		if ( $knownReachedPos && $knownReachedPos->hasReached( $this->mWaitForPos ) ) {
0 ignored issues
show
Bug introduced by
It seems like $this->mWaitForPos can also be of type boolean; however, DBMasterPos::hasReached() does only seem to accept object<DBMasterPos>, maybe add an additional type check?

If a method or function can return multiple different values and unless you are sure that you only can receive a single value in this context, we recommend to add an additional type check:

/**
 * @return array|string
 */
function returnsDifferentValues($x) {
    if ($x) {
        return 'foo';
    }

    return array();
}

$x = returnsDifferentValues($y);
if (is_array($x)) {
    // $x is an array.
}

If this a common case that PHP Analyzer should handle natively, please let us know by opening an issue.

Loading history...
477
			wfDebugLog( 'replication', __METHOD__ .
478
				": replica DB $server known to be caught up (pos >= $knownReachedPos).\n" );
479
			return true;
480
		}
481
482
		// Find a connection to wait on, creating one if needed and allowed
483
		$conn = $this->getAnyOpenConnection( $index );
484
		if ( !$conn ) {
485
			if ( !$open ) {
486
				wfDebugLog( 'replication', __METHOD__ . ": no connection open for $server\n" );
487
488
				return false;
489
			} else {
490
				$conn = $this->openConnection( $index, '' );
491
				if ( !$conn ) {
492
					wfDebugLog( 'replication', __METHOD__ . ": failed to connect to $server\n" );
493
494
					return false;
495
				}
496
				// Avoid connection spam in waitForAll() when connections
497
				// are made just for the sake of doing this lag check.
498
				$close = true;
499
			}
500
		}
501
502
		wfDebugLog( 'replication', __METHOD__ . ": Waiting for replica DB $server to catch up...\n" );
503
		$timeout = $timeout ?: $this->mWaitTimeout;
504
		$result = $conn->masterPosWait( $this->mWaitForPos, $timeout );
0 ignored issues
show
Bug introduced by
It seems like $this->mWaitForPos can also be of type boolean; however, DatabaseBase::masterPosWait() does only seem to accept object<DBMasterPos>, maybe add an additional type check?

If a method or function can return multiple different values and unless you are sure that you only can receive a single value in this context, we recommend to add an additional type check:

/**
 * @return array|string
 */
function returnsDifferentValues($x) {
    if ($x) {
        return 'foo';
    }

    return array();
}

$x = returnsDifferentValues($y);
if (is_array($x)) {
    // $x is an array.
}

If this a common case that PHP Analyzer should handle natively, please let us know by opening an issue.

Loading history...
505
506
		if ( $result == -1 || is_null( $result ) ) {
507
			// Timed out waiting for replica DB, use master instead
508
			$msg = __METHOD__ . ": Timed out waiting on $server pos {$this->mWaitForPos}";
509
			wfDebugLog( 'replication', "$msg\n" );
510
			wfDebugLog( 'DBPerformance', "$msg:\n" . wfBacktrace( true ) );
511
			$ok = false;
512
		} else {
513
			wfDebugLog( 'replication', __METHOD__ . ": Done\n" );
514
			$ok = true;
515
			// Remember that the DB reached this point
516
			$this->srvCache->set( $key, $this->mWaitForPos, BagOStuff::TTL_DAY );
517
		}
518
519
		if ( $close ) {
520
			$this->closeConnection( $conn );
0 ignored issues
show
Bug introduced by
It seems like $conn can also be of type boolean; however, LoadBalancer::closeConnection() does only seem to accept object<DatabaseBase>, maybe add an additional type check?

If a method or function can return multiple different values and unless you are sure that you only can receive a single value in this context, we recommend to add an additional type check:

/**
 * @return array|string
 */
function returnsDifferentValues($x) {
    if ($x) {
        return 'foo';
    }

    return array();
}

$x = returnsDifferentValues($y);
if (is_array($x)) {
    // $x is an array.
}

If this a common case that PHP Analyzer should handle natively, please let us know by opening an issue.

Loading history...
521
		}
522
523
		return $ok;
524
	}
525
526
	/**
527
	 * Get a connection by index
528
	 * This is the main entry point for this class.
529
	 *
530
	 * @param int $i Server index
531
	 * @param array|string|bool $groups Query group(s), or false for the generic reader
532
	 * @param string|bool $wiki Wiki ID, or false for the current wiki
533
	 *
534
	 * @throws MWException
535
	 * @return DatabaseBase
536
	 */
537
	public function getConnection( $i, $groups = [], $wiki = false ) {
538
		if ( $i === null || $i === false ) {
539
			throw new MWException( 'Attempt to call ' . __METHOD__ .
540
				' with invalid server index' );
541
		}
542
543
		if ( $wiki === wfWikiID() ) {
544
			$wiki = false;
545
		}
546
547
		$groups = ( $groups === false || $groups === [] )
548
			? [ false ] // check one "group": the generic pool
549
			: (array)$groups;
550
551
		$masterOnly = ( $i == DB_MASTER || $i == $this->getWriterIndex() );
552
		$oldConnsOpened = $this->connsOpened; // connections open now
553
554
		if ( $i == DB_MASTER ) {
555
			$i = $this->getWriterIndex();
556
		} else {
557
			# Try to find an available server in any the query groups (in order)
558
			foreach ( $groups as $group ) {
559
				$groupIndex = $this->getReaderIndex( $group, $wiki );
560
				if ( $groupIndex !== false ) {
561
					$i = $groupIndex;
562
					break;
563
				}
564
			}
565
		}
566
567
		# Operation-based index
568
		if ( $i == DB_SLAVE ) {
569
			$this->mLastError = 'Unknown error'; // reset error string
570
			# Try the general server pool if $groups are unavailable.
571
			$i = in_array( false, $groups, true )
572
				? false // don't bother with this if that is what was tried above
573
				: $this->getReaderIndex( false, $wiki );
574
			# Couldn't find a working server in getReaderIndex()?
575
			if ( $i === false ) {
576
				$this->mLastError = 'No working replica DB server: ' . $this->mLastError;
577
578
				return $this->reportConnectionError();
579
			}
580
		}
581
582
		# Now we have an explicit index into the servers array
583
		$conn = $this->openConnection( $i, $wiki );
584
		if ( !$conn ) {
585
			return $this->reportConnectionError();
586
		}
587
588
		# Profile any new connections that happen
589
		if ( $this->connsOpened > $oldConnsOpened ) {
590
			$host = $conn->getServer();
591
			$dbname = $conn->getDBname();
592
			$trxProf = Profiler::instance()->getTransactionProfiler();
593
			$trxProf->recordConnection( $host, $dbname, $masterOnly );
594
		}
595
596
		if ( $masterOnly ) {
597
			# Make master-requested DB handles inherit any read-only mode setting
598
			$conn->setLBInfo( 'readOnlyReason', $this->getReadOnlyReason( $wiki, $conn ) );
0 ignored issues
show
Bug introduced by
It seems like $conn defined by $this->openConnection($i, $wiki) on line 583 can also be of type boolean; however, LoadBalancer::getReadOnlyReason() does only seem to accept null|object<DatabaseBase>, maybe add an additional type check?

If a method or function can return multiple different values and unless you are sure that you only can receive a single value in this context, we recommend to add an additional type check:

/**
 * @return array|string
 */
function returnsDifferentValues($x) {
    if ($x) {
        return 'foo';
    }

    return array();
}

$x = returnsDifferentValues($y);
if (is_array($x)) {
    // $x is an array.
}

If this a common case that PHP Analyzer should handle natively, please let us know by opening an issue.

Loading history...
599
		}
600
601
		return $conn;
602
	}
603
604
	/**
605
	 * Mark a foreign connection as being available for reuse under a different
606
	 * DB name or prefix. This mechanism is reference-counted, and must be called
607
	 * the same number of times as getConnection() to work.
608
	 *
609
	 * @param DatabaseBase $conn
610
	 * @throws MWException
611
	 */
612
	public function reuseConnection( $conn ) {
613
		$serverIndex = $conn->getLBInfo( 'serverIndex' );
614
		$refCount = $conn->getLBInfo( 'foreignPoolRefCount' );
615
		if ( $serverIndex === null || $refCount === null ) {
616
			/**
617
			 * This can happen in code like:
618
			 *   foreach ( $dbs as $db ) {
619
			 *     $conn = $lb->getConnection( DB_SLAVE, [], $db );
620
			 *     ...
621
			 *     $lb->reuseConnection( $conn );
622
			 *   }
623
			 * When a connection to the local DB is opened in this way, reuseConnection()
624
			 * should be ignored
625
			 */
626
			return;
627
		}
628
629
		$dbName = $conn->getDBname();
630
		$prefix = $conn->tablePrefix();
631
		if ( strval( $prefix ) !== '' ) {
632
			$wiki = "$dbName-$prefix";
633
		} else {
634
			$wiki = $dbName;
635
		}
636
		if ( $this->mConns['foreignUsed'][$serverIndex][$wiki] !== $conn ) {
637
			throw new MWException( __METHOD__ . ": connection not found, has " .
638
				"the connection been freed already?" );
639
		}
640
		$conn->setLBInfo( 'foreignPoolRefCount', --$refCount );
641
		if ( $refCount <= 0 ) {
642
			$this->mConns['foreignFree'][$serverIndex][$wiki] = $conn;
643
			unset( $this->mConns['foreignUsed'][$serverIndex][$wiki] );
644
			wfDebug( __METHOD__ . ": freed connection $serverIndex/$wiki\n" );
645
		} else {
646
			wfDebug( __METHOD__ . ": reference count for $serverIndex/$wiki reduced to $refCount\n" );
647
		}
648
	}
649
650
	/**
651
	 * Get a database connection handle reference
652
	 *
653
	 * The handle's methods wrap simply wrap those of a DatabaseBase handle
654
	 *
655
	 * @see LoadBalancer::getConnection() for parameter information
656
	 *
657
	 * @param int $db
658
	 * @param array|string|bool $groups Query group(s), or false for the generic reader
659
	 * @param string|bool $wiki Wiki ID, or false for the current wiki
660
	 * @return DBConnRef
661
	 */
662
	public function getConnectionRef( $db, $groups = [], $wiki = false ) {
663
		return new DBConnRef( $this, $this->getConnection( $db, $groups, $wiki ) );
664
	}
665
666
	/**
667
	 * Get a database connection handle reference without connecting yet
668
	 *
669
	 * The handle's methods wrap simply wrap those of a DatabaseBase handle
670
	 *
671
	 * @see LoadBalancer::getConnection() for parameter information
672
	 *
673
	 * @param int $db
674
	 * @param array|string|bool $groups Query group(s), or false for the generic reader
675
	 * @param string|bool $wiki Wiki ID, or false for the current wiki
676
	 * @return DBConnRef
677
	 */
678
	public function getLazyConnectionRef( $db, $groups = [], $wiki = false ) {
679
		return new DBConnRef( $this, [ $db, $groups, $wiki ] );
680
	}
681
682
	/**
683
	 * Open a connection to the server given by the specified index
684
	 * Index must be an actual index into the array.
685
	 * If the server is already open, returns it.
686
	 *
687
	 * On error, returns false, and the connection which caused the
688
	 * error will be available via $this->mErrorConnection.
689
	 *
690
	 * @note If disable() was called on this LoadBalancer, this method will throw a DBAccessError.
691
	 *
692
	 * @param int $i Server index
693
	 * @param string|bool $wiki Wiki ID, or false for the current wiki
694
	 * @return DatabaseBase|bool Returns false on errors
695
	 */
696
	public function openConnection( $i, $wiki = false ) {
697
		if ( $wiki !== false ) {
698
			$conn = $this->openForeignConnection( $i, $wiki );
0 ignored issues
show
Bug introduced by
It seems like $wiki defined by parameter $wiki on line 696 can also be of type boolean; however, LoadBalancer::openForeignConnection() does only seem to accept string, maybe add an additional type check?

This check looks at variables that have been passed in as parameters and are passed out again to other methods.

If the outgoing method call has stricter type requirements than the method itself, an issue is raised.

An additional type check may prevent trouble.

Loading history...
699
		} elseif ( isset( $this->mConns['local'][$i][0] ) ) {
700
			$conn = $this->mConns['local'][$i][0];
701
		} else {
702
			$server = $this->mServers[$i];
703
			$server['serverIndex'] = $i;
704
			$conn = $this->reallyOpenConnection( $server, false );
705
			$serverName = $this->getServerName( $i );
706
			if ( $conn->isOpen() ) {
707
				wfDebugLog( 'connect', "Connected to database $i at $serverName\n" );
708
				$this->mConns['local'][$i][0] = $conn;
709
			} else {
710
				wfDebugLog( 'connect', "Failed to connect to database $i at $serverName\n" );
711
				$this->mErrorConnection = $conn;
712
				$conn = false;
713
			}
714
		}
715
716
		if ( $conn && !$conn->isOpen() ) {
717
			// Connection was made but later unrecoverably lost for some reason.
718
			// Do not return a handle that will just throw exceptions on use,
719
			// but let the calling code (e.g. getReaderIndex) try another server.
720
			// See DatabaseMyslBase::ping() for how this can happen.
721
			$this->mErrorConnection = $conn;
722
			$conn = false;
723
		}
724
725
		return $conn;
726
	}
727
728
	/**
729
	 * Open a connection to a foreign DB, or return one if it is already open.
730
	 *
731
	 * Increments a reference count on the returned connection which locks the
732
	 * connection to the requested wiki. This reference count can be
733
	 * decremented by calling reuseConnection().
734
	 *
735
	 * If a connection is open to the appropriate server already, but with the wrong
736
	 * database, it will be switched to the right database and returned, as long as
737
	 * it has been freed first with reuseConnection().
738
	 *
739
	 * On error, returns false, and the connection which caused the
740
	 * error will be available via $this->mErrorConnection.
741
	 *
742
	 * @note If disable() was called on this LoadBalancer, this method will throw a DBAccessError.
743
	 *
744
	 * @param int $i Server index
745
	 * @param string $wiki Wiki ID to open
746
	 * @return DatabaseBase
747
	 */
748
	private function openForeignConnection( $i, $wiki ) {
749
		list( $dbName, $prefix ) = wfSplitWikiID( $wiki );
750
		if ( isset( $this->mConns['foreignUsed'][$i][$wiki] ) ) {
751
			// Reuse an already-used connection
752
			$conn = $this->mConns['foreignUsed'][$i][$wiki];
753
			wfDebug( __METHOD__ . ": reusing connection $i/$wiki\n" );
754
		} elseif ( isset( $this->mConns['foreignFree'][$i][$wiki] ) ) {
755
			// Reuse a free connection for the same wiki
756
			$conn = $this->mConns['foreignFree'][$i][$wiki];
757
			unset( $this->mConns['foreignFree'][$i][$wiki] );
758
			$this->mConns['foreignUsed'][$i][$wiki] = $conn;
759
			wfDebug( __METHOD__ . ": reusing free connection $i/$wiki\n" );
760
		} elseif ( !empty( $this->mConns['foreignFree'][$i] ) ) {
761
			// Reuse a connection from another wiki
762
			$conn = reset( $this->mConns['foreignFree'][$i] );
763
			$oldWiki = key( $this->mConns['foreignFree'][$i] );
764
765
			// The empty string as a DB name means "don't care".
766
			// DatabaseMysqlBase::open() already handle this on connection.
767
			if ( $dbName !== '' && !$conn->selectDB( $dbName ) ) {
768
				$this->mLastError = "Error selecting database $dbName on server " .
769
					$conn->getServer() . " from client host " . wfHostname() . "\n";
770
				$this->mErrorConnection = $conn;
771
				$conn = false;
772
			} else {
773
				$conn->tablePrefix( $prefix );
774
				unset( $this->mConns['foreignFree'][$i][$oldWiki] );
775
				$this->mConns['foreignUsed'][$i][$wiki] = $conn;
776
				wfDebug( __METHOD__ . ": reusing free connection from $oldWiki for $wiki\n" );
777
			}
778
		} else {
779
			// Open a new connection
780
			$server = $this->mServers[$i];
781
			$server['serverIndex'] = $i;
782
			$server['foreignPoolRefCount'] = 0;
783
			$server['foreign'] = true;
784
			$conn = $this->reallyOpenConnection( $server, $dbName );
785
			if ( !$conn->isOpen() ) {
786
				wfDebug( __METHOD__ . ": error opening connection for $i/$wiki\n" );
787
				$this->mErrorConnection = $conn;
788
				$conn = false;
789
			} else {
790
				$conn->tablePrefix( $prefix );
791
				$this->mConns['foreignUsed'][$i][$wiki] = $conn;
792
				wfDebug( __METHOD__ . ": opened new connection for $i/$wiki\n" );
793
			}
794
		}
795
796
		// Increment reference count
797
		if ( $conn ) {
798
			$refCount = $conn->getLBInfo( 'foreignPoolRefCount' );
799
			$conn->setLBInfo( 'foreignPoolRefCount', $refCount + 1 );
800
		}
801
802
		return $conn;
803
	}
804
805
	/**
806
	 * Test if the specified index represents an open connection
807
	 *
808
	 * @param int $index Server index
809
	 * @access private
810
	 * @return bool
811
	 */
812
	private function isOpen( $index ) {
813
		if ( !is_integer( $index ) ) {
814
			return false;
815
		}
816
817
		return (bool)$this->getAnyOpenConnection( $index );
818
	}
819
820
	/**
821
	 * Really opens a connection. Uncached.
822
	 * Returns a Database object whether or not the connection was successful.
823
	 * @access private
824
	 *
825
	 * @param array $server
826
	 * @param bool $dbNameOverride
827
	 * @throws MWException
828
	 * @return DatabaseBase
829
	 */
830
	protected function reallyOpenConnection( $server, $dbNameOverride = false ) {
831
		if ( $this->disabled ) {
832
			throw new DBAccessError();
833
		}
834
835
		if ( !is_array( $server ) ) {
836
			throw new MWException( 'You must update your load-balancing configuration. ' .
837
				'See DefaultSettings.php entry for $wgDBservers.' );
838
		}
839
840
		if ( $dbNameOverride !== false ) {
841
			$server['dbname'] = $dbNameOverride;
842
		}
843
844
		// Let the handle know what the cluster master is (e.g. "db1052")
845
		$masterName = $this->getServerName( 0 );
846
		$server['clusterMasterHost'] = $masterName;
847
848
		// Log when many connection are made on requests
849
		if ( ++$this->connsOpened >= self::CONN_HELD_WARN_THRESHOLD ) {
850
			wfDebugLog( 'DBPerformance', __METHOD__ . ": " .
851
				"{$this->connsOpened}+ connections made (master=$masterName)\n" .
852
				wfBacktrace( true ) );
853
		}
854
855
		# Create object
856
		try {
857
			$db = DatabaseBase::factory( $server['type'], $server );
858
		} catch ( DBConnectionError $e ) {
859
			// FIXME: This is probably the ugliest thing I have ever done to
860
			// PHP. I'm half-expecting it to segfault, just out of disgust. -- TS
861
			$db = $e->db;
862
		}
863
864
		$db->setLBInfo( $server );
865
		$db->setLazyMasterHandle(
866
			$this->getLazyConnectionRef( DB_MASTER, [], $db->getWikiID() )
867
		);
868
		$db->setTransactionProfiler( $this->trxProfiler );
869
		if ( $this->trxRoundId !== false ) {
870
			$this->applyTransactionRoundFlags( $db );
871
		}
872
873
		if ( $server['serverIndex'] === $this->getWriterIndex() ) {
874
			foreach ( $this->trxRecurringCallbacks as $name => $callback ) {
875
				$db->setTransactionListener( $name, $callback );
876
			}
877
		}
878
879
		return $db;
880
	}
881
882
	/**
883
	 * @throws DBConnectionError
884
	 * @return bool
885
	 */
886
	private function reportConnectionError() {
887
		$conn = $this->mErrorConnection; // The connection which caused the error
888
		$context = [
889
			'method' => __METHOD__,
890
			'last_error' => $this->mLastError,
891
		];
892
893
		if ( !is_object( $conn ) ) {
894
			// No last connection, probably due to all servers being too busy
895
			wfLogDBError(
896
				"LB failure with no last connection. Connection error: {last_error}",
897
				$context
898
			);
899
900
			// If all servers were busy, mLastError will contain something sensible
901
			throw new DBConnectionError( null, $this->mLastError );
902
		} else {
903
			$context['db_server'] = $conn->getProperty( 'mServer' );
904
			wfLogDBError(
905
				"Connection error: {last_error} ({db_server})",
906
				$context
907
			);
908
909
			// throws DBConnectionError
910
			$conn->reportConnectionError( "{$this->mLastError} ({$context['db_server']})" );
911
		}
912
913
		return false; /* not reached */
914
	}
915
916
	/**
917
	 * @return int
918
	 * @since 1.26
919
	 */
920
	public function getWriterIndex() {
921
		return 0;
922
	}
923
924
	/**
925
	 * Returns true if the specified index is a valid server index
926
	 *
927
	 * @param string $i
928
	 * @return bool
929
	 */
930
	public function haveIndex( $i ) {
931
		return array_key_exists( $i, $this->mServers );
932
	}
933
934
	/**
935
	 * Returns true if the specified index is valid and has non-zero load
936
	 *
937
	 * @param string $i
938
	 * @return bool
939
	 */
940
	public function isNonZeroLoad( $i ) {
941
		return array_key_exists( $i, $this->mServers ) && $this->mLoads[$i] != 0;
942
	}
943
944
	/**
945
	 * Get the number of defined servers (not the number of open connections)
946
	 *
947
	 * @return int
948
	 */
949
	public function getServerCount() {
950
		return count( $this->mServers );
951
	}
952
953
	/**
954
	 * Get the host name or IP address of the server with the specified index
955
	 * Prefer a readable name if available.
956
	 * @param string $i
957
	 * @return string
958
	 */
959
	public function getServerName( $i ) {
960
		if ( isset( $this->mServers[$i]['hostName'] ) ) {
961
			$name = $this->mServers[$i]['hostName'];
962 View Code Duplication
		} elseif ( isset( $this->mServers[$i]['host'] ) ) {
963
			$name = $this->mServers[$i]['host'];
964
		} else {
965
			$name = '';
966
		}
967
968
		return ( $name != '' ) ? $name : 'localhost';
969
	}
970
971
	/**
972
	 * Return the server info structure for a given index, or false if the index is invalid.
973
	 * @param int $i
974
	 * @return array|bool
975
	 */
976
	public function getServerInfo( $i ) {
977
		if ( isset( $this->mServers[$i] ) ) {
978
			return $this->mServers[$i];
979
		} else {
980
			return false;
981
		}
982
	}
983
984
	/**
985
	 * Sets the server info structure for the given index. Entry at index $i
986
	 * is created if it doesn't exist
987
	 * @param int $i
988
	 * @param array $serverInfo
989
	 */
990
	public function setServerInfo( $i, array $serverInfo ) {
991
		$this->mServers[$i] = $serverInfo;
992
	}
993
994
	/**
995
	 * Get the current master position for chronology control purposes
996
	 * @return mixed
997
	 */
998
	public function getMasterPos() {
999
		# If this entire request was served from a replica DB without opening a connection to the
1000
		# master (however unlikely that may be), then we can fetch the position from the replica DB.
1001
		$masterConn = $this->getAnyOpenConnection( 0 );
1002
		if ( !$masterConn ) {
1003
			$serverCount = count( $this->mServers );
1004
			for ( $i = 1; $i < $serverCount; $i++ ) {
1005
				$conn = $this->getAnyOpenConnection( $i );
1006
				if ( $conn ) {
1007
					return $conn->getSlavePos();
0 ignored issues
show
Bug introduced by
The method getSlavePos cannot be called on $conn (of type boolean).

Methods can only be called on objects. This check looks for methods being called on variables that have been inferred to never be objects.

Loading history...
1008
				}
1009
			}
1010
		} else {
1011
			return $masterConn->getMasterPos();
1012
		}
1013
1014
		return false;
1015
	}
1016
1017
	/**
1018
	 * Disable this load balancer. All connections are closed, and any attempt to
1019
	 * open a new connection will result in a DBAccessError.
1020
	 *
1021
	 * @since 1.27
1022
	 */
1023
	public function disable() {
1024
		$this->closeAll();
1025
		$this->disabled = true;
1026
	}
1027
1028
	/**
1029
	 * Close all open connections
1030
	 */
1031
	public function closeAll() {
1032
		$this->forEachOpenConnection( function ( DatabaseBase $conn ) {
1033
			$conn->close();
1034
		} );
1035
1036
		$this->mConns = [
0 ignored issues
show
Documentation Bug introduced by
It seems like array('local' => array()...oreignUsed' => array()) of type array<string,array,{"loc..."foreignUsed":"array"}> is incompatible with the declared type array<integer,array> of property $mConns.

Our type inference engine has found an assignment to a property that is incompatible with the declared type of that property.

Either this assignment is in error or the assigned type should be added to the documentation/type hint for that property..

Loading history...
1037
			'local' => [],
1038
			'foreignFree' => [],
1039
			'foreignUsed' => [],
1040
		];
1041
		$this->connsOpened = 0;
1042
	}
1043
1044
	/**
1045
	 * Close a connection
1046
	 * Using this function makes sure the LoadBalancer knows the connection is closed.
1047
	 * If you use $conn->close() directly, the load balancer won't update its state.
1048
	 * @param DatabaseBase $conn
1049
	 */
1050
	public function closeConnection( $conn ) {
1051
		$done = false;
1052
		foreach ( $this->mConns as $i1 => $conns2 ) {
1053
			foreach ( $conns2 as $i2 => $conns3 ) {
1054
				foreach ( $conns3 as $i3 => $candidateConn ) {
1055
					if ( $conn === $candidateConn ) {
1056
						$conn->close();
1057
						unset( $this->mConns[$i1][$i2][$i3] );
1058
						--$this->connsOpened;
1059
						$done = true;
1060
						break;
1061
					}
1062
				}
1063
			}
1064
		}
1065
		if ( !$done ) {
1066
			$conn->close();
1067
		}
1068
	}
1069
1070
	/**
1071
	 * Commit transactions on all open connections
1072
	 * @param string $fname Caller name
1073
	 * @throws DBExpectedError
1074
	 */
1075
	public function commitAll( $fname = __METHOD__ ) {
1076
		$failures = [];
1077
1078
		$restore = ( $this->trxRoundId !== false );
1079
		$this->trxRoundId = false;
1080
		$this->forEachOpenConnection(
1081
			function ( DatabaseBase $conn ) use ( $fname, $restore, &$failures ) {
1082
				try {
1083
					$conn->commit( $fname, $conn::FLUSHING_ALL_PEERS );
1084
				} catch ( DBError $e ) {
1085
					MWExceptionHandler::logException( $e );
1086
					$failures[] = "{$conn->getServer()}: {$e->getMessage()}";
1087
				}
1088
				if ( $restore && $conn->getLBInfo( 'master' ) ) {
1089
					$this->undoTransactionRoundFlags( $conn );
1090
				}
1091
			}
1092
		);
1093
1094
		if ( $failures ) {
1095
			throw new DBExpectedError(
1096
				null,
1097
				"Commit failed on server(s) " . implode( "\n", array_unique( $failures ) )
1098
			);
1099
		}
1100
	}
1101
1102
	/**
1103
	 * Perform all pre-commit callbacks that remain part of the atomic transactions
1104
	 * and disable any post-commit callbacks until runMasterPostTrxCallbacks()
1105
	 * @since 1.28
1106
	 */
1107
	public function finalizeMasterChanges() {
1108
		$this->forEachOpenMasterConnection( function ( DatabaseBase $conn ) {
1109
			// Any error should cause all DB transactions to be rolled back together
1110
			$conn->setTrxEndCallbackSuppression( false );
1111
			$conn->runOnTransactionPreCommitCallbacks();
1112
			// Defer post-commit callbacks until COMMIT finishes for all DBs
1113
			$conn->setTrxEndCallbackSuppression( true );
1114
		} );
1115
	}
1116
1117
	/**
1118
	 * Perform all pre-commit checks for things like replication safety
1119
	 * @param array $options Includes:
1120
	 *   - maxWriteDuration : max write query duration time in seconds
1121
	 * @throws DBTransactionError
1122
	 * @since 1.28
1123
	 */
1124
	public function approveMasterChanges( array $options ) {
1125
		$limit = isset( $options['maxWriteDuration'] ) ? $options['maxWriteDuration'] : 0;
1126
		$this->forEachOpenMasterConnection( function ( DatabaseBase $conn ) use ( $limit ) {
1127
			// If atomic sections or explicit transactions are still open, some caller must have
1128
			// caught an exception but failed to properly rollback any changes. Detect that and
1129
			// throw and error (causing rollback).
1130
			if ( $conn->explicitTrxActive() ) {
1131
				throw new DBTransactionError(
1132
					$conn,
1133
					"Explicit transaction still active. A caller may have caught an error."
1134
				);
1135
			}
1136
			// Assert that the time to replicate the transaction will be sane.
1137
			// If this fails, then all DB transactions will be rollback back together.
1138
			$time = $conn->pendingWriteQueryDuration( $conn::ESTIMATE_DB_APPLY );
1139
			if ( $limit > 0 && $time > $limit ) {
1140
				throw new DBTransactionError(
1141
					$conn,
1142
					wfMessage( 'transaction-duration-limit-exceeded', $time, $limit )->text()
1143
				);
1144
			}
1145
			// If a connection sits idle while slow queries execute on another, that connection
1146
			// may end up dropped before the commit round is reached. Ping servers to detect this.
1147
			if ( $conn->writesOrCallbacksPending() && !$conn->ping() ) {
1148
				throw new DBTransactionError(
1149
					$conn,
1150
					"A connection to the {$conn->getDBname()} database was lost before commit."
1151
				);
1152
			}
1153
		} );
1154
	}
1155
1156
	/**
1157
	 * Flush any master transaction snapshots and set DBO_TRX (if DBO_DEFAULT is set)
1158
	 *
1159
	 * The DBO_TRX setting will be reverted to the default in each of these methods:
1160
	 *   - commitMasterChanges()
1161
	 *   - rollbackMasterChanges()
1162
	 *   - commitAll()
1163
	 * This allows for custom transaction rounds from any outer transaction scope.
1164
	 *
1165
	 * @param string $fname
1166
	 * @throws DBExpectedError
1167
	 * @since 1.28
1168
	 */
1169
	public function beginMasterChanges( $fname = __METHOD__ ) {
1170
		if ( $this->trxRoundId !== false ) {
1171
			throw new DBTransactionError(
1172
				null,
1173
				"$fname: Transaction round '{$this->trxRoundId}' already started."
1174
			);
1175
		}
1176
		$this->trxRoundId = $fname;
1177
1178
		$failures = [];
1179
		$this->forEachOpenMasterConnection(
1180
			function ( DatabaseBase $conn ) use ( $fname, &$failures ) {
1181
				$conn->setTrxEndCallbackSuppression( true );
1182
				try {
1183
					$conn->clearSnapshot( $fname );
1184
				} catch ( DBError $e ) {
1185
					MWExceptionHandler::logException( $e );
1186
					$failures[] = "{$conn->getServer()}: {$e->getMessage()}";
1187
				}
1188
				$conn->setTrxEndCallbackSuppression( false );
1189
				$this->applyTransactionRoundFlags( $conn );
1190
			}
1191
		);
1192
1193 View Code Duplication
		if ( $failures ) {
1194
			throw new DBExpectedError(
1195
				null,
1196
				"$fname: Flush failed on server(s) " . implode( "\n", array_unique( $failures ) )
1197
			);
1198
		}
1199
	}
1200
1201
	/**
1202
	 * Issue COMMIT on all master connections where writes where done
1203
	 * @param string $fname Caller name
1204
	 * @throws DBExpectedError
1205
	 */
1206
	public function commitMasterChanges( $fname = __METHOD__ ) {
1207
		$failures = [];
1208
1209
		$restore = ( $this->trxRoundId !== false );
1210
		$this->trxRoundId = false;
1211
		$this->forEachOpenMasterConnection(
1212
			function ( DatabaseBase $conn ) use ( $fname, $restore, &$failures ) {
1213
				try {
1214
					if ( $conn->writesOrCallbacksPending() ) {
1215
						$conn->commit( $fname, $conn::FLUSHING_ALL_PEERS );
1216
					} elseif ( $restore ) {
1217
						$conn->clearSnapshot( $fname );
1218
					}
1219
				} catch ( DBError $e ) {
1220
					MWExceptionHandler::logException( $e );
1221
					$failures[] = "{$conn->getServer()}: {$e->getMessage()}";
1222
				}
1223
				if ( $restore ) {
1224
					$this->undoTransactionRoundFlags( $conn );
1225
				}
1226
			}
1227
		);
1228
1229 View Code Duplication
		if ( $failures ) {
1230
			throw new DBExpectedError(
1231
				null,
1232
				"$fname: Commit failed on server(s) " . implode( "\n", array_unique( $failures ) )
1233
			);
1234
		}
1235
	}
1236
1237
	/**
1238
	 * Issue all pending post-COMMIT/ROLLBACK callbacks
1239
	 * @param integer $type IDatabase::TRIGGER_* constant
1240
	 * @return Exception|null The first exception or null if there were none
1241
	 * @since 1.28
1242
	 */
1243
	public function runMasterPostTrxCallbacks( $type ) {
1244
		$e = null; // first exception
1245
		$this->forEachOpenMasterConnection( function ( DatabaseBase $conn ) use ( $type, &$e ) {
1246
			$conn->clearSnapshot( __METHOD__ ); // clear no-op transactions
1247
1248
			$conn->setTrxEndCallbackSuppression( false );
1249
			try {
1250
				$conn->runOnTransactionIdleCallbacks( $type );
1251
			} catch ( Exception $ex ) {
1252
				$e = $e ?: $ex;
1253
			}
1254
			try {
1255
				$conn->runTransactionListenerCallbacks( $type );
1256
			} catch ( Exception $ex ) {
1257
				$e = $e ?: $ex;
1258
			}
1259
		} );
1260
1261
		return $e;
1262
	}
1263
1264
	/**
1265
	 * Issue ROLLBACK only on master, only if queries were done on connection
1266
	 * @param string $fname Caller name
1267
	 * @throws DBExpectedError
1268
	 * @since 1.23
1269
	 */
1270
	public function rollbackMasterChanges( $fname = __METHOD__ ) {
1271
		$restore = ( $this->trxRoundId !== false );
1272
		$this->trxRoundId = false;
1273
		$this->forEachOpenMasterConnection(
1274
			function ( DatabaseBase $conn ) use ( $fname, $restore ) {
1275
				if ( $conn->writesOrCallbacksPending() ) {
1276
					$conn->rollback( $fname, $conn::FLUSHING_ALL_PEERS );
1277
				}
1278
				if ( $restore ) {
1279
					$this->undoTransactionRoundFlags( $conn );
1280
				}
1281
			}
1282
		);
1283
	}
1284
1285
	/**
1286
	 * Suppress all pending post-COMMIT/ROLLBACK callbacks
1287
	 * @return Exception|null The first exception or null if there were none
1288
	 * @since 1.28
1289
	 */
1290
	public function suppressTransactionEndCallbacks() {
1291
		$this->forEachOpenMasterConnection( function ( DatabaseBase $conn ) {
1292
			$conn->setTrxEndCallbackSuppression( true );
1293
		} );
1294
	}
1295
1296
	/**
1297
	 * @param DatabaseBase $conn
1298
	 */
1299
	private function applyTransactionRoundFlags( DatabaseBase $conn ) {
1300
		if ( $conn->getFlag( DBO_DEFAULT ) ) {
1301
			// DBO_TRX is controlled entirely by CLI mode presence with DBO_DEFAULT.
1302
			// Force DBO_TRX even in CLI mode since a commit round is expected soon.
1303
			$conn->setFlag( DBO_TRX, $conn::REMEMBER_PRIOR );
1304
			// If config has explicitly requested DBO_TRX be either on or off by not
1305
			// setting DBO_DEFAULT, then respect that. Forcing no transactions is useful
1306
			// for things like blob stores (ExternalStore) which want auto-commit mode.
1307
		}
1308
	}
1309
1310
	/**
1311
	 * @param DatabaseBase $conn
1312
	 */
1313
	private function undoTransactionRoundFlags( DatabaseBase $conn ) {
1314
		if ( $conn->getFlag( DBO_DEFAULT ) ) {
1315
			$conn->restoreFlags( $conn::RESTORE_PRIOR );
1316
		}
1317
	}
1318
1319
	/**
1320
	 * Commit all replica DB transactions so as to flush any REPEATABLE-READ or SSI snapshot
1321
	 *
1322
	 * @param string $fname Caller name
1323
	 * @since 1.28
1324
	 */
1325
	public function flushReplicaSnapshots( $fname = __METHOD__ ) {
1326
		$this->forEachOpenReplicaConnection( function ( DatabaseBase $conn ) {
1327
			$conn->clearSnapshot( __METHOD__ );
1328
		} );
1329
	}
1330
1331
	/**
1332
	 * @return bool Whether a master connection is already open
1333
	 * @since 1.24
1334
	 */
1335
	public function hasMasterConnection() {
1336
		return $this->isOpen( $this->getWriterIndex() );
1337
	}
1338
1339
	/**
1340
	 * Determine if there are pending changes in a transaction by this thread
1341
	 * @since 1.23
1342
	 * @return bool
1343
	 */
1344
	public function hasMasterChanges() {
1345
		$masterIndex = $this->getWriterIndex();
1346
		foreach ( $this->mConns as $conns2 ) {
1347
			if ( empty( $conns2[$masterIndex] ) ) {
1348
				continue;
1349
			}
1350
			/** @var DatabaseBase $conn */
1351
			foreach ( $conns2[$masterIndex] as $conn ) {
1352
				if ( $conn->trxLevel() && $conn->writesOrCallbacksPending() ) {
1353
					return true;
1354
				}
1355
			}
1356
		}
1357
		return false;
1358
	}
1359
1360
	/**
1361
	 * Get the timestamp of the latest write query done by this thread
1362
	 * @since 1.25
1363
	 * @return float|bool UNIX timestamp or false
1364
	 */
1365 View Code Duplication
	public function lastMasterChangeTimestamp() {
1366
		$lastTime = false;
1367
		$masterIndex = $this->getWriterIndex();
1368
		foreach ( $this->mConns as $conns2 ) {
1369
			if ( empty( $conns2[$masterIndex] ) ) {
1370
				continue;
1371
			}
1372
			/** @var DatabaseBase $conn */
1373
			foreach ( $conns2[$masterIndex] as $conn ) {
1374
				$lastTime = max( $lastTime, $conn->lastDoneWrites() );
1375
			}
1376
		}
1377
		return $lastTime;
1378
	}
1379
1380
	/**
1381
	 * Check if this load balancer object had any recent or still
1382
	 * pending writes issued against it by this PHP thread
1383
	 *
1384
	 * @param float $age How many seconds ago is "recent" [defaults to mWaitTimeout]
1385
	 * @return bool
1386
	 * @since 1.25
1387
	 */
1388
	public function hasOrMadeRecentMasterChanges( $age = null ) {
1389
		$age = ( $age === null ) ? $this->mWaitTimeout : $age;
1390
1391
		return ( $this->hasMasterChanges()
1392
			|| $this->lastMasterChangeTimestamp() > microtime( true ) - $age );
1393
	}
1394
1395
	/**
1396
	 * Get the list of callers that have pending master changes
1397
	 *
1398
	 * @return array
1399
	 * @since 1.27
1400
	 */
1401 View Code Duplication
	public function pendingMasterChangeCallers() {
1402
		$fnames = [];
1403
1404
		$masterIndex = $this->getWriterIndex();
1405
		foreach ( $this->mConns as $conns2 ) {
1406
			if ( empty( $conns2[$masterIndex] ) ) {
1407
				continue;
1408
			}
1409
			/** @var DatabaseBase $conn */
1410
			foreach ( $conns2[$masterIndex] as $conn ) {
1411
				$fnames = array_merge( $fnames, $conn->pendingWriteCallers() );
1412
			}
1413
		}
1414
1415
		return $fnames;
1416
	}
1417
1418
	/**
1419
	 * @param mixed $value
1420
	 * @return mixed
1421
	 */
1422
	public function waitTimeout( $value = null ) {
1423
		return wfSetVar( $this->mWaitTimeout, $value );
1424
	}
1425
1426
	/**
1427
	 * @note This method will trigger a DB connection if not yet done
1428
	 *
1429
	 * @param string|bool $wiki Wiki ID, or false for the current wiki
1430
	 * @return bool Whether the generic connection for reads is highly "lagged"
1431
	 */
1432
	public function getLaggedSlaveMode( $wiki = false ) {
1433
		// No-op if there is only one DB (also avoids recursion)
1434
		if ( !$this->laggedSlaveMode && $this->getServerCount() > 1 ) {
1435
			try {
1436
				// See if laggedSlaveMode gets set
1437
				$conn = $this->getConnection( DB_SLAVE, false, $wiki );
1438
				$this->reuseConnection( $conn );
1439
			} catch ( DBConnectionError $e ) {
1440
				// Avoid expensive re-connect attempts and failures
1441
				$this->slavesDownMode = true;
1442
				$this->laggedSlaveMode = true;
1443
			}
1444
		}
1445
1446
		return $this->laggedSlaveMode;
1447
	}
1448
1449
	/**
1450
	 * @note This method will never cause a new DB connection
1451
	 * @return bool Whether any generic connection used for reads was highly "lagged"
1452
	 * @since 1.27
1453
	 */
1454
	public function laggedSlaveUsed() {
1455
		return $this->laggedSlaveMode;
1456
	}
1457
1458
	/**
1459
	 * @note This method may trigger a DB connection if not yet done
1460
	 * @param string|bool $wiki Wiki ID, or false for the current wiki
1461
	 * @param DatabaseBase|null DB master connection; used to avoid loops [optional]
1462
	 * @return string|bool Reason the master is read-only or false if it is not
1463
	 * @since 1.27
1464
	 */
1465
	public function getReadOnlyReason( $wiki = false, DatabaseBase $conn = null ) {
1466
		if ( $this->readOnlyReason !== false ) {
1467
			return $this->readOnlyReason;
1468
		} elseif ( $this->getLaggedSlaveMode( $wiki ) ) {
1469
			if ( $this->slavesDownMode ) {
1470
				return 'The database has been automatically locked ' .
1471
					'until the replica database servers become available';
1472
			} else {
1473
				return 'The database has been automatically locked ' .
1474
					'while the replica database servers catch up to the master.';
1475
			}
1476
		} elseif ( $this->masterRunningReadOnly( $wiki, $conn ) ) {
0 ignored issues
show
Bug introduced by
It seems like $wiki defined by parameter $wiki on line 1465 can also be of type boolean; however, LoadBalancer::masterRunningReadOnly() does only seem to accept string, maybe add an additional type check?

This check looks at variables that have been passed in as parameters and are passed out again to other methods.

If the outgoing method call has stricter type requirements than the method itself, an issue is raised.

An additional type check may prevent trouble.

Loading history...
1477
			return 'The database master is running in read-only mode.';
1478
		}
1479
1480
		return false;
1481
	}
1482
1483
	/**
1484
	 * @param string $wiki Wiki ID, or false for the current wiki
1485
	 * @param DatabaseBase|null DB master connectionl used to avoid loops [optional]
1486
	 * @return bool
1487
	 */
1488
	private function masterRunningReadOnly( $wiki, DatabaseBase $conn = null ) {
1489
		$cache = $this->wanCache;
1490
		$masterServer = $this->getServerName( $this->getWriterIndex() );
1491
1492
		return (bool)$cache->getWithSetCallback(
1493
			$cache->makeGlobalKey( __CLASS__, 'server-read-only', $masterServer ),
1494
			self::TTL_CACHE_READONLY,
1495
			function () use ( $wiki, $conn ) {
1496
				$this->trxProfiler->setSilenced( true );
1497
				try {
1498
					$dbw = $conn ?: $this->getConnection( DB_MASTER, [], $wiki );
1499
					$readOnly = (int)$dbw->serverIsReadOnly();
1500
				} catch ( DBError $e ) {
1501
					$readOnly = 0;
1502
				}
1503
				$this->trxProfiler->setSilenced( false );
1504
				return $readOnly;
1505
			},
1506
			[ 'pcTTL' => $cache::TTL_PROC_LONG, 'busyValue' => 0 ]
1507
		);
1508
	}
1509
1510
	/**
1511
	 * Disables/enables lag checks
1512
	 * @param null|bool $mode
1513
	 * @return bool
1514
	 */
1515
	public function allowLagged( $mode = null ) {
1516
		if ( $mode === null ) {
1517
			return $this->mAllowLagged;
1518
		}
1519
		$this->mAllowLagged = $mode;
1520
1521
		return $this->mAllowLagged;
1522
	}
1523
1524
	/**
1525
	 * @return bool
1526
	 */
1527
	public function pingAll() {
1528
		$success = true;
1529
		$this->forEachOpenConnection( function ( DatabaseBase $conn ) use ( &$success ) {
1530
			if ( !$conn->ping() ) {
1531
				$success = false;
1532
			}
1533
		} );
1534
1535
		return $success;
1536
	}
1537
1538
	/**
1539
	 * Call a function with each open connection object
1540
	 * @param callable $callback
1541
	 * @param array $params
1542
	 */
1543 View Code Duplication
	public function forEachOpenConnection( $callback, array $params = [] ) {
1544
		foreach ( $this->mConns as $connsByServer ) {
1545
			foreach ( $connsByServer as $serverConns ) {
1546
				foreach ( $serverConns as $conn ) {
1547
					$mergedParams = array_merge( [ $conn ], $params );
1548
					call_user_func_array( $callback, $mergedParams );
1549
				}
1550
			}
1551
		}
1552
	}
1553
1554
	/**
1555
	 * Call a function with each open connection object to a master
1556
	 * @param callable $callback
1557
	 * @param array $params
1558
	 * @since 1.28
1559
	 */
1560
	public function forEachOpenMasterConnection( $callback, array $params = [] ) {
1561
		$masterIndex = $this->getWriterIndex();
1562
		foreach ( $this->mConns as $connsByServer ) {
1563
			if ( isset( $connsByServer[$masterIndex] ) ) {
1564
				/** @var DatabaseBase $conn */
1565
				foreach ( $connsByServer[$masterIndex] as $conn ) {
1566
					$mergedParams = array_merge( [ $conn ], $params );
1567
					call_user_func_array( $callback, $mergedParams );
1568
				}
1569
			}
1570
		}
1571
	}
1572
1573
	/**
1574
	 * Call a function with each open replica DB connection object
1575
	 * @param callable $callback
1576
	 * @param array $params
1577
	 * @since 1.28
1578
	 */
1579 View Code Duplication
	public function forEachOpenReplicaConnection( $callback, array $params = [] ) {
1580
		foreach ( $this->mConns as $connsByServer ) {
1581
			foreach ( $connsByServer as $i => $serverConns ) {
1582
				if ( $i === $this->getWriterIndex() ) {
1583
					continue; // skip master
1584
				}
1585
				foreach ( $serverConns as $conn ) {
1586
					$mergedParams = array_merge( [ $conn ], $params );
1587
					call_user_func_array( $callback, $mergedParams );
1588
				}
1589
			}
1590
		}
1591
	}
1592
1593
	/**
1594
	 * Get the hostname and lag time of the most-lagged replica DB
1595
	 *
1596
	 * This is useful for maintenance scripts that need to throttle their updates.
1597
	 * May attempt to open connections to replica DBs on the default DB. If there is
1598
	 * no lag, the maximum lag will be reported as -1.
1599
	 *
1600
	 * @param bool|string $wiki Wiki ID, or false for the default database
1601
	 * @return array ( host, max lag, index of max lagged host )
1602
	 */
1603
	public function getMaxLag( $wiki = false ) {
1604
		$maxLag = -1;
1605
		$host = '';
1606
		$maxIndex = 0;
1607
1608
		if ( $this->getServerCount() <= 1 ) {
1609
			return [ $host, $maxLag, $maxIndex ]; // no replication = no lag
1610
		}
1611
1612
		$lagTimes = $this->getLagTimes( $wiki );
1613
		foreach ( $lagTimes as $i => $lag ) {
1614
			if ( $this->mLoads[$i] > 0 && $lag > $maxLag ) {
1615
				$maxLag = $lag;
1616
				$host = $this->mServers[$i]['host'];
1617
				$maxIndex = $i;
1618
			}
1619
		}
1620
1621
		return [ $host, $maxLag, $maxIndex ];
1622
	}
1623
1624
	/**
1625
	 * Get an estimate of replication lag (in seconds) for each server
1626
	 *
1627
	 * Results are cached for a short time in memcached/process cache
1628
	 *
1629
	 * Values may be "false" if replication is too broken to estimate
1630
	 *
1631
	 * @param string|bool $wiki
1632
	 * @return int[] Map of (server index => float|int|bool)
1633
	 */
1634
	public function getLagTimes( $wiki = false ) {
1635
		if ( $this->getServerCount() <= 1 ) {
1636
			return [ 0 => 0 ]; // no replication = no lag
1637
		}
1638
1639
		# Send the request to the load monitor
1640
		return $this->getLoadMonitor()->getLagTimes( array_keys( $this->mServers ), $wiki );
0 ignored issues
show
Bug introduced by
It seems like $wiki defined by parameter $wiki on line 1634 can also be of type boolean; however, LoadMonitor::getLagTimes() does only seem to accept string, maybe add an additional type check?

This check looks at variables that have been passed in as parameters and are passed out again to other methods.

If the outgoing method call has stricter type requirements than the method itself, an issue is raised.

An additional type check may prevent trouble.

Loading history...
1641
	}
1642
1643
	/**
1644
	 * Get the lag in seconds for a given connection, or zero if this load
1645
	 * balancer does not have replication enabled.
1646
	 *
1647
	 * This should be used in preference to Database::getLag() in cases where
1648
	 * replication may not be in use, since there is no way to determine if
1649
	 * replication is in use at the connection level without running
1650
	 * potentially restricted queries such as SHOW SLAVE STATUS. Using this
1651
	 * function instead of Database::getLag() avoids a fatal error in this
1652
	 * case on many installations.
1653
	 *
1654
	 * @param IDatabase $conn
1655
	 * @return int|bool Returns false on error
1656
	 */
1657
	public function safeGetLag( IDatabase $conn ) {
1658
		if ( $this->getServerCount() == 1 ) {
1659
			return 0;
1660
		} else {
1661
			return $conn->getLag();
1662
		}
1663
	}
1664
1665
	/**
1666
	 * Wait for a replica DB to reach a specified master position
1667
	 *
1668
	 * This will connect to the master to get an accurate position if $pos is not given
1669
	 *
1670
	 * @param IDatabase $conn Replica DB
1671
	 * @param DBMasterPos|bool $pos Master position; default: current position
1672
	 * @param integer $timeout Timeout in seconds
1673
	 * @return bool Success
1674
	 * @since 1.27
1675
	 */
1676
	public function safeWaitForMasterPos( IDatabase $conn, $pos = false, $timeout = 10 ) {
1677
		if ( $this->getServerCount() == 1 || !$conn->getLBInfo( 'slave' ) ) {
1678
			return true; // server is not a replica DB
1679
		}
1680
1681
		$pos = $pos ?: $this->getConnection( DB_MASTER )->getMasterPos();
1682
		if ( !( $pos instanceof DBMasterPos ) ) {
1683
			return false; // something is misconfigured
1684
		}
1685
1686
		$result = $conn->masterPosWait( $pos, $timeout );
1687
		if ( $result == -1 || is_null( $result ) ) {
1688
			$msg = __METHOD__ . ": Timed out waiting on {$conn->getServer()} pos {$pos}";
1689
			wfDebugLog( 'replication', "$msg\n" );
1690
			wfDebugLog( 'DBPerformance', "$msg:\n" . wfBacktrace( true ) );
1691
			$ok = false;
1692
		} else {
1693
			wfDebugLog( 'replication', __METHOD__ . ": Done\n" );
1694
			$ok = true;
1695
		}
1696
1697
		return $ok;
1698
	}
1699
1700
	/**
1701
	 * Clear the cache for slag lag delay times
1702
	 *
1703
	 * This is only used for testing
1704
	 */
1705
	public function clearLagTimeCache() {
1706
		$this->getLoadMonitor()->clearCaches();
1707
	}
1708
1709
	/**
1710
	 * Set a callback via DatabaseBase::setTransactionListener() on
1711
	 * all current and future master connections of this load balancer
1712
	 *
1713
	 * @param string $name Callback name
1714
	 * @param callable|null $callback
1715
	 * @since 1.28
1716
	 */
1717
	public function setTransactionListener( $name, callable $callback = null ) {
1718
		if ( $callback ) {
1719
			$this->trxRecurringCallbacks[$name] = $callback;
1720
		} else {
1721
			unset( $this->trxRecurringCallbacks[$name] );
1722
		}
1723
		$this->forEachOpenMasterConnection(
1724
			function ( DatabaseBase $conn ) use ( $name, $callback ) {
1725
				$conn->setTransactionListener( $name, $callback );
1726
			}
1727
		);
1728
	}
1729
}
1730