DispatchChanges   A
last analyzed

Complexity

Total Complexity 32

Size/Duplication

Total Lines 364
Duplicated Lines 0 %

Coupling/Cohesion

Components 1
Dependencies 13

Importance

Changes 0
Metric Value
wmc 32
lcom 1
cbo 13
dl 0
loc 364
rs 9.84
c 0
b 0
f 0

8 Methods

Rating   Name   Duplication   Size   Complexity  
A __construct() 0 27 1
B getClientWikis() 0 29 6
A filterClientWikis() 0 22 4
B newChangeDispatcher() 0 71 1
D execute() 0 118 15
A getCoordinator() 0 24 2
A trace() 0 5 2
A log() 0 4 1
1
<?php
2
3
namespace Wikibase\Repo\Maintenance;
4
5
use Exception;
6
use ExtensionRegistry;
7
use Maintenance;
8
use MediaWiki\MediaWikiServices;
9
use MWException;
10
use MWExceptionHandler;
11
use Onoi\MessageReporter\ObservableMessageReporter;
12
use Psr\Log\LoggerInterface;
13
use Wikibase\Lib\Reporting\ReportingExceptionHandler;
14
use Wikibase\Lib\SettingsArray;
15
use Wikibase\Lib\Store\ChunkCache;
16
use Wikibase\Lib\Store\Sql\EntityChangeLookup;
17
use Wikibase\Lib\WikibaseSettings;
18
use Wikibase\Repo\ChangeDispatcher;
19
use Wikibase\Repo\Notifications\JobQueueChangeNotificationSender;
20
use Wikibase\Repo\Store\Sql\LockManagerSqlChangeDispatchCoordinator;
21
use Wikibase\Repo\Store\Sql\SqlChangeDispatchCoordinator;
22
use Wikibase\Repo\Store\Sql\SqlSubscriptionLookup;
23
use Wikibase\Repo\WikibaseRepo;
24
use Wikimedia\Assert\Assert;
25
26
$basePath = getenv( 'MW_INSTALL_PATH' ) !== false ? getenv( 'MW_INSTALL_PATH' ) : __DIR__ . '/../../../..';
27
28
require_once $basePath . '/maintenance/Maintenance.php';
29
30
/**
31
 * Maintenance script that polls for Wikibase changes in the shared wb_changes table
32
 * and dispatches the relevant changes to any client wikis' job queues.
33
 *
34
 * @license GPL-2.0-or-later
35
 * @author Daniel Kinzler
36
 */
