Test Failed
Pull Request — develop (#54)
by
unknown
02:26
created

Scheduler::delayedPush()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 7
Code Lines 4

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 1
eloc 4
nc 1
nop 2
dl 0
loc 7
rs 10
c 0
b 0
f 0
1
<?php
2
3
namespace Resque;
4
5
use Resque\Exceptions\ResqueException;
6
use Resque\Exceptions\InvalidTimestampException;
7
use DateTime;
8
9
/**
10
* Resque scheduler core class to handle scheduling of jobs in the future.
11
*
12
* @package		Resque/Scheduler
13
* @author		Chris Boulton <[email protected]>
14
* @copyright	(c) 2012 Chris Boulton
15
* @license		http://www.opensource.org/licenses/mit-license.php
16
*/
17
class Scheduler
18
{
19
	const VERSION = "0.1";
20
21
	/**
22
	 * Enqueue a job in a given number of seconds from now.
23
	 *
24
	 * Identical to Resque::enqueue, however the first argument is the number
25
	 * of seconds before the job should be executed.
26
	 *
27
	 * @param int $in Number of seconds from now when the job should be executed.
28
	 * @param string $queue The name of the queue to place the job in.
29
	 * @param string $class The name of the class that contains the code to execute the job.
30
	 * @param array $args Any optional arguments that should be passed when the job is executed.
31
	 */
32
	public static function enqueueIn($in, $queue, $class, array $args = array())
33
	{
34
		self::enqueueAt(time() + $in, $queue, $class, $args);
35
	}
36
37
	/**
38
	 * Enqueue a job for execution at a given timestamp.
39
	 *
40
	 * Identical to Resque::enqueue, however the first argument is a timestamp
41
	 * (either UNIX timestamp in integer format or an instance of the DateTime
42
	 * class in PHP).
43
	 *
44
	 * @param \DateTime|int $at Instance of PHP DateTime object or int of UNIX timestamp.
45
	 * @param string $queue The name of the queue to place the job in.
46
	 * @param string $class The name of the class that contains the code to execute the job.
47
	 * @param array $args Any optional arguments that should be passed when the job is executed.
48
	 */
49
	public static function enqueueAt($at, $queue, $class, $args = array())
50
	{
51
		self::validateJob($class, $queue);
52
53
		$job = self::jobToHash($queue, $class, $args);
54
		self::delayedPush($at, $job);
55
56
		Event::trigger('afterSchedule', array(
57
			'at'    => $at,
58
			'queue' => $queue,
59
			'class' => $class,
60
			'args'  => $args,
61
		));
62
	}
63
64
	/**
65
	 * Directly append an item to the delayed queue schedule.
66
	 *
67
	 * @param \DateTime|int $timestamp Timestamp job is scheduled to be run at.
68
	 * @param array $item Hash of item to be pushed to schedule.
69
	 */
70
	public static function delayedPush($timestamp, $item)
71
	{
72
		$timestamp = self::getTimestamp($timestamp);
73
		$redis = Resque::redis();
74
		$redis->rpush('delayed:' . $timestamp, json_encode($item));
0 ignored issues
show
Bug introduced by
The method rpush() does not exist on Resque\Redis. Since you implemented __call, consider adding a @method annotation. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-call  annotation

74
		$redis->/** @scrutinizer ignore-call */ 
75
          rpush('delayed:' . $timestamp, json_encode($item));
Loading history...
75
76
		$redis->zadd('delayed_queue_schedule', $timestamp, $timestamp);
0 ignored issues
show
Bug introduced by
The method zadd() does not exist on Resque\Redis. Since you implemented __call, consider adding a @method annotation. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-call  annotation

76
		$redis->/** @scrutinizer ignore-call */ 
77
          zadd('delayed_queue_schedule', $timestamp, $timestamp);
Loading history...
77
	}
78
79
	/**
80
	 * Get the total number of jobs in the delayed schedule.
81
	 *
82
	 * @return int Number of scheduled jobs.
83
	 */
84
	public static function getDelayedQueueScheduleSize()
85
	{
86
		return (int)Resque::redis()->zcard('delayed_queue_schedule');
0 ignored issues
show
Bug introduced by
The method zcard() does not exist on Resque\Redis. Since you implemented __call, consider adding a @method annotation. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-call  annotation

86
		return (int)Resque::redis()->/** @scrutinizer ignore-call */ zcard('delayed_queue_schedule');
Loading history...
87
	}
88
89
	/**
90
	 * Get the number of jobs for a given timestamp in the delayed schedule.
91
	 *
92
	 * @param \DateTime|int $timestamp Timestamp
93
	 * @return int Number of scheduled jobs.
94
	 */
95
	public static function getDelayedTimestampSize($timestamp)
96
	{
97
		$timestamp = self::getTimestamp($timestamp);
98
		return Resque::redis()->llen('delayed:' . $timestamp, $timestamp);
0 ignored issues
show
Bug introduced by
The method llen() does not exist on Resque\Redis. Since you implemented __call, consider adding a @method annotation. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-call  annotation

98
		return Resque::redis()->/** @scrutinizer ignore-call */ llen('delayed:' . $timestamp, $timestamp);
Loading history...
99
	}
100
101
	/**
102
	 * Remove a delayed job from the queue
103
	 *
104
	 * note: you must specify exactly the same
105
	 * queue, class and arguments that you used when you added
106
	 * to the delayed queue
107
	 *
108
	 * also, this is an expensive operation because all delayed keys have tobe
109
	 * searched
110
	 *
111
	 * @param $queue
112
	 * @param $class
113
	 * @param $args
114
	 * @return int number of jobs that were removed
115
	 */
116
	public static function removeDelayed($queue, $class, $args)
117
	{
118
		$destroyed = 0;
119
		$item = json_encode(self::jobToHash($queue, $class, $args));
120
		$redis = Resque::redis();
121
122
		foreach ($redis->keys('delayed:*') as $key) {
0 ignored issues
show
Bug introduced by
The method keys() does not exist on Resque\Redis. Since you implemented __call, consider adding a @method annotation. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-call  annotation

122
		foreach ($redis->/** @scrutinizer ignore-call */ keys('delayed:*') as $key) {
Loading history...
123
			$key = $redis->removePrefix($key);
124
			$destroyed += $redis->lrem($key, 0, $item);
0 ignored issues
show
Bug introduced by
The method lrem() does not exist on Resque\Redis. Since you implemented __call, consider adding a @method annotation. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-call  annotation

124
			$destroyed += $redis->/** @scrutinizer ignore-call */ lrem($key, 0, $item);
Loading history...
125
		}
126
127
		return $destroyed;
128
	}
129
130
	/**
131
	 * removed a delayed job queued for a specific timestamp
132
	 *
133
	 * note: you must specify exactly the same
134
	 * queue, class and arguments that you used when you added
135
	 * to the delayed queue
136
	 *
137
	 * @param $timestamp
138
	 * @param $queue
139
	 * @param $class
140
	 * @param $args
141
	 * @return mixed
142
	 */
143
	public static function removeDelayedJobFromTimestamp($timestamp, $queue, $class, $args)
144
	{
145
		$key = 'delayed:' . self::getTimestamp($timestamp);
146
		$item = json_encode(self::jobToHash($queue, $class, $args));
147
		$redis = Resque::redis();
148
		$count = $redis->lrem($key, 0, $item);
149
		self::cleanupTimestamp($key, $timestamp);
150
151
		return $count;
152
	}
153
154
	/**
155
	 * Generate hash of all job properties to be saved in the scheduled queue.
156
	 *
157
	 * @param string $queue Name of the queue the job will be placed on.
158
	 * @param string $class Name of the job class.
159
	 * @param array $args Array of job arguments.
160
	 */
161
162
	private static function jobToHash($queue, $class, $args)
163
	{
164
		return array(
165
			'class' => $class,
166
			'args'  => array($args),
167
			'queue' => $queue,
168
		);
169
	}
170
171
	/**
172
	 * If there are no jobs for a given key/timestamp, delete references to it.
173
	 *
174
	 * Used internally to remove empty delayed: items in Redis when there are
175
	 * no more jobs left to run at that timestamp.
176
	 *
177
	 * @param string $key Key to count number of items at.
178
	 * @param int $timestamp Matching timestamp for $key.
179
	 */
180
	private static function cleanupTimestamp($key, $timestamp)
181
	{
182
		$timestamp = self::getTimestamp($timestamp);
183
		$redis = Resque::redis();
184
185
		if ($redis->llen($key) == 0) {
186
			$redis->del($key);
0 ignored issues
show
Bug introduced by
The method del() does not exist on Resque\Redis. Since you implemented __call, consider adding a @method annotation. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-call  annotation

186
			$redis->/** @scrutinizer ignore-call */ 
187
           del($key);
Loading history...
187
			$redis->zrem('delayed_queue_schedule', $timestamp);
0 ignored issues
show
Bug introduced by
The method zrem() does not exist on Resque\Redis. Since you implemented __call, consider adding a @method annotation. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-call  annotation

187
			$redis->/** @scrutinizer ignore-call */ 
188
           zrem('delayed_queue_schedule', $timestamp);
Loading history...
188
		}
189
	}
190
191
	/**
192
	 * Convert a timestamp in some format in to a unix timestamp as an integer.
193
	 *
194
	 * @param \DateTime|int $timestamp Instance of DateTime or UNIX timestamp.
195
	 * @return int Timestamp
196
	 * @throws Scheduler_InvalidTimestampException
197
	 */
198
	private static function getTimestamp($timestamp)
199
	{
200
		if ($timestamp instanceof DateTime) {
201
			$timestamp = $timestamp->getTimestamp();
202
		}
203
204
		if ((int)$timestamp != $timestamp) {
205
			throw new InvalidTimestampException(
206
				'The supplied timestamp value could not be converted to an integer.'
207
			);
208
		}
209
210
		return (int)$timestamp;
211
	}
212
213
	/**
214
	 * Find the first timestamp in the delayed schedule before/including the timestamp.
215
	 *
216
	 * Will find and return the first timestamp upto and including the given
217
	 * timestamp. This is the heart of the Scheduler that will make sure
218
	 * that any jobs scheduled for the past when the worker wasn't running are
219
	 * also queued up.
220
	 *
221
	 * @param \DateTime|int $timestamp Instance of DateTime or UNIX timestamp.
222
	 *                                Defaults to now.
223
	 * @return int|false UNIX timestamp, or false if nothing to run.
224
	 */
225
	public static function nextDelayedTimestamp($at = null)
226
	{
227
		if ($at === null) {
228
			$at = time();
229
		} else {
230
			$at = self::getTimestamp($at);
231
		}
232
233
		$items = Resque::redis()->zrangebyscore('delayed_queue_schedule', '-inf', $at, array('limit' => array(0, 1)));
0 ignored issues
show
Bug introduced by
The method zrangebyscore() does not exist on Resque\Redis. Since you implemented __call, consider adding a @method annotation. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-call  annotation

233
		$items = Resque::redis()->/** @scrutinizer ignore-call */ zrangebyscore('delayed_queue_schedule', '-inf', $at, array('limit' => array(0, 1)));
Loading history...
234
		if (!empty($items)) {
235
			return $items[0];
236
		}
237
238
		return false;
239
	}
240
241
	/**
242
	 * Pop a job off the delayed queue for a given timestamp.
243
	 *
244
	 * @param \DateTime|int $timestamp Instance of DateTime or UNIX timestamp.
245
	 * @return array Matching job at timestamp.
246
	 */
247
	public static function nextItemForTimestamp($timestamp)
248
	{
249
		$timestamp = self::getTimestamp($timestamp);
250
		$key = 'delayed:' . $timestamp;
251
252
		$item = json_decode(Resque::redis()->lpop($key), true);
0 ignored issues
show
Bug introduced by
The method lpop() does not exist on Resque\Redis. Since you implemented __call, consider adding a @method annotation. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-call  annotation

252
		$item = json_decode(Resque::redis()->/** @scrutinizer ignore-call */ lpop($key), true);
Loading history...
253
254
		self::cleanupTimestamp($key, $timestamp);
255
		return $item;
256
	}
257
258
	/**
259
	 * Ensure that supplied job class/queue is valid.
260
	 *
261
	 * @param string $class Name of job class.
262
	 * @param string $queue Name of queue.
263
	 * @throws \Resque\Exceptions\ResqueException
264
	 */
265
	private static function validateJob($class, $queue)
266
	{
267
		if (empty($class)) {
268
			throw new ResqueException('Jobs must be given a class.');
269
		} elseif (empty($queue)) {
270
			throw new ResqueException('Jobs must be put in a queue.');
271
		}
272
273
		return true;
274
	}
275
}
276