Test Failed
Pull Request — master (#56)
by
unknown
02:51
created

Scheduler::getTimestamp()   A

Complexity

Conditions 3
Paths 4

Size

Total Lines 13
Code Lines 6

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 3
eloc 6
nc 4
nop 1
dl 0
loc 13
rs 10
c 0
b 0
f 0
1
<?php
2
3
namespace Resque;
4
5
use \Resque\Exceptions\Exception as ResqueException;
6
use \Resque\Exceptions\InvalidTimestampException;
7
8
/**
9
* Resque scheduler core class to handle scheduling of jobs in the future.
10
*
11
* @package		Resque/Scheduler
12
* @author		Chris Boulton <[email protected]>
13
* @copyright	(c) 2012 Chris Boulton
14
* @license		http://www.opensource.org/licenses/mit-license.php
15
*/
16
class Scheduler
17
{
18
	const VERSION = "0.1";
19
20
	/**
21
	 * Enqueue a job in a given number of seconds from now.
22
	 *
23
	 * Identical to Resque::enqueue, however the first argument is the number
24
	 * of seconds before the job should be executed.
25
	 *
26
	 * @param int $in Number of seconds from now when the job should be executed.
27
	 * @param string $queue The name of the queue to place the job in.
28
	 * @param string $class The name of the class that contains the code to execute the job.
29
	 * @param array $args Any optional arguments that should be passed when the job is executed.
30
	 */
31
	public static function enqueueIn($in, $queue, $class, array $args = array())
32
	{
33
		self::enqueueAt(time() + $in, $queue, $class, $args);
34
	}
35
36
	/**
37
	 * Enqueue a job for execution at a given timestamp.
38
	 *
39
	 * Identical to Resque::enqueue, however the first argument is a timestamp
40
	 * (either UNIX timestamp in integer format or an instance of the DateTime
41
	 * class in PHP).
42
	 *
43
	 * @param DateTime|int $at Instance of PHP DateTime object or int of UNIX timestamp.
0 ignored issues
show
Bug introduced by
The type Resque\DateTime was not found. Did you mean DateTime? If so, make sure to prefix the type with \.
Loading history...
44
	 * @param string $queue The name of the queue to place the job in.
45
	 * @param string $class The name of the class that contains the code to execute the job.
46
	 * @param array $args Any optional arguments that should be passed when the job is executed.
47
	 */
48
	public static function enqueueAt($at, $queue, $class, $args = array())
49
	{
50
		self::validateJob($class, $queue);
51
52
		$job = self::jobToHash($queue, $class, $args);
53
		self::delayedPush($at, $job);
54
55
		Event::trigger('afterSchedule', array(
56
			'at'    => $at,
57
			'queue' => $queue,
58
			'class' => $class,
59
			'args'  => $args,
60
		));
61
	}
62
63
	/**
64
	 * Directly append an item to the delayed queue schedule.
65
	 *
66
	 * @param DateTime|int $timestamp Timestamp job is scheduled to be run at.
67
	 * @param array $item Hash of item to be pushed to schedule.
68
	 */
69
	public static function delayedPush($timestamp, $item)
70
	{
71
		$timestamp = self::getTimestamp($timestamp);
72
		$redis = Resque::redis();
73
		$redis->rpush('delayed:' . $timestamp, json_encode($item));
0 ignored issues
show
Bug introduced by
The method rpush() does not exist on Resque\Redis. Since you implemented __call, consider adding a @method annotation. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-call  annotation

73
		$redis->/** @scrutinizer ignore-call */ 
74
          rpush('delayed:' . $timestamp, json_encode($item));
Loading history...
74
75
		$redis->zadd('delayed_queue_schedule', $timestamp, $timestamp);
0 ignored issues
show
Bug introduced by
The method zadd() does not exist on Resque\Redis. Since you implemented __call, consider adding a @method annotation. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-call  annotation

75
		$redis->/** @scrutinizer ignore-call */ 
76
          zadd('delayed_queue_schedule', $timestamp, $timestamp);
Loading history...
76
	}
77
78
	/**
79
	 * Get the total number of jobs in the delayed schedule.
80
	 *
81
	 * @return int Number of scheduled jobs.
82
	 */
83
	public static function getDelayedQueueScheduleSize()
84
	{
85
		return (int)Resque::redis()->zcard('delayed_queue_schedule');
0 ignored issues
show
Bug introduced by
The method zcard() does not exist on Resque\Redis. Since you implemented __call, consider adding a @method annotation. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-call  annotation

85
		return (int)Resque::redis()->/** @scrutinizer ignore-call */ zcard('delayed_queue_schedule');
Loading history...
86
	}
87
88
	/**
89
	 * Get the number of jobs for a given timestamp in the delayed schedule.
90
	 *
91
	 * @param DateTime|int $timestamp Timestamp
92
	 * @return int Number of scheduled jobs.
93
	 */
94
	public static function getDelayedTimestampSize($timestamp)
95
	{
96
		$timestamp = self::getTimestamp($timestamp);
97
		return Resque::redis()->llen('delayed:' . $timestamp, $timestamp);
0 ignored issues
show
Bug introduced by
The method llen() does not exist on Resque\Redis. Since you implemented __call, consider adding a @method annotation. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-call  annotation

97
		return Resque::redis()->/** @scrutinizer ignore-call */ llen('delayed:' . $timestamp, $timestamp);
Loading history...
98
	}
99
100
	/**
101
	 * Remove a delayed job from the queue
102
	 *
103
	 * note: you must specify exactly the same
104
	 * queue, class and arguments that you used when you added
105
	 * to the delayed queue
106
	 *
107
	 * also, this is an expensive operation because all delayed keys have tobe
108
	 * searched
109
	 *
110
	 * @param $queue
111
	 * @param $class
112
	 * @param $args
113
	 * @return int number of jobs that were removed
114
	 */
115
	public static function removeDelayed($queue, $class, $args)
116
	{
117
		$destroyed = 0;
118
		$item = json_encode(self::jobToHash($queue, $class, $args));
119
		$redis = Resque::redis();
120
121
		foreach ($redis->keys('delayed:*') as $key) {
0 ignored issues
show
Bug introduced by
The method keys() does not exist on Resque\Redis. Since you implemented __call, consider adding a @method annotation. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-call  annotation

121
		foreach ($redis->/** @scrutinizer ignore-call */ keys('delayed:*') as $key) {
Loading history...
122
			$key = $redis->removePrefix($key);
123
			$destroyed += $redis->lrem($key, 0, $item);
0 ignored issues
show
Bug introduced by
The method lrem() does not exist on Resque\Redis. Since you implemented __call, consider adding a @method annotation. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-call  annotation

123
			$destroyed += $redis->/** @scrutinizer ignore-call */ lrem($key, 0, $item);
Loading history...
124
		}
125
126
		return $destroyed;
127
	}
128
129
	/**
130
	 * removed a delayed job queued for a specific timestamp
131
	 *
132
	 * note: you must specify exactly the same
133
	 * queue, class and arguments that you used when you added
134
	 * to the delayed queue
135
	 *
136
	 * @param $timestamp
137
	 * @param $queue
138
	 * @param $class
139
	 * @param $args
140
	 * @return mixed
141
	 */
142
	public static function removeDelayedJobFromTimestamp($timestamp, $queue, $class, $args)
143
	{
144
		$key = 'delayed:' . self::getTimestamp($timestamp);
145
		$item = json_encode(self::jobToHash($queue, $class, $args));
146
		$redis = Resque::redis();
147
		$count = $redis->lrem($key, 0, $item);
148
		self::cleanupTimestamp($key, $timestamp);
149
150
		return $count;
151
	}
152
153
	/**
154
	 * Generate hash of all job properties to be saved in the scheduled queue.
155
	 *
156
	 * @param string $queue Name of the queue the job will be placed on.
157
	 * @param string $class Name of the job class.
158
	 * @param array $args Array of job arguments.
159
	 */
160
161
	private static function jobToHash($queue, $class, $args)
162
	{
163
		return array(
164
			'class' => $class,
165
			'args'  => array($args),
166
			'queue' => $queue,
167
		);
168
	}
169
170
	/**
171
	 * If there are no jobs for a given key/timestamp, delete references to it.
172
	 *
173
	 * Used internally to remove empty delayed: items in Redis when there are
174
	 * no more jobs left to run at that timestamp.
175
	 *
176
	 * @param string $key Key to count number of items at.
177
	 * @param int $timestamp Matching timestamp for $key.
178
	 */
179
	private static function cleanupTimestamp($key, $timestamp)
180
	{
181
		$timestamp = self::getTimestamp($timestamp);
182
		$redis = Resque::redis();
183
184
		if ($redis->llen($key) == 0) {
185
			$redis->del($key);
0 ignored issues
show
Bug introduced by
The method del() does not exist on Resque\Redis. Since you implemented __call, consider adding a @method annotation. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-call  annotation

185
			$redis->/** @scrutinizer ignore-call */ 
186
           del($key);
Loading history...
186
			$redis->zrem('delayed_queue_schedule', $timestamp);
0 ignored issues
show
Bug introduced by
The method zrem() does not exist on Resque\Redis. Since you implemented __call, consider adding a @method annotation. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-call  annotation

186
			$redis->/** @scrutinizer ignore-call */ 
187
           zrem('delayed_queue_schedule', $timestamp);
Loading history...
187
		}
188
	}
189
190
	/**
191
	 * Convert a timestamp in some format in to a unix timestamp as an integer.
192
	 *
193
	 * @param DateTime|int $timestamp Instance of DateTime or UNIX timestamp.
194
	 * @return int Timestamp
195
	 * @throws Scheduler_InvalidTimestampException
196
	 */
197
	private static function getTimestamp($timestamp)
198
	{
199
		if ($timestamp instanceof DateTime) {
200
			$timestamp = $timestamp->getTimestamp();
201
		}
202
203
		if ((int)$timestamp != $timestamp) {
204
			throw new InvalidTimestampException(
205
				'The supplied timestamp value could not be converted to an integer.'
206
			);
207
		}
208
209
		return (int)$timestamp;
210
	}
211
212
	/**
213
	 * Find the first timestamp in the delayed schedule before/including the timestamp.
214
	 *
215
	 * Will find and return the first timestamp upto and including the given
216
	 * timestamp. This is the heart of the Scheduler that will make sure
217
	 * that any jobs scheduled for the past when the worker wasn't running are
218
	 * also queued up.
219
	 *
220
	 * @param DateTime|int $timestamp Instance of DateTime or UNIX timestamp.
221
	 *                                Defaults to now.
222
	 * @return int|false UNIX timestamp, or false if nothing to run.
223
	 */
224
	public static function nextDelayedTimestamp($at = null)
225
	{
226
		if ($at === null) {
227
			$at = time();
228
		} else {
229
			$at = self::getTimestamp($at);
230
		}
231
232
		$items = Resque::redis()->zrangebyscore('delayed_queue_schedule', '-inf', $at, array('limit' => array(0, 1)));
0 ignored issues
show
Bug introduced by
The method zrangebyscore() does not exist on Resque\Redis. Since you implemented __call, consider adding a @method annotation. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-call  annotation

232
		$items = Resque::redis()->/** @scrutinizer ignore-call */ zrangebyscore('delayed_queue_schedule', '-inf', $at, array('limit' => array(0, 1)));
Loading history...
233
		if (!empty($items)) {
234
			return $items[0];
235
		}
236
237
		return false;
238
	}
239
240
	/**
241
	 * Pop a job off the delayed queue for a given timestamp.
242
	 *
243
	 * @param DateTime|int $timestamp Instance of DateTime or UNIX timestamp.
244
	 * @return array Matching job at timestamp.
245
	 */
246
	public static function nextItemForTimestamp($timestamp)
247
	{
248
		$timestamp = self::getTimestamp($timestamp);
249
		$key = 'delayed:' . $timestamp;
250
251
		$item = json_decode(Resque::redis()->lpop($key), true);
0 ignored issues
show
Bug introduced by
The method lpop() does not exist on Resque\Redis. Since you implemented __call, consider adding a @method annotation. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-call  annotation

251
		$item = json_decode(Resque::redis()->/** @scrutinizer ignore-call */ lpop($key), true);
Loading history...
252
253
		self::cleanupTimestamp($key, $timestamp);
254
		return $item;
255
	}
256
257
	/**
258
	 * Ensure that supplied job class/queue is valid.
259
	 *
260
	 * @param string $class Name of job class.
261
	 * @param string $queue Name of queue.
262
	 * @throws \Resque\Exceptions\Exception
263
	 */
264
	private static function validateJob($class, $queue)
265
	{
266
		if (empty($class)) {
267
			throw new ResqueException('Jobs must be given a class.');
268
		} elseif (empty($queue)) {
269
			throw new ResqueException('Jobs must be put in a queue.');
270
		}
271
272
		return true;
273
	}
274
}
275