Completed
Branch master (8ad38d)
by
unknown
40:49
created

JobQueueDB::getReplicaDB()   A

Complexity

Conditions 2
Paths 2

Size

Total Lines 7
Code Lines 5

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 2
eloc 5
nc 2
nop 0
dl 0
loc 7
rs 9.4285
c 0
b 0
f 0
1
<?php
2
/**
3
 * Database-backed job queue code.
4
 *
5
 * This program is free software; you can redistribute it and/or modify
6
 * it under the terms of the GNU General Public License as published by
7
 * the Free Software Foundation; either version 2 of the License, or
8
 * (at your option) any later version.
9
 *
10
 * This program is distributed in the hope that it will be useful,
11
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13
 * GNU General Public License for more details.
14
 *
15
 * You should have received a copy of the GNU General Public License along
16
 * with this program; if not, write to the Free Software Foundation, Inc.,
17
 * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
18
 * http://www.gnu.org/copyleft/gpl.html
19
 *
20
 * @file
21
 * @author Aaron Schulz
22
 */
23
use MediaWiki\MediaWikiServices;
24
use Wikimedia\ScopedCallback;
0 ignored issues
show
Bug introduced by
This use statement conflicts with another class in this namespace, ScopedCallback.

Let’s assume that you have a directory layout like this:

