Completed
Push — master ( 7dd147...b6a920 )
by Hennik
03:55 queued 12s
created

ResqueScheduler_Worker::sleep()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 3
Code Lines 1

Duplication

Lines 0
Ratio 0 %

Importance

Changes 1
Bugs 0 Features 1
Metric Value
cc 1
eloc 1
c 1
b 0
f 1
nc 1
nop 0
dl 0
loc 3
rs 10
1
<?php
2
declare(ticks = 1);
3
4
/**
5
 * ResqueScheduler worker to handle scheduling of delayed tasks.
6
 *
7
 * @package		ResqueScheduler
8
 * @author		Chris Boulton <[email protected]>
9
 * @copyright	(c) 2012 Chris Boulton
10
 * @license		http://www.opensource.org/licenses/mit-license.php
11
 */
12
class ResqueScheduler_Worker
13
{
14
	const LOG_NONE = 0;
15
	const LOG_NORMAL = 1;
16
	const LOG_VERBOSE = 2;
17
	
18
	/**
19
	 * @var int Current log level of this worker.
20
	 */
21
	public $logLevel = 0;
22
	
23
	/**
24
	 * @var int Interval to sleep for between checking schedules.
25
	 */
26
	protected $interval = 5;
27
28
	/**
29
	 * @var boolean True if on the next iteration, the worker should shutdown.
30
	 */
31
	private $shutdown = false;
32
33
	/**
34
	 * @var boolean True if this worker is paused.
35
	 */
36
	private $paused = false;
37
38
	/**
39
	* The primary loop for a worker.
40
	*
41
	* Every $interval (seconds), the scheduled queue will be checked for jobs
42
	* that should be pushed to Resque.
43
	*
44
	* @param int $interval How often to check schedules.
45
	*/
46
	public function work($interval = null)
47
	{
48
		if ($interval !== null) {
49
			$this->interval = $interval;
50
		}
51
52
		$this->updateProcLine('Starting');
53
		$this->registerSigHandlers();
54
55
		while (true) {
56
			if($this->shutdown) {
57
				break;
58
			}
59
			if(!$this->paused) {
60
				$this->handleDelayedItems();
61
			}
62
			$this->sleep();
63
		}
64
	}
65
	
66
	/**
67
	 * Handle delayed items for the next scheduled timestamp.
68
	 *
69
	 * Searches for any items that are due to be scheduled in Resque
70
	 * and adds them to the appropriate job queue in Resque.
71
	 *
72
	 * @param DateTime|int $timestamp Search for any items up to this timestamp to schedule.
73
	 */
74
	public function handleDelayedItems($timestamp = null)
75
	{
76
		while (($oldestJobTimestamp = ResqueScheduler::nextDelayedTimestamp($timestamp)) !== false) {
77
			$this->updateProcLine('Processing Delayed Items');
78
			$this->enqueueDelayedItemsForTimestamp($oldestJobTimestamp);
79
		}
80
	}
81
	
82
	/**
83
	 * Schedule all of the delayed jobs for a given timestamp.
84
	 *
85
	 * Searches for all items for a given timestamp, pulls them off the list of
86
	 * delayed jobs and pushes them across to Resque.
87
	 *
88
	 * @param DateTime|int $timestamp Search for any items up to this timestamp to schedule.
89
	 */
90
	public function enqueueDelayedItemsForTimestamp($timestamp)
91
	{
92
		$item = null;
0 ignored issues
show
Unused Code introduced by
The assignment to $item is dead and can be removed.
Loading history...
93
		while ($item = ResqueScheduler::nextItemForTimestamp($timestamp)) {
94
			$this->log('queueing ' . $item['class'] . ' in ' . $item['queue'] .' [delayed]');
95
			
96
			Resque_Event::trigger('beforeDelayedEnqueue', array(
97
				'queue' => $item['queue'],
98
				'class' => $item['class'],
99
				'args'  => $item['args'],
100
			));
101
102
			$payload = array_merge(array($item['queue'], $item['class']), $item['args']);
103
			call_user_func_array('Resque::enqueue', $payload);
104
		}
105
	}
106
	
107
	/**
108
	 * Sleep for the defined interval.
109
	 */
110
	protected function sleep()
111
	{
112
		sleep($this->interval);
113
	}
114
	
115
	/**
116
	 * Update the status of the current worker process.
117
	 *
118
	 * On supported systems (with the PECL proctitle module installed), update
119
	 * the name of the currently running process to indicate the current state
120
	 * of a worker.
121
	 *
122
	 * @param string $status The updated process title.
123
	 */
124
	private function updateProcLine($status)
125
	{
126
		if(function_exists('setproctitle')) {
127
			setproctitle('resque-scheduler-' . ResqueScheduler::VERSION . ': ' . $status);
128
		}
129
	}
130
	
131
	/**
132
	 * Output a given log message to STDOUT.
133
	 *
134
	 * @param string $message Message to output.
135
	 */
136
	public function log($message)
137
	{
138
		if($this->logLevel == self::LOG_NORMAL) {
139
			fwrite(STDOUT, "*** " . $message . "\n");
140
		}
141
		else if($this->logLevel == self::LOG_VERBOSE) {
142
			fwrite(STDOUT, "** [" . strftime('%T %Y-%m-%d') . "] " . $message . "\n");
143
		}
144
	}
145
146
	/**
147
	 * Register signal handlers that a worker should respond to.
148
	 *
149
	 * TERM: Shutdown after the current timestamp was processed.
150
	 * INT: Shutdown after the current timestamp was processed.
151
	 * QUIT: Shutdown after the current timestamp was processed.
152
	 */
153
	private function registerSigHandlers()
154
	{
155
		if(!function_exists('pcntl_signal')) {
156
			return;
157
		}
158
159
		pcntl_signal(SIGTERM, array($this, 'shutdown'));
160
		pcntl_signal(SIGINT, array($this, 'shutdown'));
161
		pcntl_signal(SIGQUIT, array($this, 'shutdown'));
162
		pcntl_signal(SIGUSR2, array($this, 'pauseProcessing'));
163
		pcntl_signal(SIGCONT, array($this, 'unPauseProcessing'));
164
165
		$this->log('Registered signals');
166
	}
167
168
	public function shutdown()
169
	{
170
		$this->log('Shutting down');
171
		$this->shutdown = true;
172
	}
173
174
	/**
175
	 * Signal handler callback for USR2, pauses processing.
176
	 */
177
	public function pauseProcessing()
178
	{
179
		$this->log('USR2 received; pausing processing');
180
		$this->paused = true;
181
	}
182
183
	/**
184
	 * Signal handler callback for CONT, resume processing.
185
	 */
186
	public function unPauseProcessing()
187
	{
188
		$this->log('CONT received; resuming processing');
189
		$this->paused = false;
190
	}
191
}
192