Completed
Branch master (dc3656)
by
unknown
30:14
created

DeferredUpdates   B

Complexity

Total Complexity 39

Size/Duplication

Total Lines 261
Duplicated Lines 5.36 %

Coupling/Cohesion

Components 1
Dependencies 14

Importance

Changes 1
Bugs 0 Features 0
Metric Value
c 1
b 0
f 0
dl 14
loc 261
rs 8.2857
wmc 39
lcom 1
cbo 14

11 Methods

Rating   Name   Duplication   Size   Complexity  
A addUpdate() 0 7 2
A addCallableUpdate() 0 5 1
B doUpdates() 0 9 5
C push() 0 35 7
C execute() 0 52 9
B tryOpportunisticExecute() 0 25 4
A enqueueUpdates() 14 14 3
A pendingUpdatesCount() 0 3 1
A clearPendingUpdates() 0 4 1
A installDBListener() 0 14 3
A getBusyDbConnections() 0 14 3

How to fix   Duplicated Code   

Duplicated Code

Duplicate code is one of the most pungent code smells. A rule that is often used is to re-structure code once it is duplicated in three or more places.

Common duplication problems, and corresponding solutions are:

1
<?php
2
/**
3
 * Interface and manager for deferred updates.
4
 *
5
 * This program is free software; you can redistribute it and/or modify
6
 * it under the terms of the GNU General Public License as published by
7
 * the Free Software Foundation; either version 2 of the License, or
8
 * (at your option) any later version.
9
 *
10
 * This program is distributed in the hope that it will be useful,
11
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13
 * GNU General Public License for more details.
14
 *
15
 * You should have received a copy of the GNU General Public License along
16
 * with this program; if not, write to the Free Software Foundation, Inc.,
17
 * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
18
 * http://www.gnu.org/copyleft/gpl.html
19
 *
20
 * @file
21
 */
22
use MediaWiki\MediaWikiServices;
23
24
/**
25
 * Class for managing the deferred updates
26
 *
27
 * In web request mode, deferred updates can be run at the end of the request, either before or
28
 * after the HTTP response has been sent. In either case, they run after the DB commit step. If
29
 * an update runs after the response is sent, it will not block clients. If sent before, it will
30
 * run synchronously. If such an update works via queueing, it will be more likely to complete by
31
 * the time the client makes their next request after this one.
32
 *
33
 * In CLI mode, updates are only deferred until the current wiki has no DB write transaction
34
 * active within this request.
35
 *
36
 * When updates are deferred, they use a FIFO queue (one for pre-send and one for post-send).
37
 *
38
 * @since 1.19
39
 */
