Completed
Branch master (5cbada)
by
unknown
28:59
created

JobRunner   F

Complexity

Total Complexity 87

Size/Duplication

Total Lines 515
Duplicated Lines 0 %

Coupling/Cohesion

Components 1
Dependencies 16

Importance

Changes 1
Bugs 0 Features 0
Metric Value
c 1
b 0
f 0
dl 0
loc 515
rs 1.5789
wmc 87
lcom 1
cbo 16

12 Methods

Rating   Name   Duplication   Size   Complexity  
C executeJob() 0 70 8
A setDebugHandler() 0 3 1
A setLogger() 0 3 1
A __construct() 0 6 2
F run() 0 152 33
A getMaxRssKb() 0 5 3
B getBackoffTimeToWait() 0 25 6
C loadBackoffs() 0 25 7
D syncBackoffDeltas() 0 34 10
B checkMemoryOK() 0 23 5
A debugCallback() 0 5 2
B commitMasterChanges() 0 54 9

How to fix   Complexity   

Complex Class

Complex classes like JobRunner often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes. You can also have a look at the cohesion graph to spot any un-connected, or weakly-connected components.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

While breaking up the class, it is a good idea to analyze how other classes use JobRunner, and based on these observations, apply Extract Interface, too.

1
<?php
2
/**
3
 * Job queue runner utility methods
4
 *
5
 * This program is free software; you can redistribute it and/or modify
6
 * it under the terms of the GNU General Public License as published by
7
 * the Free Software Foundation; either version 2 of the License, or
8
 * (at your option) any later version.
9
 *
10
 * This program is distributed in the hope that it will be useful,
11
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13
 * GNU General Public License for more details.
14
 *
15
 * You should have received a copy of the GNU General Public License along
16
 * with this program; if not, write to the Free Software Foundation, Inc.,
17
 * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
18
 * http://www.gnu.org/copyleft/gpl.html
19
 *
20
 * @file
21
 * @ingroup JobQueue
22
 */
23
24
use MediaWiki\Logger\LoggerFactory;
25
use Psr\Log\LoggerAwareInterface;
26
use Psr\Log\LoggerInterface;
27
28
/**
29
 * Job queue runner utility methods
30
 *
31
 * @ingroup JobQueue
32
 * @since 1.24
33
 */
