Test Failed
Pull Request — develop (#56)
by
unknown
02:24
created

SchedulerWorker::log()   A

Complexity

Conditions 3
Paths 3

Size

Total Lines 6
Code Lines 4

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 3
eloc 4
nc 3
nop 1
dl 0
loc 6
rs 10
c 0
b 0
f 0
1
<?php
2
3
declare(ticks=1);
4
5
namespace Resque\Worker;
6
7
use Resque\Scheduler;
8
use Resque\Event;
9
10
/**
11
 * ResqueScheduler worker to handle scheduling of delayed tasks.
12
 *
13
 * @package		ResqueScheduler
14
 * @author		Chris Boulton <[email protected]>
15
 * @copyright	(c) 2012 Chris Boulton
16
 * @license		http://www.opensource.org/licenses/mit-license.php
17
 */
18
class SchedulerWorker
19
{
20
	const LOG_NONE = 0;
21
	const LOG_NORMAL = 1;
22
	const LOG_VERBOSE = 2;
23
24
	/**
25
	 * @var int Current log level of this worker.
26
	 */
27
	public $logLevel = 0;
28
29
	/**
30
	 * @var int Interval to sleep for between checking schedules.
31
	 */
32
	protected $interval = 5;
33
34
	/**
35
	 * @var boolean True if on the next iteration, the worker should shutdown.
36
	 */
37
	private $shutdown = false;
38
39
	/**
40
	 * @var boolean True if this worker is paused.
41
	 */
42
	private $paused = false;
43
44
	/**
45
	* The primary loop for a worker.
46
	*
47
	* Every $interval (seconds), the scheduled queue will be checked for jobs
48
	* that should be pushed to Resque.
49
	*
50
	* @param int $interval How often to check schedules.
51
	*/
52
	public function work($interval = null)
53
	{
54
		if ($interval !== null) {
55
			$this->interval = $interval;
56
		}
57
58
		$this->updateProcLine('Starting');
59
		$this->registerSigHandlers();
60
61
		while (true) {
62
			if ($this->shutdown) {
63
				break;
64
			}
65
			if (!$this->paused) {
66
				$this->handleDelayedItems();
67
			}
68
			$this->sleep();
69
		}
70
	}
71
72
	/**
73
	 * Handle delayed items for the next scheduled timestamp.
74
	 *
75
	 * Searches for any items that are due to be scheduled in Resque
76
	 * and adds them to the appropriate job queue in Resque.
77
	 *
78
	 * @param \DateTime|int $timestamp Search for any items up to this timestamp to schedule.
79
	 */
80
	public function handleDelayedItems($timestamp = null)
81
	{
82
		while (($oldestJobTimestamp = Scheduler::nextDelayedTimestamp($timestamp)) !== false) {
83
			$this->updateProcLine('Processing Delayed Items');
84
			$this->enqueueDelayedItemsForTimestamp($oldestJobTimestamp);
85
		}
86
	}
87
88
	/**
89
	 * Schedule all of the delayed jobs for a given timestamp.
90
	 *
91
	 * Searches for all items for a given timestamp, pulls them off the list of
92
	 * delayed jobs and pushes them across to Resque.
93
	 *
94
	 * @param \DateTime|int $timestamp Search for any items up to this timestamp to schedule.
95
	 */
96
	public function enqueueDelayedItemsForTimestamp($timestamp)
97
	{
98
		$item = null;
0 ignored issues
show
Unused Code introduced by
The assignment to $item is dead and can be removed.
Loading history...
99
		while ($item = Scheduler::nextItemForTimestamp($timestamp)) {
100
			$this->log('queueing ' . $item['class'] . ' in ' . $item['queue'] . ' [delayed]');
101
102
			Event::trigger('beforeDelayedEnqueue', array(
103
				'queue' => $item['queue'],
104
				'class' => $item['class'],
105
				'args'  => $item['args'],
106
			));
107
108
			$payload = array_merge(array($item['queue'], $item['class']), $item['args']);
109
			call_user_func_array('Resque::enqueue', $payload);
110
		}
111
	}
112
113
	/**
114
	 * Sleep for the defined interval.
115
	 */
116
	protected function sleep()
117
	{
118
		sleep($this->interval);
119
	}
120
121
	/**
122
	 * Update the status of the current worker process.
123
	 *
124
	 * On supported systems (with the PECL proctitle module installed), update
125
	 * the name of the currently running process to indicate the current state
126
	 * of a worker.
127
	 *
128
	 * @param string $status The updated process title.
129
	 */
130
	private function updateProcLine($status)
131
	{
132
		if (function_exists('setproctitle')) {
133
			setproctitle('resque-scheduler-' . Scheduler::VERSION . ': ' . $status);
134
		}
135
	}
136
137
	/**
138
	 * Output a given log message to STDOUT.
139
	 *
140
	 * @param string $message Message to output.
141
	 */
142
	public function log($message)
143
	{
144
		if ($this->logLevel == self::LOG_NORMAL) {
145
			fwrite(STDOUT, "*** " . $message . "\n");
146
		} elseif ($this->logLevel == self::LOG_VERBOSE) {
147
			fwrite(STDOUT, "** [" . strftime('%T %Y-%m-%d') . "] " . $message . "\n");
148
		}
149
	}
150
151
	/**
152
	 * Register signal handlers that a worker should respond to.
153
	 *
154
	 * TERM: Shutdown after the current timestamp was processed.
155
	 * INT: Shutdown after the current timestamp was processed.
156
	 * QUIT: Shutdown after the current timestamp was processed.
157
	 */
158
	private function registerSigHandlers()
159
	{
160
		if (!function_exists('pcntl_signal')) {
161
			return;
162
		}
163
164
		pcntl_signal(SIGTERM, array($this, 'shutdown'));
165
		pcntl_signal(SIGINT, array($this, 'shutdown'));
166
		pcntl_signal(SIGQUIT, array($this, 'shutdown'));
167
		pcntl_signal(SIGUSR2, array($this, 'pauseProcessing'));
168
		pcntl_signal(SIGCONT, array($this, 'unPauseProcessing'));
169
170
		$this->log('Registered signals');
171
	}
172
173
	public function shutdown()
174
	{
175
		$this->log('Shutting down');
176
		$this->shutdown = true;
177
	}
178
179
	/**
180
	 * Signal handler callback for USR2, pauses processing.
181
	 */
182
	public function pauseProcessing()
183
	{
184
		$this->log('USR2 received; pausing processing');
185
		$this->paused = true;
186
	}
187
188
	/**
189
	 * Signal handler callback for CONT, resume processing.
190
	 */
191
	public function unPauseProcessing()
192
	{
193
		$this->log('CONT received; resuming processing');
194
		$this->paused = false;
195
	}
196
}
197