Completed
Pull Request — develop (#32)
by
unknown
07:18 queued 04:53
created

ResqueScheduler_Worker::shutdown()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 3
Code Lines 1

Duplication

Lines 0
Ratio 0 %

Importance

Changes 1
Bugs 0 Features 0
Metric Value
cc 1
eloc 1
c 1
b 0
f 0
nc 1
nop 0
dl 0
loc 3
rs 10
1
<?php
2
declare(ticks = 1);
3
4
/**
5
 * ResqueScheduler worker to handle scheduling of delayed tasks.
6
 *
7
 * @package		ResqueScheduler
8
 * @author		Chris Boulton <[email protected]>
9
 * @copyright	(c) 2012 Chris Boulton
10
 * @license		http://www.opensource.org/licenses/mit-license.php
11
 */
12
class ResqueScheduler_Worker
13
{
14
	const LOG_NONE = 0;
15
	const LOG_NORMAL = 1;
16
	const LOG_VERBOSE = 2;
17
	
18
	/**
19
	 * @var int Current log level of this worker.
20
	 */
21
	public $logLevel = 0;
22
	
23
	/**
24
	 * @var int Interval to sleep for between checking schedules.
25
	 */
26
	protected $interval = 5;
27
28
	/**
29
	 * @var boolean True if on the next iteration, the worker should shutdown.
30
	 */
31
	private $shutdown = false;
32
33
	/**
34
	* The primary loop for a worker.
35
	*
36
	* Every $interval (seconds), the scheduled queue will be checked for jobs
37
	* that should be pushed to Resque.
38
	*
39
	* @param int $interval How often to check schedules.
40
	*/
41
	public function work($interval = null)
42
	{
43
		if ($interval !== null) {
44
			$this->interval = $interval;
45
		}
46
47
		$this->updateProcLine('Starting');
48
		$this->registerSigHandlers();
49
50
		while (true) {
51
			if($this->shutdown) {
52
				break;
53
			}
54
			$this->handleDelayedItems();
55
			$this->sleep();
56
		}
57
	}
58
	
59
	/**
60
	 * Handle delayed items for the next scheduled timestamp.
61
	 *
62
	 * Searches for any items that are due to be scheduled in Resque
63
	 * and adds them to the appropriate job queue in Resque.
64
	 *
65
	 * @param DateTime|int $timestamp Search for any items up to this timestamp to schedule.
66
	 */
67
	public function handleDelayedItems($timestamp = null)
68
	{
69
		while (($oldestJobTimestamp = ResqueScheduler::nextDelayedTimestamp($timestamp)) !== false) {
70
			$this->updateProcLine('Processing Delayed Items');
71
			$this->enqueueDelayedItemsForTimestamp($oldestJobTimestamp);
72
		}
73
	}
74
	
75
	/**
76
	 * Schedule all of the delayed jobs for a given timestamp.
77
	 *
78
	 * Searches for all items for a given timestamp, pulls them off the list of
79
	 * delayed jobs and pushes them across to Resque.
80
	 *
81
	 * @param DateTime|int $timestamp Search for any items up to this timestamp to schedule.
82
	 */
83
	public function enqueueDelayedItemsForTimestamp($timestamp)
84
	{
85
		$item = null;
0 ignored issues
show
Unused Code introduced by
The assignment to $item is dead and can be removed.
Loading history...
86
		while ($item = ResqueScheduler::nextItemForTimestamp($timestamp)) {
87
			$this->log('queueing ' . $item['class'] . ' in ' . $item['queue'] .' [delayed]');
88
			
89
			Resque_Event::trigger('beforeDelayedEnqueue', array(
90
				'queue' => $item['queue'],
91
				'class' => $item['class'],
92
				'args'  => $item['args'],
93
			));
94
95
			$payload = array_merge(array($item['queue'], $item['class']), $item['args']);
96
			call_user_func_array('Resque::enqueue', $payload);
97
		}
98
	}
99
	
100
	/**
101
	 * Sleep for the defined interval.
102
	 */
103
	protected function sleep()
104
	{
105
		sleep($this->interval);
106
	}
107
	
108
	/**
109
	 * Update the status of the current worker process.
110
	 *
111
	 * On supported systems (with the PECL proctitle module installed), update
112
	 * the name of the currently running process to indicate the current state
113
	 * of a worker.
114
	 *
115
	 * @param string $status The updated process title.
116
	 */
117
	private function updateProcLine($status)
118
	{
119
		if(function_exists('setproctitle')) {
120
			setproctitle('resque-scheduler-' . ResqueScheduler::VERSION . ': ' . $status);
121
		}
122
	}
123
	
124
	/**
125
	 * Output a given log message to STDOUT.
126
	 *
127
	 * @param string $message Message to output.
128
	 */
129
	public function log($message)
130
	{
131
		if($this->logLevel == self::LOG_NORMAL) {
132
			fwrite(STDOUT, "*** " . $message . "\n");
133
		}
134
		else if($this->logLevel == self::LOG_VERBOSE) {
135
			fwrite(STDOUT, "** [" . strftime('%T %Y-%m-%d') . "] " . $message . "\n");
136
		}
137
	}
138
139
	/**
140
	 * Register signal handlers that a worker should respond to.
141
	 *
142
	 * TERM: Shutdown after the current timestamp was processed.
143
	 * INT: Shutdown after the current timestamp was processed.
144
	 * QUIT: Shutdown after the current timestamp was processed.
145
	 */
146
	private function registerSigHandlers()
147
	{
148
		if(!function_exists('pcntl_signal')) {
149
			return;
150
		}
151
152
		pcntl_signal(SIGTERM, array($this, 'shutdown'));
153
		pcntl_signal(SIGINT, array($this, 'shutdown'));
154
		pcntl_signal(SIGQUIT, array($this, 'shutdown'));
155
	}
156
157
	public function shutdown()
158
	{
159
		$this->shutdown = true;
160
	}
161
}
162