Completed
Branch master (d7c4e6)
by
unknown
29:20
created

JobQueueDB::doBatchPushInternal()   C

Complexity

Conditions 10
Paths 301

Size

Total Lines 54
Code Lines 33

Duplication

Lines 0
Ratio 0 %

Importance

Changes 1
Bugs 0 Features 0
Metric Value
cc 10
eloc 33
c 1
b 0
f 0
nc 301
nop 4
dl 0
loc 54
rs 5.1428

How to fix   Long Method    Complexity   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

1
<?php
2
/**
3
 * Database-backed job queue code.
4
 *
5
 * This program is free software; you can redistribute it and/or modify
6
 * it under the terms of the GNU General Public License as published by
7
 * the Free Software Foundation; either version 2 of the License, or
8
 * (at your option) any later version.
9
 *
10
 * This program is distributed in the hope that it will be useful,
11
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13
 * GNU General Public License for more details.
14
 *
15
 * You should have received a copy of the GNU General Public License along
16
 * with this program; if not, write to the Free Software Foundation, Inc.,
17
 * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
18
 * http://www.gnu.org/copyleft/gpl.html
19
 *
20
 * @file
21
 * @author Aaron Schulz
22
 */
23
24
/**
25
 * Class to handle job queues stored in the DB
26
 *
27
 * @ingroup JobQueue
28
 * @since 1.21
29
 */
