Completed
Push — master ( 7dd147...b6a920 )
by Hennik
03:55 queued 12s
created

ResqueScheduler::delayedPush()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 7
Code Lines 4

Duplication

Lines 0
Ratio 0 %

Importance

Changes 1
Bugs 0 Features 1
Metric Value
cc 1
eloc 4
c 1
b 0
f 1
nc 1
nop 2
dl 0
loc 7
rs 10
1
<?php
2
/**
3
* ResqueScheduler core class to handle scheduling of jobs in the future.
4
*
5
* @package		ResqueScheduler
6
* @author		Chris Boulton <[email protected]>
7
* @copyright	(c) 2012 Chris Boulton
8
* @license		http://www.opensource.org/licenses/mit-license.php
9
*/
10
class ResqueScheduler
11
{
12
	const VERSION = "0.1";
13
	
14
	/**
15
	 * Enqueue a job in a given number of seconds from now.
16
	 *
17
	 * Identical to Resque::enqueue, however the first argument is the number
18
	 * of seconds before the job should be executed.
19
	 *
20
	 * @param int $in Number of seconds from now when the job should be executed.
21
	 * @param string $queue The name of the queue to place the job in.
22
	 * @param string $class The name of the class that contains the code to execute the job.
23
	 * @param array $args Any optional arguments that should be passed when the job is executed.
24
	 */
25
	public static function enqueueIn($in, $queue, $class, array $args = array())
26
	{
27
		self::enqueueAt(time() + $in, $queue, $class, $args);
28
	}
29
30
	/**
31
	 * Enqueue a job for execution at a given timestamp.
32
	 *
33
	 * Identical to Resque::enqueue, however the first argument is a timestamp
34
	 * (either UNIX timestamp in integer format or an instance of the DateTime
35
	 * class in PHP).
36
	 *
37
	 * @param DateTime|int $at Instance of PHP DateTime object or int of UNIX timestamp.
38
	 * @param string $queue The name of the queue to place the job in.
39
	 * @param string $class The name of the class that contains the code to execute the job.
40
	 * @param array $args Any optional arguments that should be passed when the job is executed.
41
	 */
42
	public static function enqueueAt($at, $queue, $class, $args = array())
43
	{
44
		self::validateJob($class, $queue);
45
46
		$job = self::jobToHash($queue, $class, $args);
47
		self::delayedPush($at, $job);
48
		
49
		Resque_Event::trigger('afterSchedule', array(
50
			'at'    => $at,
51
			'queue' => $queue,
52
			'class' => $class,
53
			'args'  => $args,
54
		));
55
	}
56
57
	/**
58
	 * Directly append an item to the delayed queue schedule.
59
	 *
60
	 * @param DateTime|int $timestamp Timestamp job is scheduled to be run at.
61
	 * @param array $item Hash of item to be pushed to schedule.
62
	 */
63
	public static function delayedPush($timestamp, $item)
64
	{
65
		$timestamp = self::getTimestamp($timestamp);
66
		$redis = Resque::redis();
67
		$redis->rpush('delayed:' . $timestamp, json_encode($item));
0 ignored issues
show
Bug introduced by
The method rpush() does not exist on Resque_Redis. Since you implemented __call, consider adding a @method annotation. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-call  annotation

67
		$redis->/** @scrutinizer ignore-call */ 
68
          rpush('delayed:' . $timestamp, json_encode($item));
Loading history...
68
69
		$redis->zadd('delayed_queue_schedule', $timestamp, $timestamp);
0 ignored issues
show
Bug introduced by
The method zadd() does not exist on Resque_Redis. Since you implemented __call, consider adding a @method annotation. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-call  annotation

69
		$redis->/** @scrutinizer ignore-call */ 
70
          zadd('delayed_queue_schedule', $timestamp, $timestamp);
Loading history...
70
	}
71
72
	/**
73
	 * Get the total number of jobs in the delayed schedule.
74
	 *
75
	 * @return int Number of scheduled jobs.
76
	 */
77
	public static function getDelayedQueueScheduleSize()
78
	{
79
		return (int)Resque::redis()->zcard('delayed_queue_schedule');
0 ignored issues
show
Bug introduced by
The method zcard() does not exist on Resque_Redis. Since you implemented __call, consider adding a @method annotation. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-call  annotation

79
		return (int)Resque::redis()->/** @scrutinizer ignore-call */ zcard('delayed_queue_schedule');
Loading history...
80
	}
81
82
	/**
83
	 * Get the number of jobs for a given timestamp in the delayed schedule.
84
	 *
85
	 * @param DateTime|int $timestamp Timestamp
86
	 * @return int Number of scheduled jobs.
87
	 */
88
	public static function getDelayedTimestampSize($timestamp)
89
	{
90
		$timestamp = self::toTimestamp($timestamp);
0 ignored issues
show
Bug introduced by
The method toTimestamp() does not exist on ResqueScheduler. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-call  annotation

90
		/** @scrutinizer ignore-call */ 
91
  $timestamp = self::toTimestamp($timestamp);

This check looks for calls to methods that do not seem to exist on a given type. It looks for the method on the type itself as well as in inherited classes or implemented interfaces.

This is most likely a typographical error or the method has been renamed.

Loading history...
91
		return Resque::redis()->llen('delayed:' . $timestamp, $timestamp);
0 ignored issues
show
Bug introduced by
The method llen() does not exist on Resque_Redis. Since you implemented __call, consider adding a @method annotation. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-call  annotation

91
		return Resque::redis()->/** @scrutinizer ignore-call */ llen('delayed:' . $timestamp, $timestamp);
Loading history...
92
	}
93
94
    /**
95
     * Remove a delayed job from the queue
96
     *
97
     * note: you must specify exactly the same
98
     * queue, class and arguments that you used when you added
99
     * to the delayed queue
100
     *
101
     * also, this is an expensive operation because all delayed keys have tobe
102
     * searched
103
     *
104
     * @param $queue
105
     * @param $class
106
     * @param $args
107
     * @return int number of jobs that were removed
108
     */
109
    public static function removeDelayed($queue, $class, $args)
110
    {
111
       $destroyed=0;
112
       $item=json_encode(self::jobToHash($queue, $class, $args));
113
       $redis=Resque::redis();
114
115
       foreach($redis->keys('delayed:*') as $key)
0 ignored issues
show
Bug introduced by
The method keys() does not exist on Resque_Redis. Since you implemented __call, consider adding a @method annotation. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-call  annotation

115
       foreach($redis->/** @scrutinizer ignore-call */ keys('delayed:*') as $key)
Loading history...
116
       {
117
           $key=$redis->removePrefix($key);
118
           $destroyed+=$redis->lrem($key,0,$item);
0 ignored issues
show
Bug introduced by
The method lrem() does not exist on Resque_Redis. Since you implemented __call, consider adding a @method annotation. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-call  annotation

118
           $destroyed+=$redis->/** @scrutinizer ignore-call */ lrem($key,0,$item);
Loading history...
119
       }
120
121
       return $destroyed;
122
    }
123
124
    /**
125
     * removed a delayed job queued for a specific timestamp
126
     *
127
     * note: you must specify exactly the same
128
     * queue, class and arguments that you used when you added
129
     * to the delayed queue
130
     *
131
     * @param $timestamp
132
     * @param $queue
133
     * @param $class
134
     * @param $args
135
     * @return mixed
136
     */
137
    public static function removeDelayedJobFromTimestamp($timestamp, $queue, $class, $args)
138
    {
139
        $key = 'delayed:' . self::getTimestamp($timestamp);
140
        $item = json_encode(self::jobToHash($queue, $class, $args));
141
        $redis = Resque::redis();
142
        $count = $redis->lrem($key, 0, $item);
143
        self::cleanupTimestamp($key, $timestamp);
144
145
        return $count;
146
    }
147
	
148
	/**
149
	 * Generate hash of all job properties to be saved in the scheduled queue.
150
	 *
151
	 * @param string $queue Name of the queue the job will be placed on.
152
	 * @param string $class Name of the job class.
153
	 * @param array $args Array of job arguments.
154
	 */
155
156
	private static function jobToHash($queue, $class, $args)
157
	{
158
		return array(
159
			'class' => $class,
160
			'args'  => array($args),
161
			'queue' => $queue,
162
		);
163
	}
164
165
	/**
166
	 * If there are no jobs for a given key/timestamp, delete references to it.
167
	 *
168
	 * Used internally to remove empty delayed: items in Redis when there are
169
	 * no more jobs left to run at that timestamp.
170
	 *
171
	 * @param string $key Key to count number of items at.
172
	 * @param int $timestamp Matching timestamp for $key.
173
	 */
174
	private static function cleanupTimestamp($key, $timestamp)
175
	{
176
		$timestamp = self::getTimestamp($timestamp);
177
		$redis = Resque::redis();
178
179
		if ($redis->llen($key) == 0) {
180
			$redis->del($key);
0 ignored issues
show
Bug introduced by
The method del() does not exist on Resque_Redis. Since you implemented __call, consider adding a @method annotation. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-call  annotation

180
			$redis->/** @scrutinizer ignore-call */ 
181
           del($key);
Loading history...
181
			$redis->zrem('delayed_queue_schedule', $timestamp);
0 ignored issues
show
Bug introduced by
The method zrem() does not exist on Resque_Redis. Since you implemented __call, consider adding a @method annotation. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-call  annotation

181
			$redis->/** @scrutinizer ignore-call */ 
182
           zrem('delayed_queue_schedule', $timestamp);
Loading history...
182
		}
183
	}
184
185
	/**
186
	 * Convert a timestamp in some format in to a unix timestamp as an integer.
187
	 *
188
	 * @param DateTime|int $timestamp Instance of DateTime or UNIX timestamp.
189
	 * @return int Timestamp
190
	 * @throws ResqueScheduler_InvalidTimestampException
191
	 */
192
	private static function getTimestamp($timestamp)
193
	{
194
		if ($timestamp instanceof DateTime) {
195
			$timestamp = $timestamp->getTimestamp();
196
		}
197
		
198
		if ((int)$timestamp != $timestamp) {
199
			throw new ResqueScheduler_InvalidTimestampException(
200
				'The supplied timestamp value could not be converted to an integer.'
201
			);
202
		}
203
204
		return (int)$timestamp;
205
	}
206
207
	/**
208
	 * Find the first timestamp in the delayed schedule before/including the timestamp.
209
	 *
210
	 * Will find and return the first timestamp upto and including the given
211
	 * timestamp. This is the heart of the ResqueScheduler that will make sure
212
	 * that any jobs scheduled for the past when the worker wasn't running are
213
	 * also queued up.
214
	 *
215
	 * @param DateTime|int $timestamp Instance of DateTime or UNIX timestamp.
216
	 *                                Defaults to now.
217
	 * @return int|false UNIX timestamp, or false if nothing to run.
218
	 */
219
	public static function nextDelayedTimestamp($at = null)
220
	{
221
		if ($at === null) {
222
			$at = time();
223
		}
224
		else {
225
			$at = self::getTimestamp($at);
226
		}
227
	
228
		$items = Resque::redis()->zrangebyscore('delayed_queue_schedule', '-inf', $at, array('limit' => array(0, 1)));
0 ignored issues
show
Bug introduced by
The method zrangebyscore() does not exist on Resque_Redis. Since you implemented __call, consider adding a @method annotation. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-call  annotation

228
		$items = Resque::redis()->/** @scrutinizer ignore-call */ zrangebyscore('delayed_queue_schedule', '-inf', $at, array('limit' => array(0, 1)));
Loading history...
229
		if (!empty($items)) {
230
			return $items[0];
231
		}
232
		
233
		return false;
234
	}	
235
	
236
	/**
237
	 * Pop a job off the delayed queue for a given timestamp.
238
	 *
239
	 * @param DateTime|int $timestamp Instance of DateTime or UNIX timestamp.
240
	 * @return array Matching job at timestamp.
241
	 */
242
	public static function nextItemForTimestamp($timestamp)
243
	{
244
		$timestamp = self::getTimestamp($timestamp);
245
		$key = 'delayed:' . $timestamp;
246
		
247
		$item = json_decode(Resque::redis()->lpop($key), true);
0 ignored issues
show
Bug introduced by
The method lpop() does not exist on Resque_Redis. Since you implemented __call, consider adding a @method annotation. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-call  annotation

247
		$item = json_decode(Resque::redis()->/** @scrutinizer ignore-call */ lpop($key), true);
Loading history...
248
		
249
		self::cleanupTimestamp($key, $timestamp);
250
		return $item;
251
	}
252
253
	/**
254
	 * Ensure that supplied job class/queue is valid.
255
	 *
256
	 * @param string $class Name of job class.
257
	 * @param string $queue Name of queue.
258
	 * @throws Resque_Exception
259
	 */
260
	private static function validateJob($class, $queue)
261
	{
262
		if (empty($class)) {
263
			throw new Resque_Exception('Jobs must be given a class.');
264
		}
265
		else if (empty($queue)) {
266
			throw new Resque_Exception('Jobs must be put in a queue.');
267
		}
268
		
269
		return true;
270
	}
271
}
272