Completed
Branch master (90e9fc)
by
unknown
29:23
created

SqlBagOStuff::waitForReplication()   B

Complexity

Conditions 4
Paths 5

Size

Total Lines 26
Code Lines 14

Duplication

Lines 0
Ratio 0 %

Importance

Changes 1
Bugs 0 Features 0
Metric Value
cc 4
eloc 14
nc 5
nop 0
dl 0
loc 26
rs 8.5806
c 1
b 0
f 0
1
<?php
2
/**
3
 * Object caching using a SQL database.
4
 *
5
 * This program is free software; you can redistribute it and/or modify
6
 * it under the terms of the GNU General Public License as published by
7
 * the Free Software Foundation; either version 2 of the License, or
8
 * (at your option) any later version.
9
 *
10
 * This program is distributed in the hope that it will be useful,
11
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13
 * GNU General Public License for more details.
14
 *
15
 * You should have received a copy of the GNU General Public License along
16
 * with this program; if not, write to the Free Software Foundation, Inc.,
17
 * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
18
 * http://www.gnu.org/copyleft/gpl.html
19
 *
20
 * @file
21
 * @ingroup Cache
22
 */
23
24
use \MediaWiki\MediaWikiServices;
25
26
/**
27
 * Class to store objects in the database
28
 *
29
 * @ingroup Cache
30
 */
