1
|
|
|
<?php |
2
|
|
|
/** |
3
|
|
|
* Interface and manager for deferred updates. |
4
|
|
|
* |
5
|
|
|
* This program is free software; you can redistribute it and/or modify |
6
|
|
|
* it under the terms of the GNU General Public License as published by |
7
|
|
|
* the Free Software Foundation; either version 2 of the License, or |
8
|
|
|
* (at your option) any later version. |
9
|
|
|
* |
10
|
|
|
* This program is distributed in the hope that it will be useful, |
11
|
|
|
* but WITHOUT ANY WARRANTY; without even the implied warranty of |
12
|
|
|
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
13
|
|
|
* GNU General Public License for more details. |
14
|
|
|
* |
15
|
|
|
* You should have received a copy of the GNU General Public License along |
16
|
|
|
* with this program; if not, write to the Free Software Foundation, Inc., |
17
|
|
|
* 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. |
18
|
|
|
* http://www.gnu.org/copyleft/gpl.html |
19
|
|
|
* |
20
|
|
|
* @file |
21
|
|
|
*/ |
22
|
|
|
use MediaWiki\MediaWikiServices; |
23
|
|
|
|
24
|
|
|
/** |
25
|
|
|
* Class for managing the deferred updates |
26
|
|
|
* |
27
|
|
|
* In web request mode, deferred updates can be run at the end of the request, either before or |
28
|
|
|
* after the HTTP response has been sent. In either case, they run after the DB commit step. If |
29
|
|
|
* an update runs after the response is sent, it will not block clients. If sent before, it will |
30
|
|
|
* run synchronously. These two modes are defined via PRESEND and POSTSEND constants, the latter |
31
|
|
|
* being the default for addUpdate() and addCallableUpdate(). |
32
|
|
|
* |
33
|
|
|
* Updates that work through this system will be more likely to complete by the time the client |
34
|
|
|
* makes their next request after this one than with the JobQueue system. |
35
|
|
|
* |
36
|
|
|
* In CLI mode, updates run immediately if no DB writes are pending. Otherwise, they run when: |
37
|
|
|
* - a) Any waitForReplication() call if no writes are pending on any DB |
38
|
|
|
* - b) A commit happens on Maintenance::getDB( DB_MASTER ) if no writes are pending on any DB |
39
|
|
|
* - c) EnqueueableDataUpdate tasks may enqueue on commit of Maintenance::getDB( DB_MASTER ) |
40
|
|
|
* - d) At the completion of Maintenance::execute() |
41
|
|
|
* |
42
|
|
|
* When updates are deferred, they go into one two FIFO "top-queues" (one for pre-send and one |
43
|
|
|
* for post-send). Updates enqueued *during* doUpdate() of a "top" update go into the "sub-queue" |
44
|
|
|
* for that update. After that method finishes, the sub-queue is run until drained. This continues |
45
|
|
|
* for each top-queue job until the entire top queue is drained. This happens for the pre-send |
46
|
|
|
* top-queue, and later on, the post-send top-queue, in execute(). |
47
|
|
|
* |
48
|
|
|
* @since 1.19 |
49
|
|
|
*/ |
50
|
|
|
class DeferredUpdates { |
51
|
|
|
/** @var DeferrableUpdate[] Updates to be deferred until before request end */ |
52
|
|
|
private static $preSendUpdates = []; |
53
|
|
|
/** @var DeferrableUpdate[] Updates to be deferred until after request end */ |
54
|
|
|
private static $postSendUpdates = []; |
55
|
|
|
/** @var bool Whether to just run updates in addUpdate() */ |
56
|
|
|
private static $immediateMode = false; |
57
|
|
|
|
58
|
|
|
const ALL = 0; // all updates; in web requests, use only after flushing the output buffer |
59
|
|
|
const PRESEND = 1; // for updates that should run before flushing output buffer |
60
|
|
|
const POSTSEND = 2; // for updates that should run after flushing output buffer |
61
|
|
|
|
62
|
|
|
const BIG_QUEUE_SIZE = 100; |
63
|
|
|
|
64
|
|
|
/** @var array|null Information about the current execute() call or null if not running */ |
65
|
|
|
private static $executeContext; |
66
|
|
|
|
67
|
|
|
/** |
68
|
|
|
* Add an update to the deferred list to be run later by execute() |
69
|
|
|
* |
70
|
|
|
* In CLI mode, callback magic will also be used to run updates when safe |
71
|
|
|
* |
72
|
|
|
* @param DeferrableUpdate $update Some object that implements doUpdate() |
73
|
|
|
* @param integer $stage DeferredUpdates constant (PRESEND or POSTSEND) (since 1.27) |
74
|
|
|
*/ |
75
|
|
|
public static function addUpdate( DeferrableUpdate $update, $stage = self::POSTSEND ) { |
76
|
|
|
global $wgCommandLineMode; |
77
|
|
|
|
78
|
|
|
// This is a sub-DeferredUpdate, run it right after its parent update |
79
|
|
|
if ( self::$executeContext && self::$executeContext['stage'] >= $stage ) { |
80
|
|
|
self::$executeContext['subqueue'][] = $update; |
81
|
|
|
return; |
82
|
|
|
} |
83
|
|
|
|
84
|
|
|
if ( $stage === self::PRESEND ) { |
85
|
|
|
self::push( self::$preSendUpdates, $update ); |
86
|
|
|
} else { |
87
|
|
|
self::push( self::$postSendUpdates, $update ); |
88
|
|
|
} |
89
|
|
|
|
90
|
|
|
if ( self::$immediateMode ) { |
91
|
|
|
// No more explicit doUpdates() calls will happen, so run this now |
92
|
|
|
self::doUpdates( 'run' ); |
93
|
|
|
return; |
94
|
|
|
} |
95
|
|
|
|
96
|
|
|
// Try to run the updates now if in CLI mode and no transaction is active. |
97
|
|
|
// This covers scripts that don't/barely use the DB but make updates to other stores. |
98
|
|
|
if ( $wgCommandLineMode ) { |
99
|
|
|
self::tryOpportunisticExecute( 'run' ); |
100
|
|
|
} |
101
|
|
|
} |
102
|
|
|
|
103
|
|
|
/** |
104
|
|
|
* Add a callable update. In a lot of cases, we just need a callback/closure, |
105
|
|
|
* defining a new DeferrableUpdate object is not necessary |
106
|
|
|
* |
107
|
|
|
* @see MWCallableUpdate::__construct() |
108
|
|
|
* |
109
|
|
|
* @param callable $callable |
110
|
|
|
* @param integer $stage DeferredUpdates constant (PRESEND or POSTSEND) (since 1.27) |
111
|
|
|
* @param IDatabase|null $dbw Abort if this DB is rolled back [optional] (since 1.28) |
112
|
|
|
*/ |
113
|
|
|
public static function addCallableUpdate( |
114
|
|
|
$callable, $stage = self::POSTSEND, IDatabase $dbw = null |
115
|
|
|
) { |
116
|
|
|
self::addUpdate( new MWCallableUpdate( $callable, wfGetCaller(), $dbw ), $stage ); |
117
|
|
|
} |
118
|
|
|
|
119
|
|
|
/** |
120
|
|
|
* Do any deferred updates and clear the list |
121
|
|
|
* |
122
|
|
|
* @param string $mode Use "enqueue" to use the job queue when possible [Default: "run"] |
123
|
|
|
* @param integer $stage DeferredUpdates constant (PRESEND, POSTSEND, or ALL) (since 1.27) |
124
|
|
|
*/ |
125
|
|
|
public static function doUpdates( $mode = 'run', $stage = self::ALL ) { |
126
|
|
|
$stageEffective = ( $stage === self::ALL ) ? self::POSTSEND : $stage; |
127
|
|
|
|
128
|
|
|
if ( $stage === self::ALL || $stage === self::PRESEND ) { |
129
|
|
|
self::execute( self::$preSendUpdates, $mode, $stageEffective ); |
130
|
|
|
} |
131
|
|
|
|
132
|
|
|
if ( $stage === self::ALL || $stage == self::POSTSEND ) { |
133
|
|
|
self::execute( self::$postSendUpdates, $mode, $stageEffective ); |
134
|
|
|
} |
135
|
|
|
} |
136
|
|
|
|
137
|
|
|
/** |
138
|
|
|
* @param bool $value Whether to just immediately run updates in addUpdate() |
139
|
|
|
* @since 1.28 |
140
|
|
|
*/ |
141
|
|
|
public static function setImmediateMode( $value ) { |
142
|
|
|
self::$immediateMode = (bool)$value; |
143
|
|
|
} |
144
|
|
|
|
145
|
|
|
/** |
146
|
|
|
* @param DeferrableUpdate[] $queue |
147
|
|
|
* @param DeferrableUpdate $update |
148
|
|
|
*/ |
149
|
|
|
private static function push( array &$queue, DeferrableUpdate $update ) { |
150
|
|
|
if ( $update instanceof MergeableUpdate ) { |
151
|
|
|
$class = get_class( $update ); // fully-qualified class |
152
|
|
|
if ( isset( $queue[$class] ) ) { |
153
|
|
|
/** @var $existingUpdate MergeableUpdate */ |
154
|
|
|
$existingUpdate = $queue[$class]; |
155
|
|
|
$existingUpdate->merge( $update ); |
156
|
|
|
} else { |
157
|
|
|
$queue[$class] = $update; |
158
|
|
|
} |
159
|
|
|
} else { |
160
|
|
|
$queue[] = $update; |
161
|
|
|
} |
162
|
|
|
} |
163
|
|
|
|
164
|
|
|
/** |
165
|
|
|
* Immediately run/queue a list of updates |
166
|
|
|
* |
167
|
|
|
* @param DeferrableUpdate[] &$queue List of DeferrableUpdate objects |
168
|
|
|
* @param string $mode Use "enqueue" to use the job queue when possible |
169
|
|
|
* @param integer $stage Class constant (PRESEND, POSTSEND) (since 1.28) |
170
|
|
|
* @throws ErrorPageError Happens on top-level calls |
171
|
|
|
* @throws Exception Happens on second-level calls |
172
|
|
|
*/ |
173
|
|
|
protected static function execute( array &$queue, $mode, $stage ) { |
174
|
|
|
$services = MediaWikiServices::getInstance(); |
175
|
|
|
$stats = $services->getStatsdDataFactory(); |
176
|
|
|
$lbFactory = $services->getDBLoadBalancerFactory(); |
177
|
|
|
$method = RequestContext::getMain()->getRequest()->getMethod(); |
178
|
|
|
|
179
|
|
|
$ticket = $lbFactory->getEmptyTransactionTicket( __METHOD__ ); |
180
|
|
|
|
181
|
|
|
/** @var ErrorPageError $reportableError */ |
182
|
|
|
$reportableError = null; |
183
|
|
|
/** @var DeferrableUpdate[] $updates Snapshot of queue */ |
184
|
|
|
$updates = $queue; |
185
|
|
|
|
186
|
|
|
// Keep doing rounds of updates until none get enqueued... |
187
|
|
|
while ( $updates ) { |
188
|
|
|
$queue = []; // clear the queue |
189
|
|
|
|
190
|
|
|
if ( $mode === 'enqueue' ) { |
191
|
|
|
try { |
192
|
|
|
// Push enqueuable updates to the job queue and get the rest |
193
|
|
|
$updates = self::enqueueUpdates( $updates ); |
194
|
|
|
} catch ( Exception $e ) { |
195
|
|
|
// Let other updates have a chance to run if this failed |
196
|
|
|
MWExceptionHandler::rollbackMasterChangesAndLog( $e ); |
197
|
|
|
} |
198
|
|
|
} |
199
|
|
|
|
200
|
|
|
// Order will be DataUpdate followed by generic DeferrableUpdate tasks |
201
|
|
|
$updatesByType = [ 'data' => [], 'generic' => [] ]; |
202
|
|
|
foreach ( $updates as $du ) { |
203
|
|
|
if ( $du instanceof DataUpdate ) { |
204
|
|
|
$du->setTransactionTicket( $ticket ); |
205
|
|
|
$updatesByType['data'][] = $du; |
206
|
|
|
} else { |
207
|
|
|
$updatesByType['generic'][] = $du; |
208
|
|
|
} |
209
|
|
|
|
210
|
|
|
$name = ( $du instanceof DeferrableCallback ) |
211
|
|
|
? get_class( $du ) . '-' . $du->getOrigin() |
212
|
|
|
: get_class( $du ); |
213
|
|
|
$stats->increment( 'deferred_updates.' . $method . '.' . $name ); |
214
|
|
|
} |
215
|
|
|
|
216
|
|
|
// Execute all remaining tasks... |
217
|
|
|
foreach ( $updatesByType as $updatesForType ) { |
218
|
|
|
foreach ( $updatesForType as $update ) { |
219
|
|
|
self::$executeContext = [ |
220
|
|
|
'update' => $update, |
221
|
|
|
'stage' => $stage, |
222
|
|
|
'subqueue' => [] |
223
|
|
|
]; |
224
|
|
|
/** @var DeferrableUpdate $update */ |
225
|
|
|
$guiError = self::runUpdate( $update, $lbFactory, $stage ); |
226
|
|
|
$reportableError = $reportableError ?: $guiError; |
227
|
|
|
// Do the subqueue updates for $update until there are none |
228
|
|
|
while ( self::$executeContext['subqueue'] ) { |
229
|
|
|
$subUpdate = reset( self::$executeContext['subqueue'] ); |
230
|
|
|
$firstKey = key( self::$executeContext['subqueue'] ); |
231
|
|
|
unset( self::$executeContext['subqueue'][$firstKey] ); |
232
|
|
|
|
233
|
|
|
if ( $subUpdate instanceof DataUpdate ) { |
234
|
|
|
$subUpdate->setTransactionTicket( $ticket ); |
235
|
|
|
} |
236
|
|
|
|
237
|
|
|
$guiError = self::runUpdate( $subUpdate, $lbFactory, $stage ); |
238
|
|
|
$reportableError = $reportableError ?: $guiError; |
239
|
|
|
} |
240
|
|
|
self::$executeContext = null; |
241
|
|
|
} |
242
|
|
|
} |
243
|
|
|
|
244
|
|
|
$updates = $queue; // new snapshot of queue (check for new entries) |
245
|
|
|
} |
246
|
|
|
|
247
|
|
|
if ( $reportableError ) { |
248
|
|
|
throw $reportableError; // throw the first of any GUI errors |
249
|
|
|
} |
250
|
|
|
} |
251
|
|
|
|
252
|
|
|
/** |
253
|
|
|
* @param DeferrableUpdate $update |
254
|
|
|
* @param LBFactory $lbFactory |
255
|
|
|
* @param integer $stage |
256
|
|
|
* @return ErrorPageError|null |
257
|
|
|
*/ |
258
|
|
|
private static function runUpdate( DeferrableUpdate $update, LBFactory $lbFactory, $stage ) { |
259
|
|
|
$guiError = null; |
260
|
|
|
try { |
261
|
|
|
$fnameTrxOwner = get_class( $update ) . '::doUpdate'; |
262
|
|
|
$lbFactory->beginMasterChanges( $fnameTrxOwner ); |
263
|
|
|
$update->doUpdate(); |
264
|
|
|
$lbFactory->commitMasterChanges( $fnameTrxOwner ); |
265
|
|
|
} catch ( Exception $e ) { |
266
|
|
|
// Reporting GUI exceptions does not work post-send |
267
|
|
|
if ( $e instanceof ErrorPageError && $stage === self::PRESEND ) { |
268
|
|
|
$guiError = $e; |
269
|
|
|
} |
270
|
|
|
MWExceptionHandler::rollbackMasterChangesAndLog( $e ); |
271
|
|
|
} |
272
|
|
|
|
273
|
|
|
return $guiError; |
274
|
|
|
} |
275
|
|
|
|
276
|
|
|
/** |
277
|
|
|
* Run all deferred updates immediately if there are no DB writes active |
278
|
|
|
* |
279
|
|
|
* If $mode is 'run' but there are busy databates, EnqueueableDataUpdate |
280
|
|
|
* tasks will be enqueued anyway for the sake of progress. |
281
|
|
|
* |
282
|
|
|
* @param string $mode Use "enqueue" to use the job queue when possible |
283
|
|
|
* @return bool Whether updates were allowed to run |
284
|
|
|
* @since 1.28 |
285
|
|
|
*/ |
286
|
|
|
public static function tryOpportunisticExecute( $mode = 'run' ) { |
287
|
|
|
// execute() loop is already running |
288
|
|
|
if ( self::$executeContext ) { |
289
|
|
|
return false; |
290
|
|
|
} |
291
|
|
|
|
292
|
|
|
// Avoiding running updates without them having outer scope |
293
|
|
|
if ( !self::getBusyDbConnections() ) { |
294
|
|
|
self::doUpdates( $mode ); |
295
|
|
|
return true; |
296
|
|
|
} |
297
|
|
|
|
298
|
|
|
if ( self::pendingUpdatesCount() >= self::BIG_QUEUE_SIZE ) { |
299
|
|
|
// If we cannot run the updates with outer transaction context, try to |
300
|
|
|
// at least enqueue all the updates that support queueing to job queue |
301
|
|
|
self::$preSendUpdates = self::enqueueUpdates( self::$preSendUpdates ); |
302
|
|
|
self::$postSendUpdates = self::enqueueUpdates( self::$postSendUpdates ); |
303
|
|
|
} |
304
|
|
|
|
305
|
|
|
return !self::pendingUpdatesCount(); |
306
|
|
|
} |
307
|
|
|
|
308
|
|
|
/** |
309
|
|
|
* Enqueue a job for each EnqueueableDataUpdate item and return the other items |
310
|
|
|
* |
311
|
|
|
* @param DeferrableUpdate[] $updates A list of deferred update instances |
312
|
|
|
* @return DeferrableUpdate[] Remaining updates that do not support being queued |
313
|
|
|
*/ |
314
|
|
|
private static function enqueueUpdates( array $updates ) { |
315
|
|
|
$remaining = []; |
316
|
|
|
|
317
|
|
|
foreach ( $updates as $update ) { |
318
|
|
|
if ( $update instanceof EnqueueableDataUpdate ) { |
319
|
|
|
$spec = $update->getAsJobSpecification(); |
320
|
|
|
JobQueueGroup::singleton( $spec['wiki'] )->push( $spec['job'] ); |
321
|
|
|
} else { |
322
|
|
|
$remaining[] = $update; |
323
|
|
|
} |
324
|
|
|
} |
325
|
|
|
|
326
|
|
|
return $remaining; |
327
|
|
|
} |
328
|
|
|
|
329
|
|
|
/** |
330
|
|
|
* @return integer Number of enqueued updates |
331
|
|
|
* @since 1.28 |
332
|
|
|
*/ |
333
|
|
|
public static function pendingUpdatesCount() { |
334
|
|
|
return count( self::$preSendUpdates ) + count( self::$postSendUpdates ); |
335
|
|
|
} |
336
|
|
|
|
337
|
|
|
/** |
338
|
|
|
* Clear all pending updates without performing them. Generally, you don't |
339
|
|
|
* want or need to call this. Unit tests need it though. |
340
|
|
|
*/ |
341
|
|
|
public static function clearPendingUpdates() { |
342
|
|
|
self::$preSendUpdates = []; |
343
|
|
|
self::$postSendUpdates = []; |
344
|
|
|
} |
345
|
|
|
|
346
|
|
|
/** |
347
|
|
|
* @return IDatabase[] Connection where commit() cannot be called yet |
348
|
|
|
*/ |
349
|
|
|
private static function getBusyDbConnections() { |
350
|
|
|
$connsBusy = []; |
351
|
|
|
|
352
|
|
|
$lbFactory = MediaWikiServices::getInstance()->getDBLoadBalancerFactory(); |
353
|
|
|
$lbFactory->forEachLB( function ( LoadBalancer $lb ) use ( &$connsBusy ) { |
354
|
|
|
$lb->forEachOpenMasterConnection( function ( IDatabase $conn ) use ( &$connsBusy ) { |
355
|
|
|
if ( $conn->writesOrCallbacksPending() || $conn->explicitTrxActive() ) { |
356
|
|
|
$connsBusy[] = $conn; |
357
|
|
|
} |
358
|
|
|
} ); |
359
|
|
|
} ); |
360
|
|
|
|
361
|
|
|
return $connsBusy; |
362
|
|
|
} |
363
|
|
|
} |
364
|
|
|
|