30
class JobQueueDB extends JobQueue {
31
	const CACHE_TTL_SHORT = 30; // integer; seconds to cache info without re-validating
32
	const MAX_AGE_PRUNE = 604800; // integer; seconds a job can live once claimed
33
	const MAX_JOB_RANDOM = 2147483647; // integer; 2^31 - 1, used for job_random
34
	const MAX_OFFSET = 255; // integer; maximum number of rows to skip
35
36
	/** @var WANObjectCache */
37
	protected $cache;
38
39
	/** @var bool|string Name of an external DB cluster. False if not set */
40
	protected $cluster = false;
41
42
	/**
43
	 * Additional parameters include:
44
	 *   - cluster : The name of an external cluster registered via LBFactory.
45
	 *               If not specified, the primary DB cluster for the wiki will be used.
46
	 *               This can be overridden with a custom cluster so that DB handles will
47
	 *               be retrieved via LBFactory::getExternalLB() and getConnection().
48
	 * @param array $params
49
	 */
50
	protected function __construct( array $params ) {
51
		parent::__construct( $params );
52
53
		$this->cluster = isset( $params['cluster'] ) ? $params['cluster'] : false;
54
		$this->cache = ObjectCache::getMainWANInstance();
55
	}
56
57
	protected function supportedOrders() {
58
		return [ 'random', 'timestamp', 'fifo' ];
59
	}
60
61
	protected function optimalOrder() {
62
		return 'random';
63
	}
64
65
	/**
66
	 * @see JobQueue::doIsEmpty()
67
	 * @return bool
68
	 */
69
	protected function doIsEmpty() {
70
		$dbr = $this->getSlaveDB();
71
		try {
72
			$found = $dbr->selectField( // unclaimed job
73
				'job', '1', [ 'job_cmd' => $this->type, 'job_token' => '' ], __METHOD__
74
			);
75
		} catch ( DBError $e ) {
76
			$this->throwDBException( $e );
77
		}
78
79
		return !$found;
80
	}
81
82
	/**
83
	 * @see JobQueue::doGetSize()
84
	 * @return int
85
	 */
86
	protected function doGetSize() {
87
		$key = $this->getCacheKey( 'size' );
88
89
		$size = $this->cache->get( $key );
90
		if ( is_int( $size ) ) {
91
			return $size;
92
		}
93
94
		try {
95
			$dbr = $this->getSlaveDB();
96
			$size = (int)$dbr->selectField( 'job', 'COUNT(*)',
97
				[ 'job_cmd' => $this->type, 'job_token' => '' ],
98
				__METHOD__
99
			);
100
		} catch ( DBError $e ) {
101
			$this->throwDBException( $e );
102
		}
103
		$this->cache->set( $key, $size, self::CACHE_TTL_SHORT );
104
105
		return $size;
106
	}
107
108
	/**
109
	 * @see JobQueue::doGetAcquiredCount()
110
	 * @return int
111
	 */
112 View Code Duplication
	protected function doGetAcquiredCount() {
113
		if ( $this->claimTTL <= 0 ) {
114
			return 0; // no acknowledgements
115
		}
116
117
		$key = $this->getCacheKey( 'acquiredcount' );
118
119
		$count = $this->cache->get( $key );
120
		if ( is_int( $count ) ) {
121
			return $count;
122
		}
123
124
		$dbr = $this->getSlaveDB();
125
		try {
126
			$count = (int)$dbr->selectField( 'job', 'COUNT(*)',
127
				[ 'job_cmd' => $this->type, "job_token != {$dbr->addQuotes( '' )}" ],
128
				__METHOD__
129
			);
130
		} catch ( DBError $e ) {
131
			$this->throwDBException( $e );
132
		}
133
		$this->cache->set( $key, $count, self::CACHE_TTL_SHORT );
134
135
		return $count;
136
	}
137
138
	/**
139
	 * @see JobQueue::doGetAbandonedCount()
140
	 * @return int
141
	 * @throws MWException
142
	 */
143 View Code Duplication
	protected function doGetAbandonedCount() {
144
		if ( $this->claimTTL <= 0 ) {
145
			return 0; // no acknowledgements
146
		}
147
148
		$key = $this->getCacheKey( 'abandonedcount' );
149
150
		$count = $this->cache->get( $key );
151
		if ( is_int( $count ) ) {
152
			return $count;
153
		}
154
155
		$dbr = $this->getSlaveDB();
156
		try {
157
			$count = (int)$dbr->selectField( 'job', 'COUNT(*)',
158
				[
159
					'job_cmd' => $this->type,
160
					"job_token != {$dbr->addQuotes( '' )}",
161
					"job_attempts >= " . $dbr->addQuotes( $this->maxTries )
162
				],
163
				__METHOD__
164
			);
165
		} catch ( DBError $e ) {
166
			$this->throwDBException( $e );
167
		}
168
169
		$this->cache->set( $key, $count, self::CACHE_TTL_SHORT );
170
171
		return $count;
172
	}
173
174
	/**
175
	 * @see JobQueue::doBatchPush()
176
	 * @param IJobSpecification[] $jobs
177
	 * @param int $flags
178
	 * @throws DBError|Exception
179
	 * @return void
180
	 */
181
	protected function doBatchPush( array $jobs, $flags ) {
182
		$dbw = $this->getMasterDB();
183
184
		$method = __METHOD__;
185
		$dbw->onTransactionIdle(
186
			function () use ( $dbw, $jobs, $flags, $method ) {
187
				$this->doBatchPushInternal( $dbw, $jobs, $flags, $method );
188
			}
189
		);
190
	}
191
192
	/**
193
	 * This function should *not* be called outside of JobQueueDB
194
	 *
195
	 * @param IDatabase $dbw
196
	 * @param IJobSpecification[] $jobs
197
	 * @param int $flags
198
	 * @param string $method
199
	 * @throws DBError
200
	 * @return void
201
	 */
202
	public function doBatchPushInternal( IDatabase $dbw, array $jobs, $flags, $method ) {
203
		if ( !count( $jobs ) ) {
204
			return;
205
		}
206
207
		$rowSet = []; // (sha1 => job) map for jobs that are de-duplicated
208
		$rowList = []; // list of jobs for jobs that are not de-duplicated
209
		foreach ( $jobs as $job ) {
210
			$row = $this->insertFields( $job );
211
			if ( $job->ignoreDuplicates() ) {
212
				$rowSet[$row['job_sha1']] = $row;
213
			} else {
214
				$rowList[] = $row;
215
			}
216
		}
217
218
		if ( $flags & self::QOS_ATOMIC ) {
219
			$dbw->startAtomic( $method ); // wrap all the job additions in one transaction
220
		}
221
		try {
222
			// Strip out any duplicate jobs that are already in the queue...
223
			if ( count( $rowSet ) ) {
224
				$res = $dbw->select( 'job', 'job_sha1',
225
					[
226
						// No job_type condition since it's part of the job_sha1 hash
227
						'job_sha1' => array_keys( $rowSet ),
228
						'job_token' => '' // unclaimed
229
					],
230
					$method
231
				);
232
				foreach ( $res as $row ) {
0 ignored issues
show
Bug introduced by
The expression $res of type object<ResultWrapper>|boolean is not guaranteed to be traversable. How about adding an additional type check?

There are different options of fixing this problem.

  1. If you want to be on the safe side, you can add an additional type-check:

    $collection = json_decode($data, true);
    if ( ! is_array($collection)) {
        throw new \RuntimeException('$collection must be an array.');
    }
    
    foreach ($collection as $item) { /** ... */ }
    
  2. If you are sure that the expression is traversable, you might want to add a doc comment cast to improve IDE auto-completion and static analysis:

    /** @var array $collection */
    $collection = json_decode($data, true);
    
    foreach ($collection as $item) { /** .. */ }
    
  3. Mark the issue as a false-positive: Just hover the remove button, in the top-right corner of this issue for more options.

Loading history...
233
					wfDebug( "Job with hash '{$row->job_sha1}' is a duplicate.\n" );
234
					unset( $rowSet[$row->job_sha1] ); // already enqueued
235
				}
236
			}
237
			// Build the full list of job rows to insert
238
			$rows = array_merge( $rowList, array_values( $rowSet ) );
239
			// Insert the job rows in chunks to avoid slave lag...
240
			foreach ( array_chunk( $rows, 50 ) as $rowBatch ) {
241
				$dbw->insert( 'job', $rowBatch, $method );
242
			}
243
			JobQueue::incrStats( 'inserts', $this->type, count( $rows ) );
244
			JobQueue::incrStats( 'dupe_inserts', $this->type,
245
				count( $rowSet ) + count( $rowList ) - count( $rows )
246
			);
247
		} catch ( DBError $e ) {
248
			$this->throwDBException( $e );
249
		}
250
		if ( $flags & self::QOS_ATOMIC ) {
251
			$dbw->endAtomic( $method );
252
		}
253
254
		return;
255
	}
256
257
	/**
258
	 * @see JobQueue::doPop()
259
	 * @return Job|bool
260
	 */
261
	protected function doPop() {
262
		$dbw = $this->getMasterDB();
263
		try {
264
			$dbw->commit( __METHOD__, 'flush' ); // flush existing transaction
265
			$autoTrx = $dbw->getFlag( DBO_TRX ); // get current setting
266
			$dbw->clearFlag( DBO_TRX ); // make each query its own transaction
267
			$scopedReset = new ScopedCallback( function () use ( $dbw, $autoTrx ) {
0 ignored issues
show
Unused Code introduced by
$scopedReset is not used, you could remove the assignment.

This check looks for variable assignements that are either overwritten by other assignments or where the variable is not used subsequently.

$myVar = 'Value';
$higher = false;

if (rand(1, 6) > 3) {
    $higher = true;
} else {
    $higher = false;
}

Both the $myVar assignment in line 1 and the $higher assignment in line 2 are dead. The first because $myVar is never used and the second because $higher is always overwritten for every possible time line.

Loading history...
268
				$dbw->setFlag( $autoTrx ? DBO_TRX : 0 ); // restore old setting
269
			} );
270
271
			$uuid = wfRandomString( 32 ); // pop attempt
272
			$job = false; // job popped off
273
			do { // retry when our row is invalid or deleted as a duplicate
0 ignored issues
show
Unused Code introduced by
// job popped off do { ... break; } while (true); does not seem to be reachable.

This check looks for unreachable code. It uses sophisticated control flow analysis techniques to find statements which will never be executed.

Unreachable code is most often the result of return, die or exit statements that have been added for debug purposes.

function fx() {
    try {
        doSomething();
        return true;
    }
    catch (\Exception $e) {
        return false;
    }

    return false;
}

In the above example, the last return false will never be executed, because a return statement has already been met in every possible execution path.

Loading history...
274
				// Try to reserve a row in the DB...
275
				if ( in_array( $this->order, [ 'fifo', 'timestamp' ] ) ) {
276
					$row = $this->claimOldest( $uuid );
277
				} else { // random first
278
					$rand = mt_rand( 0, self::MAX_JOB_RANDOM ); // encourage concurrent UPDATEs
279
					$gte = (bool)mt_rand( 0, 1 ); // find rows with rand before/after $rand
280
					$row = $this->claimRandom( $uuid, $rand, $gte );
281
				}
282
				// Check if we found a row to reserve...
283
				if ( !$row ) {
284
					break; // nothing to do
285
				}
286
				JobQueue::incrStats( 'pops', $this->type );
287
				// Get the job object from the row...
288
				$title = Title::makeTitle( $row->job_namespace, $row->job_title );
289
				$job = Job::factory( $row->job_cmd, $title,
290
					self::extractBlob( $row->job_params ), $row->job_id );
291
				$job->metadata['id'] = $row->job_id;
292
				$job->metadata['timestamp'] = $row->job_timestamp;
293
				break; // done
294
			} while ( true );
295
296
			if ( !$job || mt_rand( 0, 9 ) == 0 ) {
297
				// Handled jobs that need to be recycled/deleted;
298
				// any recycled jobs will be picked up next attempt
299
				$this->recycleAndDeleteStaleJobs();
300
			}
301
		} catch ( DBError $e ) {
302
			$this->throwDBException( $e );
303
		}
304
305
		return $job;
0 ignored issues
show
Bug introduced by
The variable $job does not seem to be defined for all execution paths leading up to this point.

If you define a variable conditionally, it can happen that it is not defined for all execution paths.

Let’s take a look at an example:

function myFunction($a) {
    switch ($a) {
        case 'foo':
            $x = 1;
            break;

        case 'bar':
            $x = 2;
            break;
    }

    // $x is potentially undefined here.
    echo $x;
}

In the above example, the variable $x is defined if you pass “foo” or “bar” as argument for $a. However, since the switch statement has no default case statement, if you pass any other value, the variable $x would be undefined.

Available Fixes

  1. Check for existence of the variable explicitly:

    function myFunction($a) {
        switch ($a) {
            case 'foo':
                $x = 1;
                break;
    
            case 'bar':
                $x = 2;
                break;
        }
    
        if (isset($x)) { // Make sure it's always set.
            echo $x;
        }
    }
    
  2. Define a default value for the variable:

    function myFunction($a) {
        $x = ''; // Set a default which gets overridden for certain paths.
        switch ($a) {
            case 'foo':
                $x = 1;
                break;
    
            case 'bar':
                $x = 2;
                break;
        }
    
        echo $x;
    }
    
  3. Add a value for the missing path:

    function myFunction($a) {
        switch ($a) {
            case 'foo':
                $x = 1;
                break;
    
            case 'bar':
                $x = 2;
                break;
    
            // We add support for the missing case.
            default:
                $x = '';
                break;
        }
    
        echo $x;
    }
    
Loading history...
306
	}
307
308
	/**
309
	 * Reserve a row with a single UPDATE without holding row locks over RTTs...
310
	 *
311
	 * @param string $uuid 32 char hex string
312
	 * @param int $rand Random unsigned integer (31 bits)
313
	 * @param bool $gte Search for job_random >= $random (otherwise job_random <= $random)
314
	 * @return stdClass|bool Row|false
315
	 */
316
	protected function claimRandom( $uuid, $rand, $gte ) {
317
		$dbw = $this->getMasterDB();
318
		// Check cache to see if the queue has <= OFFSET items
319
		$tinyQueue = $this->cache->get( $this->getCacheKey( 'small' ) );
320
321
		$row = false; // the row acquired
0 ignored issues
show
Unused Code introduced by
$row is not used, you could remove the assignment.

This check looks for variable assignements that are either overwritten by other assignments or where the variable is not used subsequently.

$myVar = 'Value';
$higher = false;

if (rand(1, 6) > 3) {
    $higher = true;
} else {
    $higher = false;
}

Both the $myVar assignment in line 1 and the $higher assignment in line 2 are dead. The first because $myVar is never used and the second because $higher is always overwritten for every possible time line.

Loading history...
322
		$invertedDirection = false; // whether one job_random direction was already scanned
323
		// This uses a replication safe method for acquiring jobs. One could use UPDATE+LIMIT
324
		// instead, but that either uses ORDER BY (in which case it deadlocks in MySQL) or is
325
		// not replication safe. Due to http://bugs.mysql.com/bug.php?id=6980, subqueries cannot
326
		// be used here with MySQL.
327
		do {
328
			if ( $tinyQueue ) { // queue has <= MAX_OFFSET rows
329
				// For small queues, using OFFSET will overshoot and return no rows more often.
330
				// Instead, this uses job_random to pick a row (possibly checking both directions).
331
				$ineq = $gte ? '>=' : '<=';
332
				$dir = $gte ? 'ASC' : 'DESC';
333
				$row = $dbw->selectRow( 'job', self::selectFields(), // find a random job
334
					[
335
						'job_cmd' => $this->type,
336
						'job_token' => '', // unclaimed
337
						"job_random {$ineq} {$dbw->addQuotes( $rand )}" ],
338
					__METHOD__,
339
					[ 'ORDER BY' => "job_random {$dir}" ]
340
				);
341
				if ( !$row && !$invertedDirection ) {
342
					$gte = !$gte;
343
					$invertedDirection = true;
344
					continue; // try the other direction
345
				}
346
			} else { // table *may* have >= MAX_OFFSET rows
347
				// Bug 42614: "ORDER BY job_random" with a job_random inequality causes high CPU
348
				// in MySQL if there are many rows for some reason. This uses a small OFFSET
349
				// instead of job_random for reducing excess claim retries.
350
				$row = $dbw->selectRow( 'job', self::selectFields(), // find a random job
351
					[
352
						'job_cmd' => $this->type,
353
						'job_token' => '', // unclaimed
354
					],
355
					__METHOD__,
356
					[ 'OFFSET' => mt_rand( 0, self::MAX_OFFSET ) ]
357
				);
358
				if ( !$row ) {
359
					$tinyQueue = true; // we know the queue must have <= MAX_OFFSET rows
360
					$this->cache->set( $this->getCacheKey( 'small' ), 1, 30 );
361
					continue; // use job_random
362
				}
363
			}
364
365
			if ( $row ) { // claim the job
366
				$dbw->update( 'job', // update by PK
367
					[
368
						'job_token' => $uuid,
369
						'job_token_timestamp' => $dbw->timestamp(),
370
						'job_attempts = job_attempts+1' ],
371
					[ 'job_cmd' => $this->type, 'job_id' => $row->job_id, 'job_token' => '' ],
372
					__METHOD__
373
				);
374
				// This might get raced out by another runner when claiming the previously
375
				// selected row. The use of job_random should minimize this problem, however.
376
				if ( !$dbw->affectedRows() ) {
377
					$row = false; // raced out
378
				}
379
			} else {
380
				break; // nothing to do
381
			}
382
		} while ( !$row );
383
384
		return $row;
385
	}
386
387
	/**
388
	 * Reserve a row with a single UPDATE without holding row locks over RTTs...
389
	 *
390
	 * @param string $uuid 32 char hex string
391
	 * @return stdClass|bool Row|false
392
	 */
393
	protected function claimOldest( $uuid ) {
394
		$dbw = $this->getMasterDB();
395
396
		$row = false; // the row acquired
397
		do {
398
			if ( $dbw->getType() === 'mysql' ) {
399
				// Per http://bugs.mysql.com/bug.php?id=6980, we can't use subqueries on the
400
				// same table being changed in an UPDATE query in MySQL (gives Error: 1093).
401
				// Oracle and Postgre have no such limitation. However, MySQL offers an
402
				// alternative here by supporting ORDER BY + LIMIT for UPDATE queries.
403
				$dbw->query( "UPDATE {$dbw->tableName( 'job' )} " .
0 ignored issues
show
Documentation Bug introduced by
The method tableName does not exist on object<DBConnRef>? Since you implemented __call, maybe consider adding a @method annotation.

If you implement __call and you know which methods are available, you can improve IDE auto-completion and static analysis by adding a @method annotation to the class.

This is often the case, when __call is implemented by a parent class and only the child class knows which methods exist:

class ParentClass {
    private $data = array();

    public function __call($method, array $args) {
        if (0 === strpos($method, 'get')) {
            return $this->data[strtolower(substr($method, 3))];
        }

        throw new \LogicException(sprintf('Unsupported method: %s', $method));
    }
}

/**
 * If this class knows which fields exist, you can specify the methods here:
 *
 * @method string getName()
 */
class SomeClass extends ParentClass { }
Loading history...
404
					"SET " .
405
						"job_token = {$dbw->addQuotes( $uuid ) }, " .
406
						"job_token_timestamp = {$dbw->addQuotes( $dbw->timestamp() )}, " .
407
						"job_attempts = job_attempts+1 " .
408
					"WHERE ( " .
409
						"job_cmd = {$dbw->addQuotes( $this->type )} " .
410
						"AND job_token = {$dbw->addQuotes( '' )} " .
411
					") ORDER BY job_id ASC LIMIT 1",
412
					__METHOD__
413
				);
414
			} else {
415
				// Use a subquery to find the job, within an UPDATE to claim it.
416
				// This uses as much of the DB wrapper functions as possible.
417
				$dbw->update( 'job',
418
					[
419
						'job_token' => $uuid,
420
						'job_token_timestamp' => $dbw->timestamp(),
421
						'job_attempts = job_attempts+1' ],
422
					[ 'job_id = (' .
423
						$dbw->selectSQLText( 'job', 'job_id',
424
							[ 'job_cmd' => $this->type, 'job_token' => '' ],
425
							__METHOD__,
426
							[ 'ORDER BY' => 'job_id ASC', 'LIMIT' => 1 ] ) .
427
						')'
428
					],
429
					__METHOD__
430
				);
431
			}
432
			// Fetch any row that we just reserved...
433
			if ( $dbw->affectedRows() ) {
434
				$row = $dbw->selectRow( 'job', self::selectFields(),
435
					[ 'job_cmd' => $this->type, 'job_token' => $uuid ], __METHOD__
436
				);
437
				if ( !$row ) { // raced out by duplicate job removal
438
					wfDebug( "Row deleted as duplicate by another process.\n" );
439
				}
440
			} else {
441
				break; // nothing to do
442
			}
443
		} while ( !$row );
444
445
		return $row;
446
	}
447
448
	/**
449
	 * @see JobQueue::doAck()
450
	 * @param Job $job
451
	 * @throws MWException
452
	 */
453
	protected function doAck( Job $job ) {
454
		if ( !isset( $job->metadata['id'] ) ) {
455
			throw new MWException( "Job of type '{$job->getType()}' has no ID." );
456
		}
457
458
		$dbw = $this->getMasterDB();
459
		try {
460
			$dbw->commit( __METHOD__, 'flush' ); // flush existing transaction
461
			$autoTrx = $dbw->getFlag( DBO_TRX ); // get current setting
462
			$dbw->clearFlag( DBO_TRX ); // make each query its own transaction
463
			$scopedReset = new ScopedCallback( function () use ( $dbw, $autoTrx ) {
0 ignored issues
show
Unused Code introduced by
$scopedReset is not used, you could remove the assignment.

This check looks for variable assignements that are either overwritten by other assignments or where the variable is not used subsequently.

$myVar = 'Value';
$higher = false;

if (rand(1, 6) > 3) {
    $higher = true;
} else {
    $higher = false;
}

Both the $myVar assignment in line 1 and the $higher assignment in line 2 are dead. The first because $myVar is never used and the second because $higher is always overwritten for every possible time line.

Loading history...
464
				$dbw->setFlag( $autoTrx ? DBO_TRX : 0 ); // restore old setting
465
			} );
466
467
			// Delete a row with a single DELETE without holding row locks over RTTs...
468
			$dbw->delete( 'job',
469
				[ 'job_cmd' => $this->type, 'job_id' => $job->metadata['id'] ], __METHOD__ );
470
471
			JobQueue::incrStats( 'acks', $this->type );
472
		} catch ( DBError $e ) {
473
			$this->throwDBException( $e );
474
		}
475
	}
476
477
	/**
478
	 * @see JobQueue::doDeduplicateRootJob()
479
	 * @param IJobSpecification $job
480
	 * @throws MWException
481
	 * @return bool
482
	 */
483
	protected function doDeduplicateRootJob( IJobSpecification $job ) {
484
		$params = $job->getParams();
485
		if ( !isset( $params['rootJobSignature'] ) ) {
486
			throw new MWException( "Cannot register root job; missing 'rootJobSignature'." );
487
		} elseif ( !isset( $params['rootJobTimestamp'] ) ) {
488
			throw new MWException( "Cannot register root job; missing 'rootJobTimestamp'." );
489
		}
490
		$key = $this->getRootJobCacheKey( $params['rootJobSignature'] );
491
		// Callers should call batchInsert() and then this function so that if the insert
492
		// fails, the de-duplication registration will be aborted. Since the insert is
493
		// deferred till "transaction idle", do the same here, so that the ordering is
494
		// maintained. Having only the de-duplication registration succeed would cause
495
		// jobs to become no-ops without any actual jobs that made them redundant.
496
		$dbw = $this->getMasterDB();
497
		$cache = $this->dupCache;
498
		$dbw->onTransactionIdle( function () use ( $cache, $params, $key, $dbw ) {
499
			$timestamp = $cache->get( $key ); // current last timestamp of this job
500
			if ( $timestamp && $timestamp >= $params['rootJobTimestamp'] ) {
501
				return true; // a newer version of this root job was enqueued
502
			}
503
504
			// Update the timestamp of the last root job started at the location...
505
			return $cache->set( $key, $params['rootJobTimestamp'], JobQueueDB::ROOTJOB_TTL );
506
		} );
507
508
		return true;
509
	}
510
511
	/**
512
	 * @see JobQueue::doDelete()
513
	 * @return bool
514
	 */
515
	protected function doDelete() {
516
		$dbw = $this->getMasterDB();
517
		try {
518
			$dbw->delete( 'job', [ 'job_cmd' => $this->type ] );
519
		} catch ( DBError $e ) {
520
			$this->throwDBException( $e );
521
		}
522
523
		return true;
524
	}
525
526
	/**
527
	 * @see JobQueue::doWaitForBackups()
528
	 * @return void
529
	 */
530
	protected function doWaitForBackups() {
531
		wfWaitForSlaves( false, $this->wiki, $this->cluster ?: false );
0 ignored issues
show
Deprecated Code introduced by
The function wfWaitForSlaves() has been deprecated with message: since 1.27 Use LBFactory::waitForReplication

This function has been deprecated. The supplier of the file has supplied an explanatory message.

The explanatory message should give you some clue as to whether and when the function will be removed from the class and what other function to use instead.

Loading history...
532
	}
533
534
	/**
535
	 * @return void
536
	 */
537
	protected function doFlushCaches() {
538
		foreach ( [ 'size', 'acquiredcount' ] as $type ) {
539
			$this->cache->delete( $this->getCacheKey( $type ) );
540
		}
541
	}
542
543
	/**
544
	 * @see JobQueue::getAllQueuedJobs()
545
	 * @return Iterator
546
	 */
547
	public function getAllQueuedJobs() {
548
		return $this->getJobIterator( [ 'job_cmd' => $this->getType(), 'job_token' => '' ] );
549
	}
550
551
	/**
552
	 * @see JobQueue::getAllAcquiredJobs()
553
	 * @return Iterator
554
	 */
555
	public function getAllAcquiredJobs() {
556
		return $this->getJobIterator( [ 'job_cmd' => $this->getType(), "job_token > ''" ] );
557
	}
558
559
	/**
560
	 * @param array $conds Query conditions
561
	 * @return Iterator
562
	 */
563
	protected function getJobIterator( array $conds ) {
564
		$dbr = $this->getSlaveDB();
565
		try {
566
			return new MappedIterator(
567
				$dbr->select( 'job', self::selectFields(), $conds ),
0 ignored issues
show
Bug introduced by
It seems like $dbr->select('job', self::selectFields(), $conds) targeting DBConnRef::select() can also be of type boolean; however, MappedIterator::__construct() does only seem to accept object<Iterator>|array, maybe add an additional type check?

This check looks at variables that are passed out again to other methods.

If the outgoing method call has stricter type requirements than the method itself, an issue is raised.

An additional type check may prevent trouble.

Loading history...
568
				function ( $row ) {
569
					$job = Job::factory(
570
						$row->job_cmd,
571
						Title::makeTitle( $row->job_namespace, $row->job_title ),
572
						strlen( $row->job_params ) ? unserialize( $row->job_params ) : []
573
					);
574
					$job->metadata['id'] = $row->job_id;
575
					$job->metadata['timestamp'] = $row->job_timestamp;
576
577
					return $job;
578
				}
579
			);
580
		} catch ( DBError $e ) {
581
			$this->throwDBException( $e );
582
		}
583
	}
584
585
	public function getCoalesceLocationInternal() {
586
		return $this->cluster
587
			? "DBCluster:{$this->cluster}:{$this->wiki}"
588
			: "LBFactory:{$this->wiki}";
589
	}
590
591
	protected function doGetSiblingQueuesWithJobs( array $types ) {
592
		$dbr = $this->getSlaveDB();
593
		// @note: this does not check whether the jobs are claimed or not.
594
		// This is useful so JobQueueGroup::pop() also sees queues that only
595
		// have stale jobs. This lets recycleAndDeleteStaleJobs() re-enqueue
596
		// failed jobs so that they can be popped again for that edge case.
597
		$res = $dbr->select( 'job', 'DISTINCT job_cmd',
598
			[ 'job_cmd' => $types ], __METHOD__ );
599
600
		$types = [];
601
		foreach ( $res as $row ) {
0 ignored issues
show
Bug introduced by
The expression $res of type object<ResultWrapper>|boolean is not guaranteed to be traversable. How about adding an additional type check?

There are different options of fixing this problem.

  1. If you want to be on the safe side, you can add an additional type-check:

    $collection = json_decode($data, true);
    if ( ! is_array($collection)) {
        throw new \RuntimeException('$collection must be an array.');
    }
    
    foreach ($collection as $item) { /** ... */ }
    
  2. If you are sure that the expression is traversable, you might want to add a doc comment cast to improve IDE auto-completion and static analysis:

    /** @var array $collection */
    $collection = json_decode($data, true);
    
    foreach ($collection as $item) { /** .. */ }
    
  3. Mark the issue as a false-positive: Just hover the remove button, in the top-right corner of this issue for more options.

Loading history...
602
			$types[] = $row->job_cmd;
603
		}
604
605
		return $types;
606
	}
607
608
	protected function doGetSiblingQueueSizes( array $types ) {
609
		$dbr = $this->getSlaveDB();
610
		$res = $dbr->select( 'job', [ 'job_cmd', 'COUNT(*) AS count' ],
611
			[ 'job_cmd' => $types ], __METHOD__, [ 'GROUP BY' => 'job_cmd' ] );
612
613
		$sizes = [];
614
		foreach ( $res as $row ) {
0 ignored issues
show
Bug introduced by
The expression $res of type object<ResultWrapper>|boolean is not guaranteed to be traversable. How about adding an additional type check?

There are different options of fixing this problem.

  1. If you want to be on the safe side, you can add an additional type-check:

    $collection = json_decode($data, true);
    if ( ! is_array($collection)) {
        throw new \RuntimeException('$collection must be an array.');
    }
    
    foreach ($collection as $item) { /** ... */ }
    
  2. If you are sure that the expression is traversable, you might want to add a doc comment cast to improve IDE auto-completion and static analysis:

    /** @var array $collection */
    $collection = json_decode($data, true);
    
    foreach ($collection as $item) { /** .. */ }
    
  3. Mark the issue as a false-positive: Just hover the remove button, in the top-right corner of this issue for more options.

Loading history...
615
			$sizes[$row->job_cmd] = (int)$row->count;
616
		}
617
618
		return $sizes;
619
	}
620
621
	/**
622
	 * Recycle or destroy any jobs that have been claimed for too long
623
	 *
624
	 * @return int Number of jobs recycled/deleted
625
	 */
626
	public function recycleAndDeleteStaleJobs() {
627
		$now = time();
628
		$count = 0; // affected rows
629
		$dbw = $this->getMasterDB();
630
631
		try {
632
			if ( !$dbw->lock( "jobqueue-recycle-{$this->type}", __METHOD__, 1 ) ) {
633
				return $count; // already in progress
634
			}
635
636
			// Remove claims on jobs acquired for too long if enabled...
637
			if ( $this->claimTTL > 0 ) {
638
				$claimCutoff = $dbw->timestamp( $now - $this->claimTTL );
639
				// Get the IDs of jobs that have be claimed but not finished after too long.
640
				// These jobs can be recycled into the queue by expiring the claim. Selecting
641
				// the IDs first means that the UPDATE can be done by primary key (less deadlocks).
642
				$res = $dbw->select( 'job', 'job_id',
643
					[
644
						'job_cmd' => $this->type,
645
						"job_token != {$dbw->addQuotes( '' )}", // was acquired
646
						"job_token_timestamp < {$dbw->addQuotes( $claimCutoff )}", // stale
647
						"job_attempts < {$dbw->addQuotes( $this->maxTries )}" ], // retries left
648
					__METHOD__
649
				);
650
				$ids = array_map(
651
					function ( $o ) {
652
						return $o->job_id;
653
					}, iterator_to_array( $res )
654
				);
655
				if ( count( $ids ) ) {
656
					// Reset job_token for these jobs so that other runners will pick them up.
657
					// Set the timestamp to the current time, as it is useful to now that the job
658
					// was already tried before (the timestamp becomes the "released" time).
659
					$dbw->update( 'job',
660
						[
661
							'job_token' => '',
662
							'job_token_timestamp' => $dbw->timestamp( $now ) ], // time of release
663
						[
664
							'job_id' => $ids ],
665
						__METHOD__
666
					);
667
					$affected = $dbw->affectedRows();
668
					$count += $affected;
669
					JobQueue::incrStats( 'recycles', $this->type, $affected );
670
					$this->aggr->notifyQueueNonEmpty( $this->wiki, $this->type );
671
				}
672
			}
673
674
			// Just destroy any stale jobs...
675
			$pruneCutoff = $dbw->timestamp( $now - self::MAX_AGE_PRUNE );
676
			$conds = [
677
				'job_cmd' => $this->type,
678
				"job_token != {$dbw->addQuotes( '' )}", // was acquired
679
				"job_token_timestamp < {$dbw->addQuotes( $pruneCutoff )}" // stale
680
			];
681
			if ( $this->claimTTL > 0 ) { // only prune jobs attempted too many times...
682
				$conds[] = "job_attempts >= {$dbw->addQuotes( $this->maxTries )}";
683
			}
684
			// Get the IDs of jobs that are considered stale and should be removed. Selecting
685
			// the IDs first means that the UPDATE can be done by primary key (less deadlocks).
686
			$res = $dbw->select( 'job', 'job_id', $conds, __METHOD__ );
687
			$ids = array_map(
688
				function ( $o ) {
689
					return $o->job_id;
690
				}, iterator_to_array( $res )
691
			);
692
			if ( count( $ids ) ) {
693
				$dbw->delete( 'job', [ 'job_id' => $ids ], __METHOD__ );
694
				$affected = $dbw->affectedRows();
695
				$count += $affected;
696
				JobQueue::incrStats( 'abandons', $this->type, $affected );
697
			}
698
699
			$dbw->unlock( "jobqueue-recycle-{$this->type}", __METHOD__ );
700
		} catch ( DBError $e ) {
701
			$this->throwDBException( $e );
702
		}
703
704
		return $count;
705
	}
706
707
	/**
708
	 * @param IJobSpecification $job
709
	 * @return array
710
	 */
711 View Code Duplication
	protected function insertFields( IJobSpecification $job ) {
712
		$dbw = $this->getMasterDB();
713
714
		return [
715
			// Fields that describe the nature of the job
716
			'job_cmd' => $job->getType(),
717
			'job_namespace' => $job->getTitle()->getNamespace(),
718
			'job_title' => $job->getTitle()->getDBkey(),
719
			'job_params' => self::makeBlob( $job->getParams() ),
720
			// Additional job metadata
721
			'job_id' => $dbw->nextSequenceValue( 'job_job_id_seq' ),
722
			'job_timestamp' => $dbw->timestamp(),
723
			'job_sha1' => Wikimedia\base_convert(
724
				sha1( serialize( $job->getDeduplicationInfo() ) ),
725
				16, 36, 31
726
			),
727
			'job_random' => mt_rand( 0, self::MAX_JOB_RANDOM )
728
		];
729
	}
730
731
	/**
732
	 * @throws JobQueueConnectionError
733
	 * @return DBConnRef
734
	 */
735
	protected function getSlaveDB() {
736
		try {
737
			return $this->getDB( DB_SLAVE );
738
		} catch ( DBConnectionError $e ) {
739
			throw new JobQueueConnectionError( "DBConnectionError:" . $e->getMessage() );
740
		}
741
	}
742
743
	/**
744
	 * @throws JobQueueConnectionError
745
	 * @return DBConnRef
746
	 */
747
	protected function getMasterDB() {
748
		try {
749
			return $this->getDB( DB_MASTER );
750
		} catch ( DBConnectionError $e ) {
751
			throw new JobQueueConnectionError( "DBConnectionError:" . $e->getMessage() );
752
		}
753
	}
754
755
	/**
756
	 * @param int $index (DB_SLAVE/DB_MASTER)
757
	 * @return DBConnRef
758
	 */
759
	protected function getDB( $index ) {
760
		$lb = ( $this->cluster !== false )
761
			? wfGetLBFactory()->getExternalLB( $this->cluster, $this->wiki )
0 ignored issues
show
Bug introduced by
It seems like $this->cluster can also be of type boolean; however, LBFactory::getExternalLB() does only seem to accept string, maybe add an additional type check?

If a method or function can return multiple different values and unless you are sure that you only can receive a single value in this context, we recommend to add an additional type check:

/**
 * @return array|string
 */
function returnsDifferentValues($x) {
    if ($x) {
        return 'foo';
    }

    return array();
}

$x = returnsDifferentValues($y);
if (is_array($x)) {
    // $x is an array.
}

If this a common case that PHP Analyzer should handle natively, please let us know by opening an issue.

Loading history...
Deprecated Code introduced by
The function wfGetLBFactory() has been deprecated with message: since 1.27, use MediaWikiServices::getDBLoadBalancerFactory() instead.

This function has been deprecated. The supplier of the file has supplied an explanatory message.

The explanatory message should give you some clue as to whether and when the function will be removed from the class and what other function to use instead.

Loading history...
762
			: wfGetLB( $this->wiki );
0 ignored issues
show
Deprecated Code introduced by
The function wfGetLB() has been deprecated with message: since 1.27, use MediaWikiServices::getDBLoadBalancer() or MediaWikiServices::getDBLoadBalancerFactory() instead.

This function has been deprecated. The supplier of the file has supplied an explanatory message.

The explanatory message should give you some clue as to whether and when the function will be removed from the class and what other function to use instead.

Loading history...
763
764
		return $lb->getConnectionRef( $index, [], $this->wiki );
765
	}
766
767
	/**
768
	 * @param string $property
769
	 * @return string
770
	 */
771
	private function getCacheKey( $property ) {
772
		list( $db, $prefix ) = wfSplitWikiID( $this->wiki );
773
		$cluster = is_string( $this->cluster ) ? $this->cluster : 'main';
774
775
		return wfForeignMemcKey( $db, $prefix, 'jobqueue', $cluster, $this->type, $property );
776
	}
777
778
	/**
779
	 * @param array|bool $params
780
	 * @return string
781
	 */
782
	protected static function makeBlob( $params ) {
783
		if ( $params !== false ) {
784
			return serialize( $params );
785
		} else {
786
			return '';
787
		}
788
	}
789
790
	/**
791
	 * @param string $blob
792
	 * @return bool|mixed
793
	 */
794
	protected static function extractBlob( $blob ) {
795
		if ( (string)$blob !== '' ) {
796
			return unserialize( $blob );
797
		} else {
798
			return false;
799
		}
800
	}
801
802
	/**
803
	 * @param DBError $e
804
	 * @throws JobQueueError
805
	 */
806
	protected function throwDBException( DBError $e ) {
807
		throw new JobQueueError( get_class( $e ) . ": " . $e->getMessage() );
808
	}
809
810
	/**
811
	 * Return the list of job fields that should be selected.
812
	 * @since 1.23
813
	 * @return array
814
	 */
815
	public static function selectFields() {
816
		return [
817
			'job_id',
818
			'job_cmd',
819
			'job_namespace',
820
			'job_title',
821
			'job_timestamp',
822
			'job_params',
823
			'job_random',
824
			'job_attempts',
825
			'job_token',
826
			'job_token_timestamp',
827
			'job_sha1',
828
		];
829
	}
830
}
831