Test Failed
Pull Request — master (#56)
by
unknown
02:50
created

Resque::matchItem()   B

Complexity

Conditions 10
Paths 7

Size

Total Lines 27
Code Lines 15

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 10
eloc 15
nc 7
nop 2
dl 0
loc 27
rs 7.6666
c 0
b 0
f 0

How to fix   Complexity   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

1
<?php
2
3
namespace Resque;
4
5
use \RuntimeException;
6
use \Resque\Exceptions\DoNotCreateException;
7
8
/**
9
 * Base Resque class.
10
 *
11
 * @package		Resque
12
 * @author		Chris Boulton <[email protected]>
13
 * @license		http://www.opensource.org/licenses/mit-license.php
14
 */
15
class Resque
16
{
17
	const VERSION = '1.2';
18
19
	const DEFAULT_INTERVAL = 5;
20
21
	/**
22
	 * @var Redis Instance of Resque\Redis that talks to redis.
23
	 */
24
	public static $redis = null;
25
26
	/**
27
	 * @var mixed Host/port conbination separated by a colon, or a nested
28
	 * array of server swith host/port pairs
29
	 */
30
	protected static $redisServer = null;
31
32
	/**
33
	 * @var int ID of Redis database to select.
34
	 */
35
	protected static $redisDatabase = 0;
36
37
	/**
38
	 * @var string auth of Redis database
39
	 */
40
	protected static $auth;
41
42
	/**
43
	 * Given a host/port combination separated by a colon, set it as
44
	 * the redis server that Resque will talk to.
45
	 *
46
	 * @param mixed $server Host/port combination separated by a colon, DSN-formatted URI, or
47
	 *                      a callable that receives the configured database ID
48
	 *                      and returns a Resque\Redis instance, or
49
	 *                      a nested array of servers with host/port pairs.
50
	 * @param int $database
51
	 * @param string $auth
52
	 */
53
	public static function setBackend($server, $database = 0, $auth = null)
54
	{
55
		self::$redisServer   = $server;
56
		self::$redisDatabase = $database;
57
		self::$auth          = $auth;
58
		self::$redis         = null;
59
	}
60
61
	/**
62
	 * Return an instance of the Resque\Redis class instantiated for Resque.
63
	 *
64
	 * @return \Resque\Redis Instance of Resque\Redis.
65
	 */
66
	public static function redis()
67
	{
68
		if (self::$redis !== null) {
69
			return self::$redis;
70
		}
71
72
		if (is_callable(self::$redisServer)) {
73
			self::$redis = call_user_func(self::$redisServer, self::$redisDatabase);
74
		} else {
75
			self::$redis = new Redis(self::$redisServer, self::$redisDatabase);
76
		}
77
78
		if (!empty(self::$auth)) {
79
			self::$redis->auth(self::$auth);
0 ignored issues
show
Bug introduced by
The method auth() does not exist on Resque\Redis. Since you implemented __call, consider adding a @method annotation. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-call  annotation

79
			self::$redis->/** @scrutinizer ignore-call */ 
80
                 auth(self::$auth);
Loading history...
80
		}
81
82
		return self::$redis;
83
	}
84
85
	/**
86
	 * fork() helper method for php-resque that handles issues PHP socket
87
	 * and phpredis have with passing around sockets between child/parent
88
	 * processes.
89
	 *
90
	 * Will close connection to Redis before forking.
91
	 *
92
	 * @return int Return vars as per pcntl_fork(). False if pcntl_fork is unavailable
93
	 */
94
	public static function fork()
95
	{
96
		if (!function_exists('pcntl_fork')) {
97
			return false;
0 ignored issues
show
Bug Best Practice introduced by
The expression return false returns the type false which is incompatible with the documented return type integer.
Loading history...
98
		}
99
100
		// Close the connection to Redis before forking.
101
		// This is a workaround for issues phpredis has.
102
		self::$redis = null;
103
104
		$pid = pcntl_fork();
105
		if ($pid === -1) {
106
			throw new RuntimeException('Unable to fork child worker.');
107
		}
108
109
		return $pid;
110
	}
111
112
	/**
113
	 * Push a job to the end of a specific queue. If the queue does not
114
	 * exist, then create it as well.
115
	 *
116
	 * @param string $queue The name of the queue to add the job to.
117
	 * @param array $item Job description as an array to be JSON encoded.
118
	 */
119
	public static function push($queue, $item)
120
	{
121
		$encodedItem = json_encode($item);
122
		if ($encodedItem === false) {
123
			return false;
124
		}
125
		self::redis()->sadd('queues', $queue);
0 ignored issues
show
Bug introduced by
The method sadd() does not exist on Resque\Redis. Since you implemented __call, consider adding a @method annotation. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-call  annotation

125
		self::redis()->/** @scrutinizer ignore-call */ sadd('queues', $queue);
Loading history...
126
		$length = self::redis()->rpush('queue:' . $queue, $encodedItem);
0 ignored issues
show
Bug introduced by
The method rpush() does not exist on Resque\Redis. Since you implemented __call, consider adding a @method annotation. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-call  annotation

126
		$length = self::redis()->/** @scrutinizer ignore-call */ rpush('queue:' . $queue, $encodedItem);
Loading history...
127
		if ($length < 1) {
128
			return false;
129
		}
130
		return true;
131
	}
132
133
	/**
134
	 * Pop an item off the end of the specified queue, decode it and
135
	 * return it.
136
	 *
137
	 * @param string $queue The name of the queue to fetch an item from.
138
	 * @return array Decoded item from the queue.
139
	 */
140
	public static function pop($queue)
141
	{
142
		$item = self::redis()->lpop('queue:' . $queue);
0 ignored issues
show
Bug introduced by
The method lpop() does not exist on Resque\Redis. Since you implemented __call, consider adding a @method annotation. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-call  annotation

142
		$item = self::redis()->/** @scrutinizer ignore-call */ lpop('queue:' . $queue);
Loading history...
143
144
		if (!$item) {
145
			return;
146
		}
147
148
		return json_decode($item, true);
149
	}
150
151
	/**
152
	 * Remove items of the specified queue
153
	 *
154
	 * @param string $queue The name of the queue to fetch an item from.
155
	 * @param array $items
156
	 * @return integer number of deleted items
157
	 */
158
	public static function dequeue($queue, $items = array())
159
	{
160
		if (count($items) > 0) {
161
			return self::removeItems($queue, $items);
162
		} else {
163
			return self::removeList($queue);
164
		}
165
	}
166
167
	/**
168
	 * Remove specified queue
169
	 *
170
	 * @param string $queue The name of the queue to remove.
171
	 * @return integer Number of deleted items
172
	 */
173
	public static function removeQueue($queue)
174
	{
175
		$num = self::removeList($queue);
176
		self::redis()->srem('queues', $queue);
0 ignored issues
show
Bug introduced by
The method srem() does not exist on Resque\Redis. Since you implemented __call, consider adding a @method annotation. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-call  annotation

176
		self::redis()->/** @scrutinizer ignore-call */ srem('queues', $queue);
Loading history...
177
		return $num;
178
	}
179
180
	/**
181
	 * Pop an item off the end of the specified queues, using blocking list pop,
182
	 * decode it and return it.
183
	 *
184
	 * @param array         $queues
185
	 * @param int           $timeout
186
	 * @return null|array   Decoded item from the queue.
187
	 */
188
	public static function blpop(array $queues, $timeout)
189
	{
190
		$list = array();
191
		foreach ($queues as $queue) {
192
			$list[] = 'queue:' . $queue;
193
		}
194
195
		$item = self::redis()->blpop($list, (int)$timeout);
0 ignored issues
show
Bug introduced by
The method blpop() does not exist on Resque\Redis. Since you implemented __call, consider adding a @method annotation. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-call  annotation

195
		$item = self::redis()->/** @scrutinizer ignore-call */ blpop($list, (int)$timeout);
Loading history...
196
197
		if (!$item) {
198
			return;
199
		}
200
201
		/**
202
		 * Normally the Resque\Redis class returns queue names without the prefix
203
		 * But the blpop is a bit different. It returns the name as prefix:queue:name
204
		 * So we need to strip off the prefix:queue: part
205
		 */
206
		$queue = substr($item[0], strlen(self::redis()->getPrefix() . 'queue:'));
207
208
		return array(
209
		'queue'   => $queue,
210
		'payload' => json_decode($item[1], true)
211
		);
212
	}
213
214
	/**
215
	 * Return the size (number of pending jobs) of the specified queue.
216
	 *
217
	 * @param string $queue name of the queue to be checked for pending jobs
218
	 *
219
	 * @return int The size of the queue.
220
	 */
221
	public static function size($queue)
222
	{
223
		return self::redis()->llen('queue:' . $queue);
0 ignored issues
show
Bug introduced by
The method llen() does not exist on Resque\Redis. Since you implemented __call, consider adding a @method annotation. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-call  annotation

223
		return self::redis()->/** @scrutinizer ignore-call */ llen('queue:' . $queue);
Loading history...
224
	}
225
226
	/**
227
	 * Create a new job and save it to the specified queue.
228
	 *
229
	 * @param string $queue The name of the queue to place the job in.
230
	 * @param string $class The name of the class that contains the code to execute the job.
231
	 * @param array $args Any optional arguments that should be passed when the job is executed.
232
	 * @param boolean $trackStatus Set to true to be able to monitor the status of a job.
233
	 * @param string $prefix The prefix needs to be set for the status key
234
	 *
235
	 * @return string|boolean Job ID when the job was created, false if creation was cancelled due to beforeEnqueue
236
	 */
237
	public static function enqueue($queue, $class, $args = null, $trackStatus = false, $prefix = "")
238
	{
239
		$id         = Resque::generateJobId();
240
		$hookParams = array(
241
			'class' => $class,
242
			'args'  => $args,
243
			'queue' => $queue,
244
			'id'    => $id,
245
		);
246
		try {
247
			Event::trigger('beforeEnqueue', $hookParams);
248
		} catch (DoNotCreateException $e) {
249
			return false;
250
		}
251
252
		JobHandler::create($queue, $class, $args, $trackStatus, $id, $prefix);
253
		Event::trigger('afterEnqueue', $hookParams);
254
255
		return $id;
256
	}
257
258
	/**
259
	 * Reserve and return the next available job in the specified queue.
260
	 *
261
	 * @param string $queue Queue to fetch next available job from.
262
	 * @return \Resque\JobHandler Instance of Resque\JobHandler to be processed, false if none or error.
263
	 */
264
	public static function reserve($queue)
265
	{
266
		return JobHandler::reserve($queue);
267
	}
268
269
	/**
270
	 * Get an array of all known queues.
271
	 *
272
	 * @return array Array of queues.
273
	 */
274
	public static function queues()
275
	{
276
		$queues = self::redis()->smembers('queues');
0 ignored issues
show
Bug introduced by
The method smembers() does not exist on Resque\Redis. Since you implemented __call, consider adding a @method annotation. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-call  annotation

276
		$queues = self::redis()->/** @scrutinizer ignore-call */ smembers('queues');
Loading history...
277
		if (!is_array($queues)) {
278
			$queues = array();
279
		}
280
		return $queues;
281
	}
282
283
	/**
284
	 * Retrieve all the items of a queue with Redis
285
	 *
286
	 * @return array Array of items.
287
	 */
288
	public static function items($queue, $start = 0, $stop = -1)
289
	{
290
		$list = self::redis()->lrange('queue:' . $queue, $start, $stop);
0 ignored issues
show
Bug introduced by
The method lrange() does not exist on Resque\Redis. Since you implemented __call, consider adding a @method annotation. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-call  annotation

290
		$list = self::redis()->/** @scrutinizer ignore-call */ lrange('queue:' . $queue, $start, $stop);
Loading history...
291
		if (!is_array($list)) {
292
			$list = array();
293
		}
294
		return $list;
295
	}
296
297
	/**
298
	 * Remove Items from the queue
299
	 * Safely moving each item to a temporary queue before processing it
300
	 * If the Job matches, counts otherwise puts it in a requeue_queue
301
	 * which at the end eventually be copied back into the original queue
302
	 *
303
	 * @private
304
	 *
305
	 * @param string $queue The name of the queue
306
	 * @param array $items
307
	 * @return integer number of deleted items
308
	 */
309
	private static function removeItems($queue, $items = array())
310
	{
311
		$counter = 0;
312
		$originalQueue = 'queue:' . $queue;
313
		$tempQueue = $originalQueue . ':temp:' . time();
314
		$requeueQueue = $tempQueue . ':requeue';
315
316
		// move each item from original queue to temp queue and process it
317
		$finished = false;
318
		while (!$finished) {
319
			$string = self::redis()->rpoplpush($originalQueue, self::redis()->getPrefix() . $tempQueue);
0 ignored issues
show
Bug introduced by
The method rpoplpush() does not exist on Resque\Redis. Since you implemented __call, consider adding a @method annotation. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-call  annotation

319
			$string = self::redis()->/** @scrutinizer ignore-call */ rpoplpush($originalQueue, self::redis()->getPrefix() . $tempQueue);
Loading history...
320
321
			if (!empty($string)) {
322
				if (self::matchItem($string, $items)) {
323
					self::redis()->rpop($tempQueue);
0 ignored issues
show
Bug introduced by
The method rpop() does not exist on Resque\Redis. Since you implemented __call, consider adding a @method annotation. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-call  annotation

323
					self::redis()->/** @scrutinizer ignore-call */ rpop($tempQueue);
Loading history...
324
					$counter++;
325
				} else {
326
					self::redis()->rpoplpush($tempQueue, self::redis()->getPrefix() . $requeueQueue);
327
				}
328
			} else {
329
				$finished = true;
330
			}
331
		}
332
333
		// move back from temp queue to original queue
334
		$finished = false;
335
		while (!$finished) {
336
			$string = self::redis()->rpoplpush($requeueQueue, self::redis()->getPrefix() . $originalQueue);
337
			if (empty($string)) {
338
				$finished = true;
339
			}
340
		}
341
342
		// remove temp queue and requeue queue
343
		self::redis()->del($requeueQueue);
0 ignored issues
show
Bug introduced by
The method del() does not exist on Resque\Redis. Since you implemented __call, consider adding a @method annotation. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-call  annotation

343
		self::redis()->/** @scrutinizer ignore-call */ del($requeueQueue);
Loading history...
344
		self::redis()->del($tempQueue);
345
346
		return $counter;
347
	}
348
349
	/**
350
	 * matching item
351
	 * item can be ['class'] or ['class' => 'id'] or ['class' => {'foo' => 1, 'bar' => 2}]
352
	 * @private
353
	 *
354
	 * @params string $string redis result in json
355
	 * @params $items
356
	 *
357
	 * @return (bool)
358
	 */
359
	private static function matchItem($string, $items)
360
	{
361
		$decoded = json_decode($string, true);
362
363
		foreach ($items as $key => $val) {
364
			# class name only  ex: item[0] = ['class']
365
			if (is_numeric($key)) {
366
				if ($decoded['class'] == $val) {
367
					return true;
368
				}
369
			# class name with args , example: item[0] = ['class' => {'foo' => 1, 'bar' => 2}]
370
			} elseif (is_array($val)) {
371
				$decodedArgs = (array)$decoded['args'][0];
372
				if (
373
					$decoded['class'] == $key &&
374
					count($decodedArgs) > 0 && count(array_diff($decodedArgs, $val)) == 0
375
				) {
376
					return true;
377
				}
378
			# class name with ID, example: item[0] = ['class' => 'id']
379
			} else {
380
				if ($decoded['class'] == $key && $decoded['id'] == $val) {
381
					return true;
382
				}
383
			}
384
		}
385
		return false;
386
	}
387
388
	/**
389
	 * Remove List
390
	 *
391
	 * @private
392
	 *
393
	 * @params string $queue the name of the queue
394
	 * @return integer number of deleted items belongs to this list
395
	 */
396
	private static function removeList($queue)
397
	{
398
		$counter = self::size($queue);
399
		$result = self::redis()->del('queue:' . $queue);
400
		return ($result == 1) ? $counter : 0;
401
	}
402
403
	/*
404
	 * Generate an identifier to attach to a job for status tracking.
405
	 *
406
	 * @return string
407
	 */
408
	public static function generateJobId()
409
	{
410
		return md5(uniqid('', true));
411
	}
412
}
413