Resque_Worker::shutdownNow()   A
last analyzed

Complexity

Conditions 1
Paths 1

Size

Total Lines 4
Code Lines 2

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 0
CRAP Score 2

Importance

Changes 1
Bugs 0 Features 0
Metric Value
cc 1
eloc 2
c 1
b 0
f 0
nc 1
nop 0
dl 0
loc 4
ccs 0
cts 3
cp 0
crap 2
rs 10
1
<?php
2 1
3
declare(ticks=1);
4
5
/**
6
 * Resque worker that handles checking queues for jobs, fetching them
7
 * off the queues, running them and handling the result.
8
 *
9
 * @package		Resque/Worker
10
 * @author		Chris Boulton <[email protected]>
11
 * @license		http://www.opensource.org/licenses/mit-license.php
12
 */
13
class Resque_Worker
14
{
15
	/**
16
	 * @var string Prefix for the process name
17
	 */
18
	private static $processPrefix = 'resque';
19
20
	/**
21
	* @var Psr\Log\LoggerInterface Logging object that impliments the PSR-3 LoggerInterface
22
	*/
23
	public $logger;
24
25
	/**
26
	 * @var bool Whether this worker is running in a forked child process.
27
	 */
28
	public $hasParent = false;
29
30
	/**
31
	 * @var array Array of all associated queues for this worker.
32
	 */
33
	private $queues = array();
34
35
	/**
36
	 * @var string The hostname of this worker.
37
	 */
38
	private $hostname;
39
40
	/**
41
	 * @var boolean True if on the next iteration, the worker should shutdown.
42
	 */
43
	private $shutdown = false;
44
45
	/**
46
	 * @var boolean True if this worker is paused.
47
	 */
48
	private $paused = false;
49
50
	/**
51
	 * @var string String identifying this worker.
52
	 */
53
	private $id;
54
55
	/**
56
	 * @var Resque_Job Current job, if any, being processed by this worker.
57
	 */
58
	private $currentJob = null;
59
60
	/**
61
	 * @var int Process ID of child worker processes.
62
	 */
63
	private $child = null;
64
65
	/**
66
	 * Instantiate a new worker, given a list of queues that it should be working
67
	 * on. The list of queues should be supplied in the priority that they should
68
	 * be checked for jobs (first come, first served)
69
	 *
70
	 * Passing a single '*' allows the worker to work on all queues in alphabetical
71
	 * order. You can easily add new queues dynamically and have them worked on using
72
	 * this method.
73
	 *
74
	 * @param string|array $queues String with a single queue name, array with multiple.
75 69
	 */
76
	public function __construct($queues)
77 69
	{
78
		$this->logger = new Resque_Log();
79 69
80 67
		if (!is_array($queues)) {
81
			$queues = array($queues);
82
		}
83 69
84 69
		$this->queues = $queues;
85
		$this->hostname = php_uname('n');
86 69
87 69
		$this->id = $this->hostname . ':' . getmypid() . ':' . implode(',', $this->queues);
88
	}
89
90
	/**
91
	 * Set the process prefix of the workers to the given prefix string.
92
	 * @param string $prefix The new process prefix
93
	 */
94
	public static function setProcessPrefix($prefix)
95
	{
96
		self::$processPrefix = $prefix;
97
	}
98
99
	/**
100
	 * Return all workers known to Resque as instantiated instances.
101
	 * @return array
102 17
	 */
103
	public static function all()
104 17
	{
105 17
		$workers = Resque::redis()->smembers('workers');
0 ignored issues
show
Bug introduced by
The method smembers() does not exist on Resque_Redis. Since you implemented __call, consider adding a @method annotation. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-call  annotation

105
		$workers = Resque::redis()->/** @scrutinizer ignore-call */ smembers('workers');
Loading history...
106
		if (!is_array($workers)) {
107
			$workers = array();
108
		}
109 17
110 17
		$instances = array();
111 8
		foreach ($workers as $workerId) {
112
			$instances[] = self::find($workerId);
113 17
		}
114
		return $instances;
115
	}
116
117
	/**
118
	 * Given a worker ID, check if it is registered/valid.
119
	 *
120
	 * @param string $workerId ID of the worker.
121
	 * @return boolean True if the worker exists, false if not.
122 11
	 */
123
	public static function exists($workerId)
124 11
	{
125
		return (bool)Resque::redis()->sismember('workers', $workerId);
0 ignored issues
show
Bug introduced by
The method sismember() does not exist on Resque_Redis. Since you implemented __call, consider adding a @method annotation. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-call  annotation

125
		return (bool)Resque::redis()->/** @scrutinizer ignore-call */ sismember('workers', $workerId);
Loading history...
126
	}
127
128
	/**
129
	 * Given a worker ID, find it and return an instantiated worker class for it.
130
	 *
131
	 * @param string $workerId The ID of the worker.
132
	 * @return Resque_Worker Instance of the worker. False if the worker does not exist.
133 9
	 */
134
	public static function find($workerId)
135 9
	{
136
		if (!self::exists($workerId) || false === strpos($workerId, ":")) {
137
			return false;
0 ignored issues
show
Bug Best Practice introduced by
The expression return false returns the type false which is incompatible with the documented return type Resque_Worker.
Loading history...
138
		}
139 9
140 9
		list($hostname, $pid, $queues) = explode(':', $workerId, 3);
141 9
		$queues = explode(',', $queues);
142 9
		$worker = new self($queues);
143 9
		$worker->setId($workerId);
144
		return $worker;
145
	}
146
147
	/**
148
	 * Set the ID of this worker to a given ID string.
149
	 *
150
	 * @param string $workerId ID for the worker.
151 9
	 */
152
	public function setId($workerId)
153 9
	{
154 9
		$this->id = $workerId;
155
	}
156
157
	/**
158
	 * The primary loop for a worker which when called on an instance starts
159
	 * the worker's life cycle.
160
	 *
161
	 * Queues are checked every $interval (seconds) for new jobs.
162
	 *
163
	 * @param int $interval How often to check for new jobs across the queues.
164 13
	 */
165
	public function work($interval = Resque::DEFAULT_INTERVAL, $blocking = false)
166 13
	{
167 13
		$this->updateProcLine('Starting');
168
		$this->startup();
169 13
170 13
		if (function_exists('pcntl_signal_dispatch')) {
171
			pcntl_signal_dispatch();
172
		}
173 13
174 13
		while (true) {
175
			if ($this->shutdown) {
176
				break;
177
			}
178
179
			// is redis still alive?
180 13
			try {
181
				if (!$this->paused && Resque::redis()->ping() === false) {
0 ignored issues
show
Bug introduced by
The method ping() does not exist on Resque_Redis. Since you implemented __call, consider adding a @method annotation. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-call  annotation

181
				if (!$this->paused && Resque::redis()->/** @scrutinizer ignore-call */ ping() === false) {
Loading history...
182
					throw new CredisException('redis ping() failed');
183
				}
184
			} catch (CredisException $e) {
185
				$this->logger->log(Psr\Log\LogLevel::ERROR, 'redis went away. trying to reconnect');
186
				Resque::$redis = null;
187
				usleep($interval * 1000000);
188
				continue;
189
			}
190
191 13
			// Attempt to find and reserve a job
192 13
			$job = false;
193 12
			if (!$this->paused) {
194
				if ($blocking === true) {
195
					$this->logger->log(Psr\Log\LogLevel::INFO, 'Starting blocking with timeout of {interval}', array('interval' => $interval));
196
					$this->updateProcLine('Waiting with blocking timeout ' . $interval);
197 12
				} else {
198
					$this->updateProcLine('Waiting with interval ' . $interval);
199
				}
200 12
201
				$job = $this->reserve($blocking, $interval);
202
			}
203 13
204
			if (!$job) {
205 13
				// For an interval of 0, break now - helps with unit testing etc
206 13
				if ($interval == 0) {
207
					break;
208
				}
209
210
				if ($blocking === false) {
211
					// If no job was found, we sleep for $interval before continuing and checking again
212
					$this->logger->log(Psr\Log\LogLevel::INFO, 'Sleeping for {interval}', array('interval' => $interval));
213
					if ($this->paused) {
214
						$this->updateProcLine('Paused');
215
					} else {
216
						$this->updateProcLine('Waiting');
217
					}
218
219
					usleep($interval * 1000000);
220
				}
221
222
				continue;
223
			}
224
225
			$this->logger->log(Psr\Log\LogLevel::NOTICE, 'Starting work on {job}', array('job' => $job));
226 9
			Resque_Event::trigger('beforeFork', $job);
227 9
			$this->workingOn($job);
0 ignored issues
show
Bug introduced by
It seems like $job can also be of type true; however, parameter $job of Resque_Worker::workingOn() does only seem to accept Resque_Job, maybe add an additional type check? ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-type  annotation

227
			$this->workingOn(/** @scrutinizer ignore-type */ $job);
Loading history...
228 9
229
			$this->child = Resque::fork();
230 9
231
			// Forked and we're the child. Or PCNTL is not installed. Run the job.
232
			if ($this->child === 0 || $this->child === false || $this->child === -1) {
233 9
				$status = 'Processing ' . $job->queue . ' since ' . strftime('%F %T');
234
				$this->updateProcLine($status);
235
				$this->logger->log(Psr\Log\LogLevel::INFO, $status);
236
237
				if (!empty($job->payload['id'])) {
238
					Resque_Job_PID::create($job->payload['id']);
239
				}
240
241
				$this->perform($job);
0 ignored issues
show
Bug introduced by
It seems like $job can also be of type true; however, parameter $job of Resque_Worker::perform() does only seem to accept Resque_Job, maybe add an additional type check? ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-type  annotation

241
				$this->perform(/** @scrutinizer ignore-type */ $job);
Loading history...
242
243
				if (!empty($job->payload['id'])) {
244
					Resque_Job_PID::del($job->payload['id']);
245
				}
246
247
				if ($this->child === 0) {
248
					exit(0);
0 ignored issues
show
Best Practice introduced by
Using exit here is not recommended.

In general, usage of exit should be done with care and only when running in a scripting context like a CLI script.

Loading history...
249
				}
250
			}
251
252
			if ($this->child > 0) {
253 9
				// Parent process, sit and wait
254
				$status = 'Forked ' . $this->child . ' at ' . strftime('%F %T');
255 9
				$this->updateProcLine($status);
256 9
				$this->logger->log(Psr\Log\LogLevel::INFO, $status);
257 9
258
				// Wait until the child process finishes before continuing
259
				while (pcntl_wait($status, WNOHANG) === 0) {
260 9
					if (function_exists('pcntl_signal_dispatch')) {
261 9
						pcntl_signal_dispatch();
262 9
					}
263
264
					// Pause for a half a second to conserve system resources
265
					usleep(500000);
266 9
				}
267
268
				if (pcntl_wifexited($status) !== true) {
269 9
					$job->fail(new Resque_Job_DirtyExitException('Job exited abnormally'));
270
				} elseif (($exitStatus = pcntl_wexitstatus($status)) !== 0) {
271 9
					$job->fail(new Resque_Job_DirtyExitException(
272
						'Job exited with exit code ' . $exitStatus
273
					));
274
				} else {
275
					if (in_array($job->getStatus(), array(Resque_Job_Status::STATUS_WAITING, Resque_Job_Status::STATUS_RUNNING))) {
276
						$job->updateStatus(Resque_Job_Status::STATUS_COMPLETE);
277
						$this->logger->log(Psr\Log\LogLevel::INFO, 'done ' . $job);
0 ignored issues
show
Bug introduced by
Are you sure $job of type object|true can be used in concatenation? ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-type  annotation

277
						$this->logger->log(Psr\Log\LogLevel::INFO, 'done ' . /** @scrutinizer ignore-type */ $job);
Loading history...
278 9
					}
279
				}
280 1
			}
281 1
282
			$this->child = null;
283
			$this->doneWorking();
284
		}
285
286 9
		$this->unregisterWorker();
287 9
	}
288
289
	/**
290 13
	 * Process a single job.
291 13
	 *
292
	 * @param Resque_Job $job The job to be processed.
293
	 */
294
	public function perform(Resque_Job $job)
295
	{
296
		$result = null;
0 ignored issues
show
Unused Code introduced by
The assignment to $result is dead and can be removed.
Loading history...
297
		try {
298 5
			Resque_Event::trigger('afterFork', $job);
299
			$result = $job->perform();
300 5
		} catch (Exception $e) {
301
			$this->logger->log(Psr\Log\LogLevel::CRITICAL, '{job} has failed {exception}', array('job' => $job, 'exception' => $e));
302 5
			$job->fail($e);
303 5
			return;
304
		} catch (Error $e) {
305 1
			$this->logger->log(Psr\Log\LogLevel::CRITICAL, '{job} has failed {exception}', array('job' => $job, 'exception' => $e));
306 1
			$job->fail($e);
307 1
			return;
308 1
		}
309
310
		$job->updateStatus(Resque_Job_Status::STATUS_COMPLETE, $result);
311
		$this->logger->log(Psr\Log\LogLevel::NOTICE, '{job} has finished', array('job' => $job));
312
	}
313
314
	/**
315
	 * @param  bool            $blocking
316 4
	 * @param  int             $timeout
317 4
	 * @return object|boolean               Instance of Resque_Job if a job is found, false if not.
318 4
	 */
319
	public function reserve($blocking = false, $timeout = null)
320
	{
321
		if ($this->hasParent && !posix_kill(posix_getppid(), 0)) {
322
			$this->shutdown();
323
			return false;
324
		}
325 22
326
		$queues = $this->queues();
327 22
		if (!is_array($queues)) {
0 ignored issues
show
introduced by
The condition is_array($queues) is always true.
Loading history...
328
			return;
329
		}
330
331
		if ($blocking === true) {
332
			if (empty($queues)) {
333 22
				$this->logger->log(Psr\Log\LogLevel::INFO, 'No queue was found, sleeping for {interval}', array('interval' => $timeout));
334 22
				usleep($timeout * 1000000);
335
				return false;
336
			}
337
			$job = Resque_Job::reserveBlocking($queues, $timeout);
338 22
			if ($job) {
0 ignored issues
show
introduced by
The condition $job is always false.
Loading history...
339 1
				$this->logger->log(Psr\Log\LogLevel::INFO, 'Found job on {queue}', array('queue' => $job->queue));
340
				return $job;
341
			}
342
		} else {
343
			foreach ($queues as $queue) {
344 1
				$this->logger->log(Psr\Log\LogLevel::INFO, 'Checking {queue} for jobs', array('queue' => $queue));
345 1
				$job = Resque_Job::reserve($queue);
346 1
				if ($job) {
347 1
					$this->logger->log(Psr\Log\LogLevel::INFO, 'Found job on {queue}', array('queue' => $job->queue));
348
					return $job;
349
				}
350 21
			}
351 21
		}
352 21
353 21
		return false;
354 17
	}
355 17
356
	/**
357
	 * Return an array containing all of the queues that this worker should use
358
	 * when searching for jobs.
359
	 *
360 13
	 * If * is found in the list of queues, every queue will be searched in
361
	 * alphabetic order. (@see $fetch)
362
	 *
363
	 * @param boolean $fetch If true, and the queue is set to *, will fetch
364
	 * all queue names from redis.
365
	 * @return array Array of associated queues.
366
	 */
367
	public function queues($fetch = true)
368
	{
369
		if (!in_array('*', $this->queues) || $fetch == false) {
0 ignored issues
show
Coding Style Best Practice introduced by
It seems like you are loosely comparing two booleans. Considering using the strict comparison === instead.

When comparing two booleans, it is generally considered safer to use the strict comparison operator.

Loading history...
370
			return $this->queues;
371
		}
372
373
		$queues = Resque::queues();
374 22
		sort($queues);
375
		return $queues;
376 22
	}
377 20
378
	/**
379
	 * Perform necessary actions to start a worker.
380 2
	 */
381 2
	private function startup()
382 2
	{
383
		$this->registerSigHandlers();
384
		$this->pruneDeadWorkers();
385
		Resque_Event::trigger('beforeFirstFork', $this);
386
		$this->registerWorker();
387
	}
388 13
389
	/**
390 13
	 * On supported systems (with the PECL proctitle module installed), update
391 13
	 * the name of the currently running process to indicate the current state
392 13
	 * of a worker.
393 13
	 *
394 13
	 * @param string $status The updated process title.
395
	 */
396
	private function updateProcLine($status)
397
	{
398
		$processTitle = static::$processPrefix . '-' . Resque::VERSION . ' (' . implode(',', $this->queues) . '): ' . $status;
0 ignored issues
show
Bug introduced by
Since $processPrefix is declared private, accessing it with static will lead to errors in possible sub-classes; you can either use self, or increase the visibility of $processPrefix to at least protected.
Loading history...
399
		if (function_exists('cli_set_process_title') && PHP_OS !== 'Darwin') {
400
			cli_set_process_title($processTitle);
401
		} elseif (function_exists('setproctitle')) {
402
			setproctitle($processTitle);
403 13
		}
404
	}
405 13
406 13
	/**
407 13
	 * Register signal handlers that a worker should respond to.
408
	 *
409
	 * TERM: Shutdown immediately and stop processing jobs.
410
	 * INT: Shutdown immediately and stop processing jobs.
411
	 * QUIT: Shutdown after the current job finishes processing.
412 13
	 * USR1: Kill the forked child immediately and continue processing jobs.
413
	 */
414
	private function registerSigHandlers()
415
	{
416
		if (!function_exists('pcntl_signal')) {
417
			return;
418
		}
419
420
		pcntl_signal(SIGTERM, array($this, 'shutDownNow'));
421
		pcntl_signal(SIGINT, array($this, 'shutDownNow'));
422 13
		pcntl_signal(SIGQUIT, array($this, 'shutdown'));
423
		pcntl_signal(SIGUSR1, array($this, 'killChild'));
424 13
		pcntl_signal(SIGUSR2, array($this, 'pauseProcessing'));
425
		pcntl_signal(SIGCONT, array($this, 'unPauseProcessing'));
426
		$this->logger->log(Psr\Log\LogLevel::DEBUG, 'Registered signals');
427
	}
428 13
429 13
	/**
430 13
	 * Signal handler callback for USR2, pauses processing of new jobs.
431 13
	 */
432 13
	public function pauseProcessing()
433 13
	{
434 13
		$this->logger->log(Psr\Log\LogLevel::NOTICE, 'USR2 received; pausing job processing');
435 13
		$this->paused = true;
436
	}
437
438
	/**
439
	 * Signal handler callback for CONT, resumes worker allowing it to pick
440 2
	 * up new jobs.
441
	 */
442 2
	public function unPauseProcessing()
443 2
	{
444 2
		$this->logger->log(Psr\Log\LogLevel::NOTICE, 'CONT received; resuming job processing');
445
		$this->paused = false;
446
	}
447
448
	/**
449
	 * Schedule a worker for shutdown. Will finish processing the current job
450 1
	 * and when the timeout interval is reached, the worker will shut down.
451
	 */
452 1
	public function shutdown()
453 1
	{
454 1
		$this->shutdown = true;
455
		$this->logger->log(Psr\Log\LogLevel::NOTICE, 'Shutting down');
456
	}
457
458
	/**
459
	 * Force an immediate shutdown of the worker, killing any child jobs
460
	 * currently running.
461
	 */
462
	public function shutdownNow()
463
	{
464
		$this->shutdown();
465
		$this->killChild();
466
	}
467
468
	/**
469
	 * @return int Child process PID.
470
	 */
471
	public function getChildPID()
472
	{
473
		return $this->child;
474
	}
475
476
	/**
477
	 * Kill a forked child job immediately. The job it is processing will not
478
	 * be completed.
479
	 */
480
	public function killChild()
481
	{
482
		if (!$this->child) {
483
			$this->logger->log(Psr\Log\LogLevel::DEBUG, 'No child to kill.');
484
			return;
485
		}
486
487
		$this->logger->log(Psr\Log\LogLevel::INFO, 'Killing child at {child}', array('child' => $this->child));
488
		if (exec('ps -o pid,s -p ' . $this->child, $output, $returnCode) && $returnCode != 1) {
489
			$this->logger->log(Psr\Log\LogLevel::DEBUG, 'Child {child} found, killing.', array('child' => $this->child));
490
			posix_kill($this->child, SIGKILL);
491
			$this->child = null;
492
		} else {
493
			$this->logger->log(Psr\Log\LogLevel::INFO, 'Child {child} not found, restarting.', array('child' => $this->child));
494
			$this->shutdown();
495
		}
496
	}
497
498
	/**
499
	 * Look for any workers which should be running on this server and if
500
	 * they're not, remove them from Redis.
501
	 *
502
	 * This is a form of garbage collection to handle cases where the
503
	 * server may have been killed and the Resque workers did not die gracefully
504
	 * and therefore leave state information in Redis.
505
	 */
506
	public function pruneDeadWorkers()
507
	{
508
		$workerPids = $this->workerPids();
509
		$workers = self::all();
510
		foreach ($workers as $worker) {
511
			if (is_object($worker)) {
512
				list($host, $pid, $queues) = explode(':', (string)$worker, 3);
513
				if ($host != $this->hostname || in_array($pid, $workerPids) || $pid == getmypid()) {
514 15
					continue;
515
				}
516 15
				$this->logger->log(Psr\Log\LogLevel::INFO, 'Pruning dead worker: {worker}', array('worker' => (string)$worker));
517 15
				$worker->unregisterWorker();
518 15
			}
519 7
		}
520 7
	}
521 7
522 7
	/**
523
	 * Return an array of process IDs for all of the Resque workers currently
524 2
	 * running on this machine.
525 2
	 *
526
	 * @return array Array of Resque worker process IDs.
527
	 */
528 15
	public function workerPids()
529
	{
530
		$pids = array();
531
		if (strtoupper(substr(PHP_OS, 0, 3)) === 'WIN') {
532
			exec('WMIC path win32_process get Processid,Commandline | findstr resque | findstr /V findstr', $cmdOutput);
533
			foreach ($cmdOutput as $line) {
534
				$line = preg_replace('/\s+/m', ' ', $line);
535
				list(,,$pids[]) = explode(' ', trim($line), 3);
536 15
			}
537
		} else {
538 15
			exec('ps -A -o pid,args | grep [r]esque', $cmdOutput);
539 15
			foreach ($cmdOutput as $line) {
540
				list($pids[],) = explode(' ', trim($line), 2);
541
			}
542
		}
543
		return $pids;
544
	}
545
546
	/**
547 15
	 * Register this worker in Redis.
548 15
	 */
549 15
	public function registerWorker()
550
	{
551
		Resque::redis()->sadd('workers', (string)$this);
0 ignored issues
show
Bug introduced by
The method sadd() does not exist on Resque_Redis. Since you implemented __call, consider adding a @method annotation. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-call  annotation

551
		Resque::redis()->/** @scrutinizer ignore-call */ sadd('workers', (string)$this);
Loading history...
552 15
		Resque::redis()->set('worker:' . (string)$this . ':started', date('c'));
0 ignored issues
show
Bug introduced by
The method set() does not exist on Resque_Redis. Since you implemented __call, consider adding a @method annotation. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-call  annotation

552
		Resque::redis()->/** @scrutinizer ignore-call */ set('worker:' . (string)$this . ':started', date('c'));
Loading history...
553
	}
554
555
	/**
556
	 * Unregister this worker in Redis. (shutdown etc)
557
	 */
558 60
	public function unregisterWorker()
559
	{
560 60
		if (is_object($this->currentJob)) {
561 60
			$this->currentJob->fail(new Resque_Job_DirtyExitException());
562 60
		}
563
564
		$id = (string)$this;
565
		Resque::redis()->srem('workers', $id);
0 ignored issues
show
Bug introduced by
The method srem() does not exist on Resque_Redis. Since you implemented __call, consider adding a @method annotation. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-call  annotation

565
		Resque::redis()->/** @scrutinizer ignore-call */ srem('workers', $id);
Loading history...
566
		Resque::redis()->del('worker:' . $id);
0 ignored issues
show
Bug introduced by
The method del() does not exist on Resque_Redis. Since you implemented __call, consider adding a @method annotation. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-call  annotation

566
		Resque::redis()->/** @scrutinizer ignore-call */ del('worker:' . $id);
Loading history...
567 17
		Resque::redis()->del('worker:' . $id . ':started');
568
		Resque_Stat::clear('processed:' . $id);
569 17
		Resque_Stat::clear('failed:' . $id);
570 1
	}
571
572
	/**
573 17
	 * Tell Redis which job we're currently working on.
574 17
	 *
575 17
	 * @param object $job Resque_Job instance containing the job we're working on.
576 17
	 */
577 17
	public function workingOn(Resque_Job $job)
578 17
	{
579 17
		$job->worker = $this;
580
		$this->currentJob = $job;
581
		$job->updateStatus(Resque_Job_Status::STATUS_RUNNING);
582
		$data = json_encode(array(
583
			'queue' => $job->queue,
584
			'run_at' => date('c'),
585
			'payload' => $job->payload
586 14
		));
587
		Resque::redis()->set('worker:' . $job->worker, $data);
588 14
	}
589 14
590 14
	/**
591 14
	 * Notify Redis that we've finished working on a job, clearing the working
592 14
	 * state and incrementing the job stats.
593 14
	 */
594 14
	public function doneWorking()
595
	{
596 14
		$this->currentJob = null;
597 14
		Resque_Stat::incr('processed');
598
		Resque_Stat::incr('processed:' . (string)$this);
599
		Resque::redis()->del('worker:' . (string)$this);
600
	}
601
602
	/**
603 10
	 * Generate a string representation of this worker.
604
	 *
605 10
	 * @return string String identifier for this worker instance.
606 10
	 */
607 10
	public function __toString()
608 10
	{
609 10
		return $this->id;
610
	}
611
612
	/**
613
	 * Return an object describing the job this worker is currently working on.
614
	 *
615
	 * @return object Object with details of current job.
616 63
	 */
617
	public function job()
618 63
	{
619
		$job = Resque::redis()->get('worker:' . $this);
0 ignored issues
show
Bug introduced by
The method get() does not exist on Resque_Redis. Since you implemented __call, consider adding a @method annotation. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-call  annotation

619
		$job = Resque::redis()->/** @scrutinizer ignore-call */ get('worker:' . $this);
Loading history...
620
		if (!$job) {
621
			return array();
0 ignored issues
show
Bug Best Practice introduced by
The expression return array() returns the type array which is incompatible with the documented return type object.
Loading history...
622
		} else {
623
			return json_decode($job, true);
624
		}
625
	}
626 2
627
	/**
628 2
	 * Get a statistic belonging to this worker.
629 2
	 *
630 1
	 * @param string $stat Statistic to fetch.
631
	 * @return int Statistic value.
632
	 */
633 1
	public function getStat($stat)
634
	{
635
		return Resque_Stat::get($stat . ':' . $this);
636
	}
637
638
	/**
639
	 * Inject the logging object into the worker
640
	 *
641
	 * @param Psr\Log\LoggerInterface $logger
642
	 */
643 1
	public function setLogger(Psr\Log\LoggerInterface $logger)
644
	{
645 1
		$this->logger = $logger;
646
	}
647
}
648