ResqueScheduler   A
last analyzed

Complexity

Total Complexity 21

Size/Duplication

Total Lines 257
Duplicated Lines 0 %

Importance

Changes 4
Bugs 0 Features 2
Metric Value
eloc 64
c 4
b 0
f 2
dl 0
loc 257
rs 10
wmc 21

13 Methods

Rating   Name   Duplication   Size   Complexity  
A getTimestamp() 0 13 3
A removeDelayedJobFromTimestamp() 0 9 1
A cleanupTimestamp() 0 8 2
A jobToHash() 0 6 1
A getDelayedQueueScheduleSize() 0 3 1
A getDelayedTimestampSize() 0 4 1
A removeDelayed() 0 12 2
A nextItemForTimestamp() 0 9 1
A enqueueIn() 0 3 1
A validateJob() 0 9 3
A delayedPush() 0 7 1
A enqueueAt() 0 12 1
A nextDelayedTimestamp() 0 14 3
1
<?php
2
3
/**
4
* ResqueScheduler core class to handle scheduling of jobs in the future.
5
*
6
* @package		ResqueScheduler
7
* @author		Chris Boulton <[email protected]>
8
* @copyright	(c) 2012 Chris Boulton
9
* @license		http://www.opensource.org/licenses/mit-license.php
10
*/
11
class ResqueScheduler
12
{
13
	const VERSION = "0.1";
14
15
	/**
16
	 * Enqueue a job in a given number of seconds from now.
17
	 *
18
	 * Identical to Resque::enqueue, however the first argument is the number
19
	 * of seconds before the job should be executed.
20
	 *
21
	 * @param int $in Number of seconds from now when the job should be executed.
22
	 * @param string $queue The name of the queue to place the job in.
23
	 * @param string $class The name of the class that contains the code to execute the job.
24
	 * @param array $args Any optional arguments that should be passed when the job is executed.
25
	 */
26
	public static function enqueueIn($in, $queue, $class, array $args = array())
27
	{
28
		self::enqueueAt(time() + $in, $queue, $class, $args);
29
	}
30
31
	/**
32
	 * Enqueue a job for execution at a given timestamp.
33
	 *
34
	 * Identical to Resque::enqueue, however the first argument is a timestamp
35
	 * (either UNIX timestamp in integer format or an instance of the DateTime
36
	 * class in PHP).
37
	 *
38
	 * @param DateTime|int $at Instance of PHP DateTime object or int of UNIX timestamp.
39
	 * @param string $queue The name of the queue to place the job in.
40
	 * @param string $class The name of the class that contains the code to execute the job.
41
	 * @param array $args Any optional arguments that should be passed when the job is executed.
42
	 */
43
	public static function enqueueAt($at, $queue, $class, $args = array())
44
	{
45
		self::validateJob($class, $queue);
46
47
		$job = self::jobToHash($queue, $class, $args);
48
		self::delayedPush($at, $job);
49
50
		Resque_Event::trigger('afterSchedule', array(
51
			'at'    => $at,
52
			'queue' => $queue,
53
			'class' => $class,
54
			'args'  => $args,
55
		));
56
	}
57
58
	/**
59
	 * Directly append an item to the delayed queue schedule.
60
	 *
61
	 * @param DateTime|int $timestamp Timestamp job is scheduled to be run at.
62
	 * @param array $item Hash of item to be pushed to schedule.
63
	 */
64
	public static function delayedPush($timestamp, $item)
65
	{
66
		$timestamp = self::getTimestamp($timestamp);
67
		$redis = Resque::redis();
68
		$redis->rpush('delayed:' . $timestamp, json_encode($item));
0 ignored issues
show
Bug introduced by
The method rpush() does not exist on Resque_Redis. Since you implemented __call, consider adding a @method annotation. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-call  annotation

68
		$redis->/** @scrutinizer ignore-call */ 
69
          rpush('delayed:' . $timestamp, json_encode($item));
Loading history...
69
70
		$redis->zadd('delayed_queue_schedule', $timestamp, $timestamp);
0 ignored issues
show
Bug introduced by
The method zadd() does not exist on Resque_Redis. Since you implemented __call, consider adding a @method annotation. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-call  annotation

70
		$redis->/** @scrutinizer ignore-call */ 
71
          zadd('delayed_queue_schedule', $timestamp, $timestamp);
Loading history...
71
	}
72
73
	/**
74
	 * Get the total number of jobs in the delayed schedule.
75
	 *
76
	 * @return int Number of scheduled jobs.
77
	 */
78
	public static function getDelayedQueueScheduleSize()
79
	{
80
		return (int)Resque::redis()->zcard('delayed_queue_schedule');
0 ignored issues
show
Bug introduced by
The method zcard() does not exist on Resque_Redis. Since you implemented __call, consider adding a @method annotation. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-call  annotation

80
		return (int)Resque::redis()->/** @scrutinizer ignore-call */ zcard('delayed_queue_schedule');
Loading history...
81
	}
82
83
	/**
84
	 * Get the number of jobs for a given timestamp in the delayed schedule.
85
	 *
86
	 * @param DateTime|int $timestamp Timestamp
87
	 * @return int Number of scheduled jobs.
88
	 */
89
	public static function getDelayedTimestampSize($timestamp)
90
	{
91
		$timestamp = self::getTimestamp($timestamp);
92
		return Resque::redis()->llen('delayed:' . $timestamp, $timestamp);
0 ignored issues
show
Bug introduced by
The method llen() does not exist on Resque_Redis. Since you implemented __call, consider adding a @method annotation. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-call  annotation

92
		return Resque::redis()->/** @scrutinizer ignore-call */ llen('delayed:' . $timestamp, $timestamp);
Loading history...
93
	}
94
95
	/**
96
	 * Remove a delayed job from the queue
97
	 *
98
	 * note: you must specify exactly the same
99
	 * queue, class and arguments that you used when you added
100
	 * to the delayed queue
101
	 *
102
	 * also, this is an expensive operation because all delayed keys have tobe
103
	 * searched
104
	 *
105
	 * @param $queue
106
	 * @param $class
107
	 * @param $args
108
	 * @return int number of jobs that were removed
109
	 */
110
	public static function removeDelayed($queue, $class, $args)
111
	{
112
		$destroyed = 0;
113
		$item = json_encode(self::jobToHash($queue, $class, $args));
114
		$redis = Resque::redis();
115
116
		foreach ($redis->keys('delayed:*') as $key) {
0 ignored issues
show
Bug introduced by
The method keys() does not exist on Resque_Redis. Since you implemented __call, consider adding a @method annotation. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-call  annotation

116
		foreach ($redis->/** @scrutinizer ignore-call */ keys('delayed:*') as $key) {
Loading history...
117
			$key = $redis->removePrefix($key);
118
			$destroyed += $redis->lrem($key, 0, $item);
0 ignored issues
show
Bug introduced by
The method lrem() does not exist on Resque_Redis. Since you implemented __call, consider adding a @method annotation. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-call  annotation

118
			$destroyed += $redis->/** @scrutinizer ignore-call */ lrem($key, 0, $item);
Loading history...
119
		}
120
121
		return $destroyed;
122
	}
123
124
	/**
125
	 * removed a delayed job queued for a specific timestamp
126
	 *
127
	 * note: you must specify exactly the same
128
	 * queue, class and arguments that you used when you added
129
	 * to the delayed queue
130
	 *
131
	 * @param $timestamp
132
	 * @param $queue
133
	 * @param $class
134
	 * @param $args
135
	 * @return mixed
136
	 */
137
	public static function removeDelayedJobFromTimestamp($timestamp, $queue, $class, $args)
138
	{
139
		$key = 'delayed:' . self::getTimestamp($timestamp);
140
		$item = json_encode(self::jobToHash($queue, $class, $args));
141
		$redis = Resque::redis();
142
		$count = $redis->lrem($key, 0, $item);
143
		self::cleanupTimestamp($key, $timestamp);
144
145
		return $count;
146
	}
147
148
	/**
149
	 * Generate hash of all job properties to be saved in the scheduled queue.
150
	 *
151
	 * @param string $queue Name of the queue the job will be placed on.
152
	 * @param string $class Name of the job class.
153
	 * @param array $args Array of job arguments.
154
	 */
155
156
	private static function jobToHash($queue, $class, $args)
157
	{
158
		return array(
159
			'class' => $class,
160
			'args'  => array($args),
161
			'queue' => $queue,
162
		);
163
	}
164
165
	/**
166
	 * If there are no jobs for a given key/timestamp, delete references to it.
167
	 *
168
	 * Used internally to remove empty delayed: items in Redis when there are
169
	 * no more jobs left to run at that timestamp.
170
	 *
171
	 * @param string $key Key to count number of items at.
172
	 * @param int $timestamp Matching timestamp for $key.
173
	 */
174
	private static function cleanupTimestamp($key, $timestamp)
175
	{
176
		$timestamp = self::getTimestamp($timestamp);
177
		$redis = Resque::redis();
178
179
		if ($redis->llen($key) == 0) {
180
			$redis->del($key);
0 ignored issues
show
Bug introduced by
The method del() does not exist on Resque_Redis. Since you implemented __call, consider adding a @method annotation. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-call  annotation

180
			$redis->/** @scrutinizer ignore-call */ 
181
           del($key);
Loading history...
181
			$redis->zrem('delayed_queue_schedule', $timestamp);
0 ignored issues
show
Bug introduced by
The method zrem() does not exist on Resque_Redis. Since you implemented __call, consider adding a @method annotation. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-call  annotation

181
			$redis->/** @scrutinizer ignore-call */ 
182
           zrem('delayed_queue_schedule', $timestamp);
Loading history...
182
		}
183
	}
184
185
	/**
186
	 * Convert a timestamp in some format in to a unix timestamp as an integer.
187
	 *
188
	 * @param DateTime|int $timestamp Instance of DateTime or UNIX timestamp.
189
	 * @return int Timestamp
190
	 * @throws ResqueScheduler_InvalidTimestampException
191
	 */
192
	private static function getTimestamp($timestamp)
193
	{
194
		if ($timestamp instanceof DateTime) {
195
			$timestamp = $timestamp->getTimestamp();
196
		}
197
198
		if ((int)$timestamp != $timestamp) {
199
			throw new ResqueScheduler_InvalidTimestampException(
200
				'The supplied timestamp value could not be converted to an integer.'
201
			);
202
		}
203
204
		return (int)$timestamp;
205
	}
206
207
	/**
208
	 * Find the first timestamp in the delayed schedule before/including the timestamp.
209
	 *
210
	 * Will find and return the first timestamp upto and including the given
211
	 * timestamp. This is the heart of the ResqueScheduler that will make sure
212
	 * that any jobs scheduled for the past when the worker wasn't running are
213
	 * also queued up.
214
	 *
215
	 * @param DateTime|int $timestamp Instance of DateTime or UNIX timestamp.
216
	 *                                Defaults to now.
217
	 * @return int|false UNIX timestamp, or false if nothing to run.
218
	 */
219
	public static function nextDelayedTimestamp($at = null)
220
	{
221
		if ($at === null) {
222
			$at = time();
223
		} else {
224
			$at = self::getTimestamp($at);
225
		}
226
227
		$items = Resque::redis()->zrangebyscore('delayed_queue_schedule', '-inf', $at, array('limit' => array(0, 1)));
0 ignored issues
show
Bug introduced by
The method zrangebyscore() does not exist on Resque_Redis. Since you implemented __call, consider adding a @method annotation. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-call  annotation

227
		$items = Resque::redis()->/** @scrutinizer ignore-call */ zrangebyscore('delayed_queue_schedule', '-inf', $at, array('limit' => array(0, 1)));
Loading history...
228
		if (!empty($items)) {
229
			return $items[0];
230
		}
231
232
		return false;
233
	}
234
235
	/**
236
	 * Pop a job off the delayed queue for a given timestamp.
237
	 *
238
	 * @param DateTime|int $timestamp Instance of DateTime or UNIX timestamp.
239
	 * @return array Matching job at timestamp.
240
	 */
241
	public static function nextItemForTimestamp($timestamp)
242
	{
243
		$timestamp = self::getTimestamp($timestamp);
244
		$key = 'delayed:' . $timestamp;
245
246
		$item = json_decode(Resque::redis()->lpop($key), true);
0 ignored issues
show
Bug introduced by
The method lpop() does not exist on Resque_Redis. Since you implemented __call, consider adding a @method annotation. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-call  annotation

246
		$item = json_decode(Resque::redis()->/** @scrutinizer ignore-call */ lpop($key), true);
Loading history...
247
248
		self::cleanupTimestamp($key, $timestamp);
249
		return $item;
250
	}
251
252
	/**
253
	 * Ensure that supplied job class/queue is valid.
254
	 *
255
	 * @param string $class Name of job class.
256
	 * @param string $queue Name of queue.
257
	 * @throws Resque_Exception
258
	 */
259
	private static function validateJob($class, $queue)
260
	{
261
		if (empty($class)) {
262
			throw new Resque_Exception('Jobs must be given a class.');
263
		} elseif (empty($queue)) {
264
			throw new Resque_Exception('Jobs must be put in a queue.');
265
		}
266
267
		return true;
268
	}
269
}
270