1
|
|
|
<?php |
2
|
|
|
/** |
3
|
|
|
* Redis-backed job queue code. |
4
|
|
|
* |
5
|
|
|
* This program is free software; you can redistribute it and/or modify |
6
|
|
|
* it under the terms of the GNU General Public License as published by |
7
|
|
|
* the Free Software Foundation; either version 2 of the License, or |
8
|
|
|
* (at your option) any later version. |
9
|
|
|
* |
10
|
|
|
* This program is distributed in the hope that it will be useful, |
11
|
|
|
* but WITHOUT ANY WARRANTY; without even the implied warranty of |
12
|
|
|
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
13
|
|
|
* GNU General Public License for more details. |
14
|
|
|
* |
15
|
|
|
* You should have received a copy of the GNU General Public License along |
16
|
|
|
* with this program; if not, write to the Free Software Foundation, Inc., |
17
|
|
|
* 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. |
18
|
|
|
* http://www.gnu.org/copyleft/gpl.html |
19
|
|
|
* |
20
|
|
|
* @file |
21
|
|
|
* @author Aaron Schulz |
22
|
|
|
*/ |
23
|
|
|
use Psr\Log\LoggerInterface; |
24
|
|
|
|
25
|
|
|
/** |
26
|
|
|
* Class to handle job queues stored in Redis |
27
|
|
|
* |
28
|
|
|
* This is a faster and less resource-intensive job queue than JobQueueDB. |
29
|
|
|
* All data for a queue using this class is placed into one redis server. |
30
|
|
|
* The mediawiki/services/jobrunner background service must be set up and running. |
31
|
|
|
* |
32
|
|
|
* There are eight main redis keys (per queue) used to track jobs: |
33
|
|
|
* - l-unclaimed : A list of job IDs used for ready unclaimed jobs |
34
|
|
|
* - z-claimed : A sorted set of (job ID, UNIX timestamp as score) used for job retries |
35
|
|
|
* - z-abandoned : A sorted set of (job ID, UNIX timestamp as score) used for broken jobs |
36
|
|
|
* - z-delayed : A sorted set of (job ID, UNIX timestamp as score) used for delayed jobs |
37
|
|
|
* - h-idBySha1 : A hash of (SHA1 => job ID) for unclaimed jobs used for de-duplication |
38
|
|
|
* - h-sha1ById : A hash of (job ID => SHA1) for unclaimed jobs used for de-duplication |
39
|
|
|
* - h-attempts : A hash of (job ID => attempt count) used for job claiming/retries |
40
|
|
|
* - h-data : A hash of (job ID => serialized blobs) for job storage |
41
|
|
|
* A job ID can be in only one of z-delayed, l-unclaimed, z-claimed, and z-abandoned. |
42
|
|
|
* If an ID appears in any of those lists, it should have a h-data entry for its ID. |
43
|
|
|
* If a job has a SHA1 de-duplication value and its ID is in l-unclaimed or z-delayed, then |
44
|
|
|
* there should be no other such jobs with that SHA1. Every h-idBySha1 entry has an h-sha1ById |
45
|
|
|
* entry and every h-sha1ById must refer to an ID that is l-unclaimed. If a job has its |
46
|
|
|
* ID in z-claimed or z-abandoned, then it must also have an h-attempts entry for its ID. |
47
|
|
|
* |
48
|
|
|
* The following keys are used to track queue states: |
49
|
|
|
* - s-queuesWithJobs : A set of all queues with non-abandoned jobs |
50
|
|
|
* |
51
|
|
|
* The background service takes care of undelaying, recycling, and pruning jobs as well as |
52
|
|
|
* removing s-queuesWithJobs entries as queues empty. |
53
|
|
|
* |
54
|
|
|
* Additionally, "rootjob:* keys track "root jobs" used for additional de-duplication. |
55
|
|
|
* Aside from root job keys, all keys have no expiry, and are only removed when jobs are run. |
56
|
|
|
* All the keys are prefixed with the relevant wiki ID information. |
57
|
|
|
* |
58
|
|
|
* This class requires Redis 2.6 as it makes use Lua scripts for fast atomic operations. |
59
|
|
|
* Additionally, it should be noted that redis has different persistence modes, such |
60
|
|
|
* as rdb snapshots, journaling, and no persistence. Appropriate configuration should be |
61
|
|
|
* made on the servers based on what queues are using it and what tolerance they have. |
62
|
|
|
* |
63
|
|
|
* @ingroup JobQueue |
64
|
|
|
* @ingroup Redis |
65
|
|
|
* @since 1.22 |
66
|
|
|
*/ |
67
|
|
|
class JobQueueRedis extends JobQueue { |
68
|
|
|
/** @var RedisConnectionPool */ |
69
|
|
|
protected $redisPool; |
70
|
|
|
/** @var LoggerInterface */ |
71
|
|
|
protected $logger; |
72
|
|
|
|
73
|
|
|
/** @var string Server address */ |
74
|
|
|
protected $server; |
75
|
|
|
/** @var string Compression method to use */ |
76
|
|
|
protected $compression; |
77
|
|
|
|
78
|
|
|
/** |
79
|
|
|
* @param array $params Possible keys: |
80
|
|
|
* - redisConfig : An array of parameters to RedisConnectionPool::__construct(). |
81
|
|
|
* Note that the serializer option is ignored as "none" is always used. |
82
|
|
|
* - redisServer : A hostname/port combination or the absolute path of a UNIX socket. |
83
|
|
|
* If a hostname is specified but no port, the standard port number |
84
|
|
|
* 6379 will be used. Required. |
85
|
|
|
* - compression : The type of compression to use; one of (none,gzip). |
86
|
|
|
* - daemonized : Set to true if the redisJobRunnerService runs in the background. |
87
|
|
|
* This will disable job recycling/undelaying from the MediaWiki side |
88
|
|
|
* to avoid redundance and out-of-sync configuration. |
89
|
|
|
* @throws InvalidArgumentException |
90
|
|
|
*/ |
91
|
|
|
public function __construct( array $params ) { |
92
|
|
|
parent::__construct( $params ); |
93
|
|
|
$params['redisConfig']['serializer'] = 'none'; // make it easy to use Lua |
94
|
|
|
$this->server = $params['redisServer']; |
95
|
|
|
$this->compression = isset( $params['compression'] ) ? $params['compression'] : 'none'; |
96
|
|
|
$this->redisPool = RedisConnectionPool::singleton( $params['redisConfig'] ); |
97
|
|
|
if ( empty( $params['daemonized'] ) ) { |
98
|
|
|
throw new InvalidArgumentException( |
99
|
|
|
"Non-daemonized mode is no longer supported. Please install the " . |
100
|
|
|
"mediawiki/services/jobrunner service and update \$wgJobTypeConf as needed." ); |
101
|
|
|
} |
102
|
|
|
$this->logger = \MediaWiki\Logger\LoggerFactory::getInstance( 'redis' ); |
103
|
|
|
} |
104
|
|
|
|
105
|
|
|
protected function supportedOrders() { |
106
|
|
|
return [ 'timestamp', 'fifo' ]; |
107
|
|
|
} |
108
|
|
|
|
109
|
|
|
protected function optimalOrder() { |
110
|
|
|
return 'fifo'; |
111
|
|
|
} |
112
|
|
|
|
113
|
|
|
protected function supportsDelayedJobs() { |
114
|
|
|
return true; |
115
|
|
|
} |
116
|
|
|
|
117
|
|
|
/** |
118
|
|
|
* @see JobQueue::doIsEmpty() |
119
|
|
|
* @return bool |
120
|
|
|
* @throws JobQueueError |
121
|
|
|
*/ |
122
|
|
|
protected function doIsEmpty() { |
123
|
|
|
return $this->doGetSize() == 0; |
124
|
|
|
} |
125
|
|
|
|
126
|
|
|
/** |
127
|
|
|
* @see JobQueue::doGetSize() |
128
|
|
|
* @return int |
129
|
|
|
* @throws JobQueueError |
130
|
|
|
*/ |
131
|
|
|
protected function doGetSize() { |
132
|
|
|
$conn = $this->getConnection(); |
133
|
|
|
try { |
134
|
|
|
return $conn->lSize( $this->getQueueKey( 'l-unclaimed' ) ); |
|
|
|
|
135
|
|
|
} catch ( RedisException $e ) { |
|
|
|
|
136
|
|
|
$this->throwRedisException( $conn, $e ); |
137
|
|
|
} |
138
|
|
|
} |
139
|
|
|
|
140
|
|
|
/** |
141
|
|
|
* @see JobQueue::doGetAcquiredCount() |
142
|
|
|
* @return int |
143
|
|
|
* @throws JobQueueError |
144
|
|
|
*/ |
145
|
|
|
protected function doGetAcquiredCount() { |
146
|
|
|
$conn = $this->getConnection(); |
147
|
|
|
try { |
148
|
|
|
$conn->multi( Redis::PIPELINE ); |
|
|
|
|
149
|
|
|
$conn->zSize( $this->getQueueKey( 'z-claimed' ) ); |
|
|
|
|
150
|
|
|
$conn->zSize( $this->getQueueKey( 'z-abandoned' ) ); |
|
|
|
|
151
|
|
|
|
152
|
|
|
return array_sum( $conn->exec() ); |
|
|
|
|
153
|
|
|
} catch ( RedisException $e ) { |
|
|
|
|
154
|
|
|
$this->throwRedisException( $conn, $e ); |
155
|
|
|
} |
156
|
|
|
} |
157
|
|
|
|
158
|
|
|
/** |
159
|
|
|
* @see JobQueue::doGetDelayedCount() |
160
|
|
|
* @return int |
161
|
|
|
* @throws JobQueueError |
162
|
|
|
*/ |
163
|
|
View Code Duplication |
protected function doGetDelayedCount() { |
164
|
|
|
$conn = $this->getConnection(); |
165
|
|
|
try { |
166
|
|
|
return $conn->zSize( $this->getQueueKey( 'z-delayed' ) ); |
|
|
|
|
167
|
|
|
} catch ( RedisException $e ) { |
|
|
|
|
168
|
|
|
$this->throwRedisException( $conn, $e ); |
169
|
|
|
} |
170
|
|
|
} |
171
|
|
|
|
172
|
|
|
/** |
173
|
|
|
* @see JobQueue::doGetAbandonedCount() |
174
|
|
|
* @return int |
175
|
|
|
* @throws JobQueueError |
176
|
|
|
*/ |
177
|
|
View Code Duplication |
protected function doGetAbandonedCount() { |
178
|
|
|
$conn = $this->getConnection(); |
179
|
|
|
try { |
180
|
|
|
return $conn->zSize( $this->getQueueKey( 'z-abandoned' ) ); |
|
|
|
|
181
|
|
|
} catch ( RedisException $e ) { |
|
|
|
|
182
|
|
|
$this->throwRedisException( $conn, $e ); |
183
|
|
|
} |
184
|
|
|
} |
185
|
|
|
|
186
|
|
|
/** |
187
|
|
|
* @see JobQueue::doBatchPush() |
188
|
|
|
* @param IJobSpecification[] $jobs |
189
|
|
|
* @param int $flags |
190
|
|
|
* @return void |
191
|
|
|
* @throws JobQueueError |
192
|
|
|
*/ |
193
|
|
|
protected function doBatchPush( array $jobs, $flags ) { |
194
|
|
|
// Convert the jobs into field maps (de-duplicated against each other) |
195
|
|
|
$items = []; // (job ID => job fields map) |
196
|
|
|
foreach ( $jobs as $job ) { |
197
|
|
|
$item = $this->getNewJobFields( $job ); |
198
|
|
|
if ( strlen( $item['sha1'] ) ) { // hash identifier => de-duplicate |
199
|
|
|
$items[$item['sha1']] = $item; |
200
|
|
|
} else { |
201
|
|
|
$items[$item['uuid']] = $item; |
202
|
|
|
} |
203
|
|
|
} |
204
|
|
|
|
205
|
|
|
if ( !count( $items ) ) { |
206
|
|
|
return; // nothing to do |
207
|
|
|
} |
208
|
|
|
|
209
|
|
|
$conn = $this->getConnection(); |
210
|
|
|
try { |
211
|
|
|
// Actually push the non-duplicate jobs into the queue... |
212
|
|
|
if ( $flags & self::QOS_ATOMIC ) { |
213
|
|
|
$batches = [ $items ]; // all or nothing |
214
|
|
|
} else { |
215
|
|
|
$batches = array_chunk( $items, 100 ); // avoid tying up the server |
216
|
|
|
} |
217
|
|
|
$failed = 0; |
218
|
|
|
$pushed = 0; |
219
|
|
|
foreach ( $batches as $itemBatch ) { |
220
|
|
|
$added = $this->pushBlobs( $conn, $itemBatch ); |
221
|
|
|
if ( is_int( $added ) ) { |
222
|
|
|
$pushed += $added; |
223
|
|
|
} else { |
224
|
|
|
$failed += count( $itemBatch ); |
225
|
|
|
} |
226
|
|
|
} |
227
|
|
|
JobQueue::incrStats( 'inserts', $this->type, count( $items ) ); |
228
|
|
|
JobQueue::incrStats( 'inserts_actual', $this->type, $pushed ); |
229
|
|
|
JobQueue::incrStats( 'dupe_inserts', $this->type, |
230
|
|
|
count( $items ) - $failed - $pushed ); |
231
|
|
|
if ( $failed > 0 ) { |
232
|
|
|
$err = "Could not insert {$failed} {$this->type} job(s)."; |
233
|
|
|
wfDebugLog( 'JobQueueRedis', $err ); |
234
|
|
|
throw new RedisException( $err ); |
235
|
|
|
} |
236
|
|
|
} catch ( RedisException $e ) { |
|
|
|
|
237
|
|
|
$this->throwRedisException( $conn, $e ); |
238
|
|
|
} |
239
|
|
|
} |
240
|
|
|
|
241
|
|
|
/** |
242
|
|
|
* @param RedisConnRef $conn |
243
|
|
|
* @param array $items List of results from JobQueueRedis::getNewJobFields() |
244
|
|
|
* @return int Number of jobs inserted (duplicates are ignored) |
245
|
|
|
* @throws RedisException |
246
|
|
|
*/ |
247
|
|
|
protected function pushBlobs( RedisConnRef $conn, array $items ) { |
248
|
|
|
$args = [ $this->encodeQueueName() ]; |
249
|
|
|
// Next args come in 4s ([id, sha1, rtime, blob [, id, sha1, rtime, blob ... ] ] ) |
250
|
|
|
foreach ( $items as $item ) { |
251
|
|
|
$args[] = (string)$item['uuid']; |
252
|
|
|
$args[] = (string)$item['sha1']; |
253
|
|
|
$args[] = (string)$item['rtimestamp']; |
254
|
|
|
$args[] = (string)$this->serialize( $item ); |
255
|
|
|
} |
256
|
|
|
static $script = |
257
|
|
|
/** @lang Lua */ |
258
|
|
|
<<<LUA |
259
|
|
|
local kUnclaimed, kSha1ById, kIdBySha1, kDelayed, kData, kQwJobs = unpack(KEYS) |
260
|
|
|
-- First argument is the queue ID |
261
|
|
|
local queueId = ARGV[1] |
262
|
|
|
-- Next arguments all come in 4s (one per job) |
263
|
|
|
local variadicArgCount = #ARGV - 1 |
264
|
|
|
if variadicArgCount % 4 ~= 0 then |
265
|
|
|
return redis.error_reply('Unmatched arguments') |
266
|
|
|
end |
267
|
|
|
-- Insert each job into this queue as needed |
268
|
|
|
local pushed = 0 |
269
|
|
|
for i = 2,#ARGV,4 do |
270
|
|
|
local id,sha1,rtimestamp,blob = ARGV[i],ARGV[i+1],ARGV[i+2],ARGV[i+3] |
271
|
|
|
if sha1 == '' or redis.call('hExists',kIdBySha1,sha1) == 0 then |
272
|
|
|
if 1*rtimestamp > 0 then |
273
|
|
|
-- Insert into delayed queue (release time as score) |
274
|
|
|
redis.call('zAdd',kDelayed,rtimestamp,id) |
275
|
|
|
else |
276
|
|
|
-- Insert into unclaimed queue |
277
|
|
|
redis.call('lPush',kUnclaimed,id) |
278
|
|
|
end |
279
|
|
|
if sha1 ~= '' then |
280
|
|
|
redis.call('hSet',kSha1ById,id,sha1) |
281
|
|
|
redis.call('hSet',kIdBySha1,sha1,id) |
282
|
|
|
end |
283
|
|
|
redis.call('hSet',kData,id,blob) |
284
|
|
|
pushed = pushed + 1 |
285
|
|
|
end |
286
|
|
|
end |
287
|
|
|
-- Mark this queue as having jobs |
288
|
|
|
redis.call('sAdd',kQwJobs,queueId) |
289
|
|
|
return pushed |
290
|
|
|
LUA; |
291
|
|
|
return $conn->luaEval( $script, |
292
|
|
|
array_merge( |
293
|
|
|
[ |
294
|
|
|
$this->getQueueKey( 'l-unclaimed' ), # KEYS[1] |
295
|
|
|
$this->getQueueKey( 'h-sha1ById' ), # KEYS[2] |
296
|
|
|
$this->getQueueKey( 'h-idBySha1' ), # KEYS[3] |
297
|
|
|
$this->getQueueKey( 'z-delayed' ), # KEYS[4] |
298
|
|
|
$this->getQueueKey( 'h-data' ), # KEYS[5] |
299
|
|
|
$this->getGlobalKey( 's-queuesWithJobs' ), # KEYS[6] |
300
|
|
|
], |
301
|
|
|
$args |
302
|
|
|
), |
303
|
|
|
6 # number of first argument(s) that are keys |
304
|
|
|
); |
305
|
|
|
} |
306
|
|
|
|
307
|
|
|
/** |
308
|
|
|
* @see JobQueue::doPop() |
309
|
|
|
* @return Job|bool |
310
|
|
|
* @throws JobQueueError |
311
|
|
|
*/ |
312
|
|
|
protected function doPop() { |
313
|
|
|
$job = false; |
314
|
|
|
|
315
|
|
|
$conn = $this->getConnection(); |
316
|
|
|
try { |
317
|
|
|
do { |
318
|
|
|
$blob = $this->popAndAcquireBlob( $conn ); |
319
|
|
|
if ( !is_string( $blob ) ) { |
320
|
|
|
break; // no jobs; nothing to do |
321
|
|
|
} |
322
|
|
|
|
323
|
|
|
JobQueue::incrStats( 'pops', $this->type ); |
324
|
|
|
$item = $this->unserialize( $blob ); |
325
|
|
|
if ( $item === false ) { |
326
|
|
|
wfDebugLog( 'JobQueueRedis', "Could not unserialize {$this->type} job." ); |
327
|
|
|
continue; |
328
|
|
|
} |
329
|
|
|
|
330
|
|
|
// If $item is invalid, the runner loop recyling will cleanup as needed |
331
|
|
|
$job = $this->getJobFromFields( $item ); // may be false |
332
|
|
|
} while ( !$job ); // job may be false if invalid |
333
|
|
|
} catch ( RedisException $e ) { |
|
|
|
|
334
|
|
|
$this->throwRedisException( $conn, $e ); |
335
|
|
|
} |
336
|
|
|
|
337
|
|
|
return $job; |
338
|
|
|
} |
339
|
|
|
|
340
|
|
|
/** |
341
|
|
|
* @param RedisConnRef $conn |
342
|
|
|
* @return array Serialized string or false |
343
|
|
|
* @throws RedisException |
344
|
|
|
*/ |
345
|
|
|
protected function popAndAcquireBlob( RedisConnRef $conn ) { |
346
|
|
|
static $script = |
347
|
|
|
/** @lang Lua */ |
348
|
|
|
<<<LUA |
349
|
|
|
local kUnclaimed, kSha1ById, kIdBySha1, kClaimed, kAttempts, kData = unpack(KEYS) |
350
|
|
|
local rTime = unpack(ARGV) |
351
|
|
|
-- Pop an item off the queue |
352
|
|
|
local id = redis.call('rPop',kUnclaimed) |
353
|
|
|
if not id then |
354
|
|
|
return false |
355
|
|
|
end |
356
|
|
|
-- Allow new duplicates of this job |
357
|
|
|
local sha1 = redis.call('hGet',kSha1ById,id) |
358
|
|
|
if sha1 then redis.call('hDel',kIdBySha1,sha1) end |
359
|
|
|
redis.call('hDel',kSha1ById,id) |
360
|
|
|
-- Mark the jobs as claimed and return it |
361
|
|
|
redis.call('zAdd',kClaimed,rTime,id) |
362
|
|
|
redis.call('hIncrBy',kAttempts,id,1) |
363
|
|
|
return redis.call('hGet',kData,id) |
364
|
|
|
LUA; |
365
|
|
|
return $conn->luaEval( $script, |
366
|
|
|
[ |
367
|
|
|
$this->getQueueKey( 'l-unclaimed' ), # KEYS[1] |
368
|
|
|
$this->getQueueKey( 'h-sha1ById' ), # KEYS[2] |
369
|
|
|
$this->getQueueKey( 'h-idBySha1' ), # KEYS[3] |
370
|
|
|
$this->getQueueKey( 'z-claimed' ), # KEYS[4] |
371
|
|
|
$this->getQueueKey( 'h-attempts' ), # KEYS[5] |
372
|
|
|
$this->getQueueKey( 'h-data' ), # KEYS[6] |
373
|
|
|
time(), # ARGV[1] (injected to be replication-safe) |
374
|
|
|
], |
375
|
|
|
6 # number of first argument(s) that are keys |
376
|
|
|
); |
377
|
|
|
} |
378
|
|
|
|
379
|
|
|
/** |
380
|
|
|
* @see JobQueue::doAck() |
381
|
|
|
* @param Job $job |
382
|
|
|
* @return Job|bool |
383
|
|
|
* @throws UnexpectedValueException |
384
|
|
|
* @throws JobQueueError |
385
|
|
|
*/ |
386
|
|
|
protected function doAck( Job $job ) { |
387
|
|
|
if ( !isset( $job->metadata['uuid'] ) ) { |
388
|
|
|
throw new UnexpectedValueException( "Job of type '{$job->getType()}' has no UUID." ); |
389
|
|
|
} |
390
|
|
|
|
391
|
|
|
$uuid = $job->metadata['uuid']; |
392
|
|
|
$conn = $this->getConnection(); |
393
|
|
|
try { |
394
|
|
|
static $script = |
395
|
|
|
/** @lang Lua */ |
396
|
|
|
<<<LUA |
397
|
|
|
local kClaimed, kAttempts, kData = unpack(KEYS) |
398
|
|
|
local id = unpack(ARGV) |
399
|
|
|
-- Unmark the job as claimed |
400
|
|
|
local removed = redis.call('zRem',kClaimed,id) |
401
|
|
|
-- Check if the job was recycled |
402
|
|
|
if removed == 0 then |
403
|
|
|
return 0 |
404
|
|
|
end |
405
|
|
|
-- Delete the retry data |
406
|
|
|
redis.call('hDel',kAttempts,id) |
407
|
|
|
-- Delete the job data itself |
408
|
|
|
return redis.call('hDel',kData,id) |
409
|
|
|
LUA; |
410
|
|
|
$res = $conn->luaEval( $script, |
411
|
|
|
[ |
412
|
|
|
$this->getQueueKey( 'z-claimed' ), # KEYS[1] |
413
|
|
|
$this->getQueueKey( 'h-attempts' ), # KEYS[2] |
414
|
|
|
$this->getQueueKey( 'h-data' ), # KEYS[3] |
415
|
|
|
$uuid # ARGV[1] |
416
|
|
|
], |
417
|
|
|
3 # number of first argument(s) that are keys |
418
|
|
|
); |
419
|
|
|
|
420
|
|
|
if ( !$res ) { |
421
|
|
|
wfDebugLog( 'JobQueueRedis', "Could not acknowledge {$this->type} job $uuid." ); |
422
|
|
|
|
423
|
|
|
return false; |
424
|
|
|
} |
425
|
|
|
|
426
|
|
|
JobQueue::incrStats( 'acks', $this->type ); |
427
|
|
|
} catch ( RedisException $e ) { |
|
|
|
|
428
|
|
|
$this->throwRedisException( $conn, $e ); |
429
|
|
|
} |
430
|
|
|
|
431
|
|
|
return true; |
432
|
|
|
} |
433
|
|
|
|
434
|
|
|
/** |
435
|
|
|
* @see JobQueue::doDeduplicateRootJob() |
436
|
|
|
* @param IJobSpecification $job |
437
|
|
|
* @return bool |
438
|
|
|
* @throws JobQueueError |
439
|
|
|
* @throws LogicException |
440
|
|
|
*/ |
441
|
|
|
protected function doDeduplicateRootJob( IJobSpecification $job ) { |
442
|
|
|
if ( !$job->hasRootJobParams() ) { |
443
|
|
|
throw new LogicException( "Cannot register root job; missing parameters." ); |
444
|
|
|
} |
445
|
|
|
$params = $job->getRootJobParams(); |
446
|
|
|
|
447
|
|
|
$key = $this->getRootJobCacheKey( $params['rootJobSignature'] ); |
448
|
|
|
|
449
|
|
|
$conn = $this->getConnection(); |
450
|
|
|
try { |
451
|
|
|
$timestamp = $conn->get( $key ); // current last timestamp of this job |
|
|
|
|
452
|
|
|
if ( $timestamp && $timestamp >= $params['rootJobTimestamp'] ) { |
453
|
|
|
return true; // a newer version of this root job was enqueued |
454
|
|
|
} |
455
|
|
|
|
456
|
|
|
// Update the timestamp of the last root job started at the location... |
457
|
|
|
return $conn->set( $key, $params['rootJobTimestamp'], self::ROOTJOB_TTL ); // 2 weeks |
|
|
|
|
458
|
|
|
} catch ( RedisException $e ) { |
|
|
|
|
459
|
|
|
$this->throwRedisException( $conn, $e ); |
460
|
|
|
} |
461
|
|
|
} |
462
|
|
|
|
463
|
|
|
/** |
464
|
|
|
* @see JobQueue::doIsRootJobOldDuplicate() |
465
|
|
|
* @param Job $job |
466
|
|
|
* @return bool |
467
|
|
|
* @throws JobQueueError |
468
|
|
|
*/ |
469
|
|
|
protected function doIsRootJobOldDuplicate( Job $job ) { |
470
|
|
|
if ( !$job->hasRootJobParams() ) { |
471
|
|
|
return false; // job has no de-deplication info |
472
|
|
|
} |
473
|
|
|
$params = $job->getRootJobParams(); |
474
|
|
|
|
475
|
|
|
$conn = $this->getConnection(); |
476
|
|
|
try { |
477
|
|
|
// Get the last time this root job was enqueued |
478
|
|
|
$timestamp = $conn->get( $this->getRootJobCacheKey( $params['rootJobSignature'] ) ); |
|
|
|
|
479
|
|
|
} catch ( RedisException $e ) { |
|
|
|
|
480
|
|
|
$timestamp = false; |
481
|
|
|
$this->throwRedisException( $conn, $e ); |
482
|
|
|
} |
483
|
|
|
|
484
|
|
|
// Check if a new root job was started at the location after this one's... |
485
|
|
|
return ( $timestamp && $timestamp > $params['rootJobTimestamp'] ); |
486
|
|
|
} |
487
|
|
|
|
488
|
|
|
/** |
489
|
|
|
* @see JobQueue::doDelete() |
490
|
|
|
* @return bool |
491
|
|
|
* @throws JobQueueError |
492
|
|
|
*/ |
493
|
|
|
protected function doDelete() { |
494
|
|
|
static $props = [ 'l-unclaimed', 'z-claimed', 'z-abandoned', |
495
|
|
|
'z-delayed', 'h-idBySha1', 'h-sha1ById', 'h-attempts', 'h-data' ]; |
496
|
|
|
|
497
|
|
|
$conn = $this->getConnection(); |
498
|
|
|
try { |
499
|
|
|
$keys = []; |
500
|
|
|
foreach ( $props as $prop ) { |
501
|
|
|
$keys[] = $this->getQueueKey( $prop ); |
502
|
|
|
} |
503
|
|
|
|
504
|
|
|
$ok = ( $conn->delete( $keys ) !== false ); |
|
|
|
|
505
|
|
|
$conn->sRem( $this->getGlobalKey( 's-queuesWithJobs' ), $this->encodeQueueName() ); |
|
|
|
|
506
|
|
|
|
507
|
|
|
return $ok; |
508
|
|
|
} catch ( RedisException $e ) { |
|
|
|
|
509
|
|
|
$this->throwRedisException( $conn, $e ); |
510
|
|
|
} |
511
|
|
|
} |
512
|
|
|
|
513
|
|
|
/** |
514
|
|
|
* @see JobQueue::getAllQueuedJobs() |
515
|
|
|
* @return Iterator |
516
|
|
|
* @throws JobQueueError |
517
|
|
|
*/ |
518
|
|
View Code Duplication |
public function getAllQueuedJobs() { |
519
|
|
|
$conn = $this->getConnection(); |
520
|
|
|
try { |
521
|
|
|
$uids = $conn->lRange( $this->getQueueKey( 'l-unclaimed' ), 0, -1 ); |
|
|
|
|
522
|
|
|
} catch ( RedisException $e ) { |
|
|
|
|
523
|
|
|
$this->throwRedisException( $conn, $e ); |
524
|
|
|
} |
525
|
|
|
|
526
|
|
|
return $this->getJobIterator( $conn, $uids ); |
527
|
|
|
} |
528
|
|
|
|
529
|
|
|
/** |
530
|
|
|
* @see JobQueue::getAllDelayedJobs() |
531
|
|
|
* @return Iterator |
532
|
|
|
* @throws JobQueueError |
533
|
|
|
*/ |
534
|
|
View Code Duplication |
public function getAllDelayedJobs() { |
535
|
|
|
$conn = $this->getConnection(); |
536
|
|
|
try { |
537
|
|
|
$uids = $conn->zRange( $this->getQueueKey( 'z-delayed' ), 0, -1 ); |
|
|
|
|
538
|
|
|
} catch ( RedisException $e ) { |
|
|
|
|
539
|
|
|
$this->throwRedisException( $conn, $e ); |
540
|
|
|
} |
541
|
|
|
|
542
|
|
|
return $this->getJobIterator( $conn, $uids ); |
543
|
|
|
} |
544
|
|
|
|
545
|
|
|
/** |
546
|
|
|
* @see JobQueue::getAllAcquiredJobs() |
547
|
|
|
* @return Iterator |
548
|
|
|
* @throws JobQueueError |
549
|
|
|
*/ |
550
|
|
View Code Duplication |
public function getAllAcquiredJobs() { |
551
|
|
|
$conn = $this->getConnection(); |
552
|
|
|
try { |
553
|
|
|
$uids = $conn->zRange( $this->getQueueKey( 'z-claimed' ), 0, -1 ); |
|
|
|
|
554
|
|
|
} catch ( RedisException $e ) { |
|
|
|
|
555
|
|
|
$this->throwRedisException( $conn, $e ); |
556
|
|
|
} |
557
|
|
|
|
558
|
|
|
return $this->getJobIterator( $conn, $uids ); |
559
|
|
|
} |
560
|
|
|
|
561
|
|
|
/** |
562
|
|
|
* @see JobQueue::getAllAbandonedJobs() |
563
|
|
|
* @return Iterator |
564
|
|
|
* @throws JobQueueError |
565
|
|
|
*/ |
566
|
|
View Code Duplication |
public function getAllAbandonedJobs() { |
567
|
|
|
$conn = $this->getConnection(); |
568
|
|
|
try { |
569
|
|
|
$uids = $conn->zRange( $this->getQueueKey( 'z-abandoned' ), 0, -1 ); |
|
|
|
|
570
|
|
|
} catch ( RedisException $e ) { |
|
|
|
|
571
|
|
|
$this->throwRedisException( $conn, $e ); |
572
|
|
|
} |
573
|
|
|
|
574
|
|
|
return $this->getJobIterator( $conn, $uids ); |
575
|
|
|
} |
576
|
|
|
|
577
|
|
|
/** |
578
|
|
|
* @param RedisConnRef $conn |
579
|
|
|
* @param array $uids List of job UUIDs |
580
|
|
|
* @return MappedIterator |
581
|
|
|
*/ |
582
|
|
|
protected function getJobIterator( RedisConnRef $conn, array $uids ) { |
583
|
|
|
return new MappedIterator( |
584
|
|
|
$uids, |
585
|
|
|
function ( $uid ) use ( $conn ) { |
586
|
|
|
return $this->getJobFromUidInternal( $uid, $conn ); |
587
|
|
|
}, |
588
|
|
|
[ 'accept' => function ( $job ) { |
589
|
|
|
return is_object( $job ); |
590
|
|
|
} ] |
591
|
|
|
); |
592
|
|
|
} |
593
|
|
|
|
594
|
|
|
public function getCoalesceLocationInternal() { |
595
|
|
|
return "RedisServer:" . $this->server; |
596
|
|
|
} |
597
|
|
|
|
598
|
|
|
protected function doGetSiblingQueuesWithJobs( array $types ) { |
599
|
|
|
return array_keys( array_filter( $this->doGetSiblingQueueSizes( $types ) ) ); |
600
|
|
|
} |
601
|
|
|
|
602
|
|
|
protected function doGetSiblingQueueSizes( array $types ) { |
603
|
|
|
$sizes = []; // (type => size) |
604
|
|
|
$types = array_values( $types ); // reindex |
605
|
|
|
$conn = $this->getConnection(); |
606
|
|
|
try { |
607
|
|
|
$conn->multi( Redis::PIPELINE ); |
|
|
|
|
608
|
|
|
foreach ( $types as $type ) { |
609
|
|
|
$conn->lSize( $this->getQueueKey( 'l-unclaimed', $type ) ); |
|
|
|
|
610
|
|
|
} |
611
|
|
|
$res = $conn->exec(); |
|
|
|
|
612
|
|
|
if ( is_array( $res ) ) { |
613
|
|
|
foreach ( $res as $i => $size ) { |
614
|
|
|
$sizes[$types[$i]] = $size; |
615
|
|
|
} |
616
|
|
|
} |
617
|
|
|
} catch ( RedisException $e ) { |
|
|
|
|
618
|
|
|
$this->throwRedisException( $conn, $e ); |
619
|
|
|
} |
620
|
|
|
|
621
|
|
|
return $sizes; |
622
|
|
|
} |
623
|
|
|
|
624
|
|
|
/** |
625
|
|
|
* This function should not be called outside JobQueueRedis |
626
|
|
|
* |
627
|
|
|
* @param string $uid |
628
|
|
|
* @param RedisConnRef $conn |
629
|
|
|
* @return Job|bool Returns false if the job does not exist |
630
|
|
|
* @throws JobQueueError |
631
|
|
|
* @throws UnexpectedValueException |
632
|
|
|
*/ |
633
|
|
|
public function getJobFromUidInternal( $uid, RedisConnRef $conn ) { |
634
|
|
|
try { |
635
|
|
|
$data = $conn->hGet( $this->getQueueKey( 'h-data' ), $uid ); |
|
|
|
|
636
|
|
|
if ( $data === false ) { |
637
|
|
|
return false; // not found |
638
|
|
|
} |
639
|
|
|
$item = $this->unserialize( $data ); |
640
|
|
|
if ( !is_array( $item ) ) { // this shouldn't happen |
641
|
|
|
throw new UnexpectedValueException( "Could not find job with ID '$uid'." ); |
642
|
|
|
} |
643
|
|
|
$title = Title::makeTitle( $item['namespace'], $item['title'] ); |
644
|
|
|
$job = Job::factory( $item['type'], $title, $item['params'] ); |
645
|
|
|
$job->metadata['uuid'] = $item['uuid']; |
646
|
|
|
$job->metadata['timestamp'] = $item['timestamp']; |
647
|
|
|
// Add in attempt count for debugging at showJobs.php |
648
|
|
|
$job->metadata['attempts'] = $conn->hGet( $this->getQueueKey( 'h-attempts' ), $uid ); |
|
|
|
|
649
|
|
|
|
650
|
|
|
return $job; |
651
|
|
|
} catch ( RedisException $e ) { |
|
|
|
|
652
|
|
|
$this->throwRedisException( $conn, $e ); |
653
|
|
|
} |
654
|
|
|
} |
655
|
|
|
|
656
|
|
|
/** |
657
|
|
|
* @return array List of (wiki,type) tuples for queues with non-abandoned jobs |
658
|
|
|
* @throws JobQueueConnectionError |
659
|
|
|
* @throws JobQueueError |
660
|
|
|
*/ |
661
|
|
|
public function getServerQueuesWithJobs() { |
662
|
|
|
$queues = []; |
663
|
|
|
|
664
|
|
|
$conn = $this->getConnection(); |
665
|
|
|
try { |
666
|
|
|
$set = $conn->sMembers( $this->getGlobalKey( 's-queuesWithJobs' ) ); |
|
|
|
|
667
|
|
|
foreach ( $set as $queue ) { |
668
|
|
|
$queues[] = $this->decodeQueueName( $queue ); |
669
|
|
|
} |
670
|
|
|
} catch ( RedisException $e ) { |
|
|
|
|
671
|
|
|
$this->throwRedisException( $conn, $e ); |
672
|
|
|
} |
673
|
|
|
|
674
|
|
|
return $queues; |
675
|
|
|
} |
676
|
|
|
|
677
|
|
|
/** |
678
|
|
|
* @param IJobSpecification $job |
679
|
|
|
* @return array |
680
|
|
|
*/ |
681
|
|
View Code Duplication |
protected function getNewJobFields( IJobSpecification $job ) { |
682
|
|
|
return [ |
683
|
|
|
// Fields that describe the nature of the job |
684
|
|
|
'type' => $job->getType(), |
685
|
|
|
'namespace' => $job->getTitle()->getNamespace(), |
686
|
|
|
'title' => $job->getTitle()->getDBkey(), |
687
|
|
|
'params' => $job->getParams(), |
688
|
|
|
// Some jobs cannot run until a "release timestamp" |
689
|
|
|
'rtimestamp' => $job->getReleaseTimestamp() ?: 0, |
690
|
|
|
// Additional job metadata |
691
|
|
|
'uuid' => UIDGenerator::newRawUUIDv4( UIDGenerator::QUICK_RAND ), |
692
|
|
|
'sha1' => $job->ignoreDuplicates() |
693
|
|
|
? Wikimedia\base_convert( sha1( serialize( $job->getDeduplicationInfo() ) ), 16, 36, 31 ) |
694
|
|
|
: '', |
695
|
|
|
'timestamp' => time() // UNIX timestamp |
696
|
|
|
]; |
697
|
|
|
} |
698
|
|
|
|
699
|
|
|
/** |
700
|
|
|
* @param array $fields |
701
|
|
|
* @return Job|bool |
702
|
|
|
*/ |
703
|
|
|
protected function getJobFromFields( array $fields ) { |
704
|
|
|
$title = Title::makeTitle( $fields['namespace'], $fields['title'] ); |
705
|
|
|
$job = Job::factory( $fields['type'], $title, $fields['params'] ); |
706
|
|
|
$job->metadata['uuid'] = $fields['uuid']; |
707
|
|
|
$job->metadata['timestamp'] = $fields['timestamp']; |
708
|
|
|
|
709
|
|
|
return $job; |
710
|
|
|
} |
711
|
|
|
|
712
|
|
|
/** |
713
|
|
|
* @param array $fields |
714
|
|
|
* @return string Serialized and possibly compressed version of $fields |
715
|
|
|
*/ |
716
|
|
|
protected function serialize( array $fields ) { |
717
|
|
|
$blob = serialize( $fields ); |
718
|
|
|
if ( $this->compression === 'gzip' |
719
|
|
|
&& strlen( $blob ) >= 1024 |
720
|
|
|
&& function_exists( 'gzdeflate' ) |
721
|
|
|
) { |
722
|
|
|
$object = (object)[ 'blob' => gzdeflate( $blob ), 'enc' => 'gzip' ]; |
723
|
|
|
$blobz = serialize( $object ); |
724
|
|
|
|
725
|
|
|
return ( strlen( $blobz ) < strlen( $blob ) ) ? $blobz : $blob; |
726
|
|
|
} else { |
727
|
|
|
return $blob; |
728
|
|
|
} |
729
|
|
|
} |
730
|
|
|
|
731
|
|
|
/** |
732
|
|
|
* @param string $blob |
733
|
|
|
* @return array|bool Unserialized version of $blob or false |
734
|
|
|
*/ |
735
|
|
|
protected function unserialize( $blob ) { |
736
|
|
|
$fields = unserialize( $blob ); |
737
|
|
|
if ( is_object( $fields ) ) { |
738
|
|
|
if ( $fields->enc === 'gzip' && function_exists( 'gzinflate' ) ) { |
739
|
|
|
$fields = unserialize( gzinflate( $fields->blob ) ); |
740
|
|
|
} else { |
741
|
|
|
$fields = false; |
742
|
|
|
} |
743
|
|
|
} |
744
|
|
|
|
745
|
|
|
return is_array( $fields ) ? $fields : false; |
746
|
|
|
} |
747
|
|
|
|
748
|
|
|
/** |
749
|
|
|
* Get a connection to the server that handles all sub-queues for this queue |
750
|
|
|
* |
751
|
|
|
* @return RedisConnRef |
752
|
|
|
* @throws JobQueueConnectionError |
753
|
|
|
*/ |
754
|
|
|
protected function getConnection() { |
755
|
|
|
$conn = $this->redisPool->getConnection( $this->server, $this->logger ); |
756
|
|
|
if ( !$conn ) { |
757
|
|
|
throw new JobQueueConnectionError( |
758
|
|
|
"Unable to connect to redis server {$this->server}." ); |
759
|
|
|
} |
760
|
|
|
|
761
|
|
|
return $conn; |
762
|
|
|
} |
763
|
|
|
|
764
|
|
|
/** |
765
|
|
|
* @param RedisConnRef $conn |
766
|
|
|
* @param RedisException $e |
767
|
|
|
* @throws JobQueueError |
768
|
|
|
*/ |
769
|
|
|
protected function throwRedisException( RedisConnRef $conn, $e ) { |
770
|
|
|
$this->redisPool->handleError( $conn, $e ); |
771
|
|
|
throw new JobQueueError( "Redis server error: {$e->getMessage()}\n" ); |
772
|
|
|
} |
773
|
|
|
|
774
|
|
|
/** |
775
|
|
|
* @return string JSON |
776
|
|
|
*/ |
777
|
|
|
private function encodeQueueName() { |
778
|
|
|
return json_encode( [ $this->type, $this->wiki ] ); |
779
|
|
|
} |
780
|
|
|
|
781
|
|
|
/** |
782
|
|
|
* @param string $name JSON |
783
|
|
|
* @return array (type, wiki) |
784
|
|
|
*/ |
785
|
|
|
private function decodeQueueName( $name ) { |
786
|
|
|
return json_decode( $name ); |
787
|
|
|
} |
788
|
|
|
|
789
|
|
|
/** |
790
|
|
|
* @param string $name |
791
|
|
|
* @return string |
792
|
|
|
*/ |
793
|
|
|
private function getGlobalKey( $name ) { |
794
|
|
|
$parts = [ 'global', 'jobqueue', $name ]; |
795
|
|
|
foreach ( $parts as $part ) { |
796
|
|
|
if ( !preg_match( '/[a-zA-Z0-9_-]+/', $part ) ) { |
797
|
|
|
throw new InvalidArgumentException( "Key part characters are out of range." ); |
798
|
|
|
} |
799
|
|
|
} |
800
|
|
|
|
801
|
|
|
return implode( ':', $parts ); |
802
|
|
|
} |
803
|
|
|
|
804
|
|
|
/** |
805
|
|
|
* @param string $prop |
806
|
|
|
* @param string|null $type Override this for sibling queues |
807
|
|
|
* @return string |
808
|
|
|
*/ |
809
|
|
|
private function getQueueKey( $prop, $type = null ) { |
810
|
|
|
$type = is_string( $type ) ? $type : $this->type; |
811
|
|
|
list( $db, $prefix ) = wfSplitWikiID( $this->wiki ); |
812
|
|
|
$keyspace = $prefix ? "$db-$prefix" : $db; |
813
|
|
|
|
814
|
|
|
$parts = [ $keyspace, 'jobqueue', $type, $prop ]; |
815
|
|
|
|
816
|
|
|
// Parts are typically ASCII, but encode for sanity to escape ":" |
817
|
|
|
return implode( ':', array_map( 'rawurlencode', $parts ) ); |
818
|
|
|
} |
819
|
|
|
} |
820
|
|
|
|
If you implement
__call
and you know which methods are available, you can improve IDE auto-completion and static analysis by adding a @method annotation to the class.This is often the case, when
__call
is implemented by a parent class and only the child class knows which methods exist: