|
1
|
|
|
<?php |
|
2
|
|
|
/** |
|
3
|
|
|
* Interface and manager for deferred updates. |
|
4
|
|
|
* |
|
5
|
|
|
* This program is free software; you can redistribute it and/or modify |
|
6
|
|
|
* it under the terms of the GNU General Public License as published by |
|
7
|
|
|
* the Free Software Foundation; either version 2 of the License, or |
|
8
|
|
|
* (at your option) any later version. |
|
9
|
|
|
* |
|
10
|
|
|
* This program is distributed in the hope that it will be useful, |
|
11
|
|
|
* but WITHOUT ANY WARRANTY; without even the implied warranty of |
|
12
|
|
|
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
|
13
|
|
|
* GNU General Public License for more details. |
|
14
|
|
|
* |
|
15
|
|
|
* You should have received a copy of the GNU General Public License along |
|
16
|
|
|
* with this program; if not, write to the Free Software Foundation, Inc., |
|
17
|
|
|
* 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. |
|
18
|
|
|
* http://www.gnu.org/copyleft/gpl.html |
|
19
|
|
|
* |
|
20
|
|
|
* @file |
|
21
|
|
|
*/ |
|
22
|
|
|
use MediaWiki\MediaWikiServices; |
|
23
|
|
|
|
|
24
|
|
|
/** |
|
25
|
|
|
* Class for managing the deferred updates |
|
26
|
|
|
* |
|
27
|
|
|
* In web request mode, deferred updates can be run at the end of the request, either before or |
|
28
|
|
|
* after the HTTP response has been sent. In either case, they run after the DB commit step. If |
|
29
|
|
|
* an update runs after the response is sent, it will not block clients. If sent before, it will |
|
30
|
|
|
* run synchronously. If such an update works via queueing, it will be more likely to complete by |
|
31
|
|
|
* the time the client makes their next request after this one. |
|
32
|
|
|
* |
|
33
|
|
|
* In CLI mode, updates run immediately if no DB writes are pending. Otherwise, they run when: |
|
34
|
|
|
* - a) Any waitForReplication() call if no writes are pending on any DB |
|
35
|
|
|
* - b) A commit happens on Maintenance::getDB( DB_MASTER ) if no writes are pending on any DB |
|
36
|
|
|
* - c) EnqueueableDataUpdate tasks may enqueue on commit of Maintenance::getDB( DB_MASTER ) |
|
37
|
|
|
* - d) At the completion of Maintenance::execute() |
|
38
|
|
|
* |
|
39
|
|
|
* When updates are deferred, they use a FIFO queue (one for pre-send and one for post-send). |
|
40
|
|
|
* |
|
41
|
|
|
* @since 1.19 |
|
42
|
|
|
*/ |
|
43
|
|
|
class DeferredUpdates { |
|
44
|
|
|
/** @var DeferrableUpdate[] Updates to be deferred until before request end */ |
|
45
|
|
|
private static $preSendUpdates = []; |
|
46
|
|
|
/** @var DeferrableUpdate[] Updates to be deferred until after request end */ |
|
47
|
|
|
private static $postSendUpdates = []; |
|
48
|
|
|
|
|
49
|
|
|
const ALL = 0; // all updates; in web requests, use only after flushing the output buffer |
|
50
|
|
|
const PRESEND = 1; // for updates that should run before flushing output buffer |
|
51
|
|
|
const POSTSEND = 2; // for updates that should run after flushing output buffer |
|
52
|
|
|
|
|
53
|
|
|
const BIG_QUEUE_SIZE = 100; |
|
54
|
|
|
|
|
55
|
|
|
/** @var array|null Information about the current execute() call or null if not running */ |
|
56
|
|
|
private static $executeContext; |
|
57
|
|
|
|
|
58
|
|
|
/** |
|
59
|
|
|
* Add an update to the deferred list to be run later by execute() |
|
60
|
|
|
* |
|
61
|
|
|
* In CLI mode, callback magic will also be used to run updates when safe |
|
62
|
|
|
* |
|
63
|
|
|
* @param DeferrableUpdate $update Some object that implements doUpdate() |
|
64
|
|
|
* @param integer $stage DeferredUpdates constant (PRESEND or POSTSEND) (since 1.27) |
|
65
|
|
|
*/ |
|
66
|
|
|
public static function addUpdate( DeferrableUpdate $update, $stage = self::POSTSEND ) { |
|
67
|
|
|
global $wgCommandLineMode; |
|
68
|
|
|
|
|
69
|
|
|
// This is a sub-DeferredUpdate, run it right after its parent update |
|
70
|
|
|
if ( self::$executeContext && self::$executeContext['stage'] >= $stage ) { |
|
71
|
|
|
self::$executeContext['subqueue'][] = $update; |
|
72
|
|
|
return; |
|
73
|
|
|
} |
|
74
|
|
|
|
|
75
|
|
|
if ( $stage === self::PRESEND ) { |
|
76
|
|
|
self::push( self::$preSendUpdates, $update ); |
|
77
|
|
|
} else { |
|
78
|
|
|
self::push( self::$postSendUpdates, $update ); |
|
79
|
|
|
} |
|
80
|
|
|
|
|
81
|
|
|
// Try to run the updates now if in CLI mode and no transaction is active. |
|
82
|
|
|
// This covers scripts that don't/barely use the DB but make updates to other stores. |
|
83
|
|
|
if ( $wgCommandLineMode ) { |
|
84
|
|
|
self::tryOpportunisticExecute( 'run' ); |
|
85
|
|
|
} |
|
86
|
|
|
} |
|
87
|
|
|
|
|
88
|
|
|
/** |
|
89
|
|
|
* Add a callable update. In a lot of cases, we just need a callback/closure, |
|
90
|
|
|
* defining a new DeferrableUpdate object is not necessary |
|
91
|
|
|
* |
|
92
|
|
|
* @see MWCallableUpdate::__construct() |
|
93
|
|
|
* |
|
94
|
|
|
* @param callable $callable |
|
95
|
|
|
* @param integer $stage DeferredUpdates constant (PRESEND or POSTSEND) (since 1.27) |
|
96
|
|
|
* @param IDatabase|null $dbw Abort if this DB is rolled back [optional] (since 1.28) |
|
97
|
|
|
*/ |
|
98
|
|
|
public static function addCallableUpdate( |
|
99
|
|
|
$callable, $stage = self::POSTSEND, IDatabase $dbw = null |
|
100
|
|
|
) { |
|
101
|
|
|
self::addUpdate( new MWCallableUpdate( $callable, wfGetCaller(), $dbw ), $stage ); |
|
102
|
|
|
} |
|
103
|
|
|
|
|
104
|
|
|
/** |
|
105
|
|
|
* Do any deferred updates and clear the list |
|
106
|
|
|
* |
|
107
|
|
|
* @param string $mode Use "enqueue" to use the job queue when possible [Default: "run"] |
|
108
|
|
|
* @param integer $stage DeferredUpdates constant (PRESEND, POSTSEND, or ALL) (since 1.27) |
|
109
|
|
|
*/ |
|
110
|
|
|
public static function doUpdates( $mode = 'run', $stage = self::ALL ) { |
|
111
|
|
|
$stageEffective = ( $stage === self::ALL ) ? self::POSTSEND : $stage; |
|
112
|
|
|
|
|
113
|
|
|
if ( $stage === self::ALL || $stage === self::PRESEND ) { |
|
114
|
|
|
self::execute( self::$preSendUpdates, $mode, $stageEffective ); |
|
115
|
|
|
} |
|
116
|
|
|
|
|
117
|
|
|
if ( $stage === self::ALL || $stage == self::POSTSEND ) { |
|
118
|
|
|
self::execute( self::$postSendUpdates, $mode, $stageEffective ); |
|
119
|
|
|
} |
|
120
|
|
|
} |
|
121
|
|
|
|
|
122
|
|
|
/** |
|
123
|
|
|
* @param DeferrableUpdate[] $queue |
|
124
|
|
|
* @param DeferrableUpdate $update |
|
125
|
|
|
*/ |
|
126
|
|
|
private static function push( array &$queue, DeferrableUpdate $update ) { |
|
127
|
|
|
if ( $update instanceof MergeableUpdate ) { |
|
128
|
|
|
$class = get_class( $update ); // fully-qualified class |
|
129
|
|
|
if ( isset( $queue[$class] ) ) { |
|
130
|
|
|
/** @var $existingUpdate MergeableUpdate */ |
|
131
|
|
|
$existingUpdate = $queue[$class]; |
|
132
|
|
|
$existingUpdate->merge( $update ); |
|
133
|
|
|
} else { |
|
134
|
|
|
$queue[$class] = $update; |
|
135
|
|
|
} |
|
136
|
|
|
} else { |
|
137
|
|
|
$queue[] = $update; |
|
138
|
|
|
} |
|
139
|
|
|
} |
|
140
|
|
|
|
|
141
|
|
|
/** |
|
142
|
|
|
* @param DeferrableUpdate[] &$queue List of DeferrableUpdate objects |
|
143
|
|
|
* @param string $mode Use "enqueue" to use the job queue when possible |
|
144
|
|
|
* @param integer $stage Class constant (PRESEND, POSTSEND) (since 1.28) |
|
145
|
|
|
* @throws ErrorPageError Happens on top-level calls |
|
146
|
|
|
* @throws Exception Happens on second-level calls |
|
147
|
|
|
*/ |
|
148
|
|
|
public static function execute( array &$queue, $mode, $stage ) { |
|
149
|
|
|
$services = MediaWikiServices::getInstance(); |
|
150
|
|
|
$stats = $services->getStatsdDataFactory(); |
|
151
|
|
|
$lbFactory = $services->getDBLoadBalancerFactory(); |
|
152
|
|
|
$method = RequestContext::getMain()->getRequest()->getMethod(); |
|
153
|
|
|
|
|
154
|
|
|
/** @var ErrorPageError $reportableError */ |
|
155
|
|
|
$reportableError = null; |
|
156
|
|
|
/** @var DeferrableUpdate[] $updates Snapshot of queue */ |
|
157
|
|
|
$updates = $queue; |
|
158
|
|
|
|
|
159
|
|
|
// Keep doing rounds of updates until none get enqueued... |
|
160
|
|
|
while ( $updates ) { |
|
161
|
|
|
$queue = []; // clear the queue |
|
162
|
|
|
|
|
163
|
|
|
if ( $mode === 'enqueue' ) { |
|
164
|
|
|
try { |
|
165
|
|
|
// Push enqueuable updates to the job queue and get the rest |
|
166
|
|
|
$updates = self::enqueueUpdates( $updates ); |
|
167
|
|
|
} catch ( Exception $e ) { |
|
168
|
|
|
// Let other updates have a chance to run if this failed |
|
169
|
|
|
MWExceptionHandler::rollbackMasterChangesAndLog( $e ); |
|
170
|
|
|
} |
|
171
|
|
|
} |
|
172
|
|
|
|
|
173
|
|
|
// Order will be DataUpdate followed by generic DeferrableUpdate tasks |
|
174
|
|
|
$updatesByType = [ 'data' => [], 'generic' => [] ]; |
|
175
|
|
|
foreach ( $updates as $du ) { |
|
176
|
|
|
$updatesByType[$du instanceof DataUpdate ? 'data' : 'generic'][] = $du; |
|
177
|
|
|
$name = ( $du instanceof DeferrableCallback ) |
|
178
|
|
|
? get_class( $du ) . '-' . $du->getOrigin() |
|
179
|
|
|
: get_class( $du ); |
|
180
|
|
|
$stats->increment( 'deferred_updates.' . $method . '.' . $name ); |
|
181
|
|
|
} |
|
182
|
|
|
|
|
183
|
|
|
// Execute all remaining tasks... |
|
184
|
|
|
foreach ( $updatesByType as $updatesForType ) { |
|
185
|
|
|
foreach ( $updatesForType as $update ) { |
|
186
|
|
|
self::$executeContext = [ |
|
187
|
|
|
'update' => $update, |
|
188
|
|
|
'stage' => $stage, |
|
189
|
|
|
'subqueue' => [] |
|
190
|
|
|
]; |
|
191
|
|
|
/** @var DeferrableUpdate $update */ |
|
192
|
|
|
$guiError = self::runUpdate( $update, $lbFactory, $stage ); |
|
193
|
|
|
$reportableError = $reportableError ?: $guiError; |
|
194
|
|
|
// Do the subqueue updates for $update until there are none |
|
195
|
|
|
while ( self::$executeContext['subqueue'] ) { |
|
196
|
|
|
$subUpdate = reset( self::$executeContext['subqueue'] ); |
|
197
|
|
|
$firstKey = key( self::$executeContext['subqueue'] ); |
|
198
|
|
|
unset( self::$executeContext['subqueue'][$firstKey] ); |
|
199
|
|
|
|
|
200
|
|
|
$guiError = self::runUpdate( $subUpdate, $lbFactory, $stage ); |
|
201
|
|
|
$reportableError = $reportableError ?: $guiError; |
|
202
|
|
|
} |
|
203
|
|
|
self::$executeContext = null; |
|
204
|
|
|
} |
|
205
|
|
|
} |
|
206
|
|
|
|
|
207
|
|
|
$updates = $queue; // new snapshot of queue (check for new entries) |
|
208
|
|
|
} |
|
209
|
|
|
|
|
210
|
|
|
if ( $reportableError ) { |
|
211
|
|
|
throw $reportableError; // throw the first of any GUI errors |
|
212
|
|
|
} |
|
213
|
|
|
} |
|
214
|
|
|
|
|
215
|
|
|
/** |
|
216
|
|
|
* @param DeferrableUpdate $update |
|
217
|
|
|
* @param LBFactory $lbFactory |
|
218
|
|
|
* @param integer $stage |
|
219
|
|
|
* @return ErrorPageError|null |
|
220
|
|
|
*/ |
|
221
|
|
|
private static function runUpdate( DeferrableUpdate $update, LBFactory $lbFactory, $stage ) { |
|
222
|
|
|
$guiError = null; |
|
223
|
|
|
try { |
|
224
|
|
|
$lbFactory->beginMasterChanges( __METHOD__ ); |
|
225
|
|
|
$update->doUpdate(); |
|
226
|
|
|
$lbFactory->commitMasterChanges( __METHOD__ ); |
|
227
|
|
|
} catch ( Exception $e ) { |
|
228
|
|
|
// Reporting GUI exceptions does not work post-send |
|
229
|
|
|
if ( $e instanceof ErrorPageError && $stage === self::PRESEND ) { |
|
230
|
|
|
$guiError = $e; |
|
231
|
|
|
} |
|
232
|
|
|
MWExceptionHandler::rollbackMasterChangesAndLog( $e ); |
|
233
|
|
|
} |
|
234
|
|
|
|
|
235
|
|
|
return $guiError; |
|
236
|
|
|
} |
|
237
|
|
|
|
|
238
|
|
|
/** |
|
239
|
|
|
* Run all deferred updates immediately if there are no DB writes active |
|
240
|
|
|
* |
|
241
|
|
|
* If $mode is 'run' but there are busy databates, EnqueueableDataUpdate |
|
242
|
|
|
* tasks will be enqueued anyway for the sake of progress. |
|
243
|
|
|
* |
|
244
|
|
|
* @param string $mode Use "enqueue" to use the job queue when possible |
|
245
|
|
|
* @return bool Whether updates were allowed to run |
|
246
|
|
|
* @since 1.28 |
|
247
|
|
|
*/ |
|
248
|
|
|
public static function tryOpportunisticExecute( $mode = 'run' ) { |
|
249
|
|
|
// execute() loop is already running |
|
250
|
|
|
if ( self::$executeContext ) { |
|
251
|
|
|
return false; |
|
252
|
|
|
} |
|
253
|
|
|
|
|
254
|
|
|
// Avoiding running updates without them having outer scope |
|
255
|
|
|
if ( !self::getBusyDbConnections() ) { |
|
256
|
|
|
self::doUpdates( $mode ); |
|
257
|
|
|
return true; |
|
258
|
|
|
} |
|
259
|
|
|
|
|
260
|
|
|
if ( self::pendingUpdatesCount() >= self::BIG_QUEUE_SIZE ) { |
|
261
|
|
|
// If we cannot run the updates with outer transaction context, try to |
|
262
|
|
|
// at least enqueue all the updates that support queueing to job queue |
|
263
|
|
|
self::$preSendUpdates = self::enqueueUpdates( self::$preSendUpdates ); |
|
264
|
|
|
self::$postSendUpdates = self::enqueueUpdates( self::$postSendUpdates ); |
|
265
|
|
|
} |
|
266
|
|
|
|
|
267
|
|
|
return !self::pendingUpdatesCount(); |
|
268
|
|
|
} |
|
269
|
|
|
|
|
270
|
|
|
/** |
|
271
|
|
|
* Enqueue a job for each EnqueueableDataUpdate item and return the other items |
|
272
|
|
|
* |
|
273
|
|
|
* @param DeferrableUpdate[] $updates A list of deferred update instances |
|
274
|
|
|
* @return DeferrableUpdate[] Remaining updates that do not support being queued |
|
275
|
|
|
*/ |
|
276
|
|
|
private static function enqueueUpdates( array $updates ) { |
|
277
|
|
|
$remaining = []; |
|
278
|
|
|
|
|
279
|
|
|
foreach ( $updates as $update ) { |
|
280
|
|
|
if ( $update instanceof EnqueueableDataUpdate ) { |
|
281
|
|
|
$spec = $update->getAsJobSpecification(); |
|
282
|
|
|
JobQueueGroup::singleton( $spec['wiki'] )->push( $spec['job'] ); |
|
283
|
|
|
} else { |
|
284
|
|
|
$remaining[] = $update; |
|
285
|
|
|
} |
|
286
|
|
|
} |
|
287
|
|
|
|
|
288
|
|
|
return $remaining; |
|
289
|
|
|
} |
|
290
|
|
|
|
|
291
|
|
|
/** |
|
292
|
|
|
* @return integer Number of enqueued updates |
|
293
|
|
|
* @since 1.28 |
|
294
|
|
|
*/ |
|
295
|
|
|
public static function pendingUpdatesCount() { |
|
296
|
|
|
return count( self::$preSendUpdates ) + count( self::$postSendUpdates ); |
|
297
|
|
|
} |
|
298
|
|
|
|
|
299
|
|
|
/** |
|
300
|
|
|
* Clear all pending updates without performing them. Generally, you don't |
|
301
|
|
|
* want or need to call this. Unit tests need it though. |
|
302
|
|
|
*/ |
|
303
|
|
|
public static function clearPendingUpdates() { |
|
304
|
|
|
self::$preSendUpdates = []; |
|
305
|
|
|
self::$postSendUpdates = []; |
|
306
|
|
|
} |
|
307
|
|
|
|
|
308
|
|
|
/** |
|
309
|
|
|
* @return IDatabase[] Connection where commit() cannot be called yet |
|
310
|
|
|
*/ |
|
311
|
|
|
private static function getBusyDbConnections() { |
|
312
|
|
|
$connsBusy = []; |
|
313
|
|
|
|
|
314
|
|
|
$lbFactory = MediaWikiServices::getInstance()->getDBLoadBalancerFactory(); |
|
315
|
|
|
$lbFactory->forEachLB( function ( LoadBalancer $lb ) use ( &$connsBusy ) { |
|
316
|
|
|
$lb->forEachOpenMasterConnection( function ( IDatabase $conn ) use ( &$connsBusy ) { |
|
317
|
|
|
if ( $conn->writesOrCallbacksPending() || $conn->explicitTrxActive() ) { |
|
318
|
|
|
$connsBusy[] = $conn; |
|
319
|
|
|
} |
|
320
|
|
|
} ); |
|
321
|
|
|
} ); |
|
322
|
|
|
|
|
323
|
|
|
return $connsBusy; |
|
324
|
|
|
} |
|
325
|
|
|
} |
|
326
|
|
|
|