Resque   B
last analyzed

Complexity

Total Complexity 46

Size/Duplication

Total Lines 396
Duplicated Lines 0 %

Importance

Changes 18
Bugs 2 Features 0
Metric Value
eloc 121
c 18
b 2
f 0
dl 0
loc 396
rs 8.72
wmc 46

17 Methods

Rating   Name   Duplication   Size   Complexity  
A reserve() 0 3 1
A queues() 0 7 2
B removeItems() 0 38 6
A items() 0 7 2
A setBackend() 0 6 1
A enqueue() 0 19 2
A blpop() 0 23 3
A removeQueue() 0 5 1
A push() 0 12 3
A removeList() 0 5 2
A generateJobId() 0 3 1
A pop() 0 9 2
A size() 0 3 1
B matchItem() 0 27 10
A fork() 0 16 3
A redis() 0 17 4
A dequeue() 0 6 2

How to fix   Complexity   

Complex Class

Complex classes like Resque often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

While breaking up the class, it is a good idea to analyze how other classes use Resque, and based on these observations, apply Extract Interface, too.

1
<?php
2
3
/**
4
 * Base Resque class.
5
 *
6
 * @package		Resque
7
 * @author		Chris Boulton <[email protected]>
8
 * @license		http://www.opensource.org/licenses/mit-license.php
9
 */
