ResqueScheduler_Worker   A
last analyzed

Complexity

Total Complexity 20

Size/Duplication

Total Lines 177
Duplicated Lines 0 %

Importance

Changes 8
Bugs 1 Features 1
Metric Value
eloc 51
c 8
b 1
f 1
dl 0
loc 177
rs 10
wmc 20

10 Methods

Rating   Name   Duplication   Size   Complexity  
A pauseProcessing() 0 4 1
A registerSigHandlers() 0 13 2
A shutdown() 0 4 1
A unPauseProcessing() 0 4 1
A updateProcLine() 0 4 2
A work() 0 17 5
A handleDelayedItems() 0 5 2
A enqueueDelayedItemsForTimestamp() 0 14 2
A sleep() 0 3 1
A log() 0 6 3
1
<?php
2
3
declare(ticks=1);
4
5
/**
6
 * ResqueScheduler worker to handle scheduling of delayed tasks.
7
 *
8
 * @package		ResqueScheduler
9
 * @author		Chris Boulton <[email protected]>
10
 * @copyright	(c) 2012 Chris Boulton
11
 * @license		http://www.opensource.org/licenses/mit-license.php
12
 */
13
class ResqueScheduler_Worker
14
{
15
	const LOG_NONE = 0;
16
	const LOG_NORMAL = 1;
17
	const LOG_VERBOSE = 2;
18
19
	/**
20
	 * @var int Current log level of this worker.
21
	 */
22
	public $logLevel = 0;
23
24
	/**
25
	 * @var int Interval to sleep for between checking schedules.
26
	 */
27
	protected $interval = 5;
28
29
	/**
30
	 * @var boolean True if on the next iteration, the worker should shutdown.
31
	 */
32
	private $shutdown = false;
33
34
	/**
35
	 * @var boolean True if this worker is paused.
36
	 */
37
	private $paused = false;
38
39
	/**
40
	* The primary loop for a worker.
41
	*
42
	* Every $interval (seconds), the scheduled queue will be checked for jobs
43
	* that should be pushed to Resque.
44
	*
45
	* @param int $interval How often to check schedules.
46
	*/
47
	public function work($interval = null)
48
	{
49
		if ($interval !== null) {
50
			$this->interval = $interval;
51
		}
52
53
		$this->updateProcLine('Starting');
54
		$this->registerSigHandlers();
55
56
		while (true) {
57
			if ($this->shutdown) {
58
				break;
59
			}
60
			if (!$this->paused) {
61
				$this->handleDelayedItems();
62
			}
63
			$this->sleep();
64
		}
65
	}
66
67
	/**
68
	 * Handle delayed items for the next scheduled timestamp.
69
	 *
70
	 * Searches for any items that are due to be scheduled in Resque
71
	 * and adds them to the appropriate job queue in Resque.
72
	 *
73
	 * @param DateTime|int $timestamp Search for any items up to this timestamp to schedule.
74
	 */
75
	public function handleDelayedItems($timestamp = null)
76
	{
77
		while (($oldestJobTimestamp = ResqueScheduler::nextDelayedTimestamp($timestamp)) !== false) {
78
			$this->updateProcLine('Processing Delayed Items');
79
			$this->enqueueDelayedItemsForTimestamp($oldestJobTimestamp);
80
		}
81
	}
82
83
	/**
84
	 * Schedule all of the delayed jobs for a given timestamp.
85
	 *
86
	 * Searches for all items for a given timestamp, pulls them off the list of
87
	 * delayed jobs and pushes them across to Resque.
88
	 *
89
	 * @param DateTime|int $timestamp Search for any items up to this timestamp to schedule.
90
	 */
91
	public function enqueueDelayedItemsForTimestamp($timestamp)
92
	{
93
		$item = null;
0 ignored issues
show
Unused Code introduced by
The assignment to $item is dead and can be removed.
Loading history...
94
		while ($item = ResqueScheduler::nextItemForTimestamp($timestamp)) {
95
			$this->log('queueing ' . $item['class'] . ' in ' . $item['queue'] . ' [delayed]');
96
97
			Resque_Event::trigger('beforeDelayedEnqueue', array(
98
				'queue' => $item['queue'],
99
				'class' => $item['class'],
100
				'args'  => $item['args'],
101
			));
102
103
			$payload = array_merge(array($item['queue'], $item['class']), $item['args']);
104
			call_user_func_array('Resque::enqueue', $payload);
105
		}
106
	}
107
108
	/**
109
	 * Sleep for the defined interval.
110
	 */
111
	protected function sleep()
112
	{
113
		sleep($this->interval);
114
	}
115
116
	/**
117
	 * Update the status of the current worker process.
118
	 *
119
	 * On supported systems (with the PECL proctitle module installed), update
120
	 * the name of the currently running process to indicate the current state
121
	 * of a worker.
122
	 *
123
	 * @param string $status The updated process title.
124
	 */
125
	private function updateProcLine($status)
126
	{
127
		if (function_exists('setproctitle')) {
128
			setproctitle('resque-scheduler-' . ResqueScheduler::VERSION . ': ' . $status);
129
		}
130
	}
131
132
	/**
133
	 * Output a given log message to STDOUT.
134
	 *
135
	 * @param string $message Message to output.
136
	 */
137
	public function log($message)
138
	{
139
		if ($this->logLevel == self::LOG_NORMAL) {
140
			fwrite(STDOUT, "*** " . $message . "\n");
141
		} elseif ($this->logLevel == self::LOG_VERBOSE) {
142
			fwrite(STDOUT, "** [" . strftime('%T %Y-%m-%d') . "] " . $message . "\n");
143
		}
144
	}
145
146
	/**
147
	 * Register signal handlers that a worker should respond to.
148
	 *
149
	 * TERM: Shutdown after the current timestamp was processed.
150
	 * INT: Shutdown after the current timestamp was processed.
151
	 * QUIT: Shutdown after the current timestamp was processed.
152
	 */
153
	private function registerSigHandlers()
154
	{
155
		if (!function_exists('pcntl_signal')) {
156
			return;
157
		}
158
159
		pcntl_signal(SIGTERM, array($this, 'shutdown'));
160
		pcntl_signal(SIGINT, array($this, 'shutdown'));
161
		pcntl_signal(SIGQUIT, array($this, 'shutdown'));
162
		pcntl_signal(SIGUSR2, array($this, 'pauseProcessing'));
163
		pcntl_signal(SIGCONT, array($this, 'unPauseProcessing'));
164
165
		$this->log('Registered signals');
166
	}
167
168
	public function shutdown()
169
	{
170
		$this->log('Shutting down');
171
		$this->shutdown = true;
172
	}
173
174
	/**
175
	 * Signal handler callback for USR2, pauses processing.
176
	 */
177
	public function pauseProcessing()
178
	{
179
		$this->log('USR2 received; pausing processing');
180
		$this->paused = true;
181
	}
182
183
	/**
184
	 * Signal handler callback for CONT, resume processing.
185
	 */
186
	public function unPauseProcessing()
187
	{
188
		$this->log('CONT received; resuming processing');
189
		$this->paused = false;
190
	}
191
}
192