34
class JobRunner implements LoggerAwareInterface {
35
	/** @var callable|null Debug output handler */
36
	protected $debug;
37
38
	/**
39
	 * @var LoggerInterface $logger
40
	 */
41
	protected $logger;
42
43
	const MAX_ALLOWED_LAG = 3; // abort if more than this much DB lag is present
44
	const LAG_CHECK_PERIOD = 1.0; // check slave lag this many seconds
45
	const ERROR_BACKOFF_TTL = 1; // seconds to back off a queue due to errors
46
47
	/**
48
	 * @param callable $debug Optional debug output handler
49
	 */
50
	public function setDebugHandler( $debug ) {
51
		$this->debug = $debug;
52
	}
53
54
	/**
55
	 * @param LoggerInterface $logger
56
	 * @return void
57
	 */
58
	public function setLogger( LoggerInterface $logger ) {
59
		$this->logger = $logger;
60
	}
61
62
	/**
63
	 * @param LoggerInterface $logger
64
	 */
65
	public function __construct( LoggerInterface $logger = null ) {
66
		if ( $logger === null ) {
67
			$logger = LoggerFactory::getInstance( 'runJobs' );
68
		}
69
		$this->setLogger( $logger );
70
	}
71
72
	/**
73
	 * Run jobs of the specified number/type for the specified time
74
	 *
75
	 * The response map has a 'job' field that lists status of each job, including:
76
	 *   - type   : the job type
77
	 *   - status : ok/failed
78
	 *   - error  : any error message string
79
	 *   - time   : the job run time in ms
80
	 * The response map also has:
81
	 *   - backoffs : the (job type => seconds) map of backoff times
82
	 *   - elapsed  : the total time spent running tasks in ms
83
	 *   - reached  : the reason the script finished, one of (none-ready, job-limit, time-limit,
84
	 *  memory-limit)
85
	 *
86
	 * This method outputs status information only if a debug handler was set.
87
	 * Any exceptions are caught and logged, but are not reported as output.
88
	 *
89
	 * @param array $options Map of parameters:
90
	 *    - type     : the job type (or false for the default types)
91
	 *    - maxJobs  : maximum number of jobs to run
92
	 *    - maxTime  : maximum time in seconds before stopping
93
	 *    - throttle : whether to respect job backoff configuration
94
	 * @return array Summary response that can easily be JSON serialized
95
	 */
96
	public function run( array $options ) {
97
		global $wgJobClasses, $wgTrxProfilerLimits;
98
99
		$response = [ 'jobs' => [], 'reached' => 'none-ready' ];
100
101
		$type = isset( $options['type'] ) ? $options['type'] : false;
102
		$maxJobs = isset( $options['maxJobs'] ) ? $options['maxJobs'] : false;
103
		$maxTime = isset( $options['maxTime'] ) ? $options['maxTime'] : false;
104
		$noThrottle = isset( $options['throttle'] ) && !$options['throttle'];
105
106
		// Bail if job type is invalid
107
		if ( $type !== false && !isset( $wgJobClasses[$type] ) ) {
108
			$response['reached'] = 'none-possible';
109
			return $response;
110
		}
111
		// Bail out if DB is in read-only mode
112
		if ( wfReadOnly() ) {
113
			$response['reached'] = 'read-only';
114
			return $response;
115
		}
116
		// Bail out if there is too much DB lag.
117
		// This check should not block as we want to try other wiki queues.
118
		list( , $maxLag ) = wfGetLB( wfWikiID() )->getMaxLag();
0 ignored issues
show
Deprecated Code introduced by
The function wfGetLB() has been deprecated with message: since 1.27, use MediaWikiServices::getDBLoadBalancer() or MediaWikiServices::getDBLoadBalancerFactory() instead.

This function has been deprecated. The supplier of the file has supplied an explanatory message.

The explanatory message should give you some clue as to whether and when the function will be removed from the class and what other function to use instead.

Loading history...
119
		if ( $maxLag >= self::MAX_ALLOWED_LAG ) {
120
			$response['reached'] = 'slave-lag-limit';
121
			return $response;
122
		}
123
124
		// Flush any pending DB writes for sanity
125
		wfGetLBFactory()->commitAll( __METHOD__ );
0 ignored issues
show
Deprecated Code introduced by
The function wfGetLBFactory() has been deprecated with message: since 1.27, use MediaWikiServices::getDBLoadBalancerFactory() instead.

This function has been deprecated. The supplier of the file has supplied an explanatory message.

The explanatory message should give you some clue as to whether and when the function will be removed from the class and what other function to use instead.

Loading history...
126
127
		// Catch huge single updates that lead to slave lag
128
		$trxProfiler = Profiler::instance()->getTransactionProfiler();
129
		$trxProfiler->setLogger( LoggerFactory::getInstance( 'DBPerformance' ) );
130
		$trxProfiler->setExpectations( $wgTrxProfilerLimits['JobRunner'], __METHOD__ );
131
132
		// Some jobs types should not run until a certain timestamp
133
		$backoffs = []; // map of (type => UNIX expiry)
134
		$backoffDeltas = []; // map of (type => seconds)
135
		$wait = 'wait'; // block to read backoffs the first time
136
137
		$group = JobQueueGroup::singleton();
138
		$stats = RequestContext::getMain()->getStats();
0 ignored issues
show
Deprecated Code introduced by
The method RequestContext::getStats() has been deprecated with message: since 1.27 use a StatsdDataFactory from MediaWikiServices (preferably injected)

This method has been deprecated. The supplier of the class has supplied an explanatory message.

The explanatory message should give you some clue as to whether and when the method will be removed from the class and what other method or class to use instead.

Loading history...
139
		$jobsPopped = 0;
140
		$timeMsTotal = 0;
141
		$startTime = microtime( true ); // time since jobs started running
142
		$lastCheckTime = 1; // timestamp of last slave check
143
		do {
144
			// Sync the persistent backoffs with concurrent runners
145
			$backoffs = $this->syncBackoffDeltas( $backoffs, $backoffDeltas, $wait );
146
			$blacklist = $noThrottle ? [] : array_keys( $backoffs );
147
			$wait = 'nowait'; // less important now
148
149
			if ( $type === false ) {
150
				$job = $group->pop(
151
					JobQueueGroup::TYPE_DEFAULT,
152
					JobQueueGroup::USE_CACHE,
153
					$blacklist
154
				);
155
			} elseif ( in_array( $type, $blacklist ) ) {
156
				$job = false; // requested queue in backoff state
157
			} else {
158
				$job = $group->pop( $type ); // job from a single queue
159
			}
160
161
			if ( $job ) { // found a job
162
				++$jobsPopped;
163
				$popTime = time();
164
				$jType = $job->getType();
165
166
				WebRequest::overrideRequestId( $job->getRequestId() );
167
168
				// Back off of certain jobs for a while (for throttling and for errors)
169
				$ttw = $this->getBackoffTimeToWait( $job );
0 ignored issues
show
Bug introduced by
It seems like $job can also be of type boolean; however, JobRunner::getBackoffTimeToWait() does only seem to accept object<Job>, maybe add an additional type check?

If a method or function can return multiple different values and unless you are sure that you only can receive a single value in this context, we recommend to add an additional type check:

/**
 * @return array|string
 */
function returnsDifferentValues($x) {
    if ($x) {
        return 'foo';
    }

    return array();
}

$x = returnsDifferentValues($y);
if (is_array($x)) {
    // $x is an array.
}

If this a common case that PHP Analyzer should handle natively, please let us know by opening an issue.

Loading history...
170
				if ( $ttw > 0 ) {
171
					// Always add the delta for other runners in case the time running the
172
					// job negated the backoff for each individually but not collectively.
173
					$backoffDeltas[$jType] = isset( $backoffDeltas[$jType] )
174
						? $backoffDeltas[$jType] + $ttw
175
						: $ttw;
176
					$backoffs = $this->syncBackoffDeltas( $backoffs, $backoffDeltas, $wait );
177
				}
178
179
				$info = $this->executeJob( $job, $stats, $popTime );
0 ignored issues
show
Bug introduced by
It seems like $job can also be of type boolean; however, JobRunner::executeJob() does only seem to accept object<Job>, maybe add an additional type check?

If a method or function can return multiple different values and unless you are sure that you only can receive a single value in this context, we recommend to add an additional type check:

/**
 * @return array|string
 */
function returnsDifferentValues($x) {
    if ($x) {
        return 'foo';
    }

    return array();
}

$x = returnsDifferentValues($y);
if (is_array($x)) {
    // $x is an array.
}

If this a common case that PHP Analyzer should handle natively, please let us know by opening an issue.

Loading history...
Compatibility introduced by
$stats of type object<Liuggio\StatsdCli...tory\StatsdDataFactory> is not a sub-type of object<BufferingStatsdDataFactory>. It seems like you assume a child class of the class Liuggio\StatsdClient\Factory\StatsdDataFactory to be always present.

This check looks for parameters that are defined as one type in their type hint or doc comment but seem to be used as a narrower type, i.e an implementation of an interface or a subclass.

Consider changing the type of the parameter or doing an instanceof check before assuming your parameter is of the expected type.

Loading history...
180
				if ( $info['status'] !== false || !$job->allowRetries() ) {
181
					$group->ack( $job ); // succeeded or job cannot be retried
0 ignored issues
show
Bug introduced by
It seems like $job can also be of type boolean; however, JobQueueGroup::ack() does only seem to accept object<Job>, maybe add an additional type check?

If a method or function can return multiple different values and unless you are sure that you only can receive a single value in this context, we recommend to add an additional type check:

/**
 * @return array|string
 */
function returnsDifferentValues($x) {
    if ($x) {
        return 'foo';
    }

    return array();
}

$x = returnsDifferentValues($y);
if (is_array($x)) {
    // $x is an array.
}

If this a common case that PHP Analyzer should handle natively, please let us know by opening an issue.

Loading history...
182
				}
183
184
				// Back off of certain jobs for a while (for throttling and for errors)
185
				if ( $info['status'] === false && mt_rand( 0, 49 ) == 0 ) {
186
					$ttw = max( $ttw, self::ERROR_BACKOFF_TTL ); // too many errors
187
					$backoffDeltas[$jType] = isset( $backoffDeltas[$jType] )
188
						? $backoffDeltas[$jType] + $ttw
189
						: $ttw;
190
				}
191
192
				$response['jobs'][] = [
193
					'type'   => $jType,
194
					'status' => ( $info['status'] === false ) ? 'failed' : 'ok',
195
					'error'  => $info['error'],
196
					'time'   => $info['timeMs']
197
				];
198
				$timeMsTotal += $info['timeMs'];
199
200
				// Break out if we hit the job count or wall time limits...
201
				if ( $maxJobs && $jobsPopped >= $maxJobs ) {
202
					$response['reached'] = 'job-limit';
203
					break;
204
				} elseif ( $maxTime && ( microtime( true ) - $startTime ) > $maxTime ) {
205
					$response['reached'] = 'time-limit';
206
					break;
207
				}
208
209
				// Don't let any of the main DB slaves get backed up.
210
				// This only waits for so long before exiting and letting
211
				// other wikis in the farm (on different masters) get a chance.
212
				$timePassed = microtime( true ) - $lastCheckTime;
213
				if ( $timePassed >= self::LAG_CHECK_PERIOD || $timePassed < 0 ) {
214
					try {
215
						wfGetLBFactory()->waitForReplication( [
0 ignored issues
show
Deprecated Code introduced by
The function wfGetLBFactory() has been deprecated with message: since 1.27, use MediaWikiServices::getDBLoadBalancerFactory() instead.

This function has been deprecated. The supplier of the file has supplied an explanatory message.

The explanatory message should give you some clue as to whether and when the function will be removed from the class and what other function to use instead.

Loading history...
216
							'ifWritesSince' => $lastCheckTime,
217
							'timeout' => self::MAX_ALLOWED_LAG
218
						] );
219
					} catch ( DBReplicationWaitError $e ) {
220
						$response['reached'] = 'slave-lag-limit';
221
						break;
222
					}
223
					$lastCheckTime = microtime( true );
224
				}
225
				// Don't let any queue slaves/backups fall behind
226
				if ( $jobsPopped > 0 && ( $jobsPopped % 100 ) == 0 ) {
227
					$group->waitForBackups();
228
				}
229
230
				// Bail if near-OOM instead of in a job
231
				if ( !$this->checkMemoryOK() ) {
232
					$response['reached'] = 'memory-limit';
233
					break;
234
				}
235
			}
236
		} while ( $job ); // stop when there are no jobs
237
238
		// Sync the persistent backoffs for the next runJobs.php pass
239
		if ( $backoffDeltas ) {
240
			$this->syncBackoffDeltas( $backoffs, $backoffDeltas, 'wait' );
241
		}
242
243
		$response['backoffs'] = $backoffs;
244
		$response['elapsed'] = $timeMsTotal;
245
246
		return $response;
247
	}
248
249
	/**
250
	 * @param Job $job
251
	 * @param BufferingStatsdDataFactory $stats
252
	 * @param float $popTime
253
	 * @return array Map of status/error/timeMs
254
	 */
255
	private function executeJob( Job $job, $stats, $popTime ) {
256
		$jType = $job->getType();
257
		$msg = $job->toString() . " STARTING";
258
		$this->logger->debug( $msg );
259
		$this->debugCallback( $msg );
260
261
		// Run the job...
262
		$rssStart = $this->getMaxRssKb();
263
		$jobStartTime = microtime( true );
264
		try {
265
			$status = $job->run();
266
			$error = $job->getLastError();
267
			$this->commitMasterChanges( $job );
268
269
			DeferredUpdates::doUpdates();
270
			$this->commitMasterChanges( $job );
271
		} catch ( Exception $e ) {
272
			MWExceptionHandler::rollbackMasterChangesAndLog( $e );
273
			$status = false;
274
			$error = get_class( $e ) . ': ' . $e->getMessage();
275
			MWExceptionHandler::logException( $e );
276
		}
277
		// Always attempt to call teardown() even if Job throws exception.
278
		try {
279
			$job->teardown();
280
		} catch ( Exception $e ) {
281
			MWExceptionHandler::logException( $e );
282
		}
283
284
		// Commit all outstanding connections that are in a transaction
285
		// to get a fresh repeatable read snapshot on every connection.
286
		// Note that jobs are still responsible for handling slave lag.
287
		wfGetLBFactory()->commitAll( __METHOD__ );
0 ignored issues
show
Deprecated Code introduced by
The function wfGetLBFactory() has been deprecated with message: since 1.27, use MediaWikiServices::getDBLoadBalancerFactory() instead.

This function has been deprecated. The supplier of the file has supplied an explanatory message.

The explanatory message should give you some clue as to whether and when the function will be removed from the class and what other function to use instead.

Loading history...
288
		// Clear out title cache data from prior snapshots
289
		LinkCache::singleton()->clear();
0 ignored issues
show
Deprecated Code introduced by
The method LinkCache::singleton() has been deprecated with message: since 1.28, use MediaWikiServices instead

This method has been deprecated. The supplier of the class has supplied an explanatory message.

The explanatory message should give you some clue as to whether and when the method will be removed from the class and what other method or class to use instead.

Loading history...
290
		$timeMs = intval( ( microtime( true ) - $jobStartTime ) * 1000 );
291
		$rssEnd = $this->getMaxRssKb();
292
293
		// Record how long jobs wait before getting popped
294
		$readyTs = $job->getReadyTimestamp();
295
		if ( $readyTs ) {
0 ignored issues
show
Bug Best Practice introduced by
The expression $readyTs of type string|null is loosely compared to true; this is ambiguous if the string can be empty. You might want to explicitly use !== null instead.

In PHP, under loose comparison (like ==, or !=, or switch conditions), values of different types might be equal.

For string values, the empty string '' is a special case, in particular the following results might be unexpected:

''   == false // true
''   == null  // true
'ab' == false // false
'ab' == null  // false

// It is often better to use strict comparison
'' === false // false
'' === null  // false
Loading history...
296
			$pickupDelay = max( 0, $popTime - $readyTs );
297
			$stats->timing( 'jobqueue.pickup_delay.all', 1000 * $pickupDelay );
298
			$stats->timing( "jobqueue.pickup_delay.$jType", 1000 * $pickupDelay );
299
		}
300
		// Record root job age for jobs being run
301
		$rootTimestamp = $job->getRootJobParams()['rootJobTimestamp'];
302
		if ( $rootTimestamp ) {
303
			$age = max( 0, $popTime - wfTimestamp( TS_UNIX, $rootTimestamp ) );
304
			$stats->timing( "jobqueue.pickup_root_age.$jType", 1000 * $age );
305
		}
306
		// Track the execution time for jobs
307
		$stats->timing( "jobqueue.run.$jType", $timeMs );
308
		// Track RSS increases for jobs (in case of memory leaks)
309
		if ( $rssStart && $rssEnd ) {
0 ignored issues
show
Bug Best Practice introduced by
The expression $rssStart of type integer|null is loosely compared to true; this is ambiguous if the integer can be zero. You might want to explicitly use !== null instead.

In PHP, under loose comparison (like ==, or !=, or switch conditions), values of different types might be equal.

For integer values, zero is a special case, in particular the following results might be unexpected:

0   == false // true
0   == null  // true
123 == false // false
123 == null  // false

// It is often better to use strict comparison
0 === false // false
0 === null  // false
Loading history...
Bug Best Practice introduced by
The expression $rssEnd of type integer|null is loosely compared to true; this is ambiguous if the integer can be zero. You might want to explicitly use !== null instead.

In PHP, under loose comparison (like ==, or !=, or switch conditions), values of different types might be equal.

For integer values, zero is a special case, in particular the following results might be unexpected:

0   == false // true
0   == null  // true
123 == false // false
123 == null  // false

// It is often better to use strict comparison
0 === false // false
0 === null  // false
Loading history...
310
			$stats->increment( "jobqueue.rss_delta.$jType", $rssEnd - $rssStart );
311
		}
312
313
		if ( $status === false ) {
314
			$msg = $job->toString() . " t=$timeMs error={$error}";
315
			$this->logger->error( $msg );
316
			$this->debugCallback( $msg );
317
		} else {
318
			$msg = $job->toString() . " t=$timeMs good";
319
			$this->logger->info( $msg );
320
			$this->debugCallback( $msg );
321
		}
322
323
		return [ 'status' => $status, 'error' => $error, 'timeMs' => $timeMs ];
324
	}
325
326
	/**
327
	 * @return int|null Max memory RSS in kilobytes
328
	 */
329
	private function getMaxRssKb() {
330
		$info = wfGetRusage() ?: [];
331
		// see http://linux.die.net/man/2/getrusage
332
		return isset( $info['ru_maxrss'] ) ? (int)$info['ru_maxrss'] : null;
333
	}
334
335
	/**
336
	 * @param Job $job
337
	 * @return int Seconds for this runner to avoid doing more jobs of this type
338
	 * @see $wgJobBackoffThrottling
339
	 */
340
	private function getBackoffTimeToWait( Job $job ) {
341
		global $wgJobBackoffThrottling;
342
343
		if ( !isset( $wgJobBackoffThrottling[$job->getType()] ) ||
344
			$job instanceof DuplicateJob // no work was done
345
		) {
346
			return 0; // not throttled
347
		}
348
349
		$itemsPerSecond = $wgJobBackoffThrottling[$job->getType()];
350
		if ( $itemsPerSecond <= 0 ) {
351
			return 0; // not throttled
352
		}
353
354
		$seconds = 0;
355
		if ( $job->workItemCount() > 0 ) {
356
			$exactSeconds = $job->workItemCount() / $itemsPerSecond;
357
			// use randomized rounding
358
			$seconds = floor( $exactSeconds );
359
			$remainder = $exactSeconds - $seconds;
360
			$seconds += ( mt_rand() / mt_getrandmax() < $remainder ) ? 1 : 0;
361
		}
362
363
		return (int)$seconds;
364
	}
365
366
	/**
367
	 * Get the previous backoff expiries from persistent storage
368
	 * On I/O or lock acquisition failure this returns the original $backoffs.
369
	 *
370
	 * @param array $backoffs Map of (job type => UNIX timestamp)
371
	 * @param string $mode Lock wait mode - "wait" or "nowait"
372
	 * @return array Map of (job type => backoff expiry timestamp)
373
	 */
374
	private function loadBackoffs( array $backoffs, $mode = 'wait' ) {
375
		$file = wfTempDir() . '/mw-runJobs-backoffs.json';
376
		if ( is_file( $file ) ) {
377
			$noblock = ( $mode === 'nowait' ) ? LOCK_NB : 0;
378
			$handle = fopen( $file, 'rb' );
379
			if ( !flock( $handle, LOCK_SH | $noblock ) ) {
380
				fclose( $handle );
381
				return $backoffs; // don't wait on lock
382
			}
383
			$content = stream_get_contents( $handle );
384
			flock( $handle, LOCK_UN );
385
			fclose( $handle );
386
			$ctime = microtime( true );
387
			$cBackoffs = json_decode( $content, true ) ?: [];
388
			foreach ( $cBackoffs as $type => $timestamp ) {
389
				if ( $timestamp < $ctime ) {
390
					unset( $cBackoffs[$type] );
391
				}
392
			}
393
		} else {
394
			$cBackoffs = [];
395
		}
396
397
		return $cBackoffs;
398
	}
399
400
	/**
401
	 * Merge the current backoff expiries from persistent storage
402
	 *
403
	 * The $deltas map is set to an empty array on success.
404
	 * On I/O or lock acquisition failure this returns the original $backoffs.
405
	 *
406
	 * @param array $backoffs Map of (job type => UNIX timestamp)
407
	 * @param array $deltas Map of (job type => seconds)
408
	 * @param string $mode Lock wait mode - "wait" or "nowait"
409
	 * @return array The new backoffs account for $backoffs and the latest file data
410
	 */
411
	private function syncBackoffDeltas( array $backoffs, array &$deltas, $mode = 'wait' ) {
412
		if ( !$deltas ) {
413
			return $this->loadBackoffs( $backoffs, $mode );
414
		}
415
416
		$noblock = ( $mode === 'nowait' ) ? LOCK_NB : 0;
417
		$file = wfTempDir() . '/mw-runJobs-backoffs.json';
418
		$handle = fopen( $file, 'wb+' );
419
		if ( !flock( $handle, LOCK_EX | $noblock ) ) {
420
			fclose( $handle );
421
			return $backoffs; // don't wait on lock
422
		}
423
		$ctime = microtime( true );
424
		$content = stream_get_contents( $handle );
425
		$cBackoffs = json_decode( $content, true ) ?: [];
426
		foreach ( $deltas as $type => $seconds ) {
427
			$cBackoffs[$type] = isset( $cBackoffs[$type] ) && $cBackoffs[$type] >= $ctime
428
				? $cBackoffs[$type] + $seconds
429
				: $ctime + $seconds;
430
		}
431
		foreach ( $cBackoffs as $type => $timestamp ) {
432
			if ( $timestamp < $ctime ) {
433
				unset( $cBackoffs[$type] );
434
			}
435
		}
436
		ftruncate( $handle, 0 );
437
		fwrite( $handle, json_encode( $cBackoffs ) );
438
		flock( $handle, LOCK_UN );
439
		fclose( $handle );
440
441
		$deltas = [];
442
443
		return $cBackoffs;
444
	}
445
446
	/**
447
	 * Make sure that this script is not too close to the memory usage limit.
448
	 * It is better to die in between jobs than OOM right in the middle of one.
449
	 * @return bool
450
	 */
451
	private function checkMemoryOK() {
452
		static $maxBytes = null;
453
		if ( $maxBytes === null ) {
454
			$m = [];
455
			if ( preg_match( '!^(\d+)(k|m|g|)$!i', ini_get( 'memory_limit' ), $m ) ) {
456
				list( , $num, $unit ) = $m;
457
				$conv = [ 'g' => 1073741824, 'm' => 1048576, 'k' => 1024, '' => 1 ];
458
				$maxBytes = $num * $conv[strtolower( $unit )];
459
			} else {
460
				$maxBytes = 0;
461
			}
462
		}
463
		$usedBytes = memory_get_usage();
464
		if ( $maxBytes && $usedBytes >= 0.95 * $maxBytes ) {
465
			$msg = "Detected excessive memory usage ($usedBytes/$maxBytes).";
466
			$this->debugCallback( $msg );
467
			$this->logger->error( $msg );
468
469
			return false;
470
		}
471
472
		return true;
473
	}
474
475
	/**
476
	 * Log the job message
477
	 * @param string $msg The message to log
478
	 */
479
	private function debugCallback( $msg ) {
480
		if ( $this->debug ) {
481
			call_user_func_array( $this->debug, [ wfTimestamp( TS_DB ) . " $msg\n" ] );
482
		}
483
	}
484
485
	/**
486
	 * Issue a commit on all masters who are currently in a transaction and have
487
	 * made changes to the database. It also supports sometimes waiting for the
488
	 * local wiki's slaves to catch up. See the documentation for
489
	 * $wgJobSerialCommitThreshold for more.
490
	 *
491
	 * @param Job $job
492
	 * @throws DBError
493
	 */
494
	private function commitMasterChanges( Job $job ) {
495
		global $wgJobSerialCommitThreshold;
496
497
		$lb = wfGetLB( wfWikiID() );
0 ignored issues
show
Deprecated Code introduced by
The function wfGetLB() has been deprecated with message: since 1.27, use MediaWikiServices::getDBLoadBalancer() or MediaWikiServices::getDBLoadBalancerFactory() instead.

This function has been deprecated. The supplier of the file has supplied an explanatory message.

The explanatory message should give you some clue as to whether and when the function will be removed from the class and what other function to use instead.

Loading history...
498
		if ( $wgJobSerialCommitThreshold !== false && $lb->getServerCount() > 1 ) {
499
			// Generally, there is one master connection to the local DB
500
			$dbwSerial = $lb->getAnyOpenConnection( $lb->getWriterIndex() );
501
		} else {
502
			$dbwSerial = false;
503
		}
504
505
		if ( !$dbwSerial
506
			|| !$dbwSerial->namedLocksEnqueue()
507
			|| $dbwSerial->pendingWriteQueryDuration() < $wgJobSerialCommitThreshold
508
		) {
509
			// Writes are all to foreign DBs, named locks don't form queues,
510
			// or $wgJobSerialCommitThreshold is not reached; commit changes now
511
			wfGetLBFactory()->commitMasterChanges( __METHOD__ );
0 ignored issues
show
Deprecated Code introduced by
The function wfGetLBFactory() has been deprecated with message: since 1.27, use MediaWikiServices::getDBLoadBalancerFactory() instead.

This function has been deprecated. The supplier of the file has supplied an explanatory message.

The explanatory message should give you some clue as to whether and when the function will be removed from the class and what other function to use instead.

Loading history...
512
			return;
513
		}
514
515
		$ms = intval( 1000 * $dbwSerial->pendingWriteQueryDuration() );
516
		$msg = $job->toString() . " COMMIT ENQUEUED [{$ms}ms of writes]";
517
		$this->logger->info( $msg );
518
		$this->debugCallback( $msg );
519
520
		// Wait for an exclusive lock to commit
521
		if ( !$dbwSerial->lock( 'jobrunner-serial-commit', __METHOD__, 30 ) ) {
522
			// This will trigger a rollback in the main loop
523
			throw new DBError( $dbwSerial, "Timed out waiting on commit queue." );
0 ignored issues
show
Bug introduced by
It seems like $dbwSerial can also be of type boolean; however, DBError::__construct() does only seem to accept null|object<DatabaseBase>, maybe add an additional type check?

If a method or function can return multiple different values and unless you are sure that you only can receive a single value in this context, we recommend to add an additional type check:

/**
 * @return array|string
 */
function returnsDifferentValues($x) {
    if ($x) {
        return 'foo';
    }

    return array();
}

$x = returnsDifferentValues($y);
if (is_array($x)) {
    // $x is an array.
}

If this a common case that PHP Analyzer should handle natively, please let us know by opening an issue.

Loading history...
524
		}
525
		// Wait for the generic slave to catch up
526
		$pos = $lb->getMasterPos();
527
		if ( $pos ) {
528
			$lb->waitForOne( $pos );
529
		}
530
531
		$fname = __METHOD__;
532
		// Re-ping all masters with transactions. This throws DBError if some
533
		// connection died while waiting on locks/slaves, triggering a rollback.
534
		wfGetLBFactory()->forEachLB( function( LoadBalancer $lb ) use ( $fname ) {
0 ignored issues
show
Deprecated Code introduced by
The function wfGetLBFactory() has been deprecated with message: since 1.27, use MediaWikiServices::getDBLoadBalancerFactory() instead.

This function has been deprecated. The supplier of the file has supplied an explanatory message.

The explanatory message should give you some clue as to whether and when the function will be removed from the class and what other function to use instead.

Loading history...
535
			$lb->forEachOpenConnection( function( IDatabase $conn ) use ( $fname ) {
536
				if ( $conn->writesOrCallbacksPending() ) {
537
					$conn->query( "SELECT 1", $fname );
538
				}
539
			} );
540
		} );
541
542
		// Actually commit the DB master changes
543
		wfGetLBFactory()->commitMasterChanges( __METHOD__ );
0 ignored issues
show
Deprecated Code introduced by
The function wfGetLBFactory() has been deprecated with message: since 1.27, use MediaWikiServices::getDBLoadBalancerFactory() instead.

This function has been deprecated. The supplier of the file has supplied an explanatory message.

The explanatory message should give you some clue as to whether and when the function will be removed from the class and what other function to use instead.

Loading history...
544
545
		// Release the lock
546
		$dbwSerial->unlock( 'jobrunner-serial-commit', __METHOD__ );
547
	}
548
}
549