Passed
Push — master ( fdca42...7dd147 )
by Hennik
03:23 queued 10s
created

Resque::items()   A

Complexity

Conditions 2
Paths 2

Size

Total Lines 7
Code Lines 4

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 2
eloc 4
c 0
b 0
f 0
nc 2
nop 3
dl 0
loc 7
rs 10
1
<?php
2
/**
3
 * Base Resque class.
4
 *
5
 * @package		Resque
6
 * @author		Chris Boulton <[email protected]>
7
 * @license		http://www.opensource.org/licenses/mit-license.php
8
 */
9
class Resque
10
{
11
	const VERSION = '1.2';
12
13
    const DEFAULT_INTERVAL = 5;
14
15
	/**
16
	 * @var Resque_Redis Instance of Resque_Redis that talks to redis.
17
	 */
18
	public static $redis = null;
19
20
	/**
21
	 * @var mixed Host/port conbination separated by a colon, or a nested
22
	 * array of server swith host/port pairs
23
	 */
24
	protected static $redisServer = null;
25
26
	/**
27
	 * @var int ID of Redis database to select.
28
	 */
29
	protected static $redisDatabase = 0;
30
31
    /**
32
     * @var string auth of Redis database
33
     */
34
	protected static $auth;
35
36
	/**
37
	 * Given a host/port combination separated by a colon, set it as
38
	 * the redis server that Resque will talk to.
39
	 *
40
	 * @param mixed $server Host/port combination separated by a colon, DSN-formatted URI, or
41
	 *                      a callable that receives the configured database ID
42
	 *                      and returns a Resque_Redis instance, or
43
	 *                      a nested array of servers with host/port pairs.
44
	 * @param int $database
45
     * @param string $auth
46
	 */
47
	public static function setBackend($server, $database = 0, $auth = null)
48
	{
49
		self::$redisServer   = $server;
50
		self::$redisDatabase = $database;
51
		self::$auth          = $auth;
52
		self::$redis         = null;
53
	}
54
55
	/**
56
	 * Return an instance of the Resque_Redis class instantiated for Resque.
57
	 *
58
	 * @return Resque_Redis Instance of Resque_Redis.
59
	 */
60
	public static function redis()
61
	{
62
		if (self::$redis !== null) {
63
			return self::$redis;
64
		}
65
66
		if (is_callable(self::$redisServer)) {
67
			self::$redis = call_user_func(self::$redisServer, self::$redisDatabase);
68
		} else {
69
			self::$redis = new Resque_Redis(self::$redisServer, self::$redisDatabase);
70
		}
71
72
		if (!empty(self::$auth)) {
73
            self::$redis->auth(self::$auth);
0 ignored issues
show
Bug introduced by
The method auth() does not exist on Resque_Redis. Since you implemented __call, consider adding a @method annotation. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-call  annotation

73
            self::$redis->/** @scrutinizer ignore-call */ 
74
                          auth(self::$auth);
Loading history...
74
        }
75
76
		return self::$redis;
77
	}
78
79
	/**
80
	 * fork() helper method for php-resque that handles issues PHP socket
81
	 * and phpredis have with passing around sockets between child/parent
82
	 * processes.
83
	 *
84
	 * Will close connection to Redis before forking.
85
	 *
86
	 * @return int Return vars as per pcntl_fork(). False if pcntl_fork is unavailable
87
	 */
88
	public static function fork()
89
	{
90
		if(!function_exists('pcntl_fork')) {
91
			return false;
0 ignored issues
show
Bug Best Practice introduced by
The expression return false returns the type false which is incompatible with the documented return type integer.
Loading history...
92
		}
93
94
		// Close the connection to Redis before forking.
95
		// This is a workaround for issues phpredis has.
96
		self::$redis = null;
97
98
		$pid = pcntl_fork();
99
		if($pid === -1) {
100
			throw new RuntimeException('Unable to fork child worker.');
101
		}
102
103
		return $pid;
104
	}
105
106
	/**
107
	 * Push a job to the end of a specific queue. If the queue does not
108
	 * exist, then create it as well.
109
	 *
110
	 * @param string $queue The name of the queue to add the job to.
111
	 * @param array $item Job description as an array to be JSON encoded.
112
	 */
113
	public static function push($queue, $item)
114
	{
115
		$encodedItem = json_encode($item);
116
		if ($encodedItem === false) {
117
			return false;
118
		}
119
		self::redis()->sadd('queues', $queue);
0 ignored issues
show
Bug introduced by
The method sadd() does not exist on Resque_Redis. Since you implemented __call, consider adding a @method annotation. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-call  annotation

119
		self::redis()->/** @scrutinizer ignore-call */ sadd('queues', $queue);
Loading history...
120
		$length = self::redis()->rpush('queue:' . $queue, $encodedItem);
0 ignored issues
show
Bug introduced by
The method rpush() does not exist on Resque_Redis. Since you implemented __call, consider adding a @method annotation. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-call  annotation

120
		$length = self::redis()->/** @scrutinizer ignore-call */ rpush('queue:' . $queue, $encodedItem);
Loading history...
121
		if ($length < 1) {
122
			return false;
123
		}
124
		return true;
125
	}
126
127
	/**
128
	 * Pop an item off the end of the specified queue, decode it and
129
	 * return it.
130
	 *
131
	 * @param string $queue The name of the queue to fetch an item from.
132
	 * @return array Decoded item from the queue.
133
	 */
134
	public static function pop($queue)
135
	{
136
        $item = self::redis()->lpop('queue:' . $queue);
0 ignored issues
show
Bug introduced by
The method lpop() does not exist on Resque_Redis. Since you implemented __call, consider adding a @method annotation. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-call  annotation

136
        $item = self::redis()->/** @scrutinizer ignore-call */ lpop('queue:' . $queue);
Loading history...
137
138
		if(!$item) {
139
			return;
140
		}
141
142
		return json_decode($item, true);
143
	}
144
145
	/**
146
	 * Remove items of the specified queue
147
	 *
148
	 * @param string $queue The name of the queue to fetch an item from.
149
	 * @param array $items
150
	 * @return integer number of deleted items
151
	 */
152
	public static function dequeue($queue, $items = Array())
153
	{
154
	    if(count($items) > 0) {
155
			return self::removeItems($queue, $items);
156
	    } else {
157
			return self::removeList($queue);
158
	    }
159
	}
160
161
	/**
162
	 * Remove specified queue
163
	 *
164
	 * @param string $queue The name of the queue to remove.
165
	 * @return integer Number of deleted items
166
	 */
167
	public static function removeQueue($queue)
168
	{
169
	    $num = self::removeList($queue);
170
	    self::redis()->srem('queues', $queue);
0 ignored issues
show
Bug introduced by
The method srem() does not exist on Resque_Redis. Since you implemented __call, consider adding a @method annotation. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-call  annotation

170
	    self::redis()->/** @scrutinizer ignore-call */ srem('queues', $queue);
Loading history...
171
	    return $num;
172
	}
173
174
	/**
175
	 * Pop an item off the end of the specified queues, using blocking list pop,
176
	 * decode it and return it.
177
	 *
178
	 * @param array         $queues
179
	 * @param int           $timeout
180
	 * @return null|array   Decoded item from the queue.
181
	 */
182
	public static function blpop(array $queues, $timeout)
183
	{
184
	    $list = array();
185
	    foreach($queues AS $queue) {
186
		$list[] = 'queue:' . $queue;
187
	    }
188
189
	    $item = self::redis()->blpop($list, (int)$timeout);
0 ignored issues
show
Bug introduced by
The method blpop() does not exist on Resque_Redis. Since you implemented __call, consider adding a @method annotation. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-call  annotation

189
	    $item = self::redis()->/** @scrutinizer ignore-call */ blpop($list, (int)$timeout);
Loading history...
190
191
	    if(!$item) {
192
		return;
193
	    }
194
195
	    /**
196
	     * Normally the Resque_Redis class returns queue names without the prefix
197
	     * But the blpop is a bit different. It returns the name as prefix:queue:name
198
	     * So we need to strip off the prefix:queue: part
199
	     */
200
	    $queue = substr($item[0], strlen(self::redis()->getPrefix() . 'queue:'));
201
202
	    return array(
203
		'queue'   => $queue,
204
		'payload' => json_decode($item[1], true)
205
	    );
206
	}
207
208
	/**
209
	 * Return the size (number of pending jobs) of the specified queue.
210
	 *
211
	 * @param string $queue name of the queue to be checked for pending jobs
212
	 *
213
	 * @return int The size of the queue.
214
	 */
215
	public static function size($queue)
216
	{
217
		return self::redis()->llen('queue:' . $queue);
0 ignored issues
show
Bug introduced by
The method llen() does not exist on Resque_Redis. Since you implemented __call, consider adding a @method annotation. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-call  annotation

217
		return self::redis()->/** @scrutinizer ignore-call */ llen('queue:' . $queue);
Loading history...
218
	}
219
220
	/**
221
	 * Create a new job and save it to the specified queue.
222
	 *
223
	 * @param string $queue The name of the queue to place the job in.
224
	 * @param string $class The name of the class that contains the code to execute the job.
225
	 * @param array $args Any optional arguments that should be passed when the job is executed.
226
	 * @param boolean $trackStatus Set to true to be able to monitor the status of a job.
227
	 * @param string $prefix The prefix needs to be set for the status key
228
	 *
229
	 * @return string|boolean Job ID when the job was created, false if creation was cancelled due to beforeEnqueue
230
	 */
231
	public static function enqueue($queue, $class, $args = null, $trackStatus = false, $prefix = "")
232
	{
233
		$id         = Resque::generateJobId();
234
		$hookParams = array(
235
			'class' => $class,
236
			'args'  => $args,
237
			'queue' => $queue,
238
			'id'    => $id,
239
		);
240
		try {
241
			Resque_Event::trigger('beforeEnqueue', $hookParams);
242
		}
243
		catch(Resque_Job_DontCreate $e) {
244
			return false;
245
		}
246
247
		Resque_Job::create($queue, $class, $args, $trackStatus, $id, $prefix);
248
		Resque_Event::trigger('afterEnqueue', $hookParams);
249
250
		return $id;
251
	}
252
253
	/**
254
	 * Reserve and return the next available job in the specified queue.
255
	 *
256
	 * @param string $queue Queue to fetch next available job from.
257
	 * @return Resque_Job Instance of Resque_Job to be processed, false if none or error.
258
	 */
259
	public static function reserve($queue)
260
	{
261
		return Resque_Job::reserve($queue);
262
	}
263
264
	/**
265
	 * Get an array of all known queues.
266
	 *
267
	 * @return array Array of queues.
268
	 */
269
	public static function queues()
270
	{
271
		$queues = self::redis()->smembers('queues');
0 ignored issues
show
Bug introduced by
The method smembers() does not exist on Resque_Redis. Since you implemented __call, consider adding a @method annotation. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-call  annotation

271
		$queues = self::redis()->/** @scrutinizer ignore-call */ smembers('queues');
Loading history...
272
		if(!is_array($queues)) {
273
			$queues = array();
274
		}
275
		return $queues;
276
	}
277
278
	/**
279
	 * Retrieve all the items of a queue with Redis
280
	 *
281
	 * @return array Array of items.
282
	 */
283
	public static function items($queue, $start = 0, $stop = -1)
284
	{
285
		$list = self::redis()->lrange('queue:' . $queue, $start, $stop);
0 ignored issues
show
Bug introduced by
The method lrange() does not exist on Resque_Redis. Since you implemented __call, consider adding a @method annotation. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-call  annotation

285
		$list = self::redis()->/** @scrutinizer ignore-call */ lrange('queue:' . $queue, $start, $stop);
Loading history...
286
		if(!is_array($list)) {
287
			$list = array();
288
		}
289
		return $list;
290
	}
291
292
	/**
293
	 * Remove Items from the queue
294
	 * Safely moving each item to a temporary queue before processing it
295
	 * If the Job matches, counts otherwise puts it in a requeue_queue
296
	 * which at the end eventually be copied back into the original queue
297
	 *
298
	 * @private
299
	 *
300
	 * @param string $queue The name of the queue
301
	 * @param array $items
302
	 * @return integer number of deleted items
303
	 */
304
	private static function removeItems($queue, $items = Array())
305
	{
306
		$counter = 0;
307
		$originalQueue = 'queue:'. $queue;
308
		$tempQueue = $originalQueue. ':temp:'. time();
309
		$requeueQueue = $tempQueue. ':requeue';
310
311
		// move each item from original queue to temp queue and process it
312
		$finished = false;
313
		while (!$finished) {
314
			$string = self::redis()->rpoplpush($originalQueue, self::redis()->getPrefix() . $tempQueue);
0 ignored issues
show
Bug introduced by
The method rpoplpush() does not exist on Resque_Redis. Since you implemented __call, consider adding a @method annotation. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-call  annotation

314
			$string = self::redis()->/** @scrutinizer ignore-call */ rpoplpush($originalQueue, self::redis()->getPrefix() . $tempQueue);
Loading history...
315
316
			if (!empty($string)) {
317
				if(self::matchItem($string, $items)) {
318
					self::redis()->rpop($tempQueue);
0 ignored issues
show
Bug introduced by
The method rpop() does not exist on Resque_Redis. Since you implemented __call, consider adding a @method annotation. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-call  annotation

318
					self::redis()->/** @scrutinizer ignore-call */ rpop($tempQueue);
Loading history...
319
					$counter++;
320
				} else {
321
					self::redis()->rpoplpush($tempQueue, self::redis()->getPrefix() . $requeueQueue);
322
				}
323
			} else {
324
				$finished = true;
325
			}
326
		}
327
328
		// move back from temp queue to original queue
329
		$finished = false;
330
		while (!$finished) {
331
			$string = self::redis()->rpoplpush($requeueQueue, self::redis()->getPrefix() .$originalQueue);
332
			if (empty($string)) {
333
			    $finished = true;
334
			}
335
		}
336
337
		// remove temp queue and requeue queue
338
		self::redis()->del($requeueQueue);
0 ignored issues
show
Bug introduced by
The method del() does not exist on Resque_Redis. Since you implemented __call, consider adding a @method annotation. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-call  annotation

338
		self::redis()->/** @scrutinizer ignore-call */ del($requeueQueue);
Loading history...
339
		self::redis()->del($tempQueue);
340
341
		return $counter;
342
	}
343
344
	/**
345
	 * matching item
346
	 * item can be ['class'] or ['class' => 'id'] or ['class' => {'foo' => 1, 'bar' => 2}]
347
	 * @private
348
	 *
349
	 * @params string $string redis result in json
350
	 * @params $items
351
	 *
352
	 * @return (bool)
353
	 */
354
	private static function matchItem($string, $items)
355
	{
356
	    $decoded = json_decode($string, true);
357
358
	    foreach($items as $key => $val) {
359
			# class name only  ex: item[0] = ['class']
360
			if (is_numeric($key)) {
361
			    if($decoded['class'] == $val) {
362
					return true;
363
			    }
364
			# class name with args , example: item[0] = ['class' => {'foo' => 1, 'bar' => 2}]
365
			} elseif (is_array($val)) {
366
			    $decodedArgs = (array)$decoded['args'][0];
367
			    if ($decoded['class'] == $key &&
368
					count($decodedArgs) > 0 && count(array_diff($decodedArgs, $val)) == 0) {
369
					return true;
370
				}
371
			# class name with ID, example: item[0] = ['class' => 'id']
372
			} else {
373
			    if ($decoded['class'] == $key && $decoded['id'] == $val) {
374
					return true;
375
			    }
376
			}
377
	    }
378
	    return false;
379
	}
380
381
	/**
382
	 * Remove List
383
	 *
384
	 * @private
385
	 *
386
	 * @params string $queue the name of the queue
387
	 * @return integer number of deleted items belongs to this list
388
	 */
389
	private static function removeList($queue)
390
	{
391
	    $counter = self::size($queue);
392
	    $result = self::redis()->del('queue:' . $queue);
393
	    return ($result == 1) ? $counter : 0;
394
	}
395
396
	/*
397
	 * Generate an identifier to attach to a job for status tracking.
398
	 *
399
	 * @return string
400
	 */
401
	public static function generateJobId()
402
	{
403
		return md5(uniqid('', true));
404
	}
405
}
406