31
class SqlBagOStuff extends BagOStuff {
32
	/** @var array[] (server index => server config) */
33
	protected $serverInfos;
34
	/** @var string[] (server index => tag/host name) */
35
	protected $serverTags;
36
	/** @var int */
37
	protected $numServers;
38
	/** @var int */
39
	protected $lastExpireAll = 0;
40
	/** @var int */
41
	protected $purgePeriod = 100;
42
	/** @var int */
43
	protected $shards = 1;
44
	/** @var string */
45
	protected $tableName = 'objectcache';
46
	/** @var bool */
47
	protected $slaveOnly = false;
48
	/** @var int */
49
	protected $syncTimeout = 3;
50
51
	/** @var LoadBalancer|null */
52
	protected $separateMainLB;
53
	/** @var array */
54
	protected $conns;
55
	/** @var array UNIX timestamps */
56
	protected $connFailureTimes = [];
57
	/** @var array Exceptions */
58
	protected $connFailureErrors = [];
59
60
	/**
61
	 * Constructor. Parameters are:
62
	 *   - server:      A server info structure in the format required by each
63
	 *                  element in $wgDBServers.
64
	 *
65
	 *   - servers:     An array of server info structures describing a set of database servers
66
	 *                  to distribute keys to. If this is specified, the "server" option will be
67
	 *                  ignored. If string keys are used, then they will be used for consistent
68
	 *                  hashing *instead* of the host name (from the server config). This is useful
69
	 *                  when a cluster is replicated to another site (with different host names)
70
	 *                  but each server has a corresponding replica in the other cluster.
71
	 *
72
	 *   - purgePeriod: The average number of object cache requests in between
73
	 *                  garbage collection operations, where expired entries
74
	 *                  are removed from the database. Or in other words, the
75
	 *                  reciprocal of the probability of purging on any given
76
	 *                  request. If this is set to zero, purging will never be
77
	 *                  done.
78
	 *
79
	 *   - tableName:   The table name to use, default is "objectcache".
80
	 *
81
	 *   - shards:      The number of tables to use for data storage on each server.
82
	 *                  If this is more than 1, table names will be formed in the style
83
	 *                  objectcacheNNN where NNN is the shard index, between 0 and
84
	 *                  shards-1. The number of digits will be the minimum number
85
	 *                  required to hold the largest shard index. Data will be
86
	 *                  distributed across all tables by key hash. This is for
87
	 *                  MySQL bugs 61735 and 61736.
88
	 *   - slaveOnly:   Whether to only use slave DBs and avoid triggering
89
	 *                  garbage collection logic of expired items. This only
90
	 *                  makes sense if the primary DB is used and only if get()
91
	 *                  calls will be used. This is used by ReplicatedBagOStuff.
92
	 *   - syncTimeout: Max seconds to wait for slaves to catch up for WRITE_SYNC.
93
	 *
94
	 * @param array $params
95
	 */
96
	public function __construct( $params ) {
97
		parent::__construct( $params );
98
99
		$this->attrMap[self::ATTR_EMULATION] = self::QOS_EMULATION_SQL;
100
101
		if ( isset( $params['servers'] ) ) {
102
			$this->serverInfos = [];
103
			$this->serverTags = [];
104
			$this->numServers = count( $params['servers'] );
105
			$index = 0;
106
			foreach ( $params['servers'] as $tag => $info ) {
107
				$this->serverInfos[$index] = $info;
108
				if ( is_string( $tag ) ) {
109
					$this->serverTags[$index] = $tag;
110
				} else {
111
					$this->serverTags[$index] = isset( $info['host'] ) ? $info['host'] : "#$index";
112
				}
113
				++$index;
114
			}
115
		} elseif ( isset( $params['server'] ) ) {
116
			$this->serverInfos = [ $params['server'] ];
117
			$this->numServers = count( $this->serverInfos );
118
		} else {
119
			// Default to using the main wiki's database servers
120
			$this->serverInfos = false;
0 ignored issues
show
Documentation Bug introduced by
It seems like false of type false is incompatible with the declared type array<integer,array> of property $serverInfos.

Our type inference engine has found an assignment to a property that is incompatible with the declared type of that property.

Either this assignment is in error or the assigned type should be added to the documentation/type hint for that property..

Loading history...
121
			$this->numServers = 1;
122
		}
123
		if ( isset( $params['purgePeriod'] ) ) {
124
			$this->purgePeriod = intval( $params['purgePeriod'] );
125
		}
126
		if ( isset( $params['tableName'] ) ) {
127
			$this->tableName = $params['tableName'];
128
		}
129
		if ( isset( $params['shards'] ) ) {
130
			$this->shards = intval( $params['shards'] );
131
		}
132
		if ( isset( $params['syncTimeout'] ) ) {
133
			$this->syncTimeout = $params['syncTimeout'];
134
		}
135
		$this->slaveOnly = !empty( $params['slaveOnly'] );
136
	}
137
138
	protected function getSeparateMainLB() {
139
		global $wgDBtype;
140
141
		if ( $wgDBtype === 'mysql' && $this->usesMainDB() ) {
142
			if ( !$this->separateMainLB ) {
143
				// We must keep a separate connection to MySQL in order to avoid deadlocks
144
				$lbFactory = MediaWikiServices::getInstance()->getDBLoadBalancerFactory();
145
				$this->separateMainLB = $lbFactory->newMainLB();
146
			}
147
			return $this->separateMainLB;
148
		} else {
149
			// However, SQLite has an opposite behavior. And PostgreSQL needs to know
150
			// if we are in transaction or not (@TODO: find some PostgreSQL work-around).
151
			return null;
152
		}
153
	}
154
155
	/**
156
	 * Get a connection to the specified database
157
	 *
158
	 * @param int $serverIndex
159
	 * @return IDatabase
160
	 * @throws MWException
161
	 */
162
	protected function getDB( $serverIndex ) {
163
		if ( !isset( $this->conns[$serverIndex] ) ) {
164
			if ( $serverIndex >= $this->numServers ) {
165
				throw new MWException( __METHOD__ . ": Invalid server index \"$serverIndex\"" );
166
			}
167
168
			# Don't keep timing out trying to connect for each call if the DB is down
169
			if ( isset( $this->connFailureErrors[$serverIndex] )
170
				&& ( time() - $this->connFailureTimes[$serverIndex] ) < 60
171
			) {
172
				throw $this->connFailureErrors[$serverIndex];
173
			}
174
175
			# If server connection info was given, use that
176
			if ( $this->serverInfos ) {
177
				$info = $this->serverInfos[$serverIndex];
178
				$type = isset( $info['type'] ) ? $info['type'] : 'mysql';
179
				$host = isset( $info['host'] ) ? $info['host'] : '[unknown]';
180
				$this->logger->debug( __CLASS__ . ": connecting to $host" );
181
				// Use a blank trx profiler to ignore expections as this is a cache
182
				$info['trxProfiler'] = new TransactionProfiler();
183
				$db = DatabaseBase::factory( $type, $info );
184
				$db->clearFlag( DBO_TRX );
185
			} else {
186
				$index = $this->slaveOnly ? DB_SLAVE : DB_MASTER;
187
				if ( $this->getSeparateMainLB() ) {
188
					$db = $this->getSeparateMainLB()->getConnection( $index );
189
					$db->clearFlag( DBO_TRX ); // auto-commit mode
190
				} else {
191
					$db = wfGetDB( $index );
192
					// Can't mess with transaction rounds (DBO_TRX) :(
193
				}
194
			}
195
			$this->logger->debug( sprintf( "Connection %s will be used for SqlBagOStuff", $db ) );
196
			$this->conns[$serverIndex] = $db;
197
		}
198
199
		return $this->conns[$serverIndex];
200
	}
201
202
	/**
203
	 * Get the server index and table name for a given key
204
	 * @param string $key
205
	 * @return array Server index and table name
206
	 */
207
	protected function getTableByKey( $key ) {
208
		if ( $this->shards > 1 ) {
209
			$hash = hexdec( substr( md5( $key ), 0, 8 ) ) & 0x7fffffff;
210
			$tableIndex = $hash % $this->shards;
211
		} else {
212
			$tableIndex = 0;
213
		}
214
		if ( $this->numServers > 1 ) {
215
			$sortedServers = $this->serverTags;
216
			ArrayUtils::consistentHashSort( $sortedServers, $key );
217
			reset( $sortedServers );
218
			$serverIndex = key( $sortedServers );
219
		} else {
220
			$serverIndex = 0;
221
		}
222
		return [ $serverIndex, $this->getTableNameByShard( $tableIndex ) ];
223
	}
224
225
	/**
226
	 * Get the table name for a given shard index
227
	 * @param int $index
228
	 * @return string
229
	 */
230
	protected function getTableNameByShard( $index ) {
231
		if ( $this->shards > 1 ) {
232
			$decimals = strlen( $this->shards - 1 );
233
			return $this->tableName .
234
				sprintf( "%0{$decimals}d", $index );
235
		} else {
236
			return $this->tableName;
237
		}
238
	}
239
240
	protected function doGet( $key, $flags = 0 ) {
241
		$casToken = null;
242
243
		return $this->getWithToken( $key, $casToken, $flags );
244
	}
245
246
	protected function getWithToken( $key, &$casToken, $flags = 0 ) {
247
		$values = $this->getMulti( [ $key ] );
248
		if ( array_key_exists( $key, $values ) ) {
249
			$casToken = $values[$key];
250
			return $values[$key];
251
		}
252
		return false;
253
	}
254
255
	public function getMulti( array $keys, $flags = 0 ) {
256
		$values = []; // array of (key => value)
257
258
		$keysByTable = [];
259 View Code Duplication
		foreach ( $keys as $key ) {
260
			list( $serverIndex, $tableName ) = $this->getTableByKey( $key );
261
			$keysByTable[$serverIndex][$tableName][] = $key;
262
		}
263
264
		$this->garbageCollect(); // expire old entries if any
265
266
		$dataRows = [];
267
		foreach ( $keysByTable as $serverIndex => $serverKeys ) {
268
			try {
269
				$db = $this->getDB( $serverIndex );
270
				foreach ( $serverKeys as $tableName => $tableKeys ) {
271
					$res = $db->select( $tableName,
272
						[ 'keyname', 'value', 'exptime' ],
273
						[ 'keyname' => $tableKeys ],
274
						__METHOD__,
275
						// Approximate write-on-the-fly BagOStuff API via blocking.
276
						// This approximation fails if a ROLLBACK happens (which is rare).
277
						// We do not want to flush the TRX as that can break callers.
278
						$db->trxLevel() ? [ 'LOCK IN SHARE MODE' ] : []
279
					);
280
					if ( $res === false ) {
281
						continue;
282
					}
283
					foreach ( $res as $row ) {
0 ignored issues
show
Bug introduced by
The expression $res of type object<ResultWrapper>|boolean is not guaranteed to be traversable. How about adding an additional type check?

There are different options of fixing this problem.

  1. If you want to be on the safe side, you can add an additional type-check:

    $collection = json_decode($data, true);
    if ( ! is_array($collection)) {
        throw new \RuntimeException('$collection must be an array.');
    }
    
    foreach ($collection as $item) { /** ... */ }
    
  2. If you are sure that the expression is traversable, you might want to add a doc comment cast to improve IDE auto-completion and static analysis:

    /** @var array $collection */
    $collection = json_decode($data, true);
    
    foreach ($collection as $item) { /** .. */ }
    
  3. Mark the issue as a false-positive: Just hover the remove button, in the top-right corner of this issue for more options.

Loading history...
284
						$row->serverIndex = $serverIndex;
285
						$row->tableName = $tableName;
286
						$dataRows[$row->keyname] = $row;
287
					}
288
				}
289
			} catch ( DBError $e ) {
290
				$this->handleReadError( $e, $serverIndex );
291
			}
292
		}
293
294
		foreach ( $keys as $key ) {
295
			if ( isset( $dataRows[$key] ) ) { // HIT?
296
				$row = $dataRows[$key];
297
				$this->debug( "get: retrieved data; expiry time is " . $row->exptime );
298
				$db = null;
299
				try {
300
					$db = $this->getDB( $row->serverIndex );
301
					if ( $this->isExpired( $db, $row->exptime ) ) { // MISS
302
						$this->debug( "get: key has expired" );
303
					} else { // HIT
304
						$values[$key] = $this->unserialize( $db->decodeBlob( $row->value ) );
305
					}
306
				} catch ( DBQueryError $e ) {
307
					$this->handleWriteError( $e, $db, $row->serverIndex );
308
				}
309
			} else { // MISS
310
				$this->debug( 'get: no matching rows' );
311
			}
312
		}
313
314
		return $values;
315
	}
316
317
	public function setMulti( array $data, $expiry = 0 ) {
318
		$keysByTable = [];
319 View Code Duplication
		foreach ( $data as $key => $value ) {
320
			list( $serverIndex, $tableName ) = $this->getTableByKey( $key );
321
			$keysByTable[$serverIndex][$tableName][] = $key;
322
		}
323
324
		$this->garbageCollect(); // expire old entries if any
325
326
		$result = true;
327
		$exptime = (int)$expiry;
328
		foreach ( $keysByTable as $serverIndex => $serverKeys ) {
329
			$db = null;
330
			try {
331
				$db = $this->getDB( $serverIndex );
332
			} catch ( DBError $e ) {
333
				$this->handleWriteError( $e, $db, $serverIndex );
334
				$result = false;
335
				continue;
336
			}
337
338
			if ( $exptime < 0 ) {
339
				$exptime = 0;
340
			}
341
342 View Code Duplication
			if ( $exptime == 0 ) {
343
				$encExpiry = $this->getMaxDateTime( $db );
344
			} else {
345
				$exptime = $this->convertExpiry( $exptime );
346
				$encExpiry = $db->timestamp( $exptime );
347
			}
348
			foreach ( $serverKeys as $tableName => $tableKeys ) {
349
				$rows = [];
350
				foreach ( $tableKeys as $key ) {
351
					$rows[] = [
352
						'keyname' => $key,
353
						'value' => $db->encodeBlob( $this->serialize( $data[$key] ) ),
354
						'exptime' => $encExpiry,
355
					];
356
				}
357
358
				try {
359
					$db->replace(
360
						$tableName,
361
						[ 'keyname' ],
362
						$rows,
363
						__METHOD__
364
					);
365
				} catch ( DBError $e ) {
366
					$this->handleWriteError( $e, $db, $serverIndex );
367
					$result = false;
368
				}
369
370
			}
371
372
		}
373
374
		return $result;
375
	}
376
377 View Code Duplication
	public function set( $key, $value, $exptime = 0, $flags = 0 ) {
378
		$ok = $this->setMulti( [ $key => $value ], $exptime );
379
		if ( ( $flags & self::WRITE_SYNC ) == self::WRITE_SYNC ) {
380
			$ok = $this->waitForReplication() && $ok;
381
		}
382
383
		return $ok;
384
	}
385
386
	protected function cas( $casToken, $key, $value, $exptime = 0 ) {
387
		list( $serverIndex, $tableName ) = $this->getTableByKey( $key );
388
		$db = null;
389
		try {
390
			$db = $this->getDB( $serverIndex );
391
			$exptime = intval( $exptime );
392
393
			if ( $exptime < 0 ) {
394
				$exptime = 0;
395
			}
396
397 View Code Duplication
			if ( $exptime == 0 ) {
398
				$encExpiry = $this->getMaxDateTime( $db );
399
			} else {
400
				$exptime = $this->convertExpiry( $exptime );
401
				$encExpiry = $db->timestamp( $exptime );
402
			}
403
			// (bug 24425) use a replace if the db supports it instead of
404
			// delete/insert to avoid clashes with conflicting keynames
405
			$db->update(
406
				$tableName,
407
				[
408
					'keyname' => $key,
409
					'value' => $db->encodeBlob( $this->serialize( $value ) ),
410
					'exptime' => $encExpiry
411
				],
412
				[
413
					'keyname' => $key,
414
					'value' => $db->encodeBlob( $this->serialize( $casToken ) )
415
				],
416
				__METHOD__
417
			);
418
		} catch ( DBQueryError $e ) {
419
			$this->handleWriteError( $e, $db, $serverIndex );
420
421
			return false;
422
		}
423
424
		return (bool)$db->affectedRows();
425
	}
426
427
	public function delete( $key ) {
428
		list( $serverIndex, $tableName ) = $this->getTableByKey( $key );
429
		$db = null;
430
		try {
431
			$db = $this->getDB( $serverIndex );
432
			$db->delete(
433
				$tableName,
434
				[ 'keyname' => $key ],
435
				__METHOD__ );
436
		} catch ( DBError $e ) {
437
			$this->handleWriteError( $e, $db, $serverIndex );
438
			return false;
439
		}
440
441
		return true;
442
	}
443
444
	public function incr( $key, $step = 1 ) {
445
		list( $serverIndex, $tableName ) = $this->getTableByKey( $key );
446
		$db = null;
447
		try {
448
			$db = $this->getDB( $serverIndex );
449
			$step = intval( $step );
450
			$row = $db->selectRow(
451
				$tableName,
452
				[ 'value', 'exptime' ],
453
				[ 'keyname' => $key ],
454
				__METHOD__,
455
				[ 'FOR UPDATE' ] );
456
			if ( $row === false ) {
457
				// Missing
458
459
				return null;
460
			}
461
			$db->delete( $tableName, [ 'keyname' => $key ], __METHOD__ );
462
			if ( $this->isExpired( $db, $row->exptime ) ) {
463
				// Expired, do not reinsert
464
465
				return null;
466
			}
467
468
			$oldValue = intval( $this->unserialize( $db->decodeBlob( $row->value ) ) );
469
			$newValue = $oldValue + $step;
470
			$db->insert( $tableName,
471
				[
472
					'keyname' => $key,
473
					'value' => $db->encodeBlob( $this->serialize( $newValue ) ),
474
					'exptime' => $row->exptime
475
				], __METHOD__, 'IGNORE' );
476
477
			if ( $db->affectedRows() == 0 ) {
478
				// Race condition. See bug 28611
479
				$newValue = null;
480
			}
481
		} catch ( DBError $e ) {
482
			$this->handleWriteError( $e, $db, $serverIndex );
483
			return null;
484
		}
485
486
		return $newValue;
487
	}
488
489 View Code Duplication
	public function merge( $key, callable $callback, $exptime = 0, $attempts = 10, $flags = 0 ) {
490
		$ok = $this->mergeViaCas( $key, $callback, $exptime, $attempts );
491
		if ( ( $flags & self::WRITE_SYNC ) == self::WRITE_SYNC ) {
492
			$ok = $this->waitForReplication() && $ok;
493
		}
494
495
		return $ok;
496
	}
497
498
	public function changeTTL( $key, $expiry = 0 ) {
499
		list( $serverIndex, $tableName ) = $this->getTableByKey( $key );
500
		$db = null;
501
		try {
502
			$db = $this->getDB( $serverIndex );
503
			$db->update(
504
				$tableName,
505
				[ 'exptime' => $db->timestamp( $this->convertExpiry( $expiry ) ) ],
506
				[ 'keyname' => $key, 'exptime > ' . $db->addQuotes( $db->timestamp( time() ) ) ],
507
				__METHOD__
508
			);
509
			if ( $db->affectedRows() == 0 ) {
510
				return false;
511
			}
512
		} catch ( DBError $e ) {
513
			$this->handleWriteError( $e, $db, $serverIndex );
514
			return false;
515
		}
516
517
		return true;
518
	}
519
520
	/**
521
	 * @param IDatabase $db
522
	 * @param string $exptime
523
	 * @return bool
524
	 */
525
	protected function isExpired( $db, $exptime ) {
526
		return $exptime != $this->getMaxDateTime( $db ) && wfTimestamp( TS_UNIX, $exptime ) < time();
527
	}
528
529
	/**
530
	 * @param IDatabase $db
531
	 * @return string
532
	 */
533
	protected function getMaxDateTime( $db ) {
534
		if ( time() > 0x7fffffff ) {
535
			return $db->timestamp( 1 << 62 );
536
		} else {
537
			return $db->timestamp( 0x7fffffff );
538
		}
539
	}
540
541
	protected function garbageCollect() {
542
		if ( !$this->purgePeriod || $this->slaveOnly ) {
543
			// Disabled
544
			return;
545
		}
546
		// Only purge on one in every $this->purgePeriod requests.
547
		if ( $this->purgePeriod !== 1 && mt_rand( 0, $this->purgePeriod - 1 ) ) {
548
			return;
549
		}
550
		$now = time();
551
		// Avoid repeating the delete within a few seconds
552
		if ( $now > ( $this->lastExpireAll + 1 ) ) {
553
			$this->lastExpireAll = $now;
554
			$this->expireAll();
555
		}
556
	}
557
558
	public function expireAll() {
559
		$this->deleteObjectsExpiringBefore( wfTimestampNow() );
0 ignored issues
show
Security Bug introduced by
It seems like wfTimestampNow() can also be of type false; however, SqlBagOStuff::deleteObjectsExpiringBefore() does only seem to accept string, did you maybe forget to handle an error condition?
Loading history...
560
	}
561
562
	/**
563
	 * Delete objects from the database which expire before a certain date.
564
	 * @param string $timestamp
565
	 * @param bool|callable $progressCallback
566
	 * @return bool
567
	 */
568
	public function deleteObjectsExpiringBefore( $timestamp, $progressCallback = false ) {
569
		for ( $serverIndex = 0; $serverIndex < $this->numServers; $serverIndex++ ) {
570
			$db = null;
571
			try {
572
				$db = $this->getDB( $serverIndex );
573
				$dbTimestamp = $db->timestamp( $timestamp );
574
				$totalSeconds = false;
575
				$baseConds = [ 'exptime < ' . $db->addQuotes( $dbTimestamp ) ];
576
				for ( $i = 0; $i < $this->shards; $i++ ) {
577
					$maxExpTime = false;
578
					while ( true ) {
579
						$conds = $baseConds;
580
						if ( $maxExpTime !== false ) {
581
							$conds[] = 'exptime > ' . $db->addQuotes( $maxExpTime );
582
						}
583
						$rows = $db->select(
584
							$this->getTableNameByShard( $i ),
585
							[ 'keyname', 'exptime' ],
586
							$conds,
587
							__METHOD__,
588
							[ 'LIMIT' => 100, 'ORDER BY' => 'exptime' ] );
589
						if ( $rows === false || !$rows->numRows() ) {
590
							break;
591
						}
592
						$keys = [];
593
						$row = $rows->current();
594
						$minExpTime = $row->exptime;
595
						if ( $totalSeconds === false ) {
596
							$totalSeconds = wfTimestamp( TS_UNIX, $timestamp )
597
								- wfTimestamp( TS_UNIX, $minExpTime );
598
						}
599
						foreach ( $rows as $row ) {
0 ignored issues
show
Bug introduced by
The expression $rows of type object<ResultWrapper>|boolean is not guaranteed to be traversable. How about adding an additional type check?

There are different options of fixing this problem.

  1. If you want to be on the safe side, you can add an additional type-check:

    $collection = json_decode($data, true);
    if ( ! is_array($collection)) {
        throw new \RuntimeException('$collection must be an array.');
    }
    
    foreach ($collection as $item) { /** ... */ }
    
  2. If you are sure that the expression is traversable, you might want to add a doc comment cast to improve IDE auto-completion and static analysis:

    /** @var array $collection */
    $collection = json_decode($data, true);
    
    foreach ($collection as $item) { /** .. */ }
    
  3. Mark the issue as a false-positive: Just hover the remove button, in the top-right corner of this issue for more options.

Loading history...
600
							$keys[] = $row->keyname;
601
							$maxExpTime = $row->exptime;
602
						}
603
604
						$db->delete(
605
							$this->getTableNameByShard( $i ),
606
							[
607
								'exptime >= ' . $db->addQuotes( $minExpTime ),
608
								'exptime < ' . $db->addQuotes( $dbTimestamp ),
609
								'keyname' => $keys
610
							],
611
							__METHOD__ );
612
613
						if ( $progressCallback ) {
614
							if ( intval( $totalSeconds ) === 0 ) {
615
								$percent = 0;
616
							} else {
617
								$remainingSeconds = wfTimestamp( TS_UNIX, $timestamp )
618
									- wfTimestamp( TS_UNIX, $maxExpTime );
619
								if ( $remainingSeconds > $totalSeconds ) {
620
									$totalSeconds = $remainingSeconds;
621
								}
622
								$processedSeconds = $totalSeconds - $remainingSeconds;
623
								$percent = ( $i + $processedSeconds / $totalSeconds )
624
									/ $this->shards * 100;
625
							}
626
							$percent = ( $percent / $this->numServers )
627
								+ ( $serverIndex / $this->numServers * 100 );
628
							call_user_func( $progressCallback, $percent );
629
						}
630
					}
631
				}
632
			} catch ( DBError $e ) {
633
				$this->handleWriteError( $e, $db, $serverIndex );
634
				return false;
635
			}
636
		}
637
		return true;
638
	}
639
640
	/**
641
	 * Delete content of shard tables in every server.
642
	 * Return true if the operation is successful, false otherwise.
643
	 * @return bool
644
	 */
645
	public function deleteAll() {
646
		for ( $serverIndex = 0; $serverIndex < $this->numServers; $serverIndex++ ) {
647
			$db = null;
648
			try {
649
				$db = $this->getDB( $serverIndex );
650
				for ( $i = 0; $i < $this->shards; $i++ ) {
651
					$db->delete( $this->getTableNameByShard( $i ), '*', __METHOD__ );
652
				}
653
			} catch ( DBError $e ) {
654
				$this->handleWriteError( $e, $db, $serverIndex );
655
				return false;
656
			}
657
		}
658
		return true;
659
	}
660
661
	/**
662
	 * Serialize an object and, if possible, compress the representation.
663
	 * On typical message and page data, this can provide a 3X decrease
664
	 * in storage requirements.
665
	 *
666
	 * @param mixed $data
667
	 * @return string
668
	 */
669
	protected function serialize( &$data ) {
670
		$serial = serialize( $data );
671
672
		if ( function_exists( 'gzdeflate' ) ) {
673
			return gzdeflate( $serial );
674
		} else {
675
			return $serial;
676
		}
677
	}
678
679
	/**
680
	 * Unserialize and, if necessary, decompress an object.
681
	 * @param string $serial
682
	 * @return mixed
683
	 */
684
	protected function unserialize( $serial ) {
685
		if ( function_exists( 'gzinflate' ) ) {
686
			MediaWiki\suppressWarnings();
687
			$decomp = gzinflate( $serial );
688
			MediaWiki\restoreWarnings();
689
690
			if ( false !== $decomp ) {
691
				$serial = $decomp;
692
			}
693
		}
694
695
		$ret = unserialize( $serial );
696
697
		return $ret;
698
	}
699
700
	/**
701
	 * Handle a DBError which occurred during a read operation.
702
	 *
703
	 * @param DBError $exception
704
	 * @param int $serverIndex
705
	 */
706
	protected function handleReadError( DBError $exception, $serverIndex ) {
707
		if ( $exception instanceof DBConnectionError ) {
708
			$this->markServerDown( $exception, $serverIndex );
709
		}
710
		$this->logger->error( "DBError: {$exception->getMessage()}" );
711 View Code Duplication
		if ( $exception instanceof DBConnectionError ) {
712
			$this->setLastError( BagOStuff::ERR_UNREACHABLE );
713
			$this->logger->debug( __METHOD__ . ": ignoring connection error" );
714
		} else {
715
			$this->setLastError( BagOStuff::ERR_UNEXPECTED );
716
			$this->logger->debug( __METHOD__ . ": ignoring query error" );
717
		}
718
	}
719
720
	/**
721
	 * Handle a DBQueryError which occurred during a write operation.
722
	 *
723
	 * @param DBError $exception
724
	 * @param IDatabase|null $db DB handle or null if connection failed
725
	 * @param int $serverIndex
726
	 * @throws Exception
727
	 */
728
	protected function handleWriteError( DBError $exception, IDatabase $db = null, $serverIndex ) {
729
		if ( !$db ) {
730
			$this->markServerDown( $exception, $serverIndex );
731
		} elseif ( $db->wasReadOnlyError() ) {
732
			if ( $db->trxLevel() && $this->usesMainDB() ) {
733
				// Errors like deadlocks and connection drops already cause rollback.
734
				// For consistency, we have no choice but to throw an error and trigger
735
				// complete rollback if the main DB is also being used as the cache DB.
736
				throw $exception;
737
			}
738
		}
739
740
		$this->logger->error( "DBError: {$exception->getMessage()}" );
741 View Code Duplication
		if ( $exception instanceof DBConnectionError ) {
742
			$this->setLastError( BagOStuff::ERR_UNREACHABLE );
743
			$this->logger->debug( __METHOD__ . ": ignoring connection error" );
744
		} else {
745
			$this->setLastError( BagOStuff::ERR_UNEXPECTED );
746
			$this->logger->debug( __METHOD__ . ": ignoring query error" );
747
		}
748
	}
749
750
	/**
751
	 * Mark a server down due to a DBConnectionError exception
752
	 *
753
	 * @param DBError $exception
754
	 * @param int $serverIndex
755
	 */
756
	protected function markServerDown( DBError $exception, $serverIndex ) {
757
		unset( $this->conns[$serverIndex] ); // bug T103435
758
759
		if ( isset( $this->connFailureTimes[$serverIndex] ) ) {
760
			if ( time() - $this->connFailureTimes[$serverIndex] >= 60 ) {
761
				unset( $this->connFailureTimes[$serverIndex] );
762
				unset( $this->connFailureErrors[$serverIndex] );
763
			} else {
764
				$this->logger->debug( __METHOD__ . ": Server #$serverIndex already down" );
765
				return;
766
			}
767
		}
768
		$now = time();
769
		$this->logger->info( __METHOD__ . ": Server #$serverIndex down until " . ( $now + 60 ) );
770
		$this->connFailureTimes[$serverIndex] = $now;
771
		$this->connFailureErrors[$serverIndex] = $exception;
772
	}
773
774
	/**
775
	 * Create shard tables. For use from eval.php.
776
	 */
777
	public function createTables() {
778
		for ( $serverIndex = 0; $serverIndex < $this->numServers; $serverIndex++ ) {
779
			$db = $this->getDB( $serverIndex );
780
			if ( $db->getType() !== 'mysql' ) {
781
				throw new MWException( __METHOD__ . ' is not supported on this DB server' );
782
			}
783
784
			for ( $i = 0; $i < $this->shards; $i++ ) {
785
				$db->query(
786
					'CREATE TABLE ' . $db->tableName( $this->getTableNameByShard( $i ) ) .
787
					' LIKE ' . $db->tableName( 'objectcache' ),
788
					__METHOD__ );
789
			}
790
		}
791
	}
792
793
	/**
794
	 * @return bool Whether the main DB is used, e.g. wfGetDB( DB_MASTER )
795
	 */
796
	protected function usesMainDB() {
797
		return !$this->serverInfos;
798
	}
799
800
	protected function waitForReplication() {
801
		if ( !$this->usesMainDB() ) {
802
			// Custom DB server list; probably doesn't use replication
803
			return true;
804
		}
805
806
		$lb = $this->getSeparateMainLB()
807
			?: MediaWikiServices::getInstance()->getDBLoadBalancer();
808
809
		if ( $lb->getServerCount() <= 1 ) {
810
			return true; // no slaves
811
		}
812
813
		// Main LB is used; wait for any slaves to catch up
814
		$masterPos = $lb->getMasterPos();
815
816
		$loop = new WaitConditionLoop(
817
			function () use ( $lb, $masterPos ) {
818
				return $lb->waitForAll( $masterPos, 1 );
819
			},
820
			$this->syncTimeout,
821
			$this->busyCallbacks
822
		);
823
824
		return ( $loop->invoke() === $loop::CONDITION_REACHED );
825
	}
826
}
827