10
class Resque
11
{
12
	const VERSION = '1.2';
13
14
	const DEFAULT_INTERVAL = 5;
15
16
	/**
17
	 * @var Resque_Redis Instance of Resque_Redis that talks to redis.
18
	 */
19
	public static $redis = null;
20
21
	/**
22
	 * @var mixed Host/port conbination separated by a colon, or a nested
23
	 * array of server swith host/port pairs
24
	 */
25
	protected static $redisServer = null;
26
27
	/**
28
	 * @var int ID of Redis database to select.
29
	 */
30
	protected static $redisDatabase = 0;
31
32
	/**
33
	 * @var string auth of Redis database
34
	 */
35
	protected static $auth;
36
37
	/**
38
	 * Given a host/port combination separated by a colon, set it as
39
	 * the redis server that Resque will talk to.
40
	 *
41
	 * @param mixed $server Host/port combination separated by a colon, DSN-formatted URI, or
42
	 *                      a callable that receives the configured database ID
43
	 *                      and returns a Resque_Redis instance, or
44
	 *                      a nested array of servers with host/port pairs.
45
	 * @param int $database
46
	 * @param string $auth
47
	 */
48
	public static function setBackend($server, $database = 0, $auth = null)
49
	{
50
		self::$redisServer   = $server;
51
		self::$redisDatabase = $database;
52
		self::$auth          = $auth;
53
		self::$redis         = null;
54
	}
55
56
	/**
57
	 * Return an instance of the Resque_Redis class instantiated for Resque.
58
	 *
59
	 * @return Resque_Redis Instance of Resque_Redis.
60
	 */
61
	public static function redis()
62
	{
63
		if (self::$redis !== null) {
64
			return self::$redis;
65
		}
66
67
		if (is_callable(self::$redisServer)) {
68
			self::$redis = call_user_func(self::$redisServer, self::$redisDatabase);
69
		} else {
70
			self::$redis = new Resque_Redis(self::$redisServer, self::$redisDatabase);
71
		}
72
73
		if (!empty(self::$auth)) {
74
			self::$redis->auth(self::$auth);
0 ignored issues
show
Bug introduced by
The method auth() does not exist on Resque_Redis. Since you implemented __call, consider adding a @method annotation. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-call  annotation

74
			self::$redis->/** @scrutinizer ignore-call */ 
75
                 auth(self::$auth);
Loading history...
75
		}
76
77
		return self::$redis;
78
	}
79
80
	/**
81
	 * fork() helper method for php-resque that handles issues PHP socket
82
	 * and phpredis have with passing around sockets between child/parent
83
	 * processes.
84
	 *
85
	 * Will close connection to Redis before forking.
86
	 *
87
	 * @return int Return vars as per pcntl_fork(). False if pcntl_fork is unavailable
88
	 */
89
	public static function fork()
90
	{
91
		if (!function_exists('pcntl_fork')) {
92
			return false;
0 ignored issues
show
Bug Best Practice introduced by
The expression return false returns the type false which is incompatible with the documented return type integer.
Loading history...
93
		}
94
95
		// Close the connection to Redis before forking.
96
		// This is a workaround for issues phpredis has.
97
		self::$redis = null;
98
99
		$pid = pcntl_fork();
100
		if ($pid === -1) {
101
			throw new RuntimeException('Unable to fork child worker.');
102
		}
103
104
		return $pid;
105
	}
106
107
	/**
108
	 * Push a job to the end of a specific queue. If the queue does not
109
	 * exist, then create it as well.
110
	 *
111
	 * @param string $queue The name of the queue to add the job to.
112
	 * @param array $item Job description as an array to be JSON encoded.
113
	 */
114
	public static function push($queue, $item)
115
	{
116
		$encodedItem = json_encode($item);
117
		if ($encodedItem === false) {
118
			return false;
119
		}
120
		self::redis()->sadd('queues', $queue);
0 ignored issues
show
Bug introduced by
The method sadd() does not exist on Resque_Redis. Since you implemented __call, consider adding a @method annotation. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-call  annotation

120
		self::redis()->/** @scrutinizer ignore-call */ sadd('queues', $queue);
Loading history...
121
		$length = self::redis()->rpush('queue:' . $queue, $encodedItem);
0 ignored issues
show
Bug introduced by
The method rpush() does not exist on Resque_Redis. Since you implemented __call, consider adding a @method annotation. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-call  annotation

121
		$length = self::redis()->/** @scrutinizer ignore-call */ rpush('queue:' . $queue, $encodedItem);
Loading history...
122
		if ($length < 1) {
123
			return false;
124
		}
125
		return true;
126
	}
127
128
	/**
129
	 * Pop an item off the end of the specified queue, decode it and
130
	 * return it.
131
	 *
132
	 * @param string $queue The name of the queue to fetch an item from.
133
	 * @return array Decoded item from the queue.
134
	 */
135
	public static function pop($queue)
136
	{
137
		$item = self::redis()->lpop('queue:' . $queue);
0 ignored issues
show
Bug introduced by
The method lpop() does not exist on Resque_Redis. Since you implemented __call, consider adding a @method annotation. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-call  annotation

137
		$item = self::redis()->/** @scrutinizer ignore-call */ lpop('queue:' . $queue);
Loading history...
138
139
		if (!$item) {
140
			return;
141
		}
142
143
		return json_decode($item, true);
144
	}
145
146
	/**
147
	 * Remove items of the specified queue
148
	 *
149
	 * @param string $queue The name of the queue to fetch an item from.
150
	 * @param array $items
151
	 * @return integer number of deleted items
152
	 */
153
	public static function dequeue($queue, $items = array())
154
	{
155
		if (count($items) > 0) {
156
			return self::removeItems($queue, $items);
157
		} else {
158
			return self::removeList($queue);
159
		}
160
	}
161
162
	/**
163
	 * Remove specified queue
164
	 *
165
	 * @param string $queue The name of the queue to remove.
166
	 * @return integer Number of deleted items
167
	 */
168
	public static function removeQueue($queue)
169
	{
170
		$num = self::removeList($queue);
171
		self::redis()->srem('queues', $queue);
0 ignored issues
show
Bug introduced by
The method srem() does not exist on Resque_Redis. Since you implemented __call, consider adding a @method annotation. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-call  annotation

171
		self::redis()->/** @scrutinizer ignore-call */ srem('queues', $queue);
Loading history...
172
		return $num;
173
	}
174
175
	/**
176
	 * Pop an item off the end of the specified queues, using blocking list pop,
177
	 * decode it and return it.
178
	 *
179
	 * @param array         $queues
180
	 * @param int           $timeout
181
	 * @return null|array   Decoded item from the queue.
182
	 */
183
	public static function blpop(array $queues, $timeout)
184
	{
185
		$list = array();
186
		foreach ($queues as $queue) {
187
			$list[] = 'queue:' . $queue;
188
		}
189
190
		$item = self::redis()->blpop($list, (int)$timeout);
0 ignored issues
show
Bug introduced by
The method blpop() does not exist on Resque_Redis. Since you implemented __call, consider adding a @method annotation. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-call  annotation

190
		$item = self::redis()->/** @scrutinizer ignore-call */ blpop($list, (int)$timeout);
Loading history...
191
192
		if (!$item) {
193
			return;
194
		}
195
196
		/**
197
		 * Normally the Resque_Redis class returns queue names without the prefix
198
		 * But the blpop is a bit different. It returns the name as prefix:queue:name
199
		 * So we need to strip off the prefix:queue: part
200
		 */
201
		$queue = substr($item[0], strlen(self::redis()->getPrefix() . 'queue:'));
202
203
		return array(
204
		'queue'   => $queue,
205
		'payload' => json_decode($item[1], true)
206
		);
207
	}
208
209
	/**
210
	 * Return the size (number of pending jobs) of the specified queue.
211
	 *
212
	 * @param string $queue name of the queue to be checked for pending jobs
213
	 *
214
	 * @return int The size of the queue.
215
	 */
216
	public static function size($queue)
217
	{
218
		return self::redis()->llen('queue:' . $queue);
0 ignored issues
show
Bug introduced by
The method llen() does not exist on Resque_Redis. Since you implemented __call, consider adding a @method annotation. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-call  annotation

218
		return self::redis()->/** @scrutinizer ignore-call */ llen('queue:' . $queue);
Loading history...
219
	}
220
221
	/**
222
	 * Create a new job and save it to the specified queue.
223
	 *
224
	 * @param string $queue The name of the queue to place the job in.
225
	 * @param string $class The name of the class that contains the code to execute the job.
226
	 * @param array $args Any optional arguments that should be passed when the job is executed.
227
	 * @param boolean $trackStatus Set to true to be able to monitor the status of a job.
228
	 * @param string $prefix The prefix needs to be set for the status key
229
	 *
230
	 * @return string|boolean Job ID when the job was created, false if creation was cancelled due to beforeEnqueue
231
	 */
232
	public static function enqueue($queue, $class, $args = null, $trackStatus = false, $prefix = "")
233
	{
234
		$id         = Resque::generateJobId();
235
		$hookParams = array(
236
			'class' => $class,
237
			'args'  => $args,
238
			'queue' => $queue,
239
			'id'    => $id,
240
		);
241
		try {
242
			Resque_Event::trigger('beforeEnqueue', $hookParams);
243
		} catch (Resque_Job_DontCreate $e) {
244
			return false;
245
		}
246
247
		Resque_Job::create($queue, $class, $args, $trackStatus, $id, $prefix);
248
		Resque_Event::trigger('afterEnqueue', $hookParams);
249
250
		return $id;
251
	}
252
253
	/**
254
	 * Reserve and return the next available job in the specified queue.
255
	 *
256
	 * @param string $queue Queue to fetch next available job from.
257
	 * @return Resque_Job Instance of Resque_Job to be processed, false if none or error.
258
	 */
259
	public static function reserve($queue)
260
	{
261
		return Resque_Job::reserve($queue);
262
	}
263
264
	/**
265
	 * Get an array of all known queues.
266
	 *
267
	 * @return array Array of queues.
268
	 */
269
	public static function queues()
270
	{
271
		$queues = self::redis()->smembers('queues');
0 ignored issues
show
Bug introduced by
The method smembers() does not exist on Resque_Redis. Since you implemented __call, consider adding a @method annotation. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-call  annotation

271
		$queues = self::redis()->/** @scrutinizer ignore-call */ smembers('queues');
Loading history...
272
		if (!is_array($queues)) {
273
			$queues = array();
274
		}
275
		return $queues;
276
	}
277
278
	/**
279
	 * Retrieve all the items of a queue with Redis
280
	 *
281
	 * @return array Array of items.
282
	 */
283
	public static function items($queue, $start = 0, $stop = -1)
284
	{
285
		$list = self::redis()->lrange('queue:' . $queue, $start, $stop);
0 ignored issues
show
Bug introduced by
The method lrange() does not exist on Resque_Redis. Since you implemented __call, consider adding a @method annotation. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-call  annotation

285
		$list = self::redis()->/** @scrutinizer ignore-call */ lrange('queue:' . $queue, $start, $stop);
Loading history...
286
		if (!is_array($list)) {
287
			$list = array();
288
		}
289
		return $list;
290
	}
291
292
	/**
293
	 * Remove Items from the queue
294
	 * Safely moving each item to a temporary queue before processing it
295
	 * If the Job matches, counts otherwise puts it in a requeue_queue
296
	 * which at the end eventually be copied back into the original queue
297
	 *
298
	 * @private
299
	 *
300
	 * @param string $queue The name of the queue
301
	 * @param array $items
302
	 * @return integer number of deleted items
303
	 */
304
	private static function removeItems($queue, $items = array())
305
	{
306
		$counter = 0;
307
		$originalQueue = 'queue:' . $queue;
308
		$tempQueue = $originalQueue . ':temp:' . time();
309
		$requeueQueue = $tempQueue . ':requeue';
310
311
		// move each item from original queue to temp queue and process it
312
		$finished = false;
313
		while (!$finished) {
314
			$string = self::redis()->rpoplpush($originalQueue, self::redis()->getPrefix() . $tempQueue);
0 ignored issues
show
Bug introduced by
The method rpoplpush() does not exist on Resque_Redis. Since you implemented __call, consider adding a @method annotation. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-call  annotation

314
			$string = self::redis()->/** @scrutinizer ignore-call */ rpoplpush($originalQueue, self::redis()->getPrefix() . $tempQueue);
Loading history...
315
316
			if (!empty($string)) {
317
				if (self::matchItem($string, $items)) {
318
					self::redis()->rpop($tempQueue);
0 ignored issues
show
Bug introduced by
The method rpop() does not exist on Resque_Redis. Since you implemented __call, consider adding a @method annotation. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-call  annotation

318
					self::redis()->/** @scrutinizer ignore-call */ rpop($tempQueue);
Loading history...
319
					$counter++;
320
				} else {
321
					self::redis()->rpoplpush($tempQueue, self::redis()->getPrefix() . $requeueQueue);
322
				}
323
			} else {
324
				$finished = true;
325
			}
326
		}
327
328
		// move back from temp queue to original queue
329
		$finished = false;
330
		while (!$finished) {
331
			$string = self::redis()->rpoplpush($requeueQueue, self::redis()->getPrefix() . $originalQueue);
332
			if (empty($string)) {
333
				$finished = true;
334
			}
335
		}
336
337
		// remove temp queue and requeue queue
338
		self::redis()->del($requeueQueue);
0 ignored issues
show
Bug introduced by
The method del() does not exist on Resque_Redis. Since you implemented __call, consider adding a @method annotation. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-call  annotation

338
		self::redis()->/** @scrutinizer ignore-call */ del($requeueQueue);
Loading history...
339
		self::redis()->del($tempQueue);
340
341
		return $counter;
342
	}
343
344
	/**
345
	 * matching item
346
	 * item can be ['class'] or ['class' => 'id'] or ['class' => {'foo' => 1, 'bar' => 2}]
347
	 * @private
348
	 *
349
	 * @params string $string redis result in json
350
	 * @params $items
351
	 *
352
	 * @return (bool)
353
	 */
354
	private static function matchItem($string, $items)
355
	{
356
		$decoded = json_decode($string, true);
357
358
		foreach ($items as $key => $val) {
359
			# class name only  ex: item[0] = ['class']
360
			if (is_numeric($key)) {
361
				if ($decoded['class'] == $val) {
362
					return true;
363
				}
364
			# class name with args , example: item[0] = ['class' => {'foo' => 1, 'bar' => 2}]
365
			} elseif (is_array($val)) {
366
				$decodedArgs = (array)$decoded['args'][0];
367
				if (
368
					$decoded['class'] == $key &&
369
					count($decodedArgs) > 0 && count(array_diff($decodedArgs, $val)) == 0
370
				) {
371
					return true;
372
				}
373
			# class name with ID, example: item[0] = ['class' => 'id']
374
			} else {
375
				if ($decoded['class'] == $key && $decoded['id'] == $val) {
376
					return true;
377
				}
378
			}
379
		}
380
		return false;
381
	}
382
383
	/**
384
	 * Remove List
385
	 *
386
	 * @private
387
	 *
388
	 * @params string $queue the name of the queue
389
	 * @return integer number of deleted items belongs to this list
390
	 */
391
	private static function removeList($queue)
392
	{
393
		$counter = self::size($queue);
394
		$result = self::redis()->del('queue:' . $queue);
395
		return ($result == 1) ? $counter : 0;
396
	}
397
398
	/*
399
	 * Generate an identifier to attach to a job for status tracking.
400
	 *
401
	 * @return string
402
	 */
403
	public static function generateJobId()
404
	{
405
		return md5(uniqid('', true));
406
	}
407
}
408