Issues (4122)

Security Analysis    not enabled

This project does not seem to handle request data directly as such no vulnerable execution paths were found.

  Cross-Site Scripting
Cross-Site Scripting enables an attacker to inject code into the response of a web-request that is viewed by other users. It can for example be used to bypass access controls, or even to take over other users' accounts.
  File Exposure
File Exposure allows an attacker to gain access to local files that he should not be able to access. These files can for example include database credentials, or other configuration files.
  File Manipulation
File Manipulation enables an attacker to write custom data to files. This potentially leads to injection of arbitrary code on the server.
  Object Injection
Object Injection enables an attacker to inject an object into PHP code, and can lead to arbitrary code execution, file exposure, or file manipulation attacks.
  Code Injection
Code Injection enables an attacker to execute arbitrary code on the server.
  Response Splitting
Response Splitting can be used to send arbitrary responses.
  File Inclusion
File Inclusion enables an attacker to inject custom files into PHP's file loading mechanism, either explicitly passed to include, or for example via PHP's auto-loading mechanism.
  Command Injection
Command Injection enables an attacker to inject a shell command that is execute with the privileges of the web-server. This can be used to expose sensitive data, or gain access of your server.
  SQL Injection
SQL Injection enables an attacker to execute arbitrary SQL code on your database server gaining access to user data, or manipulating user data.
  XPath Injection
XPath Injection enables an attacker to modify the parts of XML document that are read. If that XML document is for example used for authentication, this can lead to further vulnerabilities similar to SQL Injection.
  LDAP Injection
LDAP Injection enables an attacker to inject LDAP statements potentially granting permission to run unauthorized queries, or modify content inside the LDAP tree.
  Header Injection
  Other Vulnerability
This category comprises other attack vectors such as manipulating the PHP runtime, loading custom extensions, freezing the runtime, or similar.
  Regex Injection
Regex Injection enables an attacker to execute arbitrary code in your PHP process.
  XML Injection
XML Injection enables an attacker to read files on your local filesystem including configuration files, or can be abused to freeze your web-server process.
  Variable Injection
Variable Injection enables an attacker to overwrite program variables with custom data, and can lead to further vulnerabilities.
Unfortunately, the security analysis is currently not available for your project. If you are a non-commercial open-source project, please contact support to gain access.

includes/jobqueue/JobRunner.php (9 issues)

Upgrade to new PHP Analysis Engine

These results are based on our legacy PHP analysis, consider migrating to our new PHP analysis engine instead. Learn more

1
<?php
2
/**
3
 * Job queue runner utility methods
4
 *
5
 * This program is free software; you can redistribute it and/or modify
6
 * it under the terms of the GNU General Public License as published by
7
 * the Free Software Foundation; either version 2 of the License, or
8
 * (at your option) any later version.
9
 *
10
 * This program is distributed in the hope that it will be useful,
11
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13
 * GNU General Public License for more details.
14
 *
15
 * You should have received a copy of the GNU General Public License along
16
 * with this program; if not, write to the Free Software Foundation, Inc.,
17
 * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
18
 * http://www.gnu.org/copyleft/gpl.html
19
 *
20
 * @file
21
 * @ingroup JobQueue
22
 */
23
24
use MediaWiki\MediaWikiServices;
25
use MediaWiki\Logger\LoggerFactory;
26
use Liuggio\StatsdClient\Factory\StatsdDataFactory;
27
use Psr\Log\LoggerAwareInterface;
28
use Psr\Log\LoggerInterface;
29
use Wikimedia\ScopedCallback;
0 ignored issues
show
This use statement conflicts with another class in this namespace, ScopedCallback.

Let’s assume that you have a directory layout like this:

