|
1
|
|
|
<?php |
|
2
|
|
|
/** |
|
3
|
|
|
* Redis-backed job queue code. |
|
4
|
|
|
* |
|
5
|
|
|
* This program is free software; you can redistribute it and/or modify |
|
6
|
|
|
* it under the terms of the GNU General Public License as published by |
|
7
|
|
|
* the Free Software Foundation; either version 2 of the License, or |
|
8
|
|
|
* (at your option) any later version. |
|
9
|
|
|
* |
|
10
|
|
|
* This program is distributed in the hope that it will be useful, |
|
11
|
|
|
* but WITHOUT ANY WARRANTY; without even the implied warranty of |
|
12
|
|
|
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
|
13
|
|
|
* GNU General Public License for more details. |
|
14
|
|
|
* |
|
15
|
|
|
* You should have received a copy of the GNU General Public License along |
|
16
|
|
|
* with this program; if not, write to the Free Software Foundation, Inc., |
|
17
|
|
|
* 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. |
|
18
|
|
|
* http://www.gnu.org/copyleft/gpl.html |
|
19
|
|
|
* |
|
20
|
|
|
* @file |
|
21
|
|
|
* @author Aaron Schulz |
|
22
|
|
|
*/ |
|
23
|
|
|
use Psr\Log\LoggerInterface; |
|
24
|
|
|
|
|
25
|
|
|
/** |
|
26
|
|
|
* Class to handle job queues stored in Redis |
|
27
|
|
|
* |
|
28
|
|
|
* This is a faster and less resource-intensive job queue than JobQueueDB. |
|
29
|
|
|
* All data for a queue using this class is placed into one redis server. |
|
30
|
|
|
* The mediawiki/services/jobrunner background service must be set up and running. |
|
31
|
|
|
* |
|
32
|
|
|
* There are eight main redis keys (per queue) used to track jobs: |
|
33
|
|
|
* - l-unclaimed : A list of job IDs used for ready unclaimed jobs |
|
34
|
|
|
* - z-claimed : A sorted set of (job ID, UNIX timestamp as score) used for job retries |
|
35
|
|
|
* - z-abandoned : A sorted set of (job ID, UNIX timestamp as score) used for broken jobs |
|
36
|
|
|
* - z-delayed : A sorted set of (job ID, UNIX timestamp as score) used for delayed jobs |
|
37
|
|
|
* - h-idBySha1 : A hash of (SHA1 => job ID) for unclaimed jobs used for de-duplication |
|
38
|
|
|
* - h-sha1ById : A hash of (job ID => SHA1) for unclaimed jobs used for de-duplication |
|
39
|
|
|
* - h-attempts : A hash of (job ID => attempt count) used for job claiming/retries |
|
40
|
|
|
* - h-data : A hash of (job ID => serialized blobs) for job storage |
|
41
|
|
|
* A job ID can be in only one of z-delayed, l-unclaimed, z-claimed, and z-abandoned. |
|
42
|
|
|
* If an ID appears in any of those lists, it should have a h-data entry for its ID. |
|
43
|
|
|
* If a job has a SHA1 de-duplication value and its ID is in l-unclaimed or z-delayed, then |
|
44
|
|
|
* there should be no other such jobs with that SHA1. Every h-idBySha1 entry has an h-sha1ById |
|
45
|
|
|
* entry and every h-sha1ById must refer to an ID that is l-unclaimed. If a job has its |
|
46
|
|
|
* ID in z-claimed or z-abandoned, then it must also have an h-attempts entry for its ID. |
|
47
|
|
|
* |
|
48
|
|
|
* The following keys are used to track queue states: |
|
49
|
|
|
* - s-queuesWithJobs : A set of all queues with non-abandoned jobs |
|
50
|
|
|
* |
|
51
|
|
|
* The background service takes care of undelaying, recycling, and pruning jobs as well as |
|
52
|
|
|
* removing s-queuesWithJobs entries as queues empty. |
|
53
|
|
|
* |
|
54
|
|
|
* Additionally, "rootjob:* keys track "root jobs" used for additional de-duplication. |
|
55
|
|
|
* Aside from root job keys, all keys have no expiry, and are only removed when jobs are run. |
|
56
|
|
|
* All the keys are prefixed with the relevant wiki ID information. |
|
57
|
|
|
* |
|
58
|
|
|
* This class requires Redis 2.6 as it makes use Lua scripts for fast atomic operations. |
|
59
|
|
|
* Additionally, it should be noted that redis has different persistence modes, such |
|
60
|
|
|
* as rdb snapshots, journaling, and no persistence. Appropriate configuration should be |
|
61
|
|
|
* made on the servers based on what queues are using it and what tolerance they have. |
|
62
|
|
|
* |
|
63
|
|
|
* @ingroup JobQueue |
|
64
|
|
|
* @ingroup Redis |
|
65
|
|
|
* @since 1.22 |
|
66
|
|
|
*/ |
|
67
|
|
|
class JobQueueRedis extends JobQueue { |
|
68
|
|
|
/** @var RedisConnectionPool */ |
|
69
|
|
|
protected $redisPool; |
|
70
|
|
|
/** @var LoggerInterface */ |
|
71
|
|
|
protected $logger; |
|
72
|
|
|
|
|
73
|
|
|
/** @var string Server address */ |
|
74
|
|
|
protected $server; |
|
75
|
|
|
/** @var string Compression method to use */ |
|
76
|
|
|
protected $compression; |
|
77
|
|
|
|
|
78
|
|
|
/** |
|
79
|
|
|
* @param array $params Possible keys: |
|
80
|
|
|
* - redisConfig : An array of parameters to RedisConnectionPool::__construct(). |
|
81
|
|
|
* Note that the serializer option is ignored as "none" is always used. |
|
82
|
|
|
* - redisServer : A hostname/port combination or the absolute path of a UNIX socket. |
|
83
|
|
|
* If a hostname is specified but no port, the standard port number |
|
84
|
|
|
* 6379 will be used. Required. |
|
85
|
|
|
* - compression : The type of compression to use; one of (none,gzip). |
|
86
|
|
|
* - daemonized : Set to true if the redisJobRunnerService runs in the background. |
|
87
|
|
|
* This will disable job recycling/undelaying from the MediaWiki side |
|
88
|
|
|
* to avoid redundance and out-of-sync configuration. |
|
89
|
|
|
* @throws InvalidArgumentException |
|
90
|
|
|
*/ |
|
91
|
|
|
public function __construct( array $params ) { |
|
92
|
|
|
parent::__construct( $params ); |
|
93
|
|
|
$params['redisConfig']['serializer'] = 'none'; // make it easy to use Lua |
|
94
|
|
|
$this->server = $params['redisServer']; |
|
95
|
|
|
$this->compression = isset( $params['compression'] ) ? $params['compression'] : 'none'; |
|
96
|
|
|
$this->redisPool = RedisConnectionPool::singleton( $params['redisConfig'] ); |
|
97
|
|
|
if ( empty( $params['daemonized'] ) ) { |
|
98
|
|
|
throw new InvalidArgumentException( |
|
99
|
|
|
"Non-daemonized mode is no longer supported. Please install the " . |
|
100
|
|
|
"mediawiki/services/jobrunner service and update \$wgJobTypeConf as needed." ); |
|
101
|
|
|
} |
|
102
|
|
|
$this->logger = \MediaWiki\Logger\LoggerFactory::getInstance( 'redis' ); |
|
103
|
|
|
} |
|
104
|
|
|
|
|
105
|
|
|
protected function supportedOrders() { |
|
106
|
|
|
return [ 'timestamp', 'fifo' ]; |
|
107
|
|
|
} |
|
108
|
|
|
|
|
109
|
|
|
protected function optimalOrder() { |
|
110
|
|
|
return 'fifo'; |
|
111
|
|
|
} |
|
112
|
|
|
|
|
113
|
|
|
protected function supportsDelayedJobs() { |
|
114
|
|
|
return true; |
|
115
|
|
|
} |
|
116
|
|
|
|
|
117
|
|
|
/** |
|
118
|
|
|
* @see JobQueue::doIsEmpty() |
|
119
|
|
|
* @return bool |
|
120
|
|
|
* @throws JobQueueError |
|
121
|
|
|
*/ |
|
122
|
|
|
protected function doIsEmpty() { |
|
123
|
|
|
return $this->doGetSize() == 0; |
|
124
|
|
|
} |
|
125
|
|
|
|
|
126
|
|
|
/** |
|
127
|
|
|
* @see JobQueue::doGetSize() |
|
128
|
|
|
* @return int |
|
129
|
|
|
* @throws JobQueueError |
|
130
|
|
|
*/ |
|
131
|
|
|
protected function doGetSize() { |
|
132
|
|
|
$conn = $this->getConnection(); |
|
133
|
|
|
try { |
|
134
|
|
|
return $conn->lSize( $this->getQueueKey( 'l-unclaimed' ) ); |
|
|
|
|
|
|
135
|
|
|
} catch ( RedisException $e ) { |
|
|
|
|
|
|
136
|
|
|
$this->throwRedisException( $conn, $e ); |
|
137
|
|
|
} |
|
138
|
|
|
} |
|
139
|
|
|
|
|
140
|
|
|
/** |
|
141
|
|
|
* @see JobQueue::doGetAcquiredCount() |
|
142
|
|
|
* @return int |
|
143
|
|
|
* @throws JobQueueError |
|
144
|
|
|
*/ |
|
145
|
|
|
protected function doGetAcquiredCount() { |
|
146
|
|
|
$conn = $this->getConnection(); |
|
147
|
|
|
try { |
|
148
|
|
|
$conn->multi( Redis::PIPELINE ); |
|
|
|
|
|
|
149
|
|
|
$conn->zSize( $this->getQueueKey( 'z-claimed' ) ); |
|
|
|
|
|
|
150
|
|
|
$conn->zSize( $this->getQueueKey( 'z-abandoned' ) ); |
|
|
|
|
|
|
151
|
|
|
|
|
152
|
|
|
return array_sum( $conn->exec() ); |
|
|
|
|
|
|
153
|
|
|
} catch ( RedisException $e ) { |
|
|
|
|
|
|
154
|
|
|
$this->throwRedisException( $conn, $e ); |
|
155
|
|
|
} |
|
156
|
|
|
} |
|
157
|
|
|
|
|
158
|
|
|
/** |
|
159
|
|
|
* @see JobQueue::doGetDelayedCount() |
|
160
|
|
|
* @return int |
|
161
|
|
|
* @throws JobQueueError |
|
162
|
|
|
*/ |
|
163
|
|
View Code Duplication |
protected function doGetDelayedCount() { |
|
164
|
|
|
$conn = $this->getConnection(); |
|
165
|
|
|
try { |
|
166
|
|
|
return $conn->zSize( $this->getQueueKey( 'z-delayed' ) ); |
|
|
|
|
|
|
167
|
|
|
} catch ( RedisException $e ) { |
|
|
|
|
|
|
168
|
|
|
$this->throwRedisException( $conn, $e ); |
|
169
|
|
|
} |
|
170
|
|
|
} |
|
171
|
|
|
|
|
172
|
|
|
/** |
|
173
|
|
|
* @see JobQueue::doGetAbandonedCount() |
|
174
|
|
|
* @return int |
|
175
|
|
|
* @throws JobQueueError |
|
176
|
|
|
*/ |
|
177
|
|
View Code Duplication |
protected function doGetAbandonedCount() { |
|
178
|
|
|
$conn = $this->getConnection(); |
|
179
|
|
|
try { |
|
180
|
|
|
return $conn->zSize( $this->getQueueKey( 'z-abandoned' ) ); |
|
|
|
|
|
|
181
|
|
|
} catch ( RedisException $e ) { |
|
|
|
|
|
|
182
|
|
|
$this->throwRedisException( $conn, $e ); |
|
183
|
|
|
} |
|
184
|
|
|
} |
|
185
|
|
|
|
|
186
|
|
|
/** |
|
187
|
|
|
* @see JobQueue::doBatchPush() |
|
188
|
|
|
* @param IJobSpecification[] $jobs |
|
189
|
|
|
* @param int $flags |
|
190
|
|
|
* @return void |
|
191
|
|
|
* @throws JobQueueError |
|
192
|
|
|
*/ |
|
193
|
|
|
protected function doBatchPush( array $jobs, $flags ) { |
|
194
|
|
|
// Convert the jobs into field maps (de-duplicated against each other) |
|
195
|
|
|
$items = []; // (job ID => job fields map) |
|
196
|
|
|
foreach ( $jobs as $job ) { |
|
197
|
|
|
$item = $this->getNewJobFields( $job ); |
|
198
|
|
|
if ( strlen( $item['sha1'] ) ) { // hash identifier => de-duplicate |
|
199
|
|
|
$items[$item['sha1']] = $item; |
|
200
|
|
|
} else { |
|
201
|
|
|
$items[$item['uuid']] = $item; |
|
202
|
|
|
} |
|
203
|
|
|
} |
|
204
|
|
|
|
|
205
|
|
|
if ( !count( $items ) ) { |
|
206
|
|
|
return; // nothing to do |
|
207
|
|
|
} |
|
208
|
|
|
|
|
209
|
|
|
$conn = $this->getConnection(); |
|
210
|
|
|
try { |
|
211
|
|
|
// Actually push the non-duplicate jobs into the queue... |
|
212
|
|
|
if ( $flags & self::QOS_ATOMIC ) { |
|
213
|
|
|
$batches = [ $items ]; // all or nothing |
|
214
|
|
|
} else { |
|
215
|
|
|
$batches = array_chunk( $items, 100 ); // avoid tying up the server |
|
216
|
|
|
} |
|
217
|
|
|
$failed = 0; |
|
218
|
|
|
$pushed = 0; |
|
219
|
|
|
foreach ( $batches as $itemBatch ) { |
|
220
|
|
|
$added = $this->pushBlobs( $conn, $itemBatch ); |
|
221
|
|
|
if ( is_int( $added ) ) { |
|
222
|
|
|
$pushed += $added; |
|
223
|
|
|
} else { |
|
224
|
|
|
$failed += count( $itemBatch ); |
|
225
|
|
|
} |
|
226
|
|
|
} |
|
227
|
|
|
JobQueue::incrStats( 'inserts', $this->type, count( $items ) ); |
|
228
|
|
|
JobQueue::incrStats( 'inserts_actual', $this->type, $pushed ); |
|
229
|
|
|
JobQueue::incrStats( 'dupe_inserts', $this->type, |
|
230
|
|
|
count( $items ) - $failed - $pushed ); |
|
231
|
|
|
if ( $failed > 0 ) { |
|
232
|
|
|
$err = "Could not insert {$failed} {$this->type} job(s)."; |
|
233
|
|
|
wfDebugLog( 'JobQueueRedis', $err ); |
|
234
|
|
|
throw new RedisException( $err ); |
|
235
|
|
|
} |
|
236
|
|
|
} catch ( RedisException $e ) { |
|
|
|
|
|
|
237
|
|
|
$this->throwRedisException( $conn, $e ); |
|
238
|
|
|
} |
|
239
|
|
|
} |
|
240
|
|
|
|
|
241
|
|
|
/** |
|
242
|
|
|
* @param RedisConnRef $conn |
|
243
|
|
|
* @param array $items List of results from JobQueueRedis::getNewJobFields() |
|
244
|
|
|
* @return int Number of jobs inserted (duplicates are ignored) |
|
245
|
|
|
* @throws RedisException |
|
246
|
|
|
*/ |
|
247
|
|
|
protected function pushBlobs( RedisConnRef $conn, array $items ) { |
|
248
|
|
|
$args = [ $this->encodeQueueName() ]; |
|
249
|
|
|
// Next args come in 4s ([id, sha1, rtime, blob [, id, sha1, rtime, blob ... ] ] ) |
|
250
|
|
|
foreach ( $items as $item ) { |
|
251
|
|
|
$args[] = (string)$item['uuid']; |
|
252
|
|
|
$args[] = (string)$item['sha1']; |
|
253
|
|
|
$args[] = (string)$item['rtimestamp']; |
|
254
|
|
|
$args[] = (string)$this->serialize( $item ); |
|
255
|
|
|
} |
|
256
|
|
|
static $script = |
|
257
|
|
|
/** @lang Lua */ |
|
258
|
|
|
<<<LUA |
|
259
|
|
|
local kUnclaimed, kSha1ById, kIdBySha1, kDelayed, kData, kQwJobs = unpack(KEYS) |
|
260
|
|
|
-- First argument is the queue ID |
|
261
|
|
|
local queueId = ARGV[1] |
|
262
|
|
|
-- Next arguments all come in 4s (one per job) |
|
263
|
|
|
local variadicArgCount = #ARGV - 1 |
|
264
|
|
|
if variadicArgCount % 4 ~= 0 then |
|
265
|
|
|
return redis.error_reply('Unmatched arguments') |
|
266
|
|
|
end |
|
267
|
|
|
-- Insert each job into this queue as needed |
|
268
|
|
|
local pushed = 0 |
|
269
|
|
|
for i = 2,#ARGV,4 do |
|
270
|
|
|
local id,sha1,rtimestamp,blob = ARGV[i],ARGV[i+1],ARGV[i+2],ARGV[i+3] |
|
271
|
|
|
if sha1 == '' or redis.call('hExists',kIdBySha1,sha1) == 0 then |
|
272
|
|
|
if 1*rtimestamp > 0 then |
|
273
|
|
|
-- Insert into delayed queue (release time as score) |
|
274
|
|
|
redis.call('zAdd',kDelayed,rtimestamp,id) |
|
275
|
|
|
else |
|
276
|
|
|
-- Insert into unclaimed queue |
|
277
|
|
|
redis.call('lPush',kUnclaimed,id) |
|
278
|
|
|
end |
|
279
|
|
|
if sha1 ~= '' then |
|
280
|
|
|
redis.call('hSet',kSha1ById,id,sha1) |
|
281
|
|
|
redis.call('hSet',kIdBySha1,sha1,id) |
|
282
|
|
|
end |
|
283
|
|
|
redis.call('hSet',kData,id,blob) |
|
284
|
|
|
pushed = pushed + 1 |
|
285
|
|
|
end |
|
286
|
|
|
end |
|
287
|
|
|
-- Mark this queue as having jobs |
|
288
|
|
|
redis.call('sAdd',kQwJobs,queueId) |
|
289
|
|
|
return pushed |
|
290
|
|
|
LUA; |
|
291
|
|
|
return $conn->luaEval( $script, |
|
292
|
|
|
array_merge( |
|
293
|
|
|
[ |
|
294
|
|
|
$this->getQueueKey( 'l-unclaimed' ), # KEYS[1] |
|
295
|
|
|
$this->getQueueKey( 'h-sha1ById' ), # KEYS[2] |
|
296
|
|
|
$this->getQueueKey( 'h-idBySha1' ), # KEYS[3] |
|
297
|
|
|
$this->getQueueKey( 'z-delayed' ), # KEYS[4] |
|
298
|
|
|
$this->getQueueKey( 'h-data' ), # KEYS[5] |
|
299
|
|
|
$this->getGlobalKey( 's-queuesWithJobs' ), # KEYS[6] |
|
300
|
|
|
], |
|
301
|
|
|
$args |
|
302
|
|
|
), |
|
303
|
|
|
6 # number of first argument(s) that are keys |
|
304
|
|
|
); |
|
305
|
|
|
} |
|
306
|
|
|
|
|
307
|
|
|
/** |
|
308
|
|
|
* @see JobQueue::doPop() |
|
309
|
|
|
* @return Job|bool |
|
310
|
|
|
* @throws JobQueueError |
|
311
|
|
|
*/ |
|
312
|
|
|
protected function doPop() { |
|
313
|
|
|
$job = false; |
|
314
|
|
|
|
|
315
|
|
|
$conn = $this->getConnection(); |
|
316
|
|
|
try { |
|
317
|
|
|
do { |
|
318
|
|
|
$blob = $this->popAndAcquireBlob( $conn ); |
|
319
|
|
|
if ( !is_string( $blob ) ) { |
|
320
|
|
|
break; // no jobs; nothing to do |
|
321
|
|
|
} |
|
322
|
|
|
|
|
323
|
|
|
JobQueue::incrStats( 'pops', $this->type ); |
|
324
|
|
|
$item = $this->unserialize( $blob ); |
|
325
|
|
|
if ( $item === false ) { |
|
326
|
|
|
wfDebugLog( 'JobQueueRedis', "Could not unserialize {$this->type} job." ); |
|
327
|
|
|
continue; |
|
328
|
|
|
} |
|
329
|
|
|
|
|
330
|
|
|
// If $item is invalid, the runner loop recyling will cleanup as needed |
|
331
|
|
|
$job = $this->getJobFromFields( $item ); // may be false |
|
332
|
|
|
} while ( !$job ); // job may be false if invalid |
|
333
|
|
|
} catch ( RedisException $e ) { |
|
|
|
|
|
|
334
|
|
|
$this->throwRedisException( $conn, $e ); |
|
335
|
|
|
} |
|
336
|
|
|
|
|
337
|
|
|
return $job; |
|
338
|
|
|
} |
|
339
|
|
|
|
|
340
|
|
|
/** |
|
341
|
|
|
* @param RedisConnRef $conn |
|
342
|
|
|
* @return array Serialized string or false |
|
343
|
|
|
* @throws RedisException |
|
344
|
|
|
*/ |
|
345
|
|
|
protected function popAndAcquireBlob( RedisConnRef $conn ) { |
|
346
|
|
|
static $script = |
|
347
|
|
|
/** @lang Lua */ |
|
348
|
|
|
<<<LUA |
|
349
|
|
|
local kUnclaimed, kSha1ById, kIdBySha1, kClaimed, kAttempts, kData = unpack(KEYS) |
|
350
|
|
|
local rTime = unpack(ARGV) |
|
351
|
|
|
-- Pop an item off the queue |
|
352
|
|
|
local id = redis.call('rPop',kUnclaimed) |
|
353
|
|
|
if not id then |
|
354
|
|
|
return false |
|
355
|
|
|
end |
|
356
|
|
|
-- Allow new duplicates of this job |
|
357
|
|
|
local sha1 = redis.call('hGet',kSha1ById,id) |
|
358
|
|
|
if sha1 then redis.call('hDel',kIdBySha1,sha1) end |
|
359
|
|
|
redis.call('hDel',kSha1ById,id) |
|
360
|
|
|
-- Mark the jobs as claimed and return it |
|
361
|
|
|
redis.call('zAdd',kClaimed,rTime,id) |
|
362
|
|
|
redis.call('hIncrBy',kAttempts,id,1) |
|
363
|
|
|
return redis.call('hGet',kData,id) |
|
364
|
|
|
LUA; |
|
365
|
|
|
return $conn->luaEval( $script, |
|
366
|
|
|
[ |
|
367
|
|
|
$this->getQueueKey( 'l-unclaimed' ), # KEYS[1] |
|
368
|
|
|
$this->getQueueKey( 'h-sha1ById' ), # KEYS[2] |
|
369
|
|
|
$this->getQueueKey( 'h-idBySha1' ), # KEYS[3] |
|
370
|
|
|
$this->getQueueKey( 'z-claimed' ), # KEYS[4] |
|
371
|
|
|
$this->getQueueKey( 'h-attempts' ), # KEYS[5] |
|
372
|
|
|
$this->getQueueKey( 'h-data' ), # KEYS[6] |
|
373
|
|
|
time(), # ARGV[1] (injected to be replication-safe) |
|
374
|
|
|
], |
|
375
|
|
|
6 # number of first argument(s) that are keys |
|
376
|
|
|
); |
|
377
|
|
|
} |
|
378
|
|
|
|
|
379
|
|
|
/** |
|
380
|
|
|
* @see JobQueue::doAck() |
|
381
|
|
|
* @param Job $job |
|
382
|
|
|
* @return Job|bool |
|
383
|
|
|
* @throws UnexpectedValueException |
|
384
|
|
|
* @throws JobQueueError |
|
385
|
|
|
*/ |
|
386
|
|
|
protected function doAck( Job $job ) { |
|
387
|
|
|
if ( !isset( $job->metadata['uuid'] ) ) { |
|
388
|
|
|
throw new UnexpectedValueException( "Job of type '{$job->getType()}' has no UUID." ); |
|
389
|
|
|
} |
|
390
|
|
|
|
|
391
|
|
|
$uuid = $job->metadata['uuid']; |
|
392
|
|
|
$conn = $this->getConnection(); |
|
393
|
|
|
try { |
|
394
|
|
|
static $script = |
|
395
|
|
|
/** @lang Lua */ |
|
396
|
|
|
<<<LUA |
|
397
|
|
|
local kClaimed, kAttempts, kData = unpack(KEYS) |
|
398
|
|
|
local id = unpack(ARGV) |
|
399
|
|
|
-- Unmark the job as claimed |
|
400
|
|
|
local removed = redis.call('zRem',kClaimed,id) |
|
401
|
|
|
-- Check if the job was recycled |
|
402
|
|
|
if removed == 0 then |
|
403
|
|
|
return 0 |
|
404
|
|
|
end |
|
405
|
|
|
-- Delete the retry data |
|
406
|
|
|
redis.call('hDel',kAttempts,id) |
|
407
|
|
|
-- Delete the job data itself |
|
408
|
|
|
return redis.call('hDel',kData,id) |
|
409
|
|
|
LUA; |
|
410
|
|
|
$res = $conn->luaEval( $script, |
|
411
|
|
|
[ |
|
412
|
|
|
$this->getQueueKey( 'z-claimed' ), # KEYS[1] |
|
413
|
|
|
$this->getQueueKey( 'h-attempts' ), # KEYS[2] |
|
414
|
|
|
$this->getQueueKey( 'h-data' ), # KEYS[3] |
|
415
|
|
|
$uuid # ARGV[1] |
|
416
|
|
|
], |
|
417
|
|
|
3 # number of first argument(s) that are keys |
|
418
|
|
|
); |
|
419
|
|
|
|
|
420
|
|
|
if ( !$res ) { |
|
421
|
|
|
wfDebugLog( 'JobQueueRedis', "Could not acknowledge {$this->type} job $uuid." ); |
|
422
|
|
|
|
|
423
|
|
|
return false; |
|
424
|
|
|
} |
|
425
|
|
|
|
|
426
|
|
|
JobQueue::incrStats( 'acks', $this->type ); |
|
427
|
|
|
} catch ( RedisException $e ) { |
|
|
|
|
|
|
428
|
|
|
$this->throwRedisException( $conn, $e ); |
|
429
|
|
|
} |
|
430
|
|
|
|
|
431
|
|
|
return true; |
|
432
|
|
|
} |
|
433
|
|
|
|
|
434
|
|
|
/** |
|
435
|
|
|
* @see JobQueue::doDeduplicateRootJob() |
|
436
|
|
|
* @param IJobSpecification $job |
|
437
|
|
|
* @return bool |
|
438
|
|
|
* @throws JobQueueError |
|
439
|
|
|
* @throws LogicException |
|
440
|
|
|
*/ |
|
441
|
|
|
protected function doDeduplicateRootJob( IJobSpecification $job ) { |
|
442
|
|
|
if ( !$job->hasRootJobParams() ) { |
|
443
|
|
|
throw new LogicException( "Cannot register root job; missing parameters." ); |
|
444
|
|
|
} |
|
445
|
|
|
$params = $job->getRootJobParams(); |
|
446
|
|
|
|
|
447
|
|
|
$key = $this->getRootJobCacheKey( $params['rootJobSignature'] ); |
|
448
|
|
|
|
|
449
|
|
|
$conn = $this->getConnection(); |
|
450
|
|
|
try { |
|
451
|
|
|
$timestamp = $conn->get( $key ); // current last timestamp of this job |
|
|
|
|
|
|
452
|
|
|
if ( $timestamp && $timestamp >= $params['rootJobTimestamp'] ) { |
|
453
|
|
|
return true; // a newer version of this root job was enqueued |
|
454
|
|
|
} |
|
455
|
|
|
|
|
456
|
|
|
// Update the timestamp of the last root job started at the location... |
|
457
|
|
|
return $conn->set( $key, $params['rootJobTimestamp'], self::ROOTJOB_TTL ); // 2 weeks |
|
|
|
|
|
|
458
|
|
|
} catch ( RedisException $e ) { |
|
|
|
|
|
|
459
|
|
|
$this->throwRedisException( $conn, $e ); |
|
460
|
|
|
} |
|
461
|
|
|
} |
|
462
|
|
|
|
|
463
|
|
|
/** |
|
464
|
|
|
* @see JobQueue::doIsRootJobOldDuplicate() |
|
465
|
|
|
* @param Job $job |
|
466
|
|
|
* @return bool |
|
467
|
|
|
* @throws JobQueueError |
|
468
|
|
|
*/ |
|
469
|
|
|
protected function doIsRootJobOldDuplicate( Job $job ) { |
|
470
|
|
|
if ( !$job->hasRootJobParams() ) { |
|
471
|
|
|
return false; // job has no de-deplication info |
|
472
|
|
|
} |
|
473
|
|
|
$params = $job->getRootJobParams(); |
|
474
|
|
|
|
|
475
|
|
|
$conn = $this->getConnection(); |
|
476
|
|
|
try { |
|
477
|
|
|
// Get the last time this root job was enqueued |
|
478
|
|
|
$timestamp = $conn->get( $this->getRootJobCacheKey( $params['rootJobSignature'] ) ); |
|
|
|
|
|
|
479
|
|
|
} catch ( RedisException $e ) { |
|
|
|
|
|
|
480
|
|
|
$timestamp = false; |
|
481
|
|
|
$this->throwRedisException( $conn, $e ); |
|
482
|
|
|
} |
|
483
|
|
|
|
|
484
|
|
|
// Check if a new root job was started at the location after this one's... |
|
485
|
|
|
return ( $timestamp && $timestamp > $params['rootJobTimestamp'] ); |
|
486
|
|
|
} |
|
487
|
|
|
|
|
488
|
|
|
/** |
|
489
|
|
|
* @see JobQueue::doDelete() |
|
490
|
|
|
* @return bool |
|
491
|
|
|
* @throws JobQueueError |
|
492
|
|
|
*/ |
|
493
|
|
|
protected function doDelete() { |
|
494
|
|
|
static $props = [ 'l-unclaimed', 'z-claimed', 'z-abandoned', |
|
495
|
|
|
'z-delayed', 'h-idBySha1', 'h-sha1ById', 'h-attempts', 'h-data' ]; |
|
496
|
|
|
|
|
497
|
|
|
$conn = $this->getConnection(); |
|
498
|
|
|
try { |
|
499
|
|
|
$keys = []; |
|
500
|
|
|
foreach ( $props as $prop ) { |
|
501
|
|
|
$keys[] = $this->getQueueKey( $prop ); |
|
502
|
|
|
} |
|
503
|
|
|
|
|
504
|
|
|
$ok = ( $conn->delete( $keys ) !== false ); |
|
|
|
|
|
|
505
|
|
|
$conn->sRem( $this->getGlobalKey( 's-queuesWithJobs' ), $this->encodeQueueName() ); |
|
|
|
|
|
|
506
|
|
|
|
|
507
|
|
|
return $ok; |
|
508
|
|
|
} catch ( RedisException $e ) { |
|
|
|
|
|
|
509
|
|
|
$this->throwRedisException( $conn, $e ); |
|
510
|
|
|
} |
|
511
|
|
|
} |
|
512
|
|
|
|
|
513
|
|
|
/** |
|
514
|
|
|
* @see JobQueue::getAllQueuedJobs() |
|
515
|
|
|
* @return Iterator |
|
516
|
|
|
* @throws JobQueueError |
|
517
|
|
|
*/ |
|
518
|
|
View Code Duplication |
public function getAllQueuedJobs() { |
|
519
|
|
|
$conn = $this->getConnection(); |
|
520
|
|
|
try { |
|
521
|
|
|
$uids = $conn->lRange( $this->getQueueKey( 'l-unclaimed' ), 0, -1 ); |
|
|
|
|
|
|
522
|
|
|
} catch ( RedisException $e ) { |
|
|
|
|
|
|
523
|
|
|
$this->throwRedisException( $conn, $e ); |
|
524
|
|
|
} |
|
525
|
|
|
|
|
526
|
|
|
return $this->getJobIterator( $conn, $uids ); |
|
527
|
|
|
} |
|
528
|
|
|
|
|
529
|
|
|
/** |
|
530
|
|
|
* @see JobQueue::getAllDelayedJobs() |
|
531
|
|
|
* @return Iterator |
|
532
|
|
|
* @throws JobQueueError |
|
533
|
|
|
*/ |
|
534
|
|
View Code Duplication |
public function getAllDelayedJobs() { |
|
535
|
|
|
$conn = $this->getConnection(); |
|
536
|
|
|
try { |
|
537
|
|
|
$uids = $conn->zRange( $this->getQueueKey( 'z-delayed' ), 0, -1 ); |
|
|
|
|
|
|
538
|
|
|
} catch ( RedisException $e ) { |
|
|
|
|
|
|
539
|
|
|
$this->throwRedisException( $conn, $e ); |
|
540
|
|
|
} |
|
541
|
|
|
|
|
542
|
|
|
return $this->getJobIterator( $conn, $uids ); |
|
543
|
|
|
} |
|
544
|
|
|
|
|
545
|
|
|
/** |
|
546
|
|
|
* @see JobQueue::getAllAcquiredJobs() |
|
547
|
|
|
* @return Iterator |
|
548
|
|
|
* @throws JobQueueError |
|
549
|
|
|
*/ |
|
550
|
|
View Code Duplication |
public function getAllAcquiredJobs() { |
|
551
|
|
|
$conn = $this->getConnection(); |
|
552
|
|
|
try { |
|
553
|
|
|
$uids = $conn->zRange( $this->getQueueKey( 'z-claimed' ), 0, -1 ); |
|
|
|
|
|
|
554
|
|
|
} catch ( RedisException $e ) { |
|
|
|
|
|
|
555
|
|
|
$this->throwRedisException( $conn, $e ); |
|
556
|
|
|
} |
|
557
|
|
|
|
|
558
|
|
|
return $this->getJobIterator( $conn, $uids ); |
|
559
|
|
|
} |
|
560
|
|
|
|
|
561
|
|
|
/** |
|
562
|
|
|
* @see JobQueue::getAllAbandonedJobs() |
|
563
|
|
|
* @return Iterator |
|
564
|
|
|
* @throws JobQueueError |
|
565
|
|
|
*/ |
|
566
|
|
View Code Duplication |
public function getAllAbandonedJobs() { |
|
567
|
|
|
$conn = $this->getConnection(); |
|
568
|
|
|
try { |
|
569
|
|
|
$uids = $conn->zRange( $this->getQueueKey( 'z-abandoned' ), 0, -1 ); |
|
|
|
|
|
|
570
|
|
|
} catch ( RedisException $e ) { |
|
|
|
|
|
|
571
|
|
|
$this->throwRedisException( $conn, $e ); |
|
572
|
|
|
} |
|
573
|
|
|
|
|
574
|
|
|
return $this->getJobIterator( $conn, $uids ); |
|
575
|
|
|
} |
|
576
|
|
|
|
|
577
|
|
|
/** |
|
578
|
|
|
* @param RedisConnRef $conn |
|
579
|
|
|
* @param array $uids List of job UUIDs |
|
580
|
|
|
* @return MappedIterator |
|
581
|
|
|
*/ |
|
582
|
|
|
protected function getJobIterator( RedisConnRef $conn, array $uids ) { |
|
583
|
|
|
return new MappedIterator( |
|
584
|
|
|
$uids, |
|
585
|
|
|
function ( $uid ) use ( $conn ) { |
|
586
|
|
|
return $this->getJobFromUidInternal( $uid, $conn ); |
|
587
|
|
|
}, |
|
588
|
|
|
[ 'accept' => function ( $job ) { |
|
589
|
|
|
return is_object( $job ); |
|
590
|
|
|
} ] |
|
591
|
|
|
); |
|
592
|
|
|
} |
|
593
|
|
|
|
|
594
|
|
|
public function getCoalesceLocationInternal() { |
|
595
|
|
|
return "RedisServer:" . $this->server; |
|
596
|
|
|
} |
|
597
|
|
|
|
|
598
|
|
|
protected function doGetSiblingQueuesWithJobs( array $types ) { |
|
599
|
|
|
return array_keys( array_filter( $this->doGetSiblingQueueSizes( $types ) ) ); |
|
600
|
|
|
} |
|
601
|
|
|
|
|
602
|
|
|
protected function doGetSiblingQueueSizes( array $types ) { |
|
603
|
|
|
$sizes = []; // (type => size) |
|
604
|
|
|
$types = array_values( $types ); // reindex |
|
605
|
|
|
$conn = $this->getConnection(); |
|
606
|
|
|
try { |
|
607
|
|
|
$conn->multi( Redis::PIPELINE ); |
|
|
|
|
|
|
608
|
|
|
foreach ( $types as $type ) { |
|
609
|
|
|
$conn->lSize( $this->getQueueKey( 'l-unclaimed', $type ) ); |
|
|
|
|
|
|
610
|
|
|
} |
|
611
|
|
|
$res = $conn->exec(); |
|
|
|
|
|
|
612
|
|
|
if ( is_array( $res ) ) { |
|
613
|
|
|
foreach ( $res as $i => $size ) { |
|
614
|
|
|
$sizes[$types[$i]] = $size; |
|
615
|
|
|
} |
|
616
|
|
|
} |
|
617
|
|
|
} catch ( RedisException $e ) { |
|
|
|
|
|
|
618
|
|
|
$this->throwRedisException( $conn, $e ); |
|
619
|
|
|
} |
|
620
|
|
|
|
|
621
|
|
|
return $sizes; |
|
622
|
|
|
} |
|
623
|
|
|
|
|
624
|
|
|
/** |
|
625
|
|
|
* This function should not be called outside JobQueueRedis |
|
626
|
|
|
* |
|
627
|
|
|
* @param string $uid |
|
628
|
|
|
* @param RedisConnRef $conn |
|
629
|
|
|
* @return Job|bool Returns false if the job does not exist |
|
630
|
|
|
* @throws JobQueueError |
|
631
|
|
|
* @throws UnexpectedValueException |
|
632
|
|
|
*/ |
|
633
|
|
|
public function getJobFromUidInternal( $uid, RedisConnRef $conn ) { |
|
634
|
|
|
try { |
|
635
|
|
|
$data = $conn->hGet( $this->getQueueKey( 'h-data' ), $uid ); |
|
|
|
|
|
|
636
|
|
|
if ( $data === false ) { |
|
637
|
|
|
return false; // not found |
|
638
|
|
|
} |
|
639
|
|
|
$item = $this->unserialize( $data ); |
|
640
|
|
|
if ( !is_array( $item ) ) { // this shouldn't happen |
|
641
|
|
|
throw new UnexpectedValueException( "Could not find job with ID '$uid'." ); |
|
642
|
|
|
} |
|
643
|
|
|
$title = Title::makeTitle( $item['namespace'], $item['title'] ); |
|
644
|
|
|
$job = Job::factory( $item['type'], $title, $item['params'] ); |
|
645
|
|
|
$job->metadata['uuid'] = $item['uuid']; |
|
646
|
|
|
$job->metadata['timestamp'] = $item['timestamp']; |
|
647
|
|
|
// Add in attempt count for debugging at showJobs.php |
|
648
|
|
|
$job->metadata['attempts'] = $conn->hGet( $this->getQueueKey( 'h-attempts' ), $uid ); |
|
|
|
|
|
|
649
|
|
|
|
|
650
|
|
|
return $job; |
|
651
|
|
|
} catch ( RedisException $e ) { |
|
|
|
|
|
|
652
|
|
|
$this->throwRedisException( $conn, $e ); |
|
653
|
|
|
} |
|
654
|
|
|
} |
|
655
|
|
|
|
|
656
|
|
|
/** |
|
657
|
|
|
* @return array List of (wiki,type) tuples for queues with non-abandoned jobs |
|
658
|
|
|
* @throws JobQueueConnectionError |
|
659
|
|
|
* @throws JobQueueError |
|
660
|
|
|
*/ |
|
661
|
|
|
public function getServerQueuesWithJobs() { |
|
662
|
|
|
$queues = []; |
|
663
|
|
|
|
|
664
|
|
|
$conn = $this->getConnection(); |
|
665
|
|
|
try { |
|
666
|
|
|
$set = $conn->sMembers( $this->getGlobalKey( 's-queuesWithJobs' ) ); |
|
|
|
|
|
|
667
|
|
|
foreach ( $set as $queue ) { |
|
668
|
|
|
$queues[] = $this->decodeQueueName( $queue ); |
|
669
|
|
|
} |
|
670
|
|
|
} catch ( RedisException $e ) { |
|
|
|
|
|
|
671
|
|
|
$this->throwRedisException( $conn, $e ); |
|
672
|
|
|
} |
|
673
|
|
|
|
|
674
|
|
|
return $queues; |
|
675
|
|
|
} |
|
676
|
|
|
|
|
677
|
|
|
/** |
|
678
|
|
|
* @param IJobSpecification $job |
|
679
|
|
|
* @return array |
|
680
|
|
|
*/ |
|
681
|
|
View Code Duplication |
protected function getNewJobFields( IJobSpecification $job ) { |
|
682
|
|
|
return [ |
|
683
|
|
|
// Fields that describe the nature of the job |
|
684
|
|
|
'type' => $job->getType(), |
|
685
|
|
|
'namespace' => $job->getTitle()->getNamespace(), |
|
686
|
|
|
'title' => $job->getTitle()->getDBkey(), |
|
687
|
|
|
'params' => $job->getParams(), |
|
688
|
|
|
// Some jobs cannot run until a "release timestamp" |
|
689
|
|
|
'rtimestamp' => $job->getReleaseTimestamp() ?: 0, |
|
690
|
|
|
// Additional job metadata |
|
691
|
|
|
'uuid' => UIDGenerator::newRawUUIDv4( UIDGenerator::QUICK_RAND ), |
|
692
|
|
|
'sha1' => $job->ignoreDuplicates() |
|
693
|
|
|
? Wikimedia\base_convert( sha1( serialize( $job->getDeduplicationInfo() ) ), 16, 36, 31 ) |
|
694
|
|
|
: '', |
|
695
|
|
|
'timestamp' => time() // UNIX timestamp |
|
696
|
|
|
]; |
|
697
|
|
|
} |
|
698
|
|
|
|
|
699
|
|
|
/** |
|
700
|
|
|
* @param array $fields |
|
701
|
|
|
* @return Job|bool |
|
702
|
|
|
*/ |
|
703
|
|
|
protected function getJobFromFields( array $fields ) { |
|
704
|
|
|
$title = Title::makeTitle( $fields['namespace'], $fields['title'] ); |
|
705
|
|
|
$job = Job::factory( $fields['type'], $title, $fields['params'] ); |
|
706
|
|
|
$job->metadata['uuid'] = $fields['uuid']; |
|
707
|
|
|
$job->metadata['timestamp'] = $fields['timestamp']; |
|
708
|
|
|
|
|
709
|
|
|
return $job; |
|
710
|
|
|
} |
|
711
|
|
|
|
|
712
|
|
|
/** |
|
713
|
|
|
* @param array $fields |
|
714
|
|
|
* @return string Serialized and possibly compressed version of $fields |
|
715
|
|
|
*/ |
|
716
|
|
|
protected function serialize( array $fields ) { |
|
717
|
|
|
$blob = serialize( $fields ); |
|
718
|
|
|
if ( $this->compression === 'gzip' |
|
719
|
|
|
&& strlen( $blob ) >= 1024 |
|
720
|
|
|
&& function_exists( 'gzdeflate' ) |
|
721
|
|
|
) { |
|
722
|
|
|
$object = (object)[ 'blob' => gzdeflate( $blob ), 'enc' => 'gzip' ]; |
|
723
|
|
|
$blobz = serialize( $object ); |
|
724
|
|
|
|
|
725
|
|
|
return ( strlen( $blobz ) < strlen( $blob ) ) ? $blobz : $blob; |
|
726
|
|
|
} else { |
|
727
|
|
|
return $blob; |
|
728
|
|
|
} |
|
729
|
|
|
} |
|
730
|
|
|
|
|
731
|
|
|
/** |
|
732
|
|
|
* @param string $blob |
|
733
|
|
|
* @return array|bool Unserialized version of $blob or false |
|
734
|
|
|
*/ |
|
735
|
|
|
protected function unserialize( $blob ) { |
|
736
|
|
|
$fields = unserialize( $blob ); |
|
737
|
|
|
if ( is_object( $fields ) ) { |
|
738
|
|
|
if ( $fields->enc === 'gzip' && function_exists( 'gzinflate' ) ) { |
|
739
|
|
|
$fields = unserialize( gzinflate( $fields->blob ) ); |
|
740
|
|
|
} else { |
|
741
|
|
|
$fields = false; |
|
742
|
|
|
} |
|
743
|
|
|
} |
|
744
|
|
|
|
|
745
|
|
|
return is_array( $fields ) ? $fields : false; |
|
746
|
|
|
} |
|
747
|
|
|
|
|
748
|
|
|
/** |
|
749
|
|
|
* Get a connection to the server that handles all sub-queues for this queue |
|
750
|
|
|
* |
|
751
|
|
|
* @return RedisConnRef |
|
752
|
|
|
* @throws JobQueueConnectionError |
|
753
|
|
|
*/ |
|
754
|
|
|
protected function getConnection() { |
|
755
|
|
|
$conn = $this->redisPool->getConnection( $this->server, $this->logger ); |
|
756
|
|
|
if ( !$conn ) { |
|
757
|
|
|
throw new JobQueueConnectionError( |
|
758
|
|
|
"Unable to connect to redis server {$this->server}." ); |
|
759
|
|
|
} |
|
760
|
|
|
|
|
761
|
|
|
return $conn; |
|
762
|
|
|
} |
|
763
|
|
|
|
|
764
|
|
|
/** |
|
765
|
|
|
* @param RedisConnRef $conn |
|
766
|
|
|
* @param RedisException $e |
|
767
|
|
|
* @throws JobQueueError |
|
768
|
|
|
*/ |
|
769
|
|
|
protected function throwRedisException( RedisConnRef $conn, $e ) { |
|
770
|
|
|
$this->redisPool->handleError( $conn, $e ); |
|
771
|
|
|
throw new JobQueueError( "Redis server error: {$e->getMessage()}\n" ); |
|
772
|
|
|
} |
|
773
|
|
|
|
|
774
|
|
|
/** |
|
775
|
|
|
* @return string JSON |
|
776
|
|
|
*/ |
|
777
|
|
|
private function encodeQueueName() { |
|
778
|
|
|
return json_encode( [ $this->type, $this->wiki ] ); |
|
779
|
|
|
} |
|
780
|
|
|
|
|
781
|
|
|
/** |
|
782
|
|
|
* @param string $name JSON |
|
783
|
|
|
* @return array (type, wiki) |
|
784
|
|
|
*/ |
|
785
|
|
|
private function decodeQueueName( $name ) { |
|
786
|
|
|
return json_decode( $name ); |
|
787
|
|
|
} |
|
788
|
|
|
|
|
789
|
|
|
/** |
|
790
|
|
|
* @param string $name |
|
791
|
|
|
* @return string |
|
792
|
|
|
*/ |
|
793
|
|
|
private function getGlobalKey( $name ) { |
|
794
|
|
|
$parts = [ 'global', 'jobqueue', $name ]; |
|
795
|
|
|
foreach ( $parts as $part ) { |
|
796
|
|
|
if ( !preg_match( '/[a-zA-Z0-9_-]+/', $part ) ) { |
|
797
|
|
|
throw new InvalidArgumentException( "Key part characters are out of range." ); |
|
798
|
|
|
} |
|
799
|
|
|
} |
|
800
|
|
|
|
|
801
|
|
|
return implode( ':', $parts ); |
|
802
|
|
|
} |
|
803
|
|
|
|
|
804
|
|
|
/** |
|
805
|
|
|
* @param string $prop |
|
806
|
|
|
* @param string|null $type Override this for sibling queues |
|
807
|
|
|
* @return string |
|
808
|
|
|
*/ |
|
809
|
|
|
private function getQueueKey( $prop, $type = null ) { |
|
810
|
|
|
$type = is_string( $type ) ? $type : $this->type; |
|
811
|
|
|
list( $db, $prefix ) = wfSplitWikiID( $this->wiki ); |
|
812
|
|
|
$keyspace = $prefix ? "$db-$prefix" : $db; |
|
813
|
|
|
|
|
814
|
|
|
$parts = [ $keyspace, 'jobqueue', $type, $prop ]; |
|
815
|
|
|
|
|
816
|
|
|
// Parts are typically ASCII, but encode for sanity to escape ":" |
|
817
|
|
|
return implode( ':', array_map( 'rawurlencode', $parts ) ); |
|
818
|
|
|
} |
|
819
|
|
|
} |
|
820
|
|
|
|
If you implement
__calland you know which methods are available, you can improve IDE auto-completion and static analysis by adding a @method annotation to the class.This is often the case, when
__callis implemented by a parent class and only the child class knows which methods exist: