This project does not seem to handle request data directly as such no vulnerable execution paths were found.
include
, or for example
via PHP's auto-loading mechanism.
These results are based on our legacy PHP analysis, consider migrating to our new PHP analysis engine instead. Learn more
1 | <?php |
||
2 | /** |
||
3 | * Job queue base code. |
||
4 | * |
||
5 | * This program is free software; you can redistribute it and/or modify |
||
6 | * it under the terms of the GNU General Public License as published by |
||
7 | * the Free Software Foundation; either version 2 of the License, or |
||
8 | * (at your option) any later version. |
||
9 | * |
||
10 | * This program is distributed in the hope that it will be useful, |
||
11 | * but WITHOUT ANY WARRANTY; without even the implied warranty of |
||
12 | * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
||
13 | * GNU General Public License for more details. |
||
14 | * |
||
15 | * You should have received a copy of the GNU General Public License along |
||
16 | * with this program; if not, write to the Free Software Foundation, Inc., |
||
17 | * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. |
||
18 | * http://www.gnu.org/copyleft/gpl.html |
||
19 | * |
||
20 | * @file |
||
21 | * @author Aaron Schulz |
||
22 | */ |
||
23 | |||
24 | /** |
||
25 | * Class to handle enqueueing of background jobs |
||
26 | * |
||
27 | * @ingroup JobQueue |
||
28 | * @since 1.21 |
||
29 | */ |
||
30 | class JobQueueGroup { |
||
31 | /** @var JobQueueGroup[] */ |
||
32 | protected static $instances = []; |
||
33 | |||
34 | /** @var ProcessCacheLRU */ |
||
35 | protected $cache; |
||
36 | |||
37 | /** @var string Wiki ID */ |
||
38 | protected $wiki; |
||
39 | /** @var string|bool Read only rationale (or false if r/w) */ |
||
40 | protected $readOnlyReason; |
||
41 | |||
42 | /** @var array Map of (bucket => (queue => JobQueue, types => list of types) */ |
||
43 | protected $coalescedQueues; |
||
44 | |||
45 | /** @var Job[] */ |
||
46 | protected $bufferedJobs = []; |
||
47 | |||
48 | const TYPE_DEFAULT = 1; // integer; jobs popped by default |
||
49 | const TYPE_ANY = 2; // integer; any job |
||
50 | |||
51 | const USE_CACHE = 1; // integer; use process or persistent cache |
||
52 | |||
53 | const PROC_CACHE_TTL = 15; // integer; seconds |
||
54 | |||
55 | const CACHE_VERSION = 1; // integer; cache version |
||
56 | |||
57 | /** |
||
58 | * @param string $wiki Wiki ID |
||
59 | * @param string|bool $readOnlyReason Read-only reason or false |
||
60 | */ |
||
61 | protected function __construct( $wiki, $readOnlyReason ) { |
||
62 | $this->wiki = $wiki; |
||
63 | $this->readOnlyReason = $readOnlyReason; |
||
64 | $this->cache = new ProcessCacheLRU( 10 ); |
||
65 | } |
||
66 | |||
67 | /** |
||
68 | * @param bool|string $wiki Wiki ID |
||
69 | * @return JobQueueGroup |
||
70 | */ |
||
71 | View Code Duplication | public static function singleton( $wiki = false ) { |
|
72 | $wiki = ( $wiki === false ) ? wfWikiID() : $wiki; |
||
73 | if ( !isset( self::$instances[$wiki] ) ) { |
||
74 | self::$instances[$wiki] = new self( $wiki, wfConfiguredReadOnlyReason() ); |
||
0 ignored issues
–
show
|
|||
75 | } |
||
76 | |||
77 | return self::$instances[$wiki]; |
||
78 | } |
||
79 | |||
80 | /** |
||
81 | * Destroy the singleton instances |
||
82 | * |
||
83 | * @return void |
||
84 | */ |
||
85 | public static function destroySingletons() { |
||
86 | self::$instances = []; |
||
87 | } |
||
88 | |||
89 | /** |
||
90 | * Get the job queue object for a given queue type |
||
91 | * |
||
92 | * @param string $type |
||
93 | * @return JobQueue |
||
94 | */ |
||
95 | public function get( $type ) { |
||
96 | global $wgJobTypeConf; |
||
97 | |||
98 | $conf = [ 'wiki' => $this->wiki, 'type' => $type ]; |
||
99 | if ( isset( $wgJobTypeConf[$type] ) ) { |
||
100 | $conf = $conf + $wgJobTypeConf[$type]; |
||
101 | } else { |
||
102 | $conf = $conf + $wgJobTypeConf['default']; |
||
103 | } |
||
104 | $conf['aggregator'] = JobQueueAggregator::singleton(); |
||
105 | if ( $this->readOnlyReason !== false ) { |
||
106 | $conf['readOnlyReason'] = $this->readOnlyReason; |
||
107 | } |
||
108 | |||
109 | return JobQueue::factory( $conf ); |
||
110 | } |
||
111 | |||
112 | /** |
||
113 | * Insert jobs into the respective queues of which they belong |
||
114 | * |
||
115 | * This inserts the jobs into the queue specified by $wgJobTypeConf |
||
116 | * and updates the aggregate job queue information cache as needed. |
||
117 | * |
||
118 | * @param IJobSpecification|IJobSpecification[] $jobs A single Job or a list of Jobs |
||
119 | * @throws InvalidArgumentException |
||
120 | * @return void |
||
121 | */ |
||
122 | public function push( $jobs ) { |
||
123 | global $wgJobTypesExcludedFromDefaultQueue; |
||
124 | |||
125 | $jobs = is_array( $jobs ) ? $jobs : [ $jobs ]; |
||
126 | if ( !count( $jobs ) ) { |
||
127 | return; |
||
128 | } |
||
129 | |||
130 | $this->assertValidJobs( $jobs ); |
||
131 | |||
132 | $jobsByType = []; // (job type => list of jobs) |
||
133 | foreach ( $jobs as $job ) { |
||
134 | $jobsByType[$job->getType()][] = $job; |
||
135 | } |
||
136 | |||
137 | foreach ( $jobsByType as $type => $jobs ) { |
||
138 | $this->get( $type )->push( $jobs ); |
||
139 | } |
||
140 | |||
141 | if ( $this->cache->has( 'queues-ready', 'list' ) ) { |
||
142 | $list = $this->cache->get( 'queues-ready', 'list' ); |
||
143 | if ( count( array_diff( array_keys( $jobsByType ), $list ) ) ) { |
||
144 | $this->cache->clear( 'queues-ready' ); |
||
145 | } |
||
146 | } |
||
147 | |||
148 | $cache = ObjectCache::getLocalClusterInstance(); |
||
149 | $cache->set( |
||
150 | $cache->makeGlobalKey( 'jobqueue', $this->wiki, 'hasjobs', self::TYPE_ANY ), |
||
151 | 'true', |
||
152 | 15 |
||
153 | ); |
||
154 | if ( array_diff( array_keys( $jobsByType ), $wgJobTypesExcludedFromDefaultQueue ) ) { |
||
155 | $cache->set( |
||
156 | $cache->makeGlobalKey( 'jobqueue', $this->wiki, 'hasjobs', self::TYPE_DEFAULT ), |
||
157 | 'true', |
||
158 | 15 |
||
159 | ); |
||
160 | } |
||
161 | } |
||
162 | |||
163 | /** |
||
164 | * Buffer jobs for insertion via push() or call it now if in CLI mode |
||
165 | * |
||
166 | * Note that MediaWiki::restInPeace() calls pushLazyJobs() |
||
167 | * |
||
168 | * @param IJobSpecification|IJobSpecification[] $jobs A single Job or a list of Jobs |
||
169 | * @return void |
||
170 | * @since 1.26 |
||
171 | */ |
||
172 | public function lazyPush( $jobs ) { |
||
173 | if ( PHP_SAPI === 'cli' ) { |
||
174 | $this->push( $jobs ); |
||
175 | return; |
||
176 | } |
||
177 | |||
178 | $jobs = is_array( $jobs ) ? $jobs : [ $jobs ]; |
||
179 | |||
180 | // Throw errors now instead of on push(), when other jobs may be buffered |
||
181 | $this->assertValidJobs( $jobs ); |
||
182 | |||
183 | $this->bufferedJobs = array_merge( $this->bufferedJobs, $jobs ); |
||
0 ignored issues
–
show
It seems like
array_merge($this->bufferedJobs, $jobs) of type array is incompatible with the declared type array<integer,object<Job>> of property $bufferedJobs .
Our type inference engine has found an assignment to a property that is incompatible with the declared type of that property. Either this assignment is in error or the assigned type should be added to the documentation/type hint for that property.. ![]() |
|||
184 | } |
||
185 | |||
186 | /** |
||
187 | * Push all jobs buffered via lazyPush() into their respective queues |
||
188 | * |
||
189 | * @return void |
||
190 | * @since 1.26 |
||
191 | */ |
||
192 | public static function pushLazyJobs() { |
||
193 | foreach ( self::$instances as $group ) { |
||
194 | try { |
||
195 | $group->push( $group->bufferedJobs ); |
||
196 | $group->bufferedJobs = []; |
||
197 | } catch ( Exception $e ) { |
||
198 | // Get in as many jobs as possible and let other post-send updates happen |
||
199 | MWExceptionHandler::logException( $e ); |
||
200 | } |
||
201 | } |
||
202 | } |
||
203 | |||
204 | /** |
||
205 | * Pop a job off one of the job queues |
||
206 | * |
||
207 | * This pops a job off a queue as specified by $wgJobTypeConf and |
||
208 | * updates the aggregate job queue information cache as needed. |
||
209 | * |
||
210 | * @param int|string $qtype JobQueueGroup::TYPE_* constant or job type string |
||
211 | * @param int $flags Bitfield of JobQueueGroup::USE_* constants |
||
212 | * @param array $blacklist List of job types to ignore |
||
213 | * @return Job|bool Returns false on failure |
||
214 | */ |
||
215 | public function pop( $qtype = self::TYPE_DEFAULT, $flags = 0, array $blacklist = [] ) { |
||
216 | $job = false; |
||
217 | |||
218 | if ( is_string( $qtype ) ) { // specific job type |
||
219 | if ( !in_array( $qtype, $blacklist ) ) { |
||
220 | $job = $this->get( $qtype )->pop(); |
||
221 | } |
||
222 | } else { // any job in the "default" jobs types |
||
223 | if ( $flags & self::USE_CACHE ) { |
||
224 | if ( !$this->cache->has( 'queues-ready', 'list', self::PROC_CACHE_TTL ) ) { |
||
225 | $this->cache->set( 'queues-ready', 'list', $this->getQueuesWithJobs() ); |
||
226 | } |
||
227 | $types = $this->cache->get( 'queues-ready', 'list' ); |
||
228 | } else { |
||
229 | $types = $this->getQueuesWithJobs(); |
||
230 | } |
||
231 | |||
232 | if ( $qtype == self::TYPE_DEFAULT ) { |
||
233 | $types = array_intersect( $types, $this->getDefaultQueueTypes() ); |
||
234 | } |
||
235 | |||
236 | $types = array_diff( $types, $blacklist ); // avoid selected types |
||
237 | shuffle( $types ); // avoid starvation |
||
238 | |||
239 | foreach ( $types as $type ) { // for each queue... |
||
240 | $job = $this->get( $type )->pop(); |
||
241 | if ( $job ) { // found |
||
242 | break; |
||
243 | } else { // not found |
||
244 | $this->cache->clear( 'queues-ready' ); |
||
245 | } |
||
246 | } |
||
247 | } |
||
248 | |||
249 | return $job; |
||
250 | } |
||
251 | |||
252 | /** |
||
253 | * Acknowledge that a job was completed |
||
254 | * |
||
255 | * @param Job $job |
||
256 | * @return void |
||
257 | */ |
||
258 | public function ack( Job $job ) { |
||
259 | $this->get( $job->getType() )->ack( $job ); |
||
260 | } |
||
261 | |||
262 | /** |
||
263 | * Register the "root job" of a given job into the queue for de-duplication. |
||
264 | * This should only be called right *after* all the new jobs have been inserted. |
||
265 | * |
||
266 | * @param Job $job |
||
267 | * @return bool |
||
268 | */ |
||
269 | public function deduplicateRootJob( Job $job ) { |
||
270 | return $this->get( $job->getType() )->deduplicateRootJob( $job ); |
||
271 | } |
||
272 | |||
273 | /** |
||
274 | * Wait for any replica DBs or backup queue servers to catch up. |
||
275 | * |
||
276 | * This does nothing for certain queue classes. |
||
277 | * |
||
278 | * @return void |
||
279 | */ |
||
280 | public function waitForBackups() { |
||
281 | global $wgJobTypeConf; |
||
282 | |||
283 | // Try to avoid doing this more than once per queue storage medium |
||
284 | foreach ( $wgJobTypeConf as $type => $conf ) { |
||
285 | $this->get( $type )->waitForBackups(); |
||
286 | } |
||
287 | } |
||
288 | |||
289 | /** |
||
290 | * Get the list of queue types |
||
291 | * |
||
292 | * @return array List of strings |
||
293 | */ |
||
294 | public function getQueueTypes() { |
||
295 | return array_keys( $this->getCachedConfigVar( 'wgJobClasses' ) ); |
||
296 | } |
||
297 | |||
298 | /** |
||
299 | * Get the list of default queue types |
||
300 | * |
||
301 | * @return array List of strings |
||
302 | */ |
||
303 | public function getDefaultQueueTypes() { |
||
304 | global $wgJobTypesExcludedFromDefaultQueue; |
||
305 | |||
306 | return array_diff( $this->getQueueTypes(), $wgJobTypesExcludedFromDefaultQueue ); |
||
307 | } |
||
308 | |||
309 | /** |
||
310 | * Check if there are any queues with jobs (this is cached) |
||
311 | * |
||
312 | * @param int $type JobQueueGroup::TYPE_* constant |
||
313 | * @return bool |
||
314 | * @since 1.23 |
||
315 | */ |
||
316 | public function queuesHaveJobs( $type = self::TYPE_ANY ) { |
||
317 | $cache = ObjectCache::getLocalClusterInstance(); |
||
318 | $key = $cache->makeGlobalKey( 'jobqueue', $this->wiki, 'hasjobs', $type ); |
||
319 | |||
320 | $value = $cache->get( $key ); |
||
321 | if ( $value === false ) { |
||
322 | $queues = $this->getQueuesWithJobs(); |
||
323 | if ( $type == self::TYPE_DEFAULT ) { |
||
324 | $queues = array_intersect( $queues, $this->getDefaultQueueTypes() ); |
||
325 | } |
||
326 | $value = count( $queues ) ? 'true' : 'false'; |
||
327 | $cache->add( $key, $value, 15 ); |
||
328 | } |
||
329 | |||
330 | return ( $value === 'true' ); |
||
331 | } |
||
332 | |||
333 | /** |
||
334 | * Get the list of job types that have non-empty queues |
||
335 | * |
||
336 | * @return array List of job types that have non-empty queues |
||
337 | */ |
||
338 | View Code Duplication | public function getQueuesWithJobs() { |
|
339 | $types = []; |
||
340 | foreach ( $this->getCoalescedQueues() as $info ) { |
||
341 | $nonEmpty = $info['queue']->getSiblingQueuesWithJobs( $this->getQueueTypes() ); |
||
342 | if ( is_array( $nonEmpty ) ) { // batching features supported |
||
343 | $types = array_merge( $types, $nonEmpty ); |
||
344 | } else { // we have to go through the queues in the bucket one-by-one |
||
345 | foreach ( $info['types'] as $type ) { |
||
346 | if ( !$this->get( $type )->isEmpty() ) { |
||
347 | $types[] = $type; |
||
348 | } |
||
349 | } |
||
350 | } |
||
351 | } |
||
352 | |||
353 | return $types; |
||
354 | } |
||
355 | |||
356 | /** |
||
357 | * Get the size of the queus for a list of job types |
||
358 | * |
||
359 | * @return array Map of (job type => size) |
||
360 | */ |
||
361 | View Code Duplication | public function getQueueSizes() { |
|
362 | $sizeMap = []; |
||
363 | foreach ( $this->getCoalescedQueues() as $info ) { |
||
364 | $sizes = $info['queue']->getSiblingQueueSizes( $this->getQueueTypes() ); |
||
365 | if ( is_array( $sizes ) ) { // batching features supported |
||
366 | $sizeMap = $sizeMap + $sizes; |
||
367 | } else { // we have to go through the queues in the bucket one-by-one |
||
368 | foreach ( $info['types'] as $type ) { |
||
369 | $sizeMap[$type] = $this->get( $type )->getSize(); |
||
370 | } |
||
371 | } |
||
372 | } |
||
373 | |||
374 | return $sizeMap; |
||
375 | } |
||
376 | |||
377 | /** |
||
378 | * @return array |
||
379 | */ |
||
380 | protected function getCoalescedQueues() { |
||
381 | global $wgJobTypeConf; |
||
382 | |||
383 | if ( $this->coalescedQueues === null ) { |
||
384 | $this->coalescedQueues = []; |
||
385 | foreach ( $wgJobTypeConf as $type => $conf ) { |
||
386 | $queue = JobQueue::factory( |
||
387 | [ 'wiki' => $this->wiki, 'type' => 'null' ] + $conf ); |
||
388 | $loc = $queue->getCoalesceLocationInternal(); |
||
389 | if ( !isset( $this->coalescedQueues[$loc] ) ) { |
||
390 | $this->coalescedQueues[$loc]['queue'] = $queue; |
||
391 | $this->coalescedQueues[$loc]['types'] = []; |
||
392 | } |
||
393 | if ( $type === 'default' ) { |
||
394 | $this->coalescedQueues[$loc]['types'] = array_merge( |
||
395 | $this->coalescedQueues[$loc]['types'], |
||
396 | array_diff( $this->getQueueTypes(), array_keys( $wgJobTypeConf ) ) |
||
397 | ); |
||
398 | } else { |
||
399 | $this->coalescedQueues[$loc]['types'][] = $type; |
||
400 | } |
||
401 | } |
||
402 | } |
||
403 | |||
404 | return $this->coalescedQueues; |
||
405 | } |
||
406 | |||
407 | /** |
||
408 | * @param string $name |
||
409 | * @return mixed |
||
410 | */ |
||
411 | private function getCachedConfigVar( $name ) { |
||
0 ignored issues
–
show
getCachedConfigVar uses the super-global variable $GLOBALS which is generally not recommended.
Instead of super-globals, we recommend to explicitly inject the dependencies of your class. This makes your code less dependent on global state and it becomes generally more testable: // Bad
class Router
{
public function generate($path)
{
return $_SERVER['HOST'].$path;
}
}
// Better
class Router
{
private $host;
public function __construct($host)
{
$this->host = $host;
}
public function generate($path)
{
return $this->host.$path;
}
}
class Controller
{
public function myAction(Request $request)
{
// Instead of
$page = isset($_GET['page']) ? intval($_GET['page']) : 1;
// Better (assuming you use the Symfony2 request)
$page = $request->query->get('page', 1);
}
}
![]() |
|||
412 | // @TODO: cleanup this whole method with a proper config system |
||
413 | if ( $this->wiki === wfWikiID() ) { |
||
414 | return $GLOBALS[$name]; // common case |
||
415 | } else { |
||
416 | $wiki = $this->wiki; |
||
417 | $cache = ObjectCache::getMainWANInstance(); |
||
418 | $value = $cache->getWithSetCallback( |
||
419 | $cache->makeGlobalKey( 'jobqueue', 'configvalue', $wiki, $name ), |
||
420 | $cache::TTL_DAY + mt_rand( 0, $cache::TTL_DAY ), |
||
421 | function () use ( $wiki, $name ) { |
||
422 | global $wgConf; |
||
423 | |||
424 | return [ 'v' => $wgConf->getConfig( $wiki, $name ) ]; |
||
425 | }, |
||
426 | [ 'pcTTL' => WANObjectCache::TTL_PROC_LONG ] |
||
427 | ); |
||
428 | |||
429 | return $value['v']; |
||
430 | } |
||
431 | } |
||
432 | |||
433 | /** |
||
434 | * @param array $jobs |
||
435 | * @throws InvalidArgumentException |
||
436 | */ |
||
437 | private function assertValidJobs( array $jobs ) { |
||
438 | foreach ( $jobs as $job ) { // sanity checks |
||
439 | if ( !( $job instanceof IJobSpecification ) ) { |
||
440 | throw new InvalidArgumentException( "Expected IJobSpecification objects" ); |
||
441 | } |
||
442 | } |
||
443 | } |
||
444 | |||
445 | function __destruct() { |
||
446 | $n = count( $this->bufferedJobs ); |
||
447 | if ( $n > 0 ) { |
||
448 | $type = implode( ', ', array_unique( array_map( 'get_class', $this->bufferedJobs ) ) ); |
||
449 | trigger_error( __METHOD__ . ": $n buffered job(s) of type(s) $type never inserted." ); |
||
450 | } |
||
451 | } |
||
452 | } |
||
453 |
If a method or function can return multiple different values and unless you are sure that you only can receive a single value in this context, we recommend to add an additional type check:
If this a common case that PHP Analyzer should handle natively, please let us know by opening an issue.