.
|-- OtherDir
|   |-- Bar.php
|   `-- Foo.php
`-- SomeDir
    `-- Foo.php

and let’s assume the following content of Bar.php:

// Bar.php
namespace OtherDir;

use SomeDir\Foo; // This now conflicts the class OtherDir\Foo

If both files OtherDir/Foo.php and SomeDir/Foo.php are loaded in the same runtime, you will see a PHP error such as the following:

PHP Fatal error:  Cannot use SomeDir\Foo as Foo because the name is already in use in OtherDir/Foo.php

However, as OtherDir/Foo.php does not necessarily have to be loaded and the error is only triggered if it is loaded before OtherDir/Bar.php, this problem might go unnoticed for a while. In order to prevent this error from surfacing, you must import the namespace with a different alias:

// Bar.php
namespace OtherDir;

use SomeDir\Foo as SomeDirFoo; // There is no conflict anymore.
Loading history...
30
31
/**
32
 * Job queue runner utility methods
33
 *
34
 * @ingroup JobQueue
35
 * @since 1.24
36
 */
37
class JobRunner implements LoggerAwareInterface {
38
	/** @var callable|null Debug output handler */
39
	protected $debug;
40
41
	/**
42
	 * @var LoggerInterface $logger
43
	 */
44
	protected $logger;
45
46
	const MAX_ALLOWED_LAG = 3; // abort if more than this much DB lag is present
47
	const LAG_CHECK_PERIOD = 1.0; // check replica DB lag this many seconds
48
	const ERROR_BACKOFF_TTL = 1; // seconds to back off a queue due to errors
49
50
	/**
51
	 * @param callable $debug Optional debug output handler
52
	 */
53
	public function setDebugHandler( $debug ) {
54
		$this->debug = $debug;
55
	}
56
57
	/**
58
	 * @param LoggerInterface $logger
59
	 * @return void
60
	 */
61
	public function setLogger( LoggerInterface $logger ) {
62
		$this->logger = $logger;
63
	}
64
65
	/**
66
	 * @param LoggerInterface $logger
67
	 */
68
	public function __construct( LoggerInterface $logger = null ) {
69
		if ( $logger === null ) {
70
			$logger = LoggerFactory::getInstance( 'runJobs' );
71
		}
72
		$this->setLogger( $logger );
73
	}
74
75
	/**
76
	 * Run jobs of the specified number/type for the specified time
77
	 *
78
	 * The response map has a 'job' field that lists status of each job, including:
79
	 *   - type   : the job type
80
	 *   - status : ok/failed
81
	 *   - error  : any error message string
82
	 *   - time   : the job run time in ms
83
	 * The response map also has:
84
	 *   - backoffs : the (job type => seconds) map of backoff times
85
	 *   - elapsed  : the total time spent running tasks in ms
86
	 *   - reached  : the reason the script finished, one of (none-ready, job-limit, time-limit,
87
	 *  memory-limit)
88
	 *
89
	 * This method outputs status information only if a debug handler was set.
90
	 * Any exceptions are caught and logged, but are not reported as output.
91
	 *
92
	 * @param array $options Map of parameters:
93
	 *    - type     : the job type (or false for the default types)
94
	 *    - maxJobs  : maximum number of jobs to run
95
	 *    - maxTime  : maximum time in seconds before stopping
96
	 *    - throttle : whether to respect job backoff configuration
97
	 * @return array Summary response that can easily be JSON serialized
98
	 */
99
	public function run( array $options ) {
100
		global $wgJobClasses, $wgTrxProfilerLimits;
101
102
		$response = [ 'jobs' => [], 'reached' => 'none-ready' ];
103
104
		$type = isset( $options['type'] ) ? $options['type'] : false;
105
		$maxJobs = isset( $options['maxJobs'] ) ? $options['maxJobs'] : false;
106
		$maxTime = isset( $options['maxTime'] ) ? $options['maxTime'] : false;
107
		$noThrottle = isset( $options['throttle'] ) && !$options['throttle'];
108
109
		// Bail if job type is invalid
110
		if ( $type !== false && !isset( $wgJobClasses[$type] ) ) {
111
			$response['reached'] = 'none-possible';
112
			return $response;
113
		}
114
		// Bail out if DB is in read-only mode
115
		if ( wfReadOnly() ) {
116
			$response['reached'] = 'read-only';
117
			return $response;
118
		}
119
120
		$lbFactory = MediaWikiServices::getInstance()->getDBLoadBalancerFactory();
121
		// Bail out if there is too much DB lag.
122
		// This check should not block as we want to try other wiki queues.
123
		list( , $maxLag ) = $lbFactory->getMainLB( wfWikiID() )->getMaxLag();
124
		if ( $maxLag >= self::MAX_ALLOWED_LAG ) {
125
			$response['reached'] = 'replica-lag-limit';
126
			return $response;
127
		}
128
129
		// Flush any pending DB writes for sanity
130
		$lbFactory->commitAll( __METHOD__ );
131
132
		// Catch huge single updates that lead to replica DB lag
133
		$trxProfiler = Profiler::instance()->getTransactionProfiler();
134
		$trxProfiler->setLogger( LoggerFactory::getInstance( 'DBPerformance' ) );
135
		$trxProfiler->setExpectations( $wgTrxProfilerLimits['JobRunner'], __METHOD__ );
136
137
		// Some jobs types should not run until a certain timestamp
138
		$backoffs = []; // map of (type => UNIX expiry)
139
		$backoffDeltas = []; // map of (type => seconds)
140
		$wait = 'wait'; // block to read backoffs the first time
141
142
		$group = JobQueueGroup::singleton();
143
		$stats = MediaWikiServices::getInstance()->getStatsdDataFactory();
144
		$jobsPopped = 0;
145
		$timeMsTotal = 0;
146
		$startTime = microtime( true ); // time since jobs started running
147
		$lastCheckTime = 1; // timestamp of last replica DB check
148
		do {
149
			// Sync the persistent backoffs with concurrent runners
150
			$backoffs = $this->syncBackoffDeltas( $backoffs, $backoffDeltas, $wait );
151
			$blacklist = $noThrottle ? [] : array_keys( $backoffs );
152
			$wait = 'nowait'; // less important now
153
154
			if ( $type === false ) {
155
				$job = $group->pop(
156
					JobQueueGroup::TYPE_DEFAULT,
157
					JobQueueGroup::USE_CACHE,
158
					$blacklist
159
				);
160
			} elseif ( in_array( $type, $blacklist ) ) {
161
				$job = false; // requested queue in backoff state
162
			} else {
163
				$job = $group->pop( $type ); // job from a single queue
164
			}
165
			$lbFactory->commitMasterChanges( __METHOD__ ); // flush any JobQueueDB writes
166
167
			if ( $job ) { // found a job
168
				++$jobsPopped;
169
				$popTime = time();
170
				$jType = $job->getType();
171
172
				WebRequest::overrideRequestId( $job->getRequestId() );
173
174
				// Back off of certain jobs for a while (for throttling and for errors)
175
				$ttw = $this->getBackoffTimeToWait( $job );
0 ignored issues
show
It seems like $job can also be of type boolean; however, JobRunner::getBackoffTimeToWait() does only seem to accept object<Job>, maybe add an additional type check?

If a method or function can return multiple different values and unless you are sure that you only can receive a single value in this context, we recommend to add an additional type check:

/**
 * @return array|string
 */
function returnsDifferentValues($x) {
    if ($x) {
        return 'foo';
    }

    return array();
}

$x = returnsDifferentValues($y);
if (is_array($x)) {
    // $x is an array.
}

If this a common case that PHP Analyzer should handle natively, please let us know by opening an issue.

Loading history...
176
				if ( $ttw > 0 ) {
177
					// Always add the delta for other runners in case the time running the
178
					// job negated the backoff for each individually but not collectively.
179
					$backoffDeltas[$jType] = isset( $backoffDeltas[$jType] )
180
						? $backoffDeltas[$jType] + $ttw
181
						: $ttw;
182
					$backoffs = $this->syncBackoffDeltas( $backoffs, $backoffDeltas, $wait );
183
				}
184
185
				$info = $this->executeJob( $job, $lbFactory, $stats, $popTime );
0 ignored issues
show
It seems like $job can also be of type boolean; however, JobRunner::executeJob() does only seem to accept object<Job>, maybe add an additional type check?

If a method or function can return multiple different values and unless you are sure that you only can receive a single value in this context, we recommend to add an additional type check:

/**
 * @return array|string
 */
function returnsDifferentValues($x) {
    if ($x) {
        return 'foo';
    }

    return array();
}

$x = returnsDifferentValues($y);
if (is_array($x)) {
    // $x is an array.
}

If this a common case that PHP Analyzer should handle natively, please let us know by opening an issue.

Loading history...
186
				if ( $info['status'] !== false || !$job->allowRetries() ) {
187
					$group->ack( $job ); // succeeded or job cannot be retried
0 ignored issues
show
It seems like $job can also be of type boolean; however, JobQueueGroup::ack() does only seem to accept object<Job>, maybe add an additional type check?

If a method or function can return multiple different values and unless you are sure that you only can receive a single value in this context, we recommend to add an additional type check:

/**
 * @return array|string
 */
function returnsDifferentValues($x) {
    if ($x) {
        return 'foo';
    }

    return array();
}

$x = returnsDifferentValues($y);
if (is_array($x)) {
    // $x is an array.
}

If this a common case that PHP Analyzer should handle natively, please let us know by opening an issue.

Loading history...
188
					$lbFactory->commitMasterChanges( __METHOD__ ); // flush any JobQueueDB writes
189
				}
190
191
				// Back off of certain jobs for a while (for throttling and for errors)
192
				if ( $info['status'] === false && mt_rand( 0, 49 ) == 0 ) {
193
					$ttw = max( $ttw, self::ERROR_BACKOFF_TTL ); // too many errors
194
					$backoffDeltas[$jType] = isset( $backoffDeltas[$jType] )
195
						? $backoffDeltas[$jType] + $ttw
196
						: $ttw;
197
				}
198
199
				$response['jobs'][] = [
200
					'type'   => $jType,
201
					'status' => ( $info['status'] === false ) ? 'failed' : 'ok',
202
					'error'  => $info['error'],
203
					'time'   => $info['timeMs']
204
				];
205
				$timeMsTotal += $info['timeMs'];
206
207
				// Break out if we hit the job count or wall time limits...
208
				if ( $maxJobs && $jobsPopped >= $maxJobs ) {
209
					$response['reached'] = 'job-limit';
210
					break;
211
				} elseif ( $maxTime && ( microtime( true ) - $startTime ) > $maxTime ) {
212
					$response['reached'] = 'time-limit';
213
					break;
214
				}
215
216
				// Don't let any of the main DB replica DBs get backed up.
217
				// This only waits for so long before exiting and letting
218
				// other wikis in the farm (on different masters) get a chance.
219
				$timePassed = microtime( true ) - $lastCheckTime;
220
				if ( $timePassed >= self::LAG_CHECK_PERIOD || $timePassed < 0 ) {
221
					try {
222
						$lbFactory->waitForReplication( [
223
							'ifWritesSince' => $lastCheckTime,
224
							'timeout' => self::MAX_ALLOWED_LAG
225
						] );
226
					} catch ( DBReplicationWaitError $e ) {
227
						$response['reached'] = 'replica-lag-limit';
228
						break;
229
					}
230
					$lastCheckTime = microtime( true );
231
				}
232
				// Don't let any queue replica DBs/backups fall behind
233
				if ( $jobsPopped > 0 && ( $jobsPopped % 100 ) == 0 ) {
234
					$group->waitForBackups();
235
				}
236
237
				// Bail if near-OOM instead of in a job
238
				if ( !$this->checkMemoryOK() ) {
239
					$response['reached'] = 'memory-limit';
240
					break;
241
				}
242
			}
243
		} while ( $job ); // stop when there are no jobs
244
245
		// Sync the persistent backoffs for the next runJobs.php pass
246
		if ( $backoffDeltas ) {
247
			$this->syncBackoffDeltas( $backoffs, $backoffDeltas, 'wait' );
248
		}
249
250
		$response['backoffs'] = $backoffs;
251
		$response['elapsed'] = $timeMsTotal;
252
253
		return $response;
254
	}
255
256
	/**
257
	 * @param Job $job
258
	 * @param LBFactory $lbFactory
259
	 * @param StatsdDataFactory $stats
260
	 * @param float $popTime
261
	 * @return array Map of status/error/timeMs
262
	 */
263
	private function executeJob( Job $job, LBFactory $lbFactory, $stats, $popTime ) {
264
		$jType = $job->getType();
265
		$msg = $job->toString() . " STARTING";
266
		$this->logger->debug( $msg );
267
		$this->debugCallback( $msg );
268
269
		// Run the job...
270
		$rssStart = $this->getMaxRssKb();
271
		$jobStartTime = microtime( true );
272
		try {
273
			$fnameTrxOwner = get_class( $job ) . '::run'; // give run() outer scope
274
			$lbFactory->beginMasterChanges( $fnameTrxOwner );
275
			$status = $job->run();
276
			$error = $job->getLastError();
277
			$this->commitMasterChanges( $lbFactory, $job, $fnameTrxOwner );
278
			// Run any deferred update tasks; doUpdates() manages transactions itself
279
			DeferredUpdates::doUpdates();
280
		} catch ( Exception $e ) {
281
			MWExceptionHandler::rollbackMasterChangesAndLog( $e );
282
			$status = false;
283
			$error = get_class( $e ) . ': ' . $e->getMessage();
284
		}
285
		// Always attempt to call teardown() even if Job throws exception.
286
		try {
287
			$job->teardown( $status );
288
		} catch ( Exception $e ) {
289
			MWExceptionHandler::logException( $e );
290
		}
291
292
		// Commit all outstanding connections that are in a transaction
293
		// to get a fresh repeatable read snapshot on every connection.
294
		// Note that jobs are still responsible for handling replica DB lag.
295
		$lbFactory->flushReplicaSnapshots( __METHOD__ );
296
		// Clear out title cache data from prior snapshots
297
		MediaWikiServices::getInstance()->getLinkCache()->clear();
298
		$timeMs = intval( ( microtime( true ) - $jobStartTime ) * 1000 );
299
		$rssEnd = $this->getMaxRssKb();
300
301
		// Record how long jobs wait before getting popped
302
		$readyTs = $job->getReadyTimestamp();
303
		if ( $readyTs ) {
0 ignored issues
show
Bug Best Practice introduced by
The expression $readyTs of type string|null is loosely compared to true; this is ambiguous if the string can be empty. You might want to explicitly use !== null instead.

In PHP, under loose comparison (like ==, or !=, or switch conditions), values of different types might be equal.

For string values, the empty string '' is a special case, in particular the following results might be unexpected:

''   == false // true
''   == null  // true
'ab' == false // false
'ab' == null  // false

// It is often better to use strict comparison
'' === false // false
'' === null  // false
Loading history...
304
			$pickupDelay = max( 0, $popTime - $readyTs );
305
			$stats->timing( 'jobqueue.pickup_delay.all', 1000 * $pickupDelay );
306
			$stats->timing( "jobqueue.pickup_delay.$jType", 1000 * $pickupDelay );
307
		}
308
		// Record root job age for jobs being run
309
		$rootTimestamp = $job->getRootJobParams()['rootJobTimestamp'];
310
		if ( $rootTimestamp ) {
311
			$age = max( 0, $popTime - wfTimestamp( TS_UNIX, $rootTimestamp ) );
312
			$stats->timing( "jobqueue.pickup_root_age.$jType", 1000 * $age );
313
		}
314
		// Track the execution time for jobs
315
		$stats->timing( "jobqueue.run.$jType", $timeMs );
316
		// Track RSS increases for jobs (in case of memory leaks)
317
		if ( $rssStart && $rssEnd ) {
0 ignored issues
show
Bug Best Practice introduced by
The expression $rssStart of type integer|null is loosely compared to true; this is ambiguous if the integer can be zero. You might want to explicitly use !== null instead.

In PHP, under loose comparison (like ==, or !=, or switch conditions), values of different types might be equal.

For integer values, zero is a special case, in particular the following results might be unexpected:

0   == false // true
0   == null  // true
123 == false // false
123 == null  // false

// It is often better to use strict comparison
0 === false // false
0 === null  // false
Loading history...
Bug Best Practice introduced by
The expression $rssEnd of type integer|null is loosely compared to true; this is ambiguous if the integer can be zero. You might want to explicitly use !== null instead.

In PHP, under loose comparison (like ==, or !=, or switch conditions), values of different types might be equal.

For integer values, zero is a special case, in particular the following results might be unexpected:

0   == false // true
0   == null  // true
123 == false // false
123 == null  // false

// It is often better to use strict comparison
0 === false // false
0 === null  // false
Loading history...
318
			$stats->updateCount( "jobqueue.rss_delta.$jType", $rssEnd - $rssStart );
319
		}
320
321
		if ( $status === false ) {
322
			$msg = $job->toString() . " t=$timeMs error={$error}";
323
			$this->logger->error( $msg );
324
			$this->debugCallback( $msg );
325
		} else {
326
			$msg = $job->toString() . " t=$timeMs good";
327
			$this->logger->info( $msg );
328
			$this->debugCallback( $msg );
329
		}
330
331
		return [ 'status' => $status, 'error' => $error, 'timeMs' => $timeMs ];
332
	}
333
334
	/**
335
	 * @return int|null Max memory RSS in kilobytes
336
	 */
337
	private function getMaxRssKb() {
338
		$info = wfGetRusage() ?: [];
339
		// see https://linux.die.net/man/2/getrusage
340
		return isset( $info['ru_maxrss'] ) ? (int)$info['ru_maxrss'] : null;
341
	}
342
343
	/**
344
	 * @param Job $job
345
	 * @return int Seconds for this runner to avoid doing more jobs of this type
346
	 * @see $wgJobBackoffThrottling
347
	 */
348
	private function getBackoffTimeToWait( Job $job ) {
349
		global $wgJobBackoffThrottling;
350
351
		if ( !isset( $wgJobBackoffThrottling[$job->getType()] ) ||
352
			$job instanceof DuplicateJob // no work was done
353
		) {
354
			return 0; // not throttled
355
		}
356
357
		$itemsPerSecond = $wgJobBackoffThrottling[$job->getType()];
358
		if ( $itemsPerSecond <= 0 ) {
359
			return 0; // not throttled
360
		}
361
362
		$seconds = 0;
363
		if ( $job->workItemCount() > 0 ) {
364
			$exactSeconds = $job->workItemCount() / $itemsPerSecond;
365
			// use randomized rounding
366
			$seconds = floor( $exactSeconds );
367
			$remainder = $exactSeconds - $seconds;
368
			$seconds += ( mt_rand() / mt_getrandmax() < $remainder ) ? 1 : 0;
369
		}
370
371
		return (int)$seconds;
372
	}
373
374
	/**
375
	 * Get the previous backoff expiries from persistent storage
376
	 * On I/O or lock acquisition failure this returns the original $backoffs.
377
	 *
378
	 * @param array $backoffs Map of (job type => UNIX timestamp)
379
	 * @param string $mode Lock wait mode - "wait" or "nowait"
380
	 * @return array Map of (job type => backoff expiry timestamp)
381
	 */
382
	private function loadBackoffs( array $backoffs, $mode = 'wait' ) {
383
		$file = wfTempDir() . '/mw-runJobs-backoffs.json';
384
		if ( is_file( $file ) ) {
385
			$noblock = ( $mode === 'nowait' ) ? LOCK_NB : 0;
386
			$handle = fopen( $file, 'rb' );
387
			if ( !flock( $handle, LOCK_SH | $noblock ) ) {
388
				fclose( $handle );
389
				return $backoffs; // don't wait on lock
390
			}
391
			$content = stream_get_contents( $handle );
392
			flock( $handle, LOCK_UN );
393
			fclose( $handle );
394
			$ctime = microtime( true );
395
			$cBackoffs = json_decode( $content, true ) ?: [];
396
			foreach ( $cBackoffs as $type => $timestamp ) {
397
				if ( $timestamp < $ctime ) {
398
					unset( $cBackoffs[$type] );
399
				}
400
			}
401
		} else {
402
			$cBackoffs = [];
403
		}
404
405
		return $cBackoffs;
406
	}
407
408
	/**
409
	 * Merge the current backoff expiries from persistent storage
410
	 *
411
	 * The $deltas map is set to an empty array on success.
412
	 * On I/O or lock acquisition failure this returns the original $backoffs.
413
	 *
414
	 * @param array $backoffs Map of (job type => UNIX timestamp)
415
	 * @param array $deltas Map of (job type => seconds)
416
	 * @param string $mode Lock wait mode - "wait" or "nowait"
417
	 * @return array The new backoffs account for $backoffs and the latest file data
418
	 */
419
	private function syncBackoffDeltas( array $backoffs, array &$deltas, $mode = 'wait' ) {
420
		if ( !$deltas ) {
421
			return $this->loadBackoffs( $backoffs, $mode );
422
		}
423
424
		$noblock = ( $mode === 'nowait' ) ? LOCK_NB : 0;
425
		$file = wfTempDir() . '/mw-runJobs-backoffs.json';
426
		$handle = fopen( $file, 'wb+' );
427
		if ( !flock( $handle, LOCK_EX | $noblock ) ) {
428
			fclose( $handle );
429
			return $backoffs; // don't wait on lock
430
		}
431
		$ctime = microtime( true );
432
		$content = stream_get_contents( $handle );
433
		$cBackoffs = json_decode( $content, true ) ?: [];
434
		foreach ( $deltas as $type => $seconds ) {
435
			$cBackoffs[$type] = isset( $cBackoffs[$type] ) && $cBackoffs[$type] >= $ctime
436
				? $cBackoffs[$type] + $seconds
437
				: $ctime + $seconds;
438
		}
439
		foreach ( $cBackoffs as $type => $timestamp ) {
440
			if ( $timestamp < $ctime ) {
441
				unset( $cBackoffs[$type] );
442
			}
443
		}
444
		ftruncate( $handle, 0 );
445
		fwrite( $handle, json_encode( $cBackoffs ) );
446
		flock( $handle, LOCK_UN );
447
		fclose( $handle );
448
449
		$deltas = [];
450
451
		return $cBackoffs;
452
	}
453
454
	/**
455
	 * Make sure that this script is not too close to the memory usage limit.
456
	 * It is better to die in between jobs than OOM right in the middle of one.
457
	 * @return bool
458
	 */
459
	private function checkMemoryOK() {
460
		static $maxBytes = null;
461
		if ( $maxBytes === null ) {
462
			$m = [];
463
			if ( preg_match( '!^(\d+)(k|m|g|)$!i', ini_get( 'memory_limit' ), $m ) ) {
464
				list( , $num, $unit ) = $m;
465
				$conv = [ 'g' => 1073741824, 'm' => 1048576, 'k' => 1024, '' => 1 ];
466
				$maxBytes = $num * $conv[strtolower( $unit )];
467
			} else {
468
				$maxBytes = 0;
469
			}
470
		}
471
		$usedBytes = memory_get_usage();
472
		if ( $maxBytes && $usedBytes >= 0.95 * $maxBytes ) {
473
			$msg = "Detected excessive memory usage ($usedBytes/$maxBytes).";
474
			$this->debugCallback( $msg );
475
			$this->logger->error( $msg );
476
477
			return false;
478
		}
479
480
		return true;
481
	}
482
483
	/**
484
	 * Log the job message
485
	 * @param string $msg The message to log
486
	 */
487
	private function debugCallback( $msg ) {
488
		if ( $this->debug ) {
489
			call_user_func_array( $this->debug, [ wfTimestamp( TS_DB ) . " $msg\n" ] );
490
		}
491
	}
492
493
	/**
494
	 * Issue a commit on all masters who are currently in a transaction and have
495
	 * made changes to the database. It also supports sometimes waiting for the
496
	 * local wiki's replica DBs to catch up. See the documentation for
497
	 * $wgJobSerialCommitThreshold for more.
498
	 *
499
	 * @param LBFactory $lbFactory
500
	 * @param Job $job
501
	 * @param string $fnameTrxOwner
502
	 * @throws DBError
503
	 */
504
	private function commitMasterChanges( LBFactory $lbFactory, Job $job, $fnameTrxOwner ) {
505
		global $wgJobSerialCommitThreshold;
506
507
		$time = false;
508
		$lb = $lbFactory->getMainLB( wfWikiID() );
509
		if ( $wgJobSerialCommitThreshold !== false && $lb->getServerCount() > 1 ) {
510
			// Generally, there is one master connection to the local DB
511
			$dbwSerial = $lb->getAnyOpenConnection( $lb->getWriterIndex() );
512
			// We need natively blocking fast locks
513
			if ( $dbwSerial && $dbwSerial->namedLocksEnqueue() ) {
514
				$time = $dbwSerial->pendingWriteQueryDuration( $dbwSerial::ESTIMATE_DB_APPLY );
515
				if ( $time < $wgJobSerialCommitThreshold ) {
516
					$dbwSerial = false;
517
				}
518
			} else {
519
				$dbwSerial = false;
520
			}
521
		} else {
522
			// There are no replica DBs or writes are all to foreign DB (we don't handle that)
523
			$dbwSerial = false;
524
		}
525
526
		if ( !$dbwSerial ) {
527
			$lbFactory->commitMasterChanges( $fnameTrxOwner );
528
			return;
529
		}
530
531
		$ms = intval( 1000 * $time );
532
		$msg = $job->toString() . " COMMIT ENQUEUED [{$ms}ms of writes]";
533
		$this->logger->info( $msg );
534
		$this->debugCallback( $msg );
535
536
		// Wait for an exclusive lock to commit
537
		if ( !$dbwSerial->lock( 'jobrunner-serial-commit', __METHOD__, 30 ) ) {
538
			// This will trigger a rollback in the main loop
539
			throw new DBError( $dbwSerial, "Timed out waiting on commit queue." );
0 ignored issues
show
It seems like $dbwSerial can also be of type boolean; however, DBError::__construct() does only seem to accept null|object<IDatabase>, maybe add an additional type check?

If a method or function can return multiple different values and unless you are sure that you only can receive a single value in this context, we recommend to add an additional type check:

/**
 * @return array|string
 */
function returnsDifferentValues($x) {
    if ($x) {
        return 'foo';
    }

    return array();
}

$x = returnsDifferentValues($y);
if (is_array($x)) {
    // $x is an array.
}

If this a common case that PHP Analyzer should handle natively, please let us know by opening an issue.

Loading history...
540
		}
541
		$unlocker = new ScopedCallback( function () use ( $dbwSerial ) {
542
			$dbwSerial->unlock( 'jobrunner-serial-commit', __METHOD__ );
543
		} );
544
545
		// Wait for the replica DBs to catch up
546
		$pos = $lb->getMasterPos();
547
		if ( $pos ) {
548
			$lb->waitForAll( $pos );
0 ignored issues
show
It seems like $pos defined by $lb->getMasterPos() on line 546 can also be of type boolean; however, LoadBalancer::waitForAll() does only seem to accept object<DBMasterPos>, maybe add an additional type check?

If a method or function can return multiple different values and unless you are sure that you only can receive a single value in this context, we recommend to add an additional type check:

/**
 * @return array|string
 */
function returnsDifferentValues($x) {
    if ($x) {
        return 'foo';
    }

    return array();
}

$x = returnsDifferentValues($y);
if (is_array($x)) {
    // $x is an array.
}

If this a common case that PHP Analyzer should handle natively, please let us know by opening an issue.

Loading history...
549
		}
550
551
		// Actually commit the DB master changes
552
		$lbFactory->commitMasterChanges( $fnameTrxOwner );
553
		ScopedCallback::consume( $unlocker );
554
	}
555
}
556