37
class DispatchChanges extends Maintenance {
38
39
	/**
40
	 * @var bool
41
	 */
42
	private $verbose;
43
44
	public function __construct() {
45
		parent::__construct();
46
47
		$this->addDescription(
48
			"Maintenance script that polls for Wikibase changes in the shared wb_changes table\n" .
49
			"and dispatches them to any client wikis using their job queue.\n" .
50
			"See docs/topics/change-propagation.md for an overview of the change propagation mechanism."
51
		);
52
53
		$this->addOption( 'verbose', "Report activity." );
54
		$this->addOption( 'idle-delay', "Seconds to sleep when idle. Default: 10", false, true );
55
		$this->addOption( 'dispatch-interval', "How often to dispatch to each target wiki. "
56
					. "Default: every 60 seconds", false, true );
57
		$this->addOption( 'randomness', "Number of least current target wikis to pick from at random. "
58
					. "Default: 15.", false, true );
59
		$this->addOption( 'max-passes', "The number of passes to perform. "
60
					. "Default: 1 if --max-time is not set, infinite if it is.", false, true );
61
		$this->addOption( 'max-time', "The number of seconds to run before exiting, "
62
					. "if --max-passes is not reached. Default: Set in LocalSettings ('dispatchMaxTime').", false, true );
63
		$this->addOption( 'max-chunks', 'Maximum number of chunks or passes per wiki when '
64
			. 'selecting pending changes. Default: 15', false, true );
65
		$this->addOption( 'batch-size', 'Maximum number of changes to pass to a client at a time. '
66
			. 'Default: 1000', false, true );
67
		$this->addOption( 'client', 'Only dispatch to the client with this site IDs. '
68
			. 'May be specified multiple times to select several clients.',
69
			false, true, false, true );
70
	}
71
72
	/**
73
	 * @param string[] $clientWikis as defined in the localClientDatabases config setting.
74
	 *
75
	 * @return string[] A mapping of client wiki site IDs to logical database names.
76
	 */
77
	private function getClientWikis( array $clientWikis ) {
78
		Assert::parameterElementType( 'string', $clientWikis, '$clientWikis' );
79
80
		// make sure we have a mapping from siteId to database name in clientWikis:
81
		foreach ( $clientWikis as $siteID => $dbName ) {
82
			if ( is_int( $siteID ) ) {
83
				unset( $clientWikis[$siteID] );
84
				$siteID = $dbName;
85
			}
86
			$clientWikis[$siteID] = $dbName;
87
		}
88
89
		// If this repo is also a client, make sure it dispatches also to itself.
90
		if ( WikibaseSettings::isClientEnabled() ) {
91
			$clientSettings = WikibaseSettings::getClientSettings();
92
			$repoName = $clientSettings->getSetting( 'repoSiteId' );
93
			$repoDb = $clientSettings->getSetting( 'repoDatabase' );
94
95
			if ( $repoDb === false ) {
96
				$repoDb = MediaWikiServices::getInstance()->getMainConfig()->get( 'DBname' );
97
			}
98
99
			if ( !isset( $clientWikis[$repoName] ) ) {
100
				$clientWikis[$repoName] = $repoDb;
101
			}
102
		}
103
104
		return $clientWikis;
105
	}
106
107
	/**
108
	 * @param string[] $allClientWikis as returned by getClientWikis().
109
	 * @param string[]|null $selectedSiteIDs site IDs to select, or null to disable filtering.
110
	 *
111
	 * @throws MWException
112
	 * @return string[] A mapping of client wiki site IDs to logical database names.
113
	 */
114
	private function filterClientWikis( array $allClientWikis, array $selectedSiteIDs = null ) {
115
		Assert::parameterElementType( 'string', $allClientWikis, '$allClientWikis' );
116
117
		if ( $selectedSiteIDs === null ) {
118
			return $allClientWikis;
119
		}
120
		Assert::parameterElementType( 'string', $selectedSiteIDs, '$selectedSiteIDs' );
121
122
		$clientWikis = [];
123
		foreach ( $selectedSiteIDs as $siteID ) {
124
			if ( array_key_exists( $siteID, $allClientWikis ) ) {
125
				$clientWikis[$siteID] = $allClientWikis[$siteID];
126
			} else {
127
				throw new MWException(
128
					"No client wiki with site ID $siteID configured! " .
129
					"Please check \$wgWBRepoSettings['localClientDatabases']."
130
				);
131
			}
132
		}
133
134
		return $clientWikis;
135
	}
136
137
	/**
138
	 * Initializes members from command line options and configuration settings.
139
	 *
140
	 * @param string[] $clientWikis A mapping of client wiki site IDs to logical database names.
141
	 * @param EntityChangeLookup $changeLookup
142
	 * @param SettingsArray $settings
143
	 * @param LoggerInterface $logger
144
	 *
145
	 * @return ChangeDispatcher
146
	 */
147
	private function newChangeDispatcher(
148
		array $clientWikis,
149
		EntityChangeLookup $changeLookup,
150
		SettingsArray $settings,
151
		LoggerInterface $logger
152
	) {
153
		$repoDB = $settings->getSetting( 'changesDatabase' );
154
		$batchChunkFactor = $settings->getSetting( 'dispatchBatchChunkFactor' );
155
		$batchCacheFactor = $settings->getSetting( 'dispatchBatchCacheFactor' );
156
157
		$batchSize = (int)$this->getOption(
158
			'batch-size',
159
			$settings->getSetting( 'dispatchDefaultBatchSize' )
160
		);
161
		$maxChunks = (int)$this->getOption(
162
			'max-chunks',
163
			$settings->getSetting( 'dispatchDefaultMaxChunks' )
164
		);
165
		$dispatchInterval = (int)$this->getOption(
166
			'dispatch-interval',
167
			$settings->getSetting( 'dispatchDefaultDispatchInterval' )
168
		);
169
		$randomness = (int)$this->getOption(
170
			'randomness',
171
			$settings->getSetting( 'dispatchDefaultDispatchRandomness' )
172
		);
173
174
		$this->verbose = $this->getOption( 'verbose', false );
175
176
		$cacheChunkSize = $batchSize * $batchChunkFactor;
177
		$cacheSize = $cacheChunkSize * $batchCacheFactor;
178
		$changesCache = new ChunkCache( $changeLookup, $cacheChunkSize, $cacheSize );
179
		$reporter = new ObservableMessageReporter();
180
181
		$reporter->registerReporterCallback(
182
			function ( $message ) {
183
				$this->log( $message );
184
			}
185
		);
186
187
		$coordinator = $this->getCoordinator( $settings, $logger );
188
		$coordinator->setMessageReporter( $reporter );
189
		$coordinator->setBatchSize( $batchSize );
190
		$coordinator->setDispatchInterval( $dispatchInterval );
191
		$coordinator->setRandomness( $randomness );
192
193
		$notificationSender = new JobQueueChangeNotificationSender(
194
			$repoDB,
195
			$logger,
196
			$clientWikis
197
		);
198
		$subscriptionLookup = new SqlSubscriptionLookup(
199
			MediaWikiServices::getInstance()->getDBLoadBalancer()
200
		);
201
202
		$dispatcher = new ChangeDispatcher(
203
			$coordinator,
204
			$notificationSender,
205
			$changesCache,
206
			$subscriptionLookup
207
		);
208
209
		$dispatcher->setMessageReporter( $reporter );
210
		$dispatcher->setExceptionHandler( new ReportingExceptionHandler( $reporter ) );
211
		$dispatcher->setBatchSize( $batchSize );
212
		$dispatcher->setMaxChunks( $maxChunks );
213
		$dispatcher->setBatchChunkFactor( $batchChunkFactor );
214
		$dispatcher->setVerbose( $this->verbose );
215
216
		return $dispatcher;
217
	}
218
219
	/**
220
	 * Maintenance script entry point.
221
	 *
222
	 * This will run $this->runPass() in a loop, the number of times specified by $this->maxPasses.
223
	 * If $this->maxTime is exceeded before all passes are run, execution is also terminated.
224
	 * If no suitable target wiki can be found for a pass, we sleep for $this->delay seconds
225
	 * instead of dispatching.
226
	 */
227
	public function execute() {
228
		if ( !ExtensionRegistry::getInstance()->isLoaded( 'WikibaseRepository' ) ) {
229
			// Since people might waste time debugging odd errors when they forget to enable the extension. BTDT.
230
			throw new MWException( "WikibaseRepository has not been loaded." );
231
		}
232
233
		$wikibaseRepo = WikibaseRepo::getDefaultInstance();
234
		$defaultMaxTime = $wikibaseRepo->getSettings()->getSetting( 'dispatchMaxTime' );
235
236
		if ( $defaultMaxTime == 0 ) {
237
			$this->log( 'dispatchMaxTime 0, so exiting early and not performing dispatch operations.' );
238
			return;
239
		}
240
241
		$maxTime = (int)$this->getOption( 'max-time', $defaultMaxTime );
242
		$maxPasses = (int)$this->getOption( 'max-passes', $maxTime < PHP_INT_MAX ? PHP_INT_MAX : 1 );
243
		$delay = (int)$this->getOption( 'idle-delay', $wikibaseRepo->getSettings()->getSetting( 'dispatchIdleDelay' ) );
244
		$selectedClients = $this->getOption( 'client' );
245
246
		$clientWikis = $this->getClientWikis(
247
			$wikibaseRepo->getSettings()->getSetting( 'localClientDatabases' )
248
		);
249
250
		if ( empty( $clientWikis ) ) {
251
			throw new MWException( "No client wikis configured! Please set \$wgWBRepoSettings['localClientDatabases']." );
252
		}
253
254
		$clientWikis = $this->filterClientWikis( $clientWikis, $selectedClients );
255
256
		if ( empty( $clientWikis ) ) {
257
			throw new MWException( 'No client wikis selected!' );
258
		}
259
260
		$dispatcher = $this->newChangeDispatcher(
261
			$clientWikis,
262
			$wikibaseRepo->getStore()->getEntityChangeLookup(),
263
			$wikibaseRepo->getSettings(),
264
			$wikibaseRepo->getLogger()
265
		);
266
267
		$dispatcher->getDispatchCoordinator()->initState( $clientWikis );
268
269
		$stats = MediaWikiServices::getInstance()->getPerDbNameStatsdDataFactory();
270
		$stats->increment( 'wikibase.repo.dispatchChanges.start' );
271
272
		$passes = $maxPasses === PHP_INT_MAX ? "unlimited" : $maxPasses;
273
		$time = $maxTime === PHP_INT_MAX ? "unlimited" : $maxTime;
274
275
		$this->log( "Starting loop for $passes passes or $time seconds" );
276
277
		$startTime = microtime( true );
278
		$t = 0;
279
280
		// Run passes in a loop, sleeping when idle.
281
		// Note that idle passes need to be counted to avoid processes staying alive
282
		// for an indefinite time, potentially leading to a pile up when used with cron.
283
		for ( $c = 0; $c < $maxPasses; ) {
284
			if ( $t > $maxTime ) {
285
				$this->trace( "Reached max time after $t seconds." );
286
				// timed out
287
				break;
288
			}
289
290
			$wikiState = null;
291
			$passStartTime = microtime( true );
292
			$c++;
293
294
			try {
295
				$this->trace( "Picking a client wiki..." );
296
				$selectClientStartTime = microtime( true );
297
				$wikiState = $dispatcher->selectClient();
298
				$stats->timing(
299
					'wikibase.repo.dispatchChanges.selectClient-time',
300
					( microtime( true ) - $selectClientStartTime ) * 1000
301
				);
302
303
				if ( $wikiState ) {
304
					$dispatchedChanges = $dispatcher->dispatchTo( $wikiState );
305
					$stats->updateCount( 'wikibase.repo.dispatchChanges.changes', $dispatchedChanges );
306
					$stats->updateCount( 'wikibase.repo.dispatchChanges.changes-per-client.'
307
											. $wikiState['chd_site'], $dispatchedChanges );
308
				} else {
309
					$stats->increment( 'wikibase.repo.dispatchChanges.noclient' );
310
					// Try again later, unless we have already reached the limit.
311
					if ( $c < $maxPasses ) {
312
						$this->trace( "Idle: No client wiki found in need of dispatching. "
313
							. "Sleeping for {$delay} seconds." );
314
315
						sleep( $delay );
316
					} else {
317
						$this->trace( "Idle: No client wiki found in need of dispatching. " );
318
					}
319
				}
320
			} catch ( Exception $ex ) {
321
				$stats->increment( 'wikibase.repo.dispatchChanges.exception' );
322
323
				MWExceptionHandler::logException( $ex );
324
325
				if ( $c < $maxPasses ) {
326
					$this->log( "ERROR: $ex; sleeping for {$delay} seconds" );
327
					sleep( $delay );
328
				} else {
329
					$this->log( "ERROR: $ex" );
330
				}
331
				if ( $wikiState ) {
332
					$dispatcher->getDispatchCoordinator()->releaseClient( $wikiState );
333
				}
334
			}
335
336
			$t = ( microtime( true ) - $startTime );
337
			$stats->timing( 'wikibase.repo.dispatchChanges.pass-time', ( microtime( true ) - $passStartTime ) * 1000 );
338
		}
339
340
		$stats->timing( 'wikibase.repo.dispatchChanges.execute-time', $t * 1000 );
341
		$stats->updateCount( 'wikibase.repo.dispatchChanges.passes', $c );
342
343
		$this->log( "Done, exiting after $c passes and $t seconds." );
344
	}
345
346
	/**
347
	 * Find and return the proper ChangeDispatchCoordinator
348
	 *
349
	 * @param SettingsArray $settings
350
	 * @param LoggerInterface $logger
351
	 *
352
	 * @return SqlChangeDispatchCoordinator
353
	 */
354
	private function getCoordinator( SettingsArray $settings, LoggerInterface $logger ) {
355
		$services = MediaWikiServices::getInstance();
356
		$repoID = wfWikiID();
357
		$lockManagerName = $settings->getSetting( 'dispatchingLockManager' );
358
		$LBFactory = $services->getDBLoadBalancerFactory();
359
		if ( $lockManagerName !== null ) {
360
			$lockManager = $services->getLockManagerGroupFactory()
361
				->getLockManagerGroup( wfWikiID() )->get( $lockManagerName );
362
			return new LockManagerSqlChangeDispatchCoordinator(
363
				$lockManager,
364
				$LBFactory,
365
				$logger,
366
				$settings->getSetting( 'changesDatabase' ),
367
				$repoID
368
			);
369
		} else {
370
			return new SqlChangeDispatchCoordinator(
371
				$settings->getSetting( 'changesDatabase' ),
372
				$repoID,
373
				$LBFactory,
374
				$logger
375
			);
376
		}
377
	}
378
379
	/**
380
	 * Log a message if verbose mode is enabled
381
	 *
382
	 * @param string $message
383
	 */
384
	public function trace( $message ) {
385
		if ( $this->verbose ) {
386
			$this->log( "    " . $message );
387
		}
388
	}
389
390
	/**
391
	 * Log a message unless we are quiet.
392
	 *
393
	 * @param string $message
394
	 */
395
	public function log( $message ) {
396
		$this->output( date( 'H:i:s' ) . ' ' . $message . "\n", 'dispatchChanges::log' );
397
		$this->cleanupChanneled();
398
	}
399
400
}
401
402
$maintClass = DispatchChanges::class;
403
require_once RUN_MAINTENANCE_IF_MAIN;
404