Completed
Branch master (715cbe)
by
unknown
51:55
created

includes/jobqueue/JobQueueDB.php (3 issues)

Upgrade to new PHP Analysis Engine

These results are based on our legacy PHP analysis, consider migrating to our new PHP analysis engine instead. Learn more

1
<?php
2
/**
3
 * Database-backed job queue code.
4
 *
5
 * This program is free software; you can redistribute it and/or modify
6
 * it under the terms of the GNU General Public License as published by
7
 * the Free Software Foundation; either version 2 of the License, or
8
 * (at your option) any later version.
9
 *
10
 * This program is distributed in the hope that it will be useful,
11
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13
 * GNU General Public License for more details.
14
 *
15
 * You should have received a copy of the GNU General Public License along
16
 * with this program; if not, write to the Free Software Foundation, Inc.,
17
 * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
18
 * http://www.gnu.org/copyleft/gpl.html
19
 *
20
 * @file
21
 * @author Aaron Schulz
22
 */
23
use MediaWiki\MediaWikiServices;
24
use Wikimedia\ScopedCallback;
25
26
/**
27
 * Class to handle job queues stored in the DB
28
 *
29
 * @ingroup JobQueue
30
 * @since 1.21
31
 */
32
class JobQueueDB extends JobQueue {
33
	const CACHE_TTL_SHORT = 30; // integer; seconds to cache info without re-validating
34
	const MAX_AGE_PRUNE = 604800; // integer; seconds a job can live once claimed
35
	const MAX_JOB_RANDOM = 2147483647; // integer; 2^31 - 1, used for job_random
36
	const MAX_OFFSET = 255; // integer; maximum number of rows to skip
37
38
	/** @var WANObjectCache */
39
	protected $cache;
40
41
	/** @var bool|string Name of an external DB cluster. False if not set */
42
	protected $cluster = false;
43
44
	/**
45
	 * Additional parameters include:
46
	 *   - cluster : The name of an external cluster registered via LBFactory.
47
	 *               If not specified, the primary DB cluster for the wiki will be used.
48
	 *               This can be overridden with a custom cluster so that DB handles will
49
	 *               be retrieved via LBFactory::getExternalLB() and getConnection().
50
	 * @param array $params
51
	 */
52
	protected function __construct( array $params ) {
53
		parent::__construct( $params );
54
55
		$this->cluster = isset( $params['cluster'] ) ? $params['cluster'] : false;
56
		$this->cache = ObjectCache::getMainWANInstance();
57
	}
58
59
	protected function supportedOrders() {
60
		return [ 'random', 'timestamp', 'fifo' ];
61
	}
62
63
	protected function optimalOrder() {
64
		return 'random';
65
	}
66
67
	/**
68
	 * @see JobQueue::doIsEmpty()
69
	 * @return bool
70
	 */
71
	protected function doIsEmpty() {
72
		$dbr = $this->getSlaveDB();
73
		try {
74
			$found = $dbr->selectField( // unclaimed job
75
				'job', '1', [ 'job_cmd' => $this->type, 'job_token' => '' ], __METHOD__
76
			);
77
		} catch ( DBError $e ) {
78
			$this->throwDBException( $e );
79
		}
80
81
		return !$found;
82
	}
83
84
	/**
85
	 * @see JobQueue::doGetSize()
86
	 * @return int
87
	 */
88
	protected function doGetSize() {
89
		$key = $this->getCacheKey( 'size' );
90
91
		$size = $this->cache->get( $key );
92
		if ( is_int( $size ) ) {
93
			return $size;
94
		}
95
96
		try {
97
			$dbr = $this->getSlaveDB();
98
			$size = (int)$dbr->selectField( 'job', 'COUNT(*)',
99
				[ 'job_cmd' => $this->type, 'job_token' => '' ],
100
				__METHOD__
101
			);
102
		} catch ( DBError $e ) {
103
			$this->throwDBException( $e );
104
		}
105
		$this->cache->set( $key, $size, self::CACHE_TTL_SHORT );
106
107
		return $size;
108
	}
109
110
	/**
111
	 * @see JobQueue::doGetAcquiredCount()
112
	 * @return int
113
	 */
114 View Code Duplication
	protected function doGetAcquiredCount() {
115
		if ( $this->claimTTL <= 0 ) {
116
			return 0; // no acknowledgements
117
		}
118
119
		$key = $this->getCacheKey( 'acquiredcount' );
120
121
		$count = $this->cache->get( $key );
122
		if ( is_int( $count ) ) {
123
			return $count;
124
		}
125
126
		$dbr = $this->getSlaveDB();
127
		try {
128
			$count = (int)$dbr->selectField( 'job', 'COUNT(*)',
129
				[ 'job_cmd' => $this->type, "job_token != {$dbr->addQuotes( '' )}" ],
130
				__METHOD__
131
			);
132
		} catch ( DBError $e ) {
133
			$this->throwDBException( $e );
134
		}
135
		$this->cache->set( $key, $count, self::CACHE_TTL_SHORT );
136
137
		return $count;
138
	}
139
140
	/**
141
	 * @see JobQueue::doGetAbandonedCount()
142
	 * @return int
143
	 * @throws MWException
144
	 */
145 View Code Duplication
	protected function doGetAbandonedCount() {
146
		if ( $this->claimTTL <= 0 ) {
147
			return 0; // no acknowledgements
148
		}
149
150
		$key = $this->getCacheKey( 'abandonedcount' );
151
152
		$count = $this->cache->get( $key );
153
		if ( is_int( $count ) ) {
154
			return $count;
155
		}
156
157
		$dbr = $this->getSlaveDB();
158
		try {
159
			$count = (int)$dbr->selectField( 'job', 'COUNT(*)',
160
				[
161
					'job_cmd' => $this->type,
162
					"job_token != {$dbr->addQuotes( '' )}",
163
					"job_attempts >= " . $dbr->addQuotes( $this->maxTries )
164
				],
165
				__METHOD__
166
			);
167
		} catch ( DBError $e ) {
168
			$this->throwDBException( $e );
169
		}
170
171
		$this->cache->set( $key, $count, self::CACHE_TTL_SHORT );
172
173
		return $count;
174
	}
175
176
	/**
177
	 * @see JobQueue::doBatchPush()
178
	 * @param IJobSpecification[] $jobs
179
	 * @param int $flags
180
	 * @throws DBError|Exception
181
	 * @return void
182
	 */
183
	protected function doBatchPush( array $jobs, $flags ) {
184
		$dbw = $this->getMasterDB();
185
186
		$method = __METHOD__;
187
		$dbw->onTransactionIdle(
188
			function () use ( $dbw, $jobs, $flags, $method ) {
189
				$this->doBatchPushInternal( $dbw, $jobs, $flags, $method );
190
			},
191
			__METHOD__
192
		);
193
	}
194
195
	/**
196
	 * This function should *not* be called outside of JobQueueDB
197
	 *
198
	 * @param IDatabase $dbw
199
	 * @param IJobSpecification[] $jobs
200
	 * @param int $flags
201
	 * @param string $method
202
	 * @throws DBError
203
	 * @return void
204
	 */
205
	public function doBatchPushInternal( IDatabase $dbw, array $jobs, $flags, $method ) {
206
		if ( !count( $jobs ) ) {
207
			return;
208
		}
209
210
		$rowSet = []; // (sha1 => job) map for jobs that are de-duplicated
211
		$rowList = []; // list of jobs for jobs that are not de-duplicated
212
		foreach ( $jobs as $job ) {
213
			$row = $this->insertFields( $job );
214
			if ( $job->ignoreDuplicates() ) {
215
				$rowSet[$row['job_sha1']] = $row;
216
			} else {
217
				$rowList[] = $row;
218
			}
219
		}
220
221
		if ( $flags & self::QOS_ATOMIC ) {
222
			$dbw->startAtomic( $method ); // wrap all the job additions in one transaction
223
		}
224
		try {
225
			// Strip out any duplicate jobs that are already in the queue...
226
			if ( count( $rowSet ) ) {
227
				$res = $dbw->select( 'job', 'job_sha1',
228
					[
229
						// No job_type condition since it's part of the job_sha1 hash
230
						'job_sha1' => array_keys( $rowSet ),
231
						'job_token' => '' // unclaimed
232
					],
233
					$method
234
				);
235
				foreach ( $res as $row ) {
236
					wfDebug( "Job with hash '{$row->job_sha1}' is a duplicate.\n" );
237
					unset( $rowSet[$row->job_sha1] ); // already enqueued
238
				}
239
			}
240
			// Build the full list of job rows to insert
241
			$rows = array_merge( $rowList, array_values( $rowSet ) );
242
			// Insert the job rows in chunks to avoid replica DB lag...
243
			foreach ( array_chunk( $rows, 50 ) as $rowBatch ) {
244
				$dbw->insert( 'job', $rowBatch, $method );
245
			}
246
			JobQueue::incrStats( 'inserts', $this->type, count( $rows ) );
247
			JobQueue::incrStats( 'dupe_inserts', $this->type,
248
				count( $rowSet ) + count( $rowList ) - count( $rows )
249
			);
250
		} catch ( DBError $e ) {
251
			$this->throwDBException( $e );
252
		}
253
		if ( $flags & self::QOS_ATOMIC ) {
254
			$dbw->endAtomic( $method );
255
		}
256
257
		return;
258
	}
259
260
	/**
261
	 * @see JobQueue::doPop()
262
	 * @return Job|bool
263
	 */
264
	protected function doPop() {
265
		$dbw = $this->getMasterDB();
266
		try {
267
			$autoTrx = $dbw->getFlag( DBO_TRX ); // get current setting
268
			$dbw->clearFlag( DBO_TRX ); // make each query its own transaction
269
			$scopedReset = new ScopedCallback( function () use ( $dbw, $autoTrx ) {
270
				$dbw->setFlag( $autoTrx ? DBO_TRX : 0 ); // restore old setting
271
			} );
272
273
			$uuid = wfRandomString( 32 ); // pop attempt
274
			$job = false; // job popped off
275
			do { // retry when our row is invalid or deleted as a duplicate
276
				// Try to reserve a row in the DB...
277
				if ( in_array( $this->order, [ 'fifo', 'timestamp' ] ) ) {
278
					$row = $this->claimOldest( $uuid );
279
				} else { // random first
280
					$rand = mt_rand( 0, self::MAX_JOB_RANDOM ); // encourage concurrent UPDATEs
281
					$gte = (bool)mt_rand( 0, 1 ); // find rows with rand before/after $rand
282
					$row = $this->claimRandom( $uuid, $rand, $gte );
283
				}
284
				// Check if we found a row to reserve...
285
				if ( !$row ) {
286
					break; // nothing to do
287
				}
288
				JobQueue::incrStats( 'pops', $this->type );
289
				// Get the job object from the row...
290
				$title = Title::makeTitle( $row->job_namespace, $row->job_title );
291
				$job = Job::factory( $row->job_cmd, $title,
292
					self::extractBlob( $row->job_params ), $row->job_id );
293
				$job->metadata['id'] = $row->job_id;
294
				$job->metadata['timestamp'] = $row->job_timestamp;
295
				break; // done
296
			} while ( true );
297
298
			if ( !$job || mt_rand( 0, 9 ) == 0 ) {
299
				// Handled jobs that need to be recycled/deleted;
300
				// any recycled jobs will be picked up next attempt
301
				$this->recycleAndDeleteStaleJobs();
302
			}
303
		} catch ( DBError $e ) {
304
			$this->throwDBException( $e );
305
		}
306
307
		return $job;
308
	}
309
310
	/**
311
	 * Reserve a row with a single UPDATE without holding row locks over RTTs...
312
	 *
313
	 * @param string $uuid 32 char hex string
314
	 * @param int $rand Random unsigned integer (31 bits)
315
	 * @param bool $gte Search for job_random >= $random (otherwise job_random <= $random)
316
	 * @return stdClass|bool Row|false
317
	 */
318
	protected function claimRandom( $uuid, $rand, $gte ) {
319
		$dbw = $this->getMasterDB();
320
		// Check cache to see if the queue has <= OFFSET items
321
		$tinyQueue = $this->cache->get( $this->getCacheKey( 'small' ) );
322
323
		$row = false; // the row acquired
324
		$invertedDirection = false; // whether one job_random direction was already scanned
325
		// This uses a replication safe method for acquiring jobs. One could use UPDATE+LIMIT
326
		// instead, but that either uses ORDER BY (in which case it deadlocks in MySQL) or is
327
		// not replication safe. Due to http://bugs.mysql.com/bug.php?id=6980, subqueries cannot
328
		// be used here with MySQL.
329
		do {
330
			if ( $tinyQueue ) { // queue has <= MAX_OFFSET rows
331
				// For small queues, using OFFSET will overshoot and return no rows more often.
332
				// Instead, this uses job_random to pick a row (possibly checking both directions).
333
				$ineq = $gte ? '>=' : '<=';
334
				$dir = $gte ? 'ASC' : 'DESC';
335
				$row = $dbw->selectRow( 'job', self::selectFields(), // find a random job
336
					[
337
						'job_cmd' => $this->type,
338
						'job_token' => '', // unclaimed
339
						"job_random {$ineq} {$dbw->addQuotes( $rand )}" ],
340
					__METHOD__,
341
					[ 'ORDER BY' => "job_random {$dir}" ]
342
				);
343
				if ( !$row && !$invertedDirection ) {
344
					$gte = !$gte;
345
					$invertedDirection = true;
346
					continue; // try the other direction
347
				}
348
			} else { // table *may* have >= MAX_OFFSET rows
349
				// Bug 42614: "ORDER BY job_random" with a job_random inequality causes high CPU
350
				// in MySQL if there are many rows for some reason. This uses a small OFFSET
351
				// instead of job_random for reducing excess claim retries.
352
				$row = $dbw->selectRow( 'job', self::selectFields(), // find a random job
353
					[
354
						'job_cmd' => $this->type,
355
						'job_token' => '', // unclaimed
356
					],
357
					__METHOD__,
358
					[ 'OFFSET' => mt_rand( 0, self::MAX_OFFSET ) ]
359
				);
360
				if ( !$row ) {
361
					$tinyQueue = true; // we know the queue must have <= MAX_OFFSET rows
362
					$this->cache->set( $this->getCacheKey( 'small' ), 1, 30 );
363
					continue; // use job_random
364
				}
365
			}
366
367
			if ( $row ) { // claim the job
368
				$dbw->update( 'job', // update by PK
369
					[
370
						'job_token' => $uuid,
371
						'job_token_timestamp' => $dbw->timestamp(),
372
						'job_attempts = job_attempts+1' ],
373
					[ 'job_cmd' => $this->type, 'job_id' => $row->job_id, 'job_token' => '' ],
374
					__METHOD__
375
				);
376
				// This might get raced out by another runner when claiming the previously
377
				// selected row. The use of job_random should minimize this problem, however.
378
				if ( !$dbw->affectedRows() ) {
379
					$row = false; // raced out
380
				}
381
			} else {
382
				break; // nothing to do
383
			}
384
		} while ( !$row );
385
386
		return $row;
387
	}
388
389
	/**
390
	 * Reserve a row with a single UPDATE without holding row locks over RTTs...
391
	 *
392
	 * @param string $uuid 32 char hex string
393
	 * @return stdClass|bool Row|false
394
	 */
395
	protected function claimOldest( $uuid ) {
396
		$dbw = $this->getMasterDB();
397
398
		$row = false; // the row acquired
399
		do {
400
			if ( $dbw->getType() === 'mysql' ) {
401
				// Per http://bugs.mysql.com/bug.php?id=6980, we can't use subqueries on the
402
				// same table being changed in an UPDATE query in MySQL (gives Error: 1093).
403
				// Oracle and Postgre have no such limitation. However, MySQL offers an
404
				// alternative here by supporting ORDER BY + LIMIT for UPDATE queries.
405
				$dbw->query( "UPDATE {$dbw->tableName( 'job' )} " .
0 ignored issues
show
Documentation Bug introduced by
The method tableName does not exist on object<DBConnRef>? Since you implemented __call, maybe consider adding a @method annotation.

If you implement __call and you know which methods are available, you can improve IDE auto-completion and static analysis by adding a @method annotation to the class.

This is often the case, when __call is implemented by a parent class and only the child class knows which methods exist:

class ParentClass {
    private $data = array();

    public function __call($method, array $args) {
        if (0 === strpos($method, 'get')) {
            return $this->data[strtolower(substr($method, 3))];
        }

        throw new \LogicException(sprintf('Unsupported method: %s', $method));
    }
}

/**
 * If this class knows which fields exist, you can specify the methods here:
 *
 * @method string getName()
 */
class SomeClass extends ParentClass { }
Loading history...
406
					"SET " .
407
						"job_token = {$dbw->addQuotes( $uuid ) }, " .
408
						"job_token_timestamp = {$dbw->addQuotes( $dbw->timestamp() )}, " .
409
						"job_attempts = job_attempts+1 " .
410
					"WHERE ( " .
411
						"job_cmd = {$dbw->addQuotes( $this->type )} " .
412
						"AND job_token = {$dbw->addQuotes( '' )} " .
413
					") ORDER BY job_id ASC LIMIT 1",
414
					__METHOD__
415
				);
416
			} else {
417
				// Use a subquery to find the job, within an UPDATE to claim it.
418
				// This uses as much of the DB wrapper functions as possible.
419
				$dbw->update( 'job',
420
					[
421
						'job_token' => $uuid,
422
						'job_token_timestamp' => $dbw->timestamp(),
423
						'job_attempts = job_attempts+1' ],
424
					[ 'job_id = (' .
425
						$dbw->selectSQLText( 'job', 'job_id',
426
							[ 'job_cmd' => $this->type, 'job_token' => '' ],
427
							__METHOD__,
428
							[ 'ORDER BY' => 'job_id ASC', 'LIMIT' => 1 ] ) .
429
						')'
430
					],
431
					__METHOD__
432
				);
433
			}
434
			// Fetch any row that we just reserved...
435
			if ( $dbw->affectedRows() ) {
436
				$row = $dbw->selectRow( 'job', self::selectFields(),
437
					[ 'job_cmd' => $this->type, 'job_token' => $uuid ], __METHOD__
438
				);
439
				if ( !$row ) { // raced out by duplicate job removal
440
					wfDebug( "Row deleted as duplicate by another process.\n" );
441
				}
442
			} else {
443
				break; // nothing to do
444
			}
445
		} while ( !$row );
446
447
		return $row;
448
	}
449
450
	/**
451
	 * @see JobQueue::doAck()
452
	 * @param Job $job
453
	 * @throws MWException
454
	 */
455
	protected function doAck( Job $job ) {
456
		if ( !isset( $job->metadata['id'] ) ) {
457
			throw new MWException( "Job of type '{$job->getType()}' has no ID." );
458
		}
459
460
		$dbw = $this->getMasterDB();
461
		try {
462
			$autoTrx = $dbw->getFlag( DBO_TRX ); // get current setting
463
			$dbw->clearFlag( DBO_TRX ); // make each query its own transaction
464
			$scopedReset = new ScopedCallback( function () use ( $dbw, $autoTrx ) {
465
				$dbw->setFlag( $autoTrx ? DBO_TRX : 0 ); // restore old setting
466
			} );
467
468
			// Delete a row with a single DELETE without holding row locks over RTTs...
469
			$dbw->delete( 'job',
470
				[ 'job_cmd' => $this->type, 'job_id' => $job->metadata['id'] ], __METHOD__ );
471
472
			JobQueue::incrStats( 'acks', $this->type );
473
		} catch ( DBError $e ) {
474
			$this->throwDBException( $e );
475
		}
476
	}
477
478
	/**
479
	 * @see JobQueue::doDeduplicateRootJob()
480
	 * @param IJobSpecification $job
481
	 * @throws MWException
482
	 * @return bool
483
	 */
484
	protected function doDeduplicateRootJob( IJobSpecification $job ) {
485
		$params = $job->getParams();
486
		if ( !isset( $params['rootJobSignature'] ) ) {
487
			throw new MWException( "Cannot register root job; missing 'rootJobSignature'." );
488
		} elseif ( !isset( $params['rootJobTimestamp'] ) ) {
489
			throw new MWException( "Cannot register root job; missing 'rootJobTimestamp'." );
490
		}
491
		$key = $this->getRootJobCacheKey( $params['rootJobSignature'] );
492
		// Callers should call batchInsert() and then this function so that if the insert
493
		// fails, the de-duplication registration will be aborted. Since the insert is
494
		// deferred till "transaction idle", do the same here, so that the ordering is
495
		// maintained. Having only the de-duplication registration succeed would cause
496
		// jobs to become no-ops without any actual jobs that made them redundant.
497
		$dbw = $this->getMasterDB();
498
		$cache = $this->dupCache;
499
		$dbw->onTransactionIdle(
500
			function () use ( $cache, $params, $key, $dbw ) {
501
				$timestamp = $cache->get( $key ); // current last timestamp of this job
502
				if ( $timestamp && $timestamp >= $params['rootJobTimestamp'] ) {
503
					return true; // a newer version of this root job was enqueued
504
				}
505
506
				// Update the timestamp of the last root job started at the location...
507
				return $cache->set( $key, $params['rootJobTimestamp'], JobQueueDB::ROOTJOB_TTL );
508
			},
509
			__METHOD__
510
		);
511
512
		return true;
513
	}
514
515
	/**
516
	 * @see JobQueue::doDelete()
517
	 * @return bool
518
	 */
519
	protected function doDelete() {
520
		$dbw = $this->getMasterDB();
521
		try {
522
			$dbw->delete( 'job', [ 'job_cmd' => $this->type ] );
523
		} catch ( DBError $e ) {
524
			$this->throwDBException( $e );
525
		}
526
527
		return true;
528
	}
529
530
	/**
531
	 * @see JobQueue::doWaitForBackups()
532
	 * @return void
533
	 */
534
	protected function doWaitForBackups() {
535
		$lbFactory = MediaWikiServices::getInstance()->getDBLoadBalancerFactory();
536
		$lbFactory->waitForReplication( [ 'wiki' => $this->wiki, 'cluster' => $this->cluster ] );
537
	}
538
539
	/**
540
	 * @return void
541
	 */
542
	protected function doFlushCaches() {
543
		foreach ( [ 'size', 'acquiredcount' ] as $type ) {
544
			$this->cache->delete( $this->getCacheKey( $type ) );
545
		}
546
	}
547
548
	/**
549
	 * @see JobQueue::getAllQueuedJobs()
550
	 * @return Iterator
551
	 */
552
	public function getAllQueuedJobs() {
553
		return $this->getJobIterator( [ 'job_cmd' => $this->getType(), 'job_token' => '' ] );
554
	}
555
556
	/**
557
	 * @see JobQueue::getAllAcquiredJobs()
558
	 * @return Iterator
559
	 */
560
	public function getAllAcquiredJobs() {
561
		return $this->getJobIterator( [ 'job_cmd' => $this->getType(), "job_token > ''" ] );
562
	}
563
564
	/**
565
	 * @param array $conds Query conditions
566
	 * @return Iterator
567
	 */
568
	protected function getJobIterator( array $conds ) {
569
		$dbr = $this->getSlaveDB();
570
		try {
571
			return new MappedIterator(
572
				$dbr->select( 'job', self::selectFields(), $conds ),
0 ignored issues
show
It seems like $dbr->select('job', self::selectFields(), $conds) targeting DBConnRef::select() can also be of type boolean; however, MappedIterator::__construct() does only seem to accept object<Iterator>|array, maybe add an additional type check?

This check looks at variables that are passed out again to other methods.

If the outgoing method call has stricter type requirements than the method itself, an issue is raised.

An additional type check may prevent trouble.

Loading history...
573
				function ( $row ) {
574
					$job = Job::factory(
575
						$row->job_cmd,
576
						Title::makeTitle( $row->job_namespace, $row->job_title ),
577
						strlen( $row->job_params ) ? unserialize( $row->job_params ) : []
578
					);
579
					$job->metadata['id'] = $row->job_id;
580
					$job->metadata['timestamp'] = $row->job_timestamp;
581
582
					return $job;
583
				}
584
			);
585
		} catch ( DBError $e ) {
586
			$this->throwDBException( $e );
587
		}
588
	}
589
590
	public function getCoalesceLocationInternal() {
591
		return $this->cluster
592
			? "DBCluster:{$this->cluster}:{$this->wiki}"
593
			: "LBFactory:{$this->wiki}";
594
	}
595
596
	protected function doGetSiblingQueuesWithJobs( array $types ) {
597
		$dbr = $this->getSlaveDB();
598
		// @note: this does not check whether the jobs are claimed or not.
599
		// This is useful so JobQueueGroup::pop() also sees queues that only
600
		// have stale jobs. This lets recycleAndDeleteStaleJobs() re-enqueue
601
		// failed jobs so that they can be popped again for that edge case.
602
		$res = $dbr->select( 'job', 'DISTINCT job_cmd',
603
			[ 'job_cmd' => $types ], __METHOD__ );
604
605
		$types = [];
606
		foreach ( $res as $row ) {
607
			$types[] = $row->job_cmd;
608
		}
609
610
		return $types;
611
	}
612
613
	protected function doGetSiblingQueueSizes( array $types ) {
614
		$dbr = $this->getSlaveDB();
615
		$res = $dbr->select( 'job', [ 'job_cmd', 'COUNT(*) AS count' ],
616
			[ 'job_cmd' => $types ], __METHOD__, [ 'GROUP BY' => 'job_cmd' ] );
617
618
		$sizes = [];
619
		foreach ( $res as $row ) {
620
			$sizes[$row->job_cmd] = (int)$row->count;
621
		}
622
623
		return $sizes;
624
	}
625
626
	/**
627
	 * Recycle or destroy any jobs that have been claimed for too long
628
	 *
629
	 * @return int Number of jobs recycled/deleted
630
	 */
631
	public function recycleAndDeleteStaleJobs() {
632
		$now = time();
633
		$count = 0; // affected rows
634
		$dbw = $this->getMasterDB();
635
636
		try {
637
			if ( !$dbw->lock( "jobqueue-recycle-{$this->type}", __METHOD__, 1 ) ) {
638
				return $count; // already in progress
639
			}
640
641
			// Remove claims on jobs acquired for too long if enabled...
642
			if ( $this->claimTTL > 0 ) {
643
				$claimCutoff = $dbw->timestamp( $now - $this->claimTTL );
644
				// Get the IDs of jobs that have be claimed but not finished after too long.
645
				// These jobs can be recycled into the queue by expiring the claim. Selecting
646
				// the IDs first means that the UPDATE can be done by primary key (less deadlocks).
647
				$res = $dbw->select( 'job', 'job_id',
648
					[
649
						'job_cmd' => $this->type,
650
						"job_token != {$dbw->addQuotes( '' )}", // was acquired
651
						"job_token_timestamp < {$dbw->addQuotes( $claimCutoff )}", // stale
652
						"job_attempts < {$dbw->addQuotes( $this->maxTries )}" ], // retries left
653
					__METHOD__
654
				);
655
				$ids = array_map(
656
					function ( $o ) {
657
						return $o->job_id;
658
					}, iterator_to_array( $res )
659
				);
660
				if ( count( $ids ) ) {
661
					// Reset job_token for these jobs so that other runners will pick them up.
662
					// Set the timestamp to the current time, as it is useful to now that the job
663
					// was already tried before (the timestamp becomes the "released" time).
664
					$dbw->update( 'job',
665
						[
666
							'job_token' => '',
667
							'job_token_timestamp' => $dbw->timestamp( $now ) ], // time of release
668
						[
669
							'job_id' => $ids ],
670
						__METHOD__
671
					);
672
					$affected = $dbw->affectedRows();
673
					$count += $affected;
674
					JobQueue::incrStats( 'recycles', $this->type, $affected );
675
					$this->aggr->notifyQueueNonEmpty( $this->wiki, $this->type );
676
				}
677
			}
678
679
			// Just destroy any stale jobs...
680
			$pruneCutoff = $dbw->timestamp( $now - self::MAX_AGE_PRUNE );
681
			$conds = [
682
				'job_cmd' => $this->type,
683
				"job_token != {$dbw->addQuotes( '' )}", // was acquired
684
				"job_token_timestamp < {$dbw->addQuotes( $pruneCutoff )}" // stale
685
			];
686
			if ( $this->claimTTL > 0 ) { // only prune jobs attempted too many times...
687
				$conds[] = "job_attempts >= {$dbw->addQuotes( $this->maxTries )}";
688
			}
689
			// Get the IDs of jobs that are considered stale and should be removed. Selecting
690
			// the IDs first means that the UPDATE can be done by primary key (less deadlocks).
691
			$res = $dbw->select( 'job', 'job_id', $conds, __METHOD__ );
692
			$ids = array_map(
693
				function ( $o ) {
694
					return $o->job_id;
695
				}, iterator_to_array( $res )
696
			);
697
			if ( count( $ids ) ) {
698
				$dbw->delete( 'job', [ 'job_id' => $ids ], __METHOD__ );
699
				$affected = $dbw->affectedRows();
700
				$count += $affected;
701
				JobQueue::incrStats( 'abandons', $this->type, $affected );
702
			}
703
704
			$dbw->unlock( "jobqueue-recycle-{$this->type}", __METHOD__ );
705
		} catch ( DBError $e ) {
706
			$this->throwDBException( $e );
707
		}
708
709
		return $count;
710
	}
711
712
	/**
713
	 * @param IJobSpecification $job
714
	 * @return array
715
	 */
716 View Code Duplication
	protected function insertFields( IJobSpecification $job ) {
717
		$dbw = $this->getMasterDB();
718
719
		return [
720
			// Fields that describe the nature of the job
721
			'job_cmd' => $job->getType(),
722
			'job_namespace' => $job->getTitle()->getNamespace(),
723
			'job_title' => $job->getTitle()->getDBkey(),
724
			'job_params' => self::makeBlob( $job->getParams() ),
725
			// Additional job metadata
726
			'job_id' => $dbw->nextSequenceValue( 'job_job_id_seq' ),
727
			'job_timestamp' => $dbw->timestamp(),
728
			'job_sha1' => Wikimedia\base_convert(
729
				sha1( serialize( $job->getDeduplicationInfo() ) ),
730
				16, 36, 31
731
			),
732
			'job_random' => mt_rand( 0, self::MAX_JOB_RANDOM )
733
		];
734
	}
735
736
	/**
737
	 * @throws JobQueueConnectionError
738
	 * @return DBConnRef
739
	 */
740
	protected function getSlaveDB() {
741
		try {
742
			return $this->getDB( DB_REPLICA );
743
		} catch ( DBConnectionError $e ) {
744
			throw new JobQueueConnectionError( "DBConnectionError:" . $e->getMessage() );
745
		}
746
	}
747
748
	/**
749
	 * @throws JobQueueConnectionError
750
	 * @return DBConnRef
751
	 */
752
	protected function getMasterDB() {
753
		try {
754
			return $this->getDB( DB_MASTER );
755
		} catch ( DBConnectionError $e ) {
756
			throw new JobQueueConnectionError( "DBConnectionError:" . $e->getMessage() );
757
		}
758
	}
759
760
	/**
761
	 * @param int $index (DB_REPLICA/DB_MASTER)
762
	 * @return DBConnRef
763
	 */
764
	protected function getDB( $index ) {
765
		$lbFactory = MediaWikiServices::getInstance()->getDBLoadBalancerFactory();
766
		$lb = ( $this->cluster !== false )
767
			? $lbFactory->getExternalLB( $this->cluster, $this->wiki )
0 ignored issues
show
It seems like $this->cluster can also be of type boolean; however, LBFactory::getExternalLB() does only seem to accept string, maybe add an additional type check?

If a method or function can return multiple different values and unless you are sure that you only can receive a single value in this context, we recommend to add an additional type check:

/**
 * @return array|string
 */
function returnsDifferentValues($x) {
    if ($x) {
        return 'foo';
    }

    return array();
}

$x = returnsDifferentValues($y);
if (is_array($x)) {
    // $x is an array.
}

If this a common case that PHP Analyzer should handle natively, please let us know by opening an issue.

Loading history...
768
			: $lbFactory->getMainLB( $this->wiki );
769
770
		return $lb->getConnectionRef( $index, [], $this->wiki );
771
	}
772
773
	/**
774
	 * @param string $property
775
	 * @return string
776
	 */
777
	private function getCacheKey( $property ) {
778
		list( $db, $prefix ) = wfSplitWikiID( $this->wiki );
779
		$cluster = is_string( $this->cluster ) ? $this->cluster : 'main';
780
781
		return wfForeignMemcKey( $db, $prefix, 'jobqueue', $cluster, $this->type, $property );
782
	}
783
784
	/**
785
	 * @param array|bool $params
786
	 * @return string
787
	 */
788
	protected static function makeBlob( $params ) {
789
		if ( $params !== false ) {
790
			return serialize( $params );
791
		} else {
792
			return '';
793
		}
794
	}
795
796
	/**
797
	 * @param string $blob
798
	 * @return bool|mixed
799
	 */
800
	protected static function extractBlob( $blob ) {
801
		if ( (string)$blob !== '' ) {
802
			return unserialize( $blob );
803
		} else {
804
			return false;
805
		}
806
	}
807
808
	/**
809
	 * @param DBError $e
810
	 * @throws JobQueueError
811
	 */
812
	protected function throwDBException( DBError $e ) {
813
		throw new JobQueueError( get_class( $e ) . ": " . $e->getMessage() );
814
	}
815
816
	/**
817
	 * Return the list of job fields that should be selected.
818
	 * @since 1.23
819
	 * @return array
820
	 */
821
	public static function selectFields() {
822
		return [
823
			'job_id',
824
			'job_cmd',
825
			'job_namespace',
826
			'job_title',
827
			'job_timestamp',
828
			'job_params',
829
			'job_random',
830
			'job_attempts',
831
			'job_token',
832
			'job_token_timestamp',
833
			'job_sha1',
834
		];
835
	}
836
}
837