40
class DeferredUpdates {
41
	/** @var DeferrableUpdate[] Updates to be deferred until before request end */
42
	private static $preSendUpdates = [];
43
	/** @var DeferrableUpdate[] Updates to be deferred until after request end */
44
	private static $postSendUpdates = [];
45
46
	const ALL = 0; // all updates
47
	const PRESEND = 1; // for updates that should run before flushing output buffer
48
	const POSTSEND = 2; // for updates that should run after flushing output buffer
49
50
	const BIG_QUEUE_SIZE = 100;
51
52
	/**
53
	 * Add an update to the deferred list
54
	 *
55
	 * @param DeferrableUpdate $update Some object that implements doUpdate()
56
	 * @param integer $type DeferredUpdates constant (PRESEND or POSTSEND) (since 1.27)
57
	 */
58
	public static function addUpdate( DeferrableUpdate $update, $type = self::POSTSEND ) {
59
		if ( $type === self::PRESEND ) {
60
			self::push( self::$preSendUpdates, $update );
61
		} else {
62
			self::push( self::$postSendUpdates, $update );
63
		}
64
	}
65
66
	/**
67
	 * Add a callable update. In a lot of cases, we just need a callback/closure,
68
	 * defining a new DeferrableUpdate object is not necessary
69
	 *
70
	 * @see MWCallableUpdate::__construct()
71
	 *
72
	 * @param callable $callable
73
	 * @param integer $type DeferredUpdates constant (PRESEND or POSTSEND) (since 1.27)
74
	 * @param IDatabase|null $dbw Abort if this DB is rolled back [optional] (since 1.28)
75
	 */
76
	public static function addCallableUpdate(
77
		$callable, $type = self::POSTSEND, IDatabase $dbw = null
78
	) {
79
		self::addUpdate( new MWCallableUpdate( $callable, wfGetCaller(), $dbw ), $type );
80
	}
81
82
	/**
83
	 * Do any deferred updates and clear the list
84
	 *
85
	 * @param string $mode Use "enqueue" to use the job queue when possible [Default: "run"]
86
	 * @param integer $type DeferredUpdates constant (PRESEND, POSTSEND, or ALL) (since 1.27)
87
	 */
88
	public static function doUpdates( $mode = 'run', $type = self::ALL ) {
89
		if ( $type === self::ALL || $type == self::PRESEND ) {
90
			self::execute( self::$preSendUpdates, $mode );
91
		}
92
93
		if ( $type === self::ALL || $type == self::POSTSEND ) {
94
			self::execute( self::$postSendUpdates, $mode );
95
		}
96
	}
97
98
	private static function push( array &$queue, DeferrableUpdate $update ) {
99
		global $wgCommandLineMode;
100
101
		if ( $update instanceof MergeableUpdate ) {
102
			$class = get_class( $update ); // fully-qualified class
103
			if ( isset( $queue[$class] ) ) {
104
				/** @var $existingUpdate MergeableUpdate */
105
				$existingUpdate = $queue[$class];
106
				$existingUpdate->merge( $update );
107
			} else {
108
				$queue[$class] = $update;
109
			}
110
		} else {
111
			$queue[] = $update;
112
		}
113
114
		// CLI scripts may forget to periodically flush these updates,
115
		// so try to handle that rather than OOMing and losing them entirely.
116
		// Try to run the updates as soon as there is no current wiki transaction.
117
		static $waitingOnTrx = false; // de-duplicate callback
118
		if ( $wgCommandLineMode && !$waitingOnTrx ) {
119
			$lb = wfGetLB();
0 ignored issues
show
Deprecated Code introduced by
The function wfGetLB() has been deprecated with message: since 1.27, use MediaWikiServices::getDBLoadBalancer() or MediaWikiServices::getDBLoadBalancerFactory() instead.

This function has been deprecated. The supplier of the file has supplied an explanatory message.

The explanatory message should give you some clue as to whether and when the function will be removed from the class and what other function to use instead.

Loading history...
120
			$dbw = $lb->getAnyOpenConnection( $lb->getWriterIndex() );
121
			// Do the update as soon as there is no transaction
122
			if ( $dbw && $dbw->trxLevel() ) {
123
				$waitingOnTrx = true;
124
				$dbw->onTransactionIdle( function() use ( &$waitingOnTrx ) {
125
					DeferredUpdates::doUpdates();
126
					$waitingOnTrx = false;
127
				} );
128
			} else {
129
				self::doUpdates();
130
			}
131
		}
132
	}
133
134
	public static function execute( array &$queue, $mode ) {
135
		$stats = \MediaWiki\MediaWikiServices::getInstance()->getStatsdDataFactory();
136
		$method = RequestContext::getMain()->getRequest()->getMethod();
137
138
		$updates = $queue; // snapshot of queue
139
		// Keep doing rounds of updates until none get enqueued
140
		while ( count( $updates ) ) {
141
			$queue = []; // clear the queue
142
			/** @var DataUpdate[] $dataUpdates */
143
			$dataUpdates = [];
144
			/** @var DeferrableUpdate[] $otherUpdates */
145
			$otherUpdates = [];
146
			foreach ( $updates as $update ) {
147
				if ( $update instanceof DataUpdate ) {
148
					$dataUpdates[] = $update;
149
				} else {
150
					$otherUpdates[] = $update;
151
				}
152
153
				$name = $update instanceof DeferrableCallback
154
					? get_class( $update ) . '-' . $update->getOrigin()
155
					: get_class( $update );
156
				$stats->increment( 'deferred_updates.' . $method . '.' . $name );
157
			}
158
159
			// Delegate DataUpdate execution to the DataUpdate class
160
			try {
161
				DataUpdate::runUpdates( $dataUpdates, $mode );
162
			} catch ( Exception $e ) {
163
				// Let the other updates occur if these had to rollback
164
				MWExceptionHandler::logException( $e );
165
			}
166
			// Execute the non-DataUpdate tasks
167
			foreach ( $otherUpdates as $update ) {
168
				try {
169
					$update->doUpdate();
170
					wfGetLBFactory()->commitMasterChanges( __METHOD__ );
0 ignored issues
show
Deprecated Code introduced by
The function wfGetLBFactory() has been deprecated with message: since 1.27, use MediaWikiServices::getDBLoadBalancerFactory() instead.

This function has been deprecated. The supplier of the file has supplied an explanatory message.

The explanatory message should give you some clue as to whether and when the function will be removed from the class and what other function to use instead.

Loading history...
171
				} catch ( Exception $e ) {
172
					// We don't want exceptions thrown during deferred updates to
173
					// be reported to the user since the output is already sent
174
					if ( !$e instanceof ErrorPageError ) {
175
						MWExceptionHandler::logException( $e );
176
					}
177
					// Make sure incomplete transactions are not committed and end any
178
					// open atomic sections so that other DB updates have a chance to run
179
					wfGetLBFactory()->rollbackMasterChanges( __METHOD__ );
0 ignored issues
show
Deprecated Code introduced by
The function wfGetLBFactory() has been deprecated with message: since 1.27, use MediaWikiServices::getDBLoadBalancerFactory() instead.

This function has been deprecated. The supplier of the file has supplied an explanatory message.

The explanatory message should give you some clue as to whether and when the function will be removed from the class and what other function to use instead.

Loading history...
180
				}
181
			}
182
183
			$updates = $queue; // new snapshot of queue (check for new entries)
184
		}
185
	}
186
187
	/**
188
	 * Run all deferred updates immediately if there are no DB writes active
189
	 *
190
	 * If $mode is 'run' but there are busy databates, EnqueueableDataUpdate
191
	 * tasks will be enqueued anyway for the sake of progress.
192
	 *
193
	 * @param string $mode Use "enqueue" to use the job queue when possible
194
	 * @return bool Whether updates were allowed to run
195
	 * @since 1.28
196
	 */
197
	public static function tryOpportunisticExecute( $mode = 'run' ) {
198
		static $recursionGuard = false;
199
		if ( $recursionGuard ) {
200
			return false; // COMMITs trigger inside update loop and inside some updates
201
		}
202
203
		try {
204
			$recursionGuard = true;
205
			if ( !self::getBusyDbConnections() ) {
206
				self::doUpdates( $mode );
207
				return true;
208
			}
209
210
			if ( self::pendingUpdatesCount() >= self::BIG_QUEUE_SIZE ) {
211
				// If we cannot run the updates with outer transaction context, try to
212
				// at least enqueue all the updates that support queueing to job queue
213
				self::$preSendUpdates = self::enqueueUpdates( self::$preSendUpdates );
214
				self::$postSendUpdates = self::enqueueUpdates( self::$postSendUpdates );
215
			}
216
217
			return !self::pendingUpdatesCount();
218
		} finally {
219
			$recursionGuard = false;
220
		}
221
	}
222
223
	/**
224
	 * Enqueue a job for each EnqueueableDataUpdate item and return the other items
225
	 *
226
	 * @param DeferrableUpdate[] $updates A list of deferred update instances
227
	 * @return DeferrableUpdate[] Remaining updates that do not support being queued
228
	 */
229 View Code Duplication
	private static function enqueueUpdates( array $updates ) {
230
		$remaining = [];
231
232
		foreach ( $updates as $update ) {
233
			if ( $update instanceof EnqueueableDataUpdate ) {
234
				$spec = $update->getAsJobSpecification();
235
				JobQueueGroup::singleton( $spec['wiki'] )->push( $spec['job'] );
236
			} else {
237
				$remaining[] = $update;
238
			}
239
		}
240
241
		return $remaining;
242
	}
243
244
	/**
245
	 * @return integer Number of enqueued updates
246
	 * @since 1.28
247
	 */
248
	public static function pendingUpdatesCount() {
249
		return count( self::$preSendUpdates ) + count( self::$postSendUpdates );
250
	}
251
252
	/**
253
	 * Clear all pending updates without performing them. Generally, you don't
254
	 * want or need to call this. Unit tests need it though.
255
	 */
256
	public static function clearPendingUpdates() {
257
		self::$preSendUpdates = [];
258
		self::$postSendUpdates = [];
259
	}
260
261
	/**
262
	 * Set the rollback/commit watcher on a DB to trigger update runs when safe
263
	 *
264
	 * @TODO: use this to replace DB logic in push()
265
	 * @param LoadBalancer $lb
266
	 * @since 1.28
267
	 */
268
	public static function installDBListener( LoadBalancer $lb ) {
269
		static $triggers = [ IDatabase::TRIGGER_COMMIT, IDatabase::TRIGGER_ROLLBACK ];
270
		// Hook into active master connections to find a moment where no writes are pending
271
		$lb->setTransactionListener(
272
			__METHOD__,
273
			function ( $trigger, IDatabase $conn ) use ( $triggers ) {
274
				global $wgCommandLineMode;
275
276
				if ( $wgCommandLineMode && in_array( $trigger, $triggers ) ) {
277
					DeferredUpdates::tryOpportunisticExecute();
278
				}
279
			}
280
		);
281
	}
282
283
	/**
284
	 * @return IDatabase[] Connection where commit() cannot be called yet
285
	 */
286
	private static function getBusyDbConnections() {
287
		$connsBusy = [];
288
289
		$lbFactory = MediaWikiServices::getInstance()->getDBLoadBalancerFactory();
290
		$lbFactory->forEachLB( function ( LoadBalancer $lb ) use ( &$connsBusy ) {
291
			$lb->forEachOpenMasterConnection( function ( IDatabase $conn ) use ( &$connsBusy ) {
292
				if ( $conn->writesOrCallbacksPending() || $conn->explicitTrxActive() ) {
293
					$connsBusy[] = $conn;
294
				}
295
			} );
296
		} );
297
298
		return $connsBusy;
299
	}
300
}
301