Passed
Push — develop ( 25ff30...23fbc2 )
by Hennik
01:03 queued 10s
created

ResqueScheduler_Worker   A

Complexity

Total Complexity 13

Size/Duplication

Total Lines 115
Duplicated Lines 0 %

Importance

Changes 3
Bugs 1 Features 1
Metric Value
eloc 31
c 3
b 1
f 1
dl 0
loc 115
rs 10
wmc 13

6 Methods

Rating   Name   Duplication   Size   Complexity  
A updateProcLine() 0 4 2
A work() 0 11 3
A handleDelayedItems() 0 5 2
A enqueueDelayedItemsForTimestamp() 0 14 2
A sleep() 0 3 1
A log() 0 7 3
1
<?php
2
/**
3
 * ResqueScheduler worker to handle scheduling of delayed tasks.
4
 *
5
 * @package		ResqueScheduler
6
 * @author		Chris Boulton <[email protected]>
7
 * @copyright	(c) 2012 Chris Boulton
8
 * @license		http://www.opensource.org/licenses/mit-license.php
9
 */
10
class ResqueScheduler_Worker
11
{
12
	const LOG_NONE = 0;
13
	const LOG_NORMAL = 1;
14
	const LOG_VERBOSE = 2;
15
	
16
	/**
17
	 * @var int Current log level of this worker.
18
	 */
19
	public $logLevel = 0;
20
	
21
	/**
22
	 * @var int Interval to sleep for between checking schedules.
23
	 */
24
	protected $interval = 5;
25
	
26
	/**
27
	* The primary loop for a worker.
28
	*
29
	* Every $interval (seconds), the scheduled queue will be checked for jobs
30
	* that should be pushed to Resque.
31
	*
32
	* @param int $interval How often to check schedules.
33
	*/
34
	public function work($interval = null)
35
	{
36
		if ($interval !== null) {
37
			$this->interval = $interval;
38
		}
39
40
		$this->updateProcLine('Starting');
41
		
42
		while (true) {
43
			$this->handleDelayedItems();
44
			$this->sleep();
45
		}
46
	}
47
	
48
	/**
49
	 * Handle delayed items for the next scheduled timestamp.
50
	 *
51
	 * Searches for any items that are due to be scheduled in Resque
52
	 * and adds them to the appropriate job queue in Resque.
53
	 *
54
	 * @param DateTime|int $timestamp Search for any items up to this timestamp to schedule.
55
	 */
56
	public function handleDelayedItems($timestamp = null)
57
	{
58
		while (($oldestJobTimestamp = ResqueScheduler::nextDelayedTimestamp($timestamp)) !== false) {
59
			$this->updateProcLine('Processing Delayed Items');
60
			$this->enqueueDelayedItemsForTimestamp($oldestJobTimestamp);
61
		}
62
	}
63
	
64
	/**
65
	 * Schedule all of the delayed jobs for a given timestamp.
66
	 *
67
	 * Searches for all items for a given timestamp, pulls them off the list of
68
	 * delayed jobs and pushes them across to Resque.
69
	 *
70
	 * @param DateTime|int $timestamp Search for any items up to this timestamp to schedule.
71
	 */
72
	public function enqueueDelayedItemsForTimestamp($timestamp)
73
	{
74
		$item = null;
0 ignored issues
show
Unused Code introduced by
The assignment to $item is dead and can be removed.
Loading history...
75
		while ($item = ResqueScheduler::nextItemForTimestamp($timestamp)) {
76
			$this->log('queueing ' . $item['class'] . ' in ' . $item['queue'] .' [delayed]');
77
			
78
			Resque_Event::trigger('beforeDelayedEnqueue', array(
79
				'queue' => $item['queue'],
80
				'class' => $item['class'],
81
				'args'  => $item['args'],
82
			));
83
84
			$payload = array_merge(array($item['queue'], $item['class']), $item['args']);
85
			call_user_func_array('Resque::enqueue', $payload);
86
		}
87
	}
88
	
89
	/**
90
	 * Sleep for the defined interval.
91
	 */
92
	protected function sleep()
93
	{
94
		sleep($this->interval);
95
	}
96
	
97
	/**
98
	 * Update the status of the current worker process.
99
	 *
100
	 * On supported systems (with the PECL proctitle module installed), update
101
	 * the name of the currently running process to indicate the current state
102
	 * of a worker.
103
	 *
104
	 * @param string $status The updated process title.
105
	 */
106
	private function updateProcLine($status)
107
	{
108
		if(function_exists('setproctitle')) {
109
			setproctitle('resque-scheduler-' . ResqueScheduler::VERSION . ': ' . $status);
110
		}
111
	}
112
	
113
	/**
114
	 * Output a given log message to STDOUT.
115
	 *
116
	 * @param string $message Message to output.
117
	 */
118
	public function log($message)
119
	{
120
		if($this->logLevel == self::LOG_NORMAL) {
121
			fwrite(STDOUT, "*** " . $message . "\n");
122
		}
123
		else if($this->logLevel == self::LOG_VERBOSE) {
124
			fwrite(STDOUT, "** [" . strftime('%T %Y-%m-%d') . "] " . $message . "\n");
125
		}
126
	}
127
}
128