.
|-- OtherDir
|   |-- Bar.php
|   `-- Foo.php
`-- SomeDir
    `-- Foo.php

and let’s assume the following content of Bar.php:

// Bar.php
namespace OtherDir;

use SomeDir\Foo; // This now conflicts the class OtherDir\Foo

If both files OtherDir/Foo.php and SomeDir/Foo.php are loaded in the same runtime, you will see a PHP error such as the following:

PHP Fatal error:  Cannot use SomeDir\Foo as Foo because the name is already in use in OtherDir/Foo.php

However, as OtherDir/Foo.php does not necessarily have to be loaded and the error is only triggered if it is loaded before OtherDir/Bar.php, this problem might go unnoticed for a while. In order to prevent this error from surfacing, you must import the namespace with a different alias:

// Bar.php
namespace OtherDir;

use SomeDir\Foo as SomeDirFoo; // There is no conflict anymore.
Loading history...
25
26
/**
27
 * Class to handle job queues stored in the DB
28
 *
29
 * @ingroup JobQueue
30
 * @since 1.21
31
 */
32
class JobQueueDB extends JobQueue {
33
	const CACHE_TTL_SHORT = 30; // integer; seconds to cache info without re-validating
34
	const MAX_AGE_PRUNE = 604800; // integer; seconds a job can live once claimed
35
	const MAX_JOB_RANDOM = 2147483647; // integer; 2^31 - 1, used for job_random
36
	const MAX_OFFSET = 255; // integer; maximum number of rows to skip
37
38
	/** @var WANObjectCache */
39
	protected $cache;
40
41
	/** @var bool|string Name of an external DB cluster. False if not set */
42
	protected $cluster = false;
43
44
	/**
45
	 * Additional parameters include:
46
	 *   - cluster : The name of an external cluster registered via LBFactory.
47
	 *               If not specified, the primary DB cluster for the wiki will be used.
48
	 *               This can be overridden with a custom cluster so that DB handles will
49
	 *               be retrieved via LBFactory::getExternalLB() and getConnection().
50
	 * @param array $params
51
	 */
52
	protected function __construct( array $params ) {
53
		parent::__construct( $params );
54
55
		$this->cluster = isset( $params['cluster'] ) ? $params['cluster'] : false;
56
		$this->cache = ObjectCache::getMainWANInstance();
57
	}
58
59
	protected function supportedOrders() {
60
		return [ 'random', 'timestamp', 'fifo' ];
61
	}
62
63
	protected function optimalOrder() {
64
		return 'random';
65
	}
66
67
	/**
68
	 * @see JobQueue::doIsEmpty()
69
	 * @return bool
70
	 */
71
	protected function doIsEmpty() {
72
		$dbr = $this->getReplicaDB();
73
		try {
74
			$found = $dbr->selectField( // unclaimed job
75
				'job', '1', [ 'job_cmd' => $this->type, 'job_token' => '' ], __METHOD__
76
			);
77
		} catch ( DBError $e ) {
78
			$this->throwDBException( $e );
79
		}
80
81
		return !$found;
82
	}
83
84
	/**
85
	 * @see JobQueue::doGetSize()
86
	 * @return int
87
	 */
88
	protected function doGetSize() {
89
		$key = $this->getCacheKey( 'size' );
90
91
		$size = $this->cache->get( $key );
92
		if ( is_int( $size ) ) {
93
			return $size;
94
		}
95
96
		try {
97
			$dbr = $this->getReplicaDB();
98
			$size = (int)$dbr->selectField( 'job', 'COUNT(*)',
99
				[ 'job_cmd' => $this->type, 'job_token' => '' ],
100
				__METHOD__
101
			);
102
		} catch ( DBError $e ) {
103
			$this->throwDBException( $e );
104
		}
105
		$this->cache->set( $key, $size, self::CACHE_TTL_SHORT );
106
107
		return $size;
108
	}
109
110
	/**
111
	 * @see JobQueue::doGetAcquiredCount()
112
	 * @return int
113
	 */
114 View Code Duplication
	protected function doGetAcquiredCount() {
115
		if ( $this->claimTTL <= 0 ) {
116
			return 0; // no acknowledgements
117
		}
118
119
		$key = $this->getCacheKey( 'acquiredcount' );
120
121
		$count = $this->cache->get( $key );
122
		if ( is_int( $count ) ) {
123
			return $count;
124
		}
125
126
		$dbr = $this->getReplicaDB();
127
		try {
128
			$count = (int)$dbr->selectField( 'job', 'COUNT(*)',
129
				[ 'job_cmd' => $this->type, "job_token != {$dbr->addQuotes( '' )}" ],
130
				__METHOD__
131
			);
132
		} catch ( DBError $e ) {
133
			$this->throwDBException( $e );
134
		}
135
		$this->cache->set( $key, $count, self::CACHE_TTL_SHORT );
136
137
		return $count;
138
	}
139
140
	/**
141
	 * @see JobQueue::doGetAbandonedCount()
142
	 * @return int
143
	 * @throws MWException
144
	 */
145 View Code Duplication
	protected function doGetAbandonedCount() {
146
		if ( $this->claimTTL <= 0 ) {
147
			return 0; // no acknowledgements
148
		}
149
150
		$key = $this->getCacheKey( 'abandonedcount' );
151
152
		$count = $this->cache->get( $key );
153
		if ( is_int( $count ) ) {
154
			return $count;
155
		}
156
157
		$dbr = $this->getReplicaDB();
158
		try {
159
			$count = (int)$dbr->selectField( 'job', 'COUNT(*)',
160
				[
161
					'job_cmd' => $this->type,
162
					"job_token != {$dbr->addQuotes( '' )}",
163
					"job_attempts >= " . $dbr->addQuotes( $this->maxTries )
164
				],
165
				__METHOD__
166
			);
167
		} catch ( DBError $e ) {
168
			$this->throwDBException( $e );
169
		}
170
171
		$this->cache->set( $key, $count, self::CACHE_TTL_SHORT );
172
173
		return $count;
174
	}
175
176
	/**
177
	 * @see JobQueue::doBatchPush()
178
	 * @param IJobSpecification[] $jobs
179
	 * @param int $flags
180
	 * @throws DBError|Exception
181
	 * @return void
182
	 */
183
	protected function doBatchPush( array $jobs, $flags ) {
184
		$dbw = $this->getMasterDB();
185
186
		$method = __METHOD__;
187
		$dbw->onTransactionIdle(
188
			function () use ( $dbw, $jobs, $flags, $method ) {
189
				$this->doBatchPushInternal( $dbw, $jobs, $flags, $method );
190
			},
191
			__METHOD__
192
		);
193
	}
194
195
	/**
196
	 * This function should *not* be called outside of JobQueueDB
197
	 *
198
	 * @param IDatabase $dbw
199
	 * @param IJobSpecification[] $jobs
200
	 * @param int $flags
201
	 * @param string $method
202
	 * @throws DBError
203
	 * @return void
204
	 */
205
	public function doBatchPushInternal( IDatabase $dbw, array $jobs, $flags, $method ) {
206
		if ( !count( $jobs ) ) {
207
			return;
208
		}
209
210
		$rowSet = []; // (sha1 => job) map for jobs that are de-duplicated
211
		$rowList = []; // list of jobs for jobs that are not de-duplicated
212
		foreach ( $jobs as $job ) {
213
			$row = $this->insertFields( $job );
214
			if ( $job->ignoreDuplicates() ) {
215
				$rowSet[$row['job_sha1']] = $row;
216
			} else {
217
				$rowList[] = $row;
218
			}
219
		}
220
221
		if ( $flags & self::QOS_ATOMIC ) {
222
			$dbw->startAtomic( $method ); // wrap all the job additions in one transaction
223
		}
224
		try {
225
			// Strip out any duplicate jobs that are already in the queue...
226
			if ( count( $rowSet ) ) {
227
				$res = $dbw->select( 'job', 'job_sha1',
228
					[
229
						// No job_type condition since it's part of the job_sha1 hash
230
						'job_sha1' => array_keys( $rowSet ),
231
						'job_token' => '' // unclaimed
232
					],
233
					$method
234
				);
235
				foreach ( $res as $row ) {
0 ignored issues
show
Bug introduced by
The expression $res of type object<ResultWrapper>|boolean is not guaranteed to be traversable. How about adding an additional type check?

There are different options of fixing this problem.

  1. If you want to be on the safe side, you can add an additional type-check:

    $collection = json_decode($data, true);
    if ( ! is_array($collection)) {
        throw new \RuntimeException('$collection must be an array.');
    }
    
    foreach ($collection as $item) { /** ... */ }
    
  2. If you are sure that the expression is traversable, you might want to add a doc comment cast to improve IDE auto-completion and static analysis:

    /** @var array $collection */
    $collection = json_decode($data, true);
    
    foreach ($collection as $item) { /** .. */ }
    
  3. Mark the issue as a false-positive: Just hover the remove button, in the top-right corner of this issue for more options.

Loading history...
236
					wfDebug( "Job with hash '{$row->job_sha1}' is a duplicate.\n" );
237
					unset( $rowSet[$row->job_sha1] ); // already enqueued
238
				}
239
			}
240
			// Build the full list of job rows to insert
241
			$rows = array_merge( $rowList, array_values( $rowSet ) );
242
			// Insert the job rows in chunks to avoid replica DB lag...
243
			foreach ( array_chunk( $rows, 50 ) as $rowBatch ) {
244
				$dbw->insert( 'job', $rowBatch, $method );
245
			}
246
			JobQueue::incrStats( 'inserts', $this->type, count( $rows ) );
247
			JobQueue::incrStats( 'dupe_inserts', $this->type,
248
				count( $rowSet ) + count( $rowList ) - count( $rows )
249
			);
250
		} catch ( DBError $e ) {
251
			$this->throwDBException( $e );
252
		}
253
		if ( $flags & self::QOS_ATOMIC ) {
254
			$dbw->endAtomic( $method );
255
		}
256
257
		return;
258
	}
259
260
	/**
261
	 * @see JobQueue::doPop()
262
	 * @return Job|bool
263
	 */
264
	protected function doPop() {
265
		$dbw = $this->getMasterDB();
266
		try {
267
			$autoTrx = $dbw->getFlag( DBO_TRX ); // get current setting
268
			$dbw->clearFlag( DBO_TRX ); // make each query its own transaction
269
			$scopedReset = new ScopedCallback( function () use ( $dbw, $autoTrx ) {
0 ignored issues
show
Unused Code introduced by
$scopedReset is not used, you could remove the assignment.

This check looks for variable assignements that are either overwritten by other assignments or where the variable is not used subsequently.

$myVar = 'Value';
$higher = false;

if (rand(1, 6) > 3) {
    $higher = true;
} else {
    $higher = false;
}

Both the $myVar assignment in line 1 and the $higher assignment in line 2 are dead. The first because $myVar is never used and the second because $higher is always overwritten for every possible time line.

Loading history...
270
				$dbw->setFlag( $autoTrx ? DBO_TRX : 0 ); // restore old setting
271
			} );
272
273
			$uuid = wfRandomString( 32 ); // pop attempt
274
			$job = false; // job popped off
275
			do { // retry when our row is invalid or deleted as a duplicate
0 ignored issues
show
Unused Code introduced by
// job popped off do { ... break; } while (true); does not seem to be reachable.

This check looks for unreachable code. It uses sophisticated control flow analysis techniques to find statements which will never be executed.

Unreachable code is most often the result of return, die or exit statements that have been added for debug purposes.

function fx() {
    try {
        doSomething();
        return true;
    }
    catch (\Exception $e) {
        return false;
    }

    return false;
}

In the above example, the last return false will never be executed, because a return statement has already been met in every possible execution path.

Loading history...
276
				// Try to reserve a row in the DB...
277
				if ( in_array( $this->order, [ 'fifo', 'timestamp' ] ) ) {
278
					$row = $this->claimOldest( $uuid );
279
				} else { // random first
280
					$rand = mt_rand( 0, self::MAX_JOB_RANDOM ); // encourage concurrent UPDATEs
281
					$gte = (bool)mt_rand( 0, 1 ); // find rows with rand before/after $rand
282
					$row = $this->claimRandom( $uuid, $rand, $gte );
283
				}
284
				// Check if we found a row to reserve...
285
				if ( !$row ) {
286
					break; // nothing to do
287
				}
288
				JobQueue::incrStats( 'pops', $this->type );
289
				// Get the job object from the row...
290
				$title = Title::makeTitle( $row->job_namespace, $row->job_title );
291
				$job = Job::factory( $row->job_cmd, $title,
292
					self::extractBlob( $row->job_params ), $row->job_id );
293
				$job->metadata['id'] = $row->job_id;
294
				$job->metadata['timestamp'] = $row->job_timestamp;
295
				break; // done
296
			} while ( true );
297
298
			if ( !$job || mt_rand( 0, 9 ) == 0 ) {
299
				// Handled jobs that need to be recycled/deleted;
300
				// any recycled jobs will be picked up next attempt
301
				$this->recycleAndDeleteStaleJobs();
302
			}
303
		} catch ( DBError $e ) {
304
			$this->throwDBException( $e );
305
		}
306
307
		return $job;
0 ignored issues
show
Bug introduced by
The variable $job does not seem to be defined for all execution paths leading up to this point.

If you define a variable conditionally, it can happen that it is not defined for all execution paths.

Let’s take a look at an example:

function myFunction($a) {
    switch ($a) {
        case 'foo':
            $x = 1;
            break;

        case 'bar':
            $x = 2;
            break;
    }

    // $x is potentially undefined here.
    echo $x;
}

In the above example, the variable $x is defined if you pass “foo” or “bar” as argument for $a. However, since the switch statement has no default case statement, if you pass any other value, the variable $x would be undefined.

Available Fixes

  1. Check for existence of the variable explicitly:

    function myFunction($a) {
        switch ($a) {
            case 'foo':
                $x = 1;
                break;
    
            case 'bar':
                $x = 2;
                break;
        }
    
        if (isset($x)) { // Make sure it's always set.
            echo $x;
        }
    }
    
  2. Define a default value for the variable:

    function myFunction($a) {
        $x = ''; // Set a default which gets overridden for certain paths.
        switch ($a) {
            case 'foo':
                $x = 1;
                break;
    
            case 'bar':
                $x = 2;
                break;
        }
    
        echo $x;
    }
    
  3. Add a value for the missing path:

    function myFunction($a) {
        switch ($a) {
            case 'foo':
                $x = 1;
                break;
    
            case 'bar':
                $x = 2;
                break;
    
            // We add support for the missing case.
            default:
                $x = '';
                break;
        }
    
        echo $x;
    }
    
Loading history...
308
	}
309
310
	/**
311
	 * Reserve a row with a single UPDATE without holding row locks over RTTs...
312
	 *
313
	 * @param string $uuid 32 char hex string
314
	 * @param int $rand Random unsigned integer (31 bits)
315
	 * @param bool $gte Search for job_random >= $random (otherwise job_random <= $random)
316
	 * @return stdClass|bool Row|false
317
	 */
318
	protected function claimRandom( $uuid, $rand, $gte ) {
319
		$dbw = $this->getMasterDB();
320
		// Check cache to see if the queue has <= OFFSET items
321
		$tinyQueue = $this->cache->get( $this->getCacheKey( 'small' ) );
322
323
		$row = false; // the row acquired
0 ignored issues
show
Unused Code introduced by
$row is not used, you could remove the assignment.

This check looks for variable assignements that are either overwritten by other assignments or where the variable is not used subsequently.

$myVar = 'Value';
$higher = false;

if (rand(1, 6) > 3) {
    $higher = true;
} else {
    $higher = false;
}

Both the $myVar assignment in line 1 and the $higher assignment in line 2 are dead. The first because $myVar is never used and the second because $higher is always overwritten for every possible time line.

Loading history...
324
		$invertedDirection = false; // whether one job_random direction was already scanned
325
		// This uses a replication safe method for acquiring jobs. One could use UPDATE+LIMIT
326
		// instead, but that either uses ORDER BY (in which case it deadlocks in MySQL) or is
327
		// not replication safe. Due to http://bugs.mysql.com/bug.php?id=6980, subqueries cannot
328
		// be used here with MySQL.
329
		do {
330
			if ( $tinyQueue ) { // queue has <= MAX_OFFSET rows
331
				// For small queues, using OFFSET will overshoot and return no rows more often.
332
				// Instead, this uses job_random to pick a row (possibly checking both directions).
333
				$ineq = $gte ? '>=' : '<=';
334
				$dir = $gte ? 'ASC' : 'DESC';
335
				$row = $dbw->selectRow( 'job', self::selectFields(), // find a random job
336
					[
337
						'job_cmd' => $this->type,
338
						'job_token' => '', // unclaimed
339
						"job_random {$ineq} {$dbw->addQuotes( $rand )}" ],
340
					__METHOD__,
341
					[ 'ORDER BY' => "job_random {$dir}" ]
342
				);
343
				if ( !$row && !$invertedDirection ) {
344
					$gte = !$gte;
345
					$invertedDirection = true;
346
					continue; // try the other direction
347
				}
348
			} else { // table *may* have >= MAX_OFFSET rows
349
				// Bug 42614: "ORDER BY job_random" with a job_random inequality causes high CPU
350
				// in MySQL if there are many rows for some reason. This uses a small OFFSET
351
				// instead of job_random for reducing excess claim retries.
352
				$row = $dbw->selectRow( 'job', self::selectFields(), // find a random job
353
					[
354
						'job_cmd' => $this->type,
355
						'job_token' => '', // unclaimed
356
					],
357
					__METHOD__,
358
					[ 'OFFSET' => mt_rand( 0, self::MAX_OFFSET ) ]
359
				);
360
				if ( !$row ) {
361
					$tinyQueue = true; // we know the queue must have <= MAX_OFFSET rows
362
					$this->cache->set( $this->getCacheKey( 'small' ), 1, 30 );
363
					continue; // use job_random
364
				}
365
			}
366
367
			if ( $row ) { // claim the job
368
				$dbw->update( 'job', // update by PK
369
					[
370
						'job_token' => $uuid,
371
						'job_token_timestamp' => $dbw->timestamp(),
372
						'job_attempts = job_attempts+1' ],
373
					[ 'job_cmd' => $this->type, 'job_id' => $row->job_id, 'job_token' => '' ],
374
					__METHOD__
375
				);
376
				// This might get raced out by another runner when claiming the previously
377
				// selected row. The use of job_random should minimize this problem, however.
378
				if ( !$dbw->affectedRows() ) {
379
					$row = false; // raced out
380
				}
381
			} else {
382
				break; // nothing to do
383
			}
384
		} while ( !$row );
385
386
		return $row;
387
	}
388
389
	/**
390
	 * Reserve a row with a single UPDATE without holding row locks over RTTs...
391
	 *
392
	 * @param string $uuid 32 char hex string
393
	 * @return stdClass|bool Row|false
394
	 */
395
	protected function claimOldest( $uuid ) {
396
		$dbw = $this->getMasterDB();
397
398
		$row = false; // the row acquired
399
		do {
400
			if ( $dbw->getType() === 'mysql' ) {
401
				// Per http://bugs.mysql.com/bug.php?id=6980, we can't use subqueries on the
402
				// same table being changed in an UPDATE query in MySQL (gives Error: 1093).
403
				// Oracle and Postgre have no such limitation. However, MySQL offers an
404
				// alternative here by supporting ORDER BY + LIMIT for UPDATE queries.
405
				$dbw->query( "UPDATE {$dbw->tableName( 'job' )} " .
0 ignored issues
show
Documentation Bug introduced by
The method tableName does not exist on object<DBConnRef>? Since you implemented __call, maybe consider adding a @method annotation.

If you implement __call and you know which methods are available, you can improve IDE auto-completion and static analysis by adding a @method annotation to the class.

This is often the case, when __call is implemented by a parent class and only the child class knows which methods exist:

class ParentClass {
    private $data = array();

    public function __call($method, array $args) {
        if (0 === strpos($method, 'get')) {
            return $this->data[strtolower(substr($method, 3))];
        }

        throw new \LogicException(sprintf('Unsupported method: %s', $method));
    }
}

/**
 * If this class knows which fields exist, you can specify the methods here:
 *
 * @method string getName()
 */
class SomeClass extends ParentClass { }
Loading history...
406
					"SET " .
407
						"job_token = {$dbw->addQuotes( $uuid ) }, " .
408
						"job_token_timestamp = {$dbw->addQuotes( $dbw->timestamp() )}, " .
409
						"job_attempts = job_attempts+1 " .
410
					"WHERE ( " .
411
						"job_cmd = {$dbw->addQuotes( $this->type )} " .
412
						"AND job_token = {$dbw->addQuotes( '' )} " .
413
					") ORDER BY job_id ASC LIMIT 1",
414
					__METHOD__
415
				);
416
			} else {
417
				// Use a subquery to find the job, within an UPDATE to claim it.
418
				// This uses as much of the DB wrapper functions as possible.
419
				$dbw->update( 'job',
420
					[
421
						'job_token' => $uuid,
422
						'job_token_timestamp' => $dbw->timestamp(),
423
						'job_attempts = job_attempts+1' ],
424
					[ 'job_id = (' .
425
						$dbw->selectSQLText( 'job', 'job_id',
426
							[ 'job_cmd' => $this->type, 'job_token' => '' ],
427
							__METHOD__,
428
							[ 'ORDER BY' => 'job_id ASC', 'LIMIT' => 1 ] ) .
429
						')'
430
					],
431
					__METHOD__
432
				);
433
			}
434
			// Fetch any row that we just reserved...
435
			if ( $dbw->affectedRows() ) {
436
				$row = $dbw->selectRow( 'job', self::selectFields(),
437
					[ 'job_cmd' => $this->type, 'job_token' => $uuid ], __METHOD__
438
				);
439
				if ( !$row ) { // raced out by duplicate job removal
440
					wfDebug( "Row deleted as duplicate by another process.\n" );
441
				}
442
			} else {
443
				break; // nothing to do
444
			}
445
		} while ( !$row );
446
447
		return $row;
448
	}
449
450
	/**
451
	 * @see JobQueue::doAck()
452
	 * @param Job $job
453
	 * @throws MWException
454
	 */
455
	protected function doAck( Job $job ) {
456
		if ( !isset( $job->metadata['id'] ) ) {
457
			throw new MWException( "Job of type '{$job->getType()}' has no ID." );
458
		}
459
460
		$dbw = $this->getMasterDB();
461
		try {
462
			$autoTrx = $dbw->getFlag( DBO_TRX ); // get current setting
463
			$dbw->clearFlag( DBO_TRX ); // make each query its own transaction
464
			$scopedReset = new ScopedCallback( function () use ( $dbw, $autoTrx ) {
0 ignored issues
show
Unused Code introduced by
$scopedReset is not used, you could remove the assignment.

This check looks for variable assignements that are either overwritten by other assignments or where the variable is not used subsequently.

$myVar = 'Value';
$higher = false;

if (rand(1, 6) > 3) {
    $higher = true;
} else {
    $higher = false;
}

Both the $myVar assignment in line 1 and the $higher assignment in line 2 are dead. The first because $myVar is never used and the second because $higher is always overwritten for every possible time line.

Loading history...
465
				$dbw->setFlag( $autoTrx ? DBO_TRX : 0 ); // restore old setting
466
			} );
467
468
			// Delete a row with a single DELETE without holding row locks over RTTs...
469
			$dbw->delete( 'job',
470
				[ 'job_cmd' => $this->type, 'job_id' => $job->metadata['id'] ], __METHOD__ );
471
472
			JobQueue::incrStats( 'acks', $this->type );
473
		} catch ( DBError $e ) {
474
			$this->throwDBException( $e );
475
		}
476
	}
477
478
	/**
479
	 * @see JobQueue::doDeduplicateRootJob()
480
	 * @param IJobSpecification $job
481
	 * @throws MWException
482
	 * @return bool
483
	 */
484
	protected function doDeduplicateRootJob( IJobSpecification $job ) {
485
		$params = $job->getParams();
486
		if ( !isset( $params['rootJobSignature'] ) ) {
487
			throw new MWException( "Cannot register root job; missing 'rootJobSignature'." );
488
		} elseif ( !isset( $params['rootJobTimestamp'] ) ) {
489
			throw new MWException( "Cannot register root job; missing 'rootJobTimestamp'." );
490
		}
491
		$key = $this->getRootJobCacheKey( $params['rootJobSignature'] );
492
		// Callers should call batchInsert() and then this function so that if the insert
493
		// fails, the de-duplication registration will be aborted. Since the insert is
494
		// deferred till "transaction idle", do the same here, so that the ordering is
495
		// maintained. Having only the de-duplication registration succeed would cause
496
		// jobs to become no-ops without any actual jobs that made them redundant.
497
		$dbw = $this->getMasterDB();
498
		$cache = $this->dupCache;
499
		$dbw->onTransactionIdle(
500
			function () use ( $cache, $params, $key, $dbw ) {
501
				$timestamp = $cache->get( $key ); // current last timestamp of this job
502
				if ( $timestamp && $timestamp >= $params['rootJobTimestamp'] ) {
503
					return true; // a newer version of this root job was enqueued
504
				}
505
506
				// Update the timestamp of the last root job started at the location...
507
				return $cache->set( $key, $params['rootJobTimestamp'], JobQueueDB::ROOTJOB_TTL );
508
			},
509
			__METHOD__
510
		);
511
512
		return true;
513
	}
514
515
	/**
516
	 * @see JobQueue::doDelete()
517
	 * @return bool
518
	 */
519
	protected function doDelete() {
520
		$dbw = $this->getMasterDB();
521
		try {
522
			$dbw->delete( 'job', [ 'job_cmd' => $this->type ] );
523
		} catch ( DBError $e ) {
524
			$this->throwDBException( $e );
525
		}
526
527
		return true;
528
	}
529
530
	/**
531
	 * @see JobQueue::doWaitForBackups()
532
	 * @return void
533
	 */
534
	protected function doWaitForBackups() {
535
		$lbFactory = MediaWikiServices::getInstance()->getDBLoadBalancerFactory();
536
		$lbFactory->waitForReplication( [ 'wiki' => $this->wiki, 'cluster' => $this->cluster ] );
537
	}
538
539
	/**
540
	 * @return void
541
	 */
542
	protected function doFlushCaches() {
543
		foreach ( [ 'size', 'acquiredcount' ] as $type ) {
544
			$this->cache->delete( $this->getCacheKey( $type ) );
545
		}
546
	}
547
548
	/**
549
	 * @see JobQueue::getAllQueuedJobs()
550
	 * @return Iterator
551
	 */
552
	public function getAllQueuedJobs() {
553
		return $this->getJobIterator( [ 'job_cmd' => $this->getType(), 'job_token' => '' ] );
554
	}
555
556
	/**
557
	 * @see JobQueue::getAllAcquiredJobs()
558
	 * @return Iterator
559
	 */
560
	public function getAllAcquiredJobs() {
561
		return $this->getJobIterator( [ 'job_cmd' => $this->getType(), "job_token > ''" ] );
562
	}
563
564
	/**
565
	 * @param array $conds Query conditions
566
	 * @return Iterator
567
	 */
568
	protected function getJobIterator( array $conds ) {
569
		$dbr = $this->getReplicaDB();
570
		try {
571
			return new MappedIterator(
572
				$dbr->select( 'job', self::selectFields(), $conds ),
0 ignored issues
show
Bug introduced by
It seems like $dbr->select('job', self::selectFields(), $conds) targeting DBConnRef::select() can also be of type boolean; however, MappedIterator::__construct() does only seem to accept object<Iterator>|array, maybe add an additional type check?

This check looks at variables that are passed out again to other methods.

If the outgoing method call has stricter type requirements than the method itself, an issue is raised.

An additional type check may prevent trouble.

Loading history...
573
				function ( $row ) {
574
					$job = Job::factory(
575
						$row->job_cmd,
576
						Title::makeTitle( $row->job_namespace, $row->job_title ),
577
						strlen( $row->job_params ) ? unserialize( $row->job_params ) : []
578
					);
579
					$job->metadata['id'] = $row->job_id;
580
					$job->metadata['timestamp'] = $row->job_timestamp;
581
582
					return $job;
583
				}
584
			);
585
		} catch ( DBError $e ) {
586
			$this->throwDBException( $e );
587
		}
588
	}
589
590
	public function getCoalesceLocationInternal() {
591
		return $this->cluster
592
			? "DBCluster:{$this->cluster}:{$this->wiki}"
593
			: "LBFactory:{$this->wiki}";
594
	}
595
596
	protected function doGetSiblingQueuesWithJobs( array $types ) {
597
		$dbr = $this->getReplicaDB();
598
		// @note: this does not check whether the jobs are claimed or not.
599
		// This is useful so JobQueueGroup::pop() also sees queues that only
600
		// have stale jobs. This lets recycleAndDeleteStaleJobs() re-enqueue
601
		// failed jobs so that they can be popped again for that edge case.
602
		$res = $dbr->select( 'job', 'DISTINCT job_cmd',
603
			[ 'job_cmd' => $types ], __METHOD__ );
604
605
		$types = [];
606
		foreach ( $res as $row ) {
0 ignored issues
show
Bug introduced by
The expression $res of type object<ResultWrapper>|boolean is not guaranteed to be traversable. How about adding an additional type check?

There are different options of fixing this problem.

  1. If you want to be on the safe side, you can add an additional type-check:

    $collection = json_decode($data, true);
    if ( ! is_array($collection)) {
        throw new \RuntimeException('$collection must be an array.');
    }
    
    foreach ($collection as $item) { /** ... */ }
    
  2. If you are sure that the expression is traversable, you might want to add a doc comment cast to improve IDE auto-completion and static analysis:

    /** @var array $collection */
    $collection = json_decode($data, true);
    
    foreach ($collection as $item) { /** .. */ }
    
  3. Mark the issue as a false-positive: Just hover the remove button, in the top-right corner of this issue for more options.

Loading history...
607
			$types[] = $row->job_cmd;
608
		}
609
610
		return $types;
611
	}
612
613
	protected function doGetSiblingQueueSizes( array $types ) {
614
		$dbr = $this->getReplicaDB();
615
		$res = $dbr->select( 'job', [ 'job_cmd', 'COUNT(*) AS count' ],
616
			[ 'job_cmd' => $types ], __METHOD__, [ 'GROUP BY' => 'job_cmd' ] );
617
618
		$sizes = [];
619
		foreach ( $res as $row ) {
0 ignored issues
show
Bug introduced by
The expression $res of type object<ResultWrapper>|boolean is not guaranteed to be traversable. How about adding an additional type check?

There are different options of fixing this problem.

  1. If you want to be on the safe side, you can add an additional type-check:

    $collection = json_decode($data, true);
    if ( ! is_array($collection)) {
        throw new \RuntimeException('$collection must be an array.');
    }
    
    foreach ($collection as $item) { /** ... */ }
    
  2. If you are sure that the expression is traversable, you might want to add a doc comment cast to improve IDE auto-completion and static analysis:

    /** @var array $collection */
    $collection = json_decode($data, true);
    
    foreach ($collection as $item) { /** .. */ }
    
  3. Mark the issue as a false-positive: Just hover the remove button, in the top-right corner of this issue for more options.

Loading history...
620
			$sizes[$row->job_cmd] = (int)$row->count;
621
		}
622
623
		return $sizes;
624
	}
625
626
	/**
627
	 * Recycle or destroy any jobs that have been claimed for too long
628
	 *
629
	 * @return int Number of jobs recycled/deleted
630
	 */
631
	public function recycleAndDeleteStaleJobs() {
632
		$now = time();
633
		$count = 0; // affected rows
634
		$dbw = $this->getMasterDB();
635
636
		try {
637
			if ( !$dbw->lock( "jobqueue-recycle-{$this->type}", __METHOD__, 1 ) ) {
638
				return $count; // already in progress
639
			}
640
641
			// Remove claims on jobs acquired for too long if enabled...
642
			if ( $this->claimTTL > 0 ) {
643
				$claimCutoff = $dbw->timestamp( $now - $this->claimTTL );
644
				// Get the IDs of jobs that have be claimed but not finished after too long.
645
				// These jobs can be recycled into the queue by expiring the claim. Selecting
646
				// the IDs first means that the UPDATE can be done by primary key (less deadlocks).
647
				$res = $dbw->select( 'job', 'job_id',
648
					[
649
						'job_cmd' => $this->type,
650
						"job_token != {$dbw->addQuotes( '' )}", // was acquired
651
						"job_token_timestamp < {$dbw->addQuotes( $claimCutoff )}", // stale
652
						"job_attempts < {$dbw->addQuotes( $this->maxTries )}" ], // retries left
653
					__METHOD__
654
				);
655
				$ids = array_map(
656
					function ( $o ) {
657
						return $o->job_id;
658
					}, iterator_to_array( $res )
659
				);
660
				if ( count( $ids ) ) {
661
					// Reset job_token for these jobs so that other runners will pick them up.
662
					// Set the timestamp to the current time, as it is useful to now that the job
663
					// was already tried before (the timestamp becomes the "released" time).
664
					$dbw->update( 'job',
665
						[
666
							'job_token' => '',
667
							'job_token_timestamp' => $dbw->timestamp( $now ) ], // time of release
668
						[
669
							'job_id' => $ids ],
670
						__METHOD__
671
					);
672
					$affected = $dbw->affectedRows();
673
					$count += $affected;
674
					JobQueue::incrStats( 'recycles', $this->type, $affected );
675
					$this->aggr->notifyQueueNonEmpty( $this->wiki, $this->type );
676
				}
677
			}
678
679
			// Just destroy any stale jobs...
680
			$pruneCutoff = $dbw->timestamp( $now - self::MAX_AGE_PRUNE );
681
			$conds = [
682
				'job_cmd' => $this->type,
683
				"job_token != {$dbw->addQuotes( '' )}", // was acquired
684
				"job_token_timestamp < {$dbw->addQuotes( $pruneCutoff )}" // stale
685
			];
686
			if ( $this->claimTTL > 0 ) { // only prune jobs attempted too many times...
687
				$conds[] = "job_attempts >= {$dbw->addQuotes( $this->maxTries )}";
688
			}
689
			// Get the IDs of jobs that are considered stale and should be removed. Selecting
690
			// the IDs first means that the UPDATE can be done by primary key (less deadlocks).
691
			$res = $dbw->select( 'job', 'job_id', $conds, __METHOD__ );
692
			$ids = array_map(
693
				function ( $o ) {
694
					return $o->job_id;
695
				}, iterator_to_array( $res )
696
			);
697
			if ( count( $ids ) ) {
698
				$dbw->delete( 'job', [ 'job_id' => $ids ], __METHOD__ );
699
				$affected = $dbw->affectedRows();
700
				$count += $affected;
701
				JobQueue::incrStats( 'abandons', $this->type, $affected );
702
			}
703
704
			$dbw->unlock( "jobqueue-recycle-{$this->type}", __METHOD__ );
705
		} catch ( DBError $e ) {
706
			$this->throwDBException( $e );
707
		}
708
709
		return $count;
710
	}
711
712
	/**
713
	 * @param IJobSpecification $job
714
	 * @return array
715
	 */
716 View Code Duplication
	protected function insertFields( IJobSpecification $job ) {
717
		$dbw = $this->getMasterDB();
718
719
		return [
720
			// Fields that describe the nature of the job
721
			'job_cmd' => $job->getType(),
722
			'job_namespace' => $job->getTitle()->getNamespace(),
723
			'job_title' => $job->getTitle()->getDBkey(),
724
			'job_params' => self::makeBlob( $job->getParams() ),
725
			// Additional job metadata
726
			'job_id' => $dbw->nextSequenceValue( 'job_job_id_seq' ),
727
			'job_timestamp' => $dbw->timestamp(),
728
			'job_sha1' => Wikimedia\base_convert(
729
				sha1( serialize( $job->getDeduplicationInfo() ) ),
730
				16, 36, 31
731
			),
732
			'job_random' => mt_rand( 0, self::MAX_JOB_RANDOM )
733
		];
734
	}
735
736
	/**
737
	 * @throws JobQueueConnectionError
738
	 * @return DBConnRef
739
	 */
740
	protected function getReplicaDB() {
741
		try {
742
			return $this->getDB( DB_REPLICA );
743
		} catch ( DBConnectionError $e ) {
744
			throw new JobQueueConnectionError( "DBConnectionError:" . $e->getMessage() );
745
		}
746
	}
747
748
	/**
749
	 * @throws JobQueueConnectionError
750
	 * @return DBConnRef
751
	 */
752
	protected function getMasterDB() {
753
		try {
754
			return $this->getDB( DB_MASTER );
755
		} catch ( DBConnectionError $e ) {
756
			throw new JobQueueConnectionError( "DBConnectionError:" . $e->getMessage() );
757
		}
758
	}
759
760
	/**
761
	 * @param int $index (DB_REPLICA/DB_MASTER)
762
	 * @return DBConnRef
763
	 */
764
	protected function getDB( $index ) {
765
		$lbFactory = MediaWikiServices::getInstance()->getDBLoadBalancerFactory();
766
		$lb = ( $this->cluster !== false )
767
			? $lbFactory->getExternalLB( $this->cluster, $this->wiki )
0 ignored issues
show
Bug introduced by
It seems like $this->cluster can also be of type boolean; however, LBFactory::getExternalLB() does only seem to accept string, maybe add an additional type check?

If a method or function can return multiple different values and unless you are sure that you only can receive a single value in this context, we recommend to add an additional type check:

/**
 * @return array|string
 */
function returnsDifferentValues($x) {
    if ($x) {
        return 'foo';
    }

    return array();
}

$x = returnsDifferentValues($y);
if (is_array($x)) {
    // $x is an array.
}

If this a common case that PHP Analyzer should handle natively, please let us know by opening an issue.

Loading history...
768
			: $lbFactory->getMainLB( $this->wiki );
769
770
		return $lb->getConnectionRef( $index, [], $this->wiki );
771
	}
772
773
	/**
774
	 * @param string $property
775
	 * @return string
776
	 */
777
	private function getCacheKey( $property ) {
778
		list( $db, $prefix ) = wfSplitWikiID( $this->wiki );
779
		$cluster = is_string( $this->cluster ) ? $this->cluster : 'main';
780
781
		return wfForeignMemcKey( $db, $prefix, 'jobqueue', $cluster, $this->type, $property );
782
	}
783
784
	/**
785
	 * @param array|bool $params
786
	 * @return string
787
	 */
788
	protected static function makeBlob( $params ) {
789
		if ( $params !== false ) {
790
			return serialize( $params );
791
		} else {
792
			return '';
793
		}
794
	}
795
796
	/**
797
	 * @param string $blob
798
	 * @return bool|mixed
799
	 */
800
	protected static function extractBlob( $blob ) {
801
		if ( (string)$blob !== '' ) {
802
			return unserialize( $blob );
803
		} else {
804
			return false;
805
		}
806
	}
807
808
	/**
809
	 * @param DBError $e
810
	 * @throws JobQueueError
811
	 */
812
	protected function throwDBException( DBError $e ) {
813
		throw new JobQueueError( get_class( $e ) . ": " . $e->getMessage() );
814
	}
815
816
	/**
817
	 * Return the list of job fields that should be selected.
818
	 * @since 1.23
819
	 * @return array
820
	 */
821
	public static function selectFields() {
822
		return [
823
			'job_id',
824
			'job_cmd',
825
			'job_namespace',
826
			'job_title',
827
			'job_timestamp',
828
			'job_params',
829
			'job_random',
830
			'job_attempts',
831
			'job_token',
832
			'job_token_timestamp',
833
			'job_sha1',
834
		];
835
	}
836
}
837