1
|
|
|
<?php |
2
|
|
|
/** |
3
|
|
|
* Interface and manager for deferred updates. |
4
|
|
|
* |
5
|
|
|
* This program is free software; you can redistribute it and/or modify |
6
|
|
|
* it under the terms of the GNU General Public License as published by |
7
|
|
|
* the Free Software Foundation; either version 2 of the License, or |
8
|
|
|
* (at your option) any later version. |
9
|
|
|
* |
10
|
|
|
* This program is distributed in the hope that it will be useful, |
11
|
|
|
* but WITHOUT ANY WARRANTY; without even the implied warranty of |
12
|
|
|
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
13
|
|
|
* GNU General Public License for more details. |
14
|
|
|
* |
15
|
|
|
* You should have received a copy of the GNU General Public License along |
16
|
|
|
* with this program; if not, write to the Free Software Foundation, Inc., |
17
|
|
|
* 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. |
18
|
|
|
* http://www.gnu.org/copyleft/gpl.html |
19
|
|
|
* |
20
|
|
|
* @file |
21
|
|
|
*/ |
22
|
|
|
use MediaWiki\MediaWikiServices; |
23
|
|
|
|
24
|
|
|
/** |
25
|
|
|
* Class for managing the deferred updates |
26
|
|
|
* |
27
|
|
|
* In web request mode, deferred updates can be run at the end of the request, either before or |
28
|
|
|
* after the HTTP response has been sent. In either case, they run after the DB commit step. If |
29
|
|
|
* an update runs after the response is sent, it will not block clients. If sent before, it will |
30
|
|
|
* run synchronously. If such an update works via queueing, it will be more likely to complete by |
31
|
|
|
* the time the client makes their next request after this one. |
32
|
|
|
* |
33
|
|
|
* In CLI mode, updates are only deferred until the current wiki has no DB write transaction |
34
|
|
|
* active within this request. |
35
|
|
|
* |
36
|
|
|
* When updates are deferred, they use a FIFO queue (one for pre-send and one for post-send). |
37
|
|
|
* |
38
|
|
|
* @since 1.19 |
39
|
|
|
*/ |
40
|
|
|
class DeferredUpdates { |
41
|
|
|
/** @var DeferrableUpdate[] Updates to be deferred until before request end */ |
42
|
|
|
private static $preSendUpdates = []; |
43
|
|
|
/** @var DeferrableUpdate[] Updates to be deferred until after request end */ |
44
|
|
|
private static $postSendUpdates = []; |
45
|
|
|
|
46
|
|
|
const ALL = 0; // all updates |
47
|
|
|
const PRESEND = 1; // for updates that should run before flushing output buffer |
48
|
|
|
const POSTSEND = 2; // for updates that should run after flushing output buffer |
49
|
|
|
|
50
|
|
|
const BIG_QUEUE_SIZE = 100; |
51
|
|
|
|
52
|
|
|
/** |
53
|
|
|
* Add an update to the deferred list |
54
|
|
|
* |
55
|
|
|
* @param DeferrableUpdate $update Some object that implements doUpdate() |
56
|
|
|
* @param integer $type DeferredUpdates constant (PRESEND or POSTSEND) (since 1.27) |
57
|
|
|
*/ |
58
|
|
|
public static function addUpdate( DeferrableUpdate $update, $type = self::POSTSEND ) { |
59
|
|
|
if ( $type === self::PRESEND ) { |
60
|
|
|
self::push( self::$preSendUpdates, $update ); |
61
|
|
|
} else { |
62
|
|
|
self::push( self::$postSendUpdates, $update ); |
63
|
|
|
} |
64
|
|
|
} |
65
|
|
|
|
66
|
|
|
/** |
67
|
|
|
* Add a callable update. In a lot of cases, we just need a callback/closure, |
68
|
|
|
* defining a new DeferrableUpdate object is not necessary |
69
|
|
|
* |
70
|
|
|
* @see MWCallableUpdate::__construct() |
71
|
|
|
* |
72
|
|
|
* @param callable $callable |
73
|
|
|
* @param integer $type DeferredUpdates constant (PRESEND or POSTSEND) (since 1.27) |
74
|
|
|
* @param IDatabase|null $dbw Abort if this DB is rolled back [optional] (since 1.28) |
75
|
|
|
*/ |
76
|
|
|
public static function addCallableUpdate( |
77
|
|
|
$callable, $type = self::POSTSEND, IDatabase $dbw = null |
78
|
|
|
) { |
79
|
|
|
self::addUpdate( new MWCallableUpdate( $callable, wfGetCaller(), $dbw ), $type ); |
80
|
|
|
} |
81
|
|
|
|
82
|
|
|
/** |
83
|
|
|
* Do any deferred updates and clear the list |
84
|
|
|
* |
85
|
|
|
* @param string $mode Use "enqueue" to use the job queue when possible [Default: "run"] |
86
|
|
|
* @param integer $type DeferredUpdates constant (PRESEND, POSTSEND, or ALL) (since 1.27) |
87
|
|
|
*/ |
88
|
|
|
public static function doUpdates( $mode = 'run', $type = self::ALL ) { |
89
|
|
|
if ( $type === self::ALL || $type == self::PRESEND ) { |
90
|
|
|
self::execute( self::$preSendUpdates, $mode ); |
91
|
|
|
} |
92
|
|
|
|
93
|
|
|
if ( $type === self::ALL || $type == self::POSTSEND ) { |
94
|
|
|
self::execute( self::$postSendUpdates, $mode ); |
95
|
|
|
} |
96
|
|
|
} |
97
|
|
|
|
98
|
|
|
private static function push( array &$queue, DeferrableUpdate $update ) { |
99
|
|
|
global $wgCommandLineMode; |
100
|
|
|
|
101
|
|
|
if ( $update instanceof MergeableUpdate ) { |
102
|
|
|
$class = get_class( $update ); // fully-qualified class |
103
|
|
|
if ( isset( $queue[$class] ) ) { |
104
|
|
|
/** @var $existingUpdate MergeableUpdate */ |
105
|
|
|
$existingUpdate = $queue[$class]; |
106
|
|
|
$existingUpdate->merge( $update ); |
107
|
|
|
} else { |
108
|
|
|
$queue[$class] = $update; |
109
|
|
|
} |
110
|
|
|
} else { |
111
|
|
|
$queue[] = $update; |
112
|
|
|
} |
113
|
|
|
|
114
|
|
|
// CLI scripts may forget to periodically flush these updates, |
115
|
|
|
// so try to handle that rather than OOMing and losing them entirely. |
116
|
|
|
// Try to run the updates as soon as there is no current wiki transaction. |
117
|
|
|
static $waitingOnTrx = false; // de-duplicate callback |
118
|
|
|
if ( $wgCommandLineMode && !$waitingOnTrx ) { |
119
|
|
|
$lb = wfGetLB(); |
|
|
|
|
120
|
|
|
$dbw = $lb->getAnyOpenConnection( $lb->getWriterIndex() ); |
121
|
|
|
// Do the update as soon as there is no transaction |
122
|
|
|
if ( $dbw && $dbw->trxLevel() ) { |
123
|
|
|
$waitingOnTrx = true; |
124
|
|
|
$dbw->onTransactionIdle( function() use ( &$waitingOnTrx ) { |
125
|
|
|
DeferredUpdates::doUpdates(); |
126
|
|
|
$waitingOnTrx = false; |
127
|
|
|
} ); |
128
|
|
|
} else { |
129
|
|
|
self::doUpdates(); |
130
|
|
|
} |
131
|
|
|
} |
132
|
|
|
} |
133
|
|
|
|
134
|
|
|
public static function execute( array &$queue, $mode ) { |
135
|
|
|
$stats = \MediaWiki\MediaWikiServices::getInstance()->getStatsdDataFactory(); |
136
|
|
|
$method = RequestContext::getMain()->getRequest()->getMethod(); |
137
|
|
|
|
138
|
|
|
$updates = $queue; // snapshot of queue |
139
|
|
|
// Keep doing rounds of updates until none get enqueued |
140
|
|
|
while ( count( $updates ) ) { |
141
|
|
|
$queue = []; // clear the queue |
142
|
|
|
/** @var DataUpdate[] $dataUpdates */ |
143
|
|
|
$dataUpdates = []; |
144
|
|
|
/** @var DeferrableUpdate[] $otherUpdates */ |
145
|
|
|
$otherUpdates = []; |
146
|
|
|
foreach ( $updates as $update ) { |
147
|
|
|
if ( $update instanceof DataUpdate ) { |
148
|
|
|
$dataUpdates[] = $update; |
149
|
|
|
} else { |
150
|
|
|
$otherUpdates[] = $update; |
151
|
|
|
} |
152
|
|
|
|
153
|
|
|
$name = $update instanceof DeferrableCallback |
154
|
|
|
? get_class( $update ) . '-' . $update->getOrigin() |
155
|
|
|
: get_class( $update ); |
156
|
|
|
$stats->increment( 'deferred_updates.' . $method . '.' . $name ); |
157
|
|
|
} |
158
|
|
|
|
159
|
|
|
// Delegate DataUpdate execution to the DataUpdate class |
160
|
|
|
try { |
161
|
|
|
DataUpdate::runUpdates( $dataUpdates, $mode ); |
162
|
|
|
} catch ( Exception $e ) { |
163
|
|
|
// Let the other updates occur if these had to rollback |
164
|
|
|
MWExceptionHandler::logException( $e ); |
165
|
|
|
} |
166
|
|
|
// Execute the non-DataUpdate tasks |
167
|
|
|
foreach ( $otherUpdates as $update ) { |
168
|
|
|
try { |
169
|
|
|
$update->doUpdate(); |
170
|
|
|
wfGetLBFactory()->commitMasterChanges( __METHOD__ ); |
|
|
|
|
171
|
|
|
} catch ( Exception $e ) { |
172
|
|
|
// We don't want exceptions thrown during deferred updates to |
173
|
|
|
// be reported to the user since the output is already sent |
174
|
|
|
if ( !$e instanceof ErrorPageError ) { |
175
|
|
|
MWExceptionHandler::logException( $e ); |
176
|
|
|
} |
177
|
|
|
// Make sure incomplete transactions are not committed and end any |
178
|
|
|
// open atomic sections so that other DB updates have a chance to run |
179
|
|
|
wfGetLBFactory()->rollbackMasterChanges( __METHOD__ ); |
|
|
|
|
180
|
|
|
} |
181
|
|
|
} |
182
|
|
|
|
183
|
|
|
$updates = $queue; // new snapshot of queue (check for new entries) |
184
|
|
|
} |
185
|
|
|
} |
186
|
|
|
|
187
|
|
|
/** |
188
|
|
|
* Run all deferred updates immediately if there are no DB writes active |
189
|
|
|
* |
190
|
|
|
* If $mode is 'run' but there are busy databates, EnqueueableDataUpdate |
191
|
|
|
* tasks will be enqueued anyway for the sake of progress. |
192
|
|
|
* |
193
|
|
|
* @param string $mode Use "enqueue" to use the job queue when possible |
194
|
|
|
* @return bool Whether updates were allowed to run |
195
|
|
|
* @since 1.28 |
196
|
|
|
*/ |
197
|
|
|
public static function tryOpportunisticExecute( $mode = 'run' ) { |
198
|
|
|
static $recursionGuard = false; |
199
|
|
|
if ( $recursionGuard ) { |
200
|
|
|
return false; // COMMITs trigger inside update loop and inside some updates |
201
|
|
|
} |
202
|
|
|
|
203
|
|
|
try { |
204
|
|
|
$recursionGuard = true; |
205
|
|
|
if ( !self::getBusyDbConnections() ) { |
206
|
|
|
self::doUpdates( $mode ); |
207
|
|
|
return true; |
208
|
|
|
} |
209
|
|
|
|
210
|
|
|
if ( self::pendingUpdatesCount() >= self::BIG_QUEUE_SIZE ) { |
211
|
|
|
// If we cannot run the updates with outer transaction context, try to |
212
|
|
|
// at least enqueue all the updates that support queueing to job queue |
213
|
|
|
self::$preSendUpdates = self::enqueueUpdates( self::$preSendUpdates ); |
214
|
|
|
self::$postSendUpdates = self::enqueueUpdates( self::$postSendUpdates ); |
215
|
|
|
} |
216
|
|
|
|
217
|
|
|
return !self::pendingUpdatesCount(); |
218
|
|
|
} finally { |
219
|
|
|
$recursionGuard = false; |
220
|
|
|
} |
221
|
|
|
} |
222
|
|
|
|
223
|
|
|
/** |
224
|
|
|
* Enqueue a job for each EnqueueableDataUpdate item and return the other items |
225
|
|
|
* |
226
|
|
|
* @param DeferrableUpdate[] $updates A list of deferred update instances |
227
|
|
|
* @return DeferrableUpdate[] Remaining updates that do not support being queued |
228
|
|
|
*/ |
229
|
|
View Code Duplication |
private static function enqueueUpdates( array $updates ) { |
230
|
|
|
$remaining = []; |
231
|
|
|
|
232
|
|
|
foreach ( $updates as $update ) { |
233
|
|
|
if ( $update instanceof EnqueueableDataUpdate ) { |
234
|
|
|
$spec = $update->getAsJobSpecification(); |
235
|
|
|
JobQueueGroup::singleton( $spec['wiki'] )->push( $spec['job'] ); |
236
|
|
|
} else { |
237
|
|
|
$remaining[] = $update; |
238
|
|
|
} |
239
|
|
|
} |
240
|
|
|
|
241
|
|
|
return $remaining; |
242
|
|
|
} |
243
|
|
|
|
244
|
|
|
/** |
245
|
|
|
* @return integer Number of enqueued updates |
246
|
|
|
* @since 1.28 |
247
|
|
|
*/ |
248
|
|
|
public static function pendingUpdatesCount() { |
249
|
|
|
return count( self::$preSendUpdates ) + count( self::$postSendUpdates ); |
250
|
|
|
} |
251
|
|
|
|
252
|
|
|
/** |
253
|
|
|
* Clear all pending updates without performing them. Generally, you don't |
254
|
|
|
* want or need to call this. Unit tests need it though. |
255
|
|
|
*/ |
256
|
|
|
public static function clearPendingUpdates() { |
257
|
|
|
self::$preSendUpdates = []; |
258
|
|
|
self::$postSendUpdates = []; |
259
|
|
|
} |
260
|
|
|
|
261
|
|
|
/** |
262
|
|
|
* Set the rollback/commit watcher on a DB to trigger update runs when safe |
263
|
|
|
* |
264
|
|
|
* @TODO: use this to replace DB logic in push() |
265
|
|
|
* @param LoadBalancer $lb |
266
|
|
|
* @since 1.28 |
267
|
|
|
*/ |
268
|
|
|
public static function installDBListener( LoadBalancer $lb ) { |
269
|
|
|
static $triggers = [ IDatabase::TRIGGER_COMMIT, IDatabase::TRIGGER_ROLLBACK ]; |
270
|
|
|
// Hook into active master connections to find a moment where no writes are pending |
271
|
|
|
$lb->setTransactionListener( |
272
|
|
|
__METHOD__, |
273
|
|
|
function ( $trigger, IDatabase $conn ) use ( $triggers ) { |
274
|
|
|
global $wgCommandLineMode; |
275
|
|
|
|
276
|
|
|
if ( $wgCommandLineMode && in_array( $trigger, $triggers ) ) { |
277
|
|
|
DeferredUpdates::tryOpportunisticExecute(); |
278
|
|
|
} |
279
|
|
|
} |
280
|
|
|
); |
281
|
|
|
} |
282
|
|
|
|
283
|
|
|
/** |
284
|
|
|
* @return IDatabase[] Connection where commit() cannot be called yet |
285
|
|
|
*/ |
286
|
|
|
private static function getBusyDbConnections() { |
287
|
|
|
$connsBusy = []; |
288
|
|
|
|
289
|
|
|
$lbFactory = MediaWikiServices::getInstance()->getDBLoadBalancerFactory(); |
290
|
|
|
$lbFactory->forEachLB( function ( LoadBalancer $lb ) use ( &$connsBusy ) { |
291
|
|
|
$lb->forEachOpenMasterConnection( function ( IDatabase $conn ) use ( &$connsBusy ) { |
292
|
|
|
if ( $conn->writesOrCallbacksPending() || $conn->explicitTrxActive() ) { |
293
|
|
|
$connsBusy[] = $conn; |
294
|
|
|
} |
295
|
|
|
} ); |
296
|
|
|
} ); |
297
|
|
|
|
298
|
|
|
return $connsBusy; |
299
|
|
|
} |
300
|
|
|
} |
301
|
|
|
|
This function has been deprecated. The supplier of the file has supplied an explanatory message.
The explanatory message should give you some clue as to whether and when the function will be removed from the class and what other function to use instead.