Resque_Worker::work()   D
last analyzed

Complexity

Conditions 25
Paths 34

Size

Total Lines 122
Code Lines 69

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 34
CRAP Score 85.9541

Importance

Changes 14
Bugs 0 Features 0
Metric Value
cc 25
eloc 69
c 14
b 0
f 0
nc 34
nop 2
dl 0
loc 122
ccs 34
cts 63
cp 0.5397
crap 85.9541
rs 4.1666

How to fix   Long Method    Complexity   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

1
<?php
2 1
3
declare(ticks=1);
4
5
/**
6
 * Resque worker that handles checking queues for jobs, fetching them
7
 * off the queues, running them and handling the result.
8
 *
9
 * @package		Resque/Worker
10
 * @author		Chris Boulton <[email protected]>
11
 * @license		http://www.opensource.org/licenses/mit-license.php
12
 */
13
class Resque_Worker
14
{
15
	/**
16
	 * @var string Prefix for the process name
17
	 */
18
	private static $processPrefix = 'resque';
19
20
	/**
21
	* @var Psr\Log\LoggerInterface Logging object that impliments the PSR-3 LoggerInterface
22
	*/
23
	public $logger;
24
25
	/**
26
	 * @var bool Whether this worker is running in a forked child process.
27
	 */
28
	public $hasParent = false;
29
30
	/**
31
	 * @var array Array of all associated queues for this worker.
32
	 */
33
	private $queues = array();
34
35
	/**
36
	 * @var string The hostname of this worker.
37
	 */
38
	private $hostname;
39
40
	/**
41
	 * @var boolean True if on the next iteration, the worker should shutdown.
42
	 */
43
	private $shutdown = false;
44
45
	/**
46
	 * @var boolean True if this worker is paused.
47
	 */
48
	private $paused = false;
49
50
	/**
51
	 * @var string String identifying this worker.
52
	 */
53
	private $id;
54
55
	/**
56
	 * @var Resque_Job Current job, if any, being processed by this worker.
57
	 */
58
	private $currentJob = null;
59
60
	/**
61
	 * @var int Process ID of child worker processes.
62
	 */
63
	private $child = null;
64
65
	/**
66
	 * Instantiate a new worker, given a list of queues that it should be working
67
	 * on. The list of queues should be supplied in the priority that they should
68
	 * be checked for jobs (first come, first served)
69
	 *
70
	 * Passing a single '*' allows the worker to work on all queues in alphabetical
71
	 * order. You can easily add new queues dynamically and have them worked on using
72
	 * this method.
73
	 *
74
	 * @param string|array $queues String with a single queue name, array with multiple.
75 69
	 */
76
	public function __construct($queues)
77 69
	{
78
		$this->logger = new Resque_Log();
79 69
80 67
		if (!is_array($queues)) {
81
			$queues = array($queues);
82
		}
83 69
84 69
		$this->queues = $queues;
85
		$this->hostname = php_uname('n');
86 69
87 69
		$this->id = $this->hostname . ':' . getmypid() . ':' . implode(',', $this->queues);
88
	}
89
90
	/**
91
	 * Set the process prefix of the workers to the given prefix string.
92
	 * @param string $prefix The new process prefix
93
	 */
94
	public static function setProcessPrefix($prefix)
95
	{
96
		self::$processPrefix = $prefix;
97
	}
98
99
	/**
100
	 * Return all workers known to Resque as instantiated instances.
101
	 * @return array
102 17
	 */
103
	public static function all()
104 17
	{
105 17
		$workers = Resque::redis()->smembers('workers');
0 ignored issues
show
Bug introduced by
The method smembers() does not exist on Resque_Redis. Since you implemented __call, consider adding a @method annotation. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-call  annotation

105
		$workers = Resque::redis()->/** @scrutinizer ignore-call */ smembers('workers');
Loading history...
106
		if (!is_array($workers)) {
107
			$workers = array();
108
		}
109 17
110 17
		$instances = array();
111 8
		foreach ($workers as $workerId) {
112
			$instances[] = self::find($workerId);
113 17
		}
114
		return $instances;
115
	}
116
117
	/**
118
	 * Given a worker ID, check if it is registered/valid.
119
	 *
120
	 * @param string $workerId ID of the worker.
121
	 * @return boolean True if the worker exists, false if not.
122 11
	 */
123
	public static function exists($workerId)
124 11
	{
125
		return (bool)Resque::redis()->sismember('workers', $workerId);
0 ignored issues
show
Bug introduced by
The method sismember() does not exist on Resque_Redis. Since you implemented __call, consider adding a @method annotation. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-call  annotation

125
		return (bool)Resque::redis()->/** @scrutinizer ignore-call */ sismember('workers', $workerId);
Loading history...
126
	}
127
128
	/**
129
	 * Given a worker ID, find it and return an instantiated worker class for it.
130
	 *
131
	 * @param string $workerId The ID of the worker.
132
	 * @return Resque_Worker Instance of the worker. False if the worker does not exist.
133 9
	 */
134
	public static function find($workerId)
135 9
	{
136
		if (!self::exists($workerId) || false === strpos($workerId, ":")) {
137
			return false;
0 ignored issues
show
Bug Best Practice introduced by
The expression return false returns the type false which is incompatible with the documented return type Resque_Worker.
Loading history...
138
		}
139 9
140 9
		list($hostname, $pid, $queues) = explode(':', $workerId, 3);
141 9
		$queues = explode(',', $queues);
142 9
		$worker = new self($queues);
143 9
		$worker->setId($workerId);
144
		return $worker;
145
	}
146
147
	/**
148
	 * Set the ID of this worker to a given ID string.
149
	 *
150
	 * @param string $workerId ID for the worker.
151 9
	 */
152
	public function setId($workerId)
153 9
	{
154 9
		$this->id = $workerId;
155
	}
156
157
	/**
158
	 * The primary loop for a worker which when called on an instance starts
159
	 * the worker's life cycle.
160
	 *
161
	 * Queues are checked every $interval (seconds) for new jobs.
162
	 *
163
	 * @param int $interval How often to check for new jobs across the queues.
164 13
	 */
165
	public function work($interval = Resque::DEFAULT_INTERVAL, $blocking = false)
166 13
	{
167 13
		$this->updateProcLine('Starting');
168
		$this->startup();
169 13
170 13
		if (function_exists('pcntl_signal_dispatch')) {
171
			pcntl_signal_dispatch();
172
		}
173 13
174 13
		while (true) {
175
			if ($this->shutdown) {
176
				break;
177
			}
178
179
			// is redis still alive?
180 13
			try {
181
				if (!$this->paused && Resque::redis()->ping() === false) {
0 ignored issues
show
Bug introduced by
The method ping() does not exist on Resque_Redis. Since you implemented __call, consider adding a @method annotation. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-call  annotation

181
				if (!$this->paused && Resque::redis()->/** @scrutinizer ignore-call */ ping() === false) {
Loading history...
182
					throw new CredisException('redis ping() failed');
183
				}
184
			} catch (CredisException $e) {
185
				$this->logger->log(Psr\Log\LogLevel::ERROR, 'redis went away. trying to reconnect');
186
				Resque::$redis = null;
187
				usleep($interval * 1000000);
188
				continue;
189
			}
190
191 13
			// Attempt to find and reserve a job
192 13
			$job = false;
193 12
			if (!$this->paused) {
194
				if ($blocking === true) {
195
					$this->logger->log(Psr\Log\LogLevel::INFO, 'Starting blocking with timeout of {interval}', array('interval' => $interval));
196
					$this->updateProcLine('Waiting with blocking timeout ' . $interval);
197 12
				} else {
198
					$this->updateProcLine('Waiting with interval ' . $interval);
199
				}
200 12
201
				$job = $this->reserve($blocking, $interval);
202
			}
203 13
204
			if (!$job) {
205 13
				// For an interval of 0, break now - helps with unit testing etc
206 13
				if ($interval == 0) {
207
					break;
208
				}
209
210
				if ($blocking === false) {
211
					// If no job was found, we sleep for $interval before continuing and checking again
212
					$this->logger->log(Psr\Log\LogLevel::INFO, 'Sleeping for {interval}', array('interval' => $interval));
213
					if ($this->paused) {
214
						$this->updateProcLine('Paused');
215
					} else {
216
						$this->updateProcLine('Waiting');
217
					}
218
219
					usleep($interval * 1000000);
220
				}
221
222
				continue;
223
			}
224
225
			$this->logger->log(Psr\Log\LogLevel::NOTICE, 'Starting work on {job}', array('job' => $job));
226 9
			Resque_Event::trigger('beforeFork', $job);
227 9
			$this->workingOn($job);
0 ignored issues
show
Bug introduced by
It seems like $job can also be of type true; however, parameter $job of Resque_Worker::workingOn() does only seem to accept Resque_Job, maybe add an additional type check? ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-type  annotation

227
			$this->workingOn(/** @scrutinizer ignore-type */ $job);
Loading history...
228 9
229
			$this->child = Resque::fork();
230 9
231
			// Forked and we're the child. Or PCNTL is not installed. Run the job.
232
			if ($this->child === 0 || $this->child === false || $this->child === -1) {
233 9
				$status = 'Processing ' . $job->queue . ' since ' . strftime('%F %T');
234
				$this->updateProcLine($status);
235
				$this->logger->log(Psr\Log\LogLevel::INFO, $status);
236
237
				if (!empty($job->payload['id'])) {
238
					Resque_Job_PID::create($job->payload['id']);
239
				}
240
241
				$this->perform($job);
0 ignored issues
show
Bug introduced by
It seems like $job can also be of type true; however, parameter $job of Resque_Worker::perform() does only seem to accept Resque_Job, maybe add an additional type check? ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-type  annotation

241
				$this->perform(/** @scrutinizer ignore-type */ $job);
Loading history...
242
243
				if (!empty($job->payload['id'])) {
244
					Resque_Job_PID::del($job->payload['id']);
245
				}
246
247
				if ($this->child === 0) {
248
					exit(0);
0 ignored issues
show
Best Practice introduced by
Using exit here is not recommended.

In general, usage of exit should be done with care and only when running in a scripting context like a CLI script.

Loading history...
249
				}
250
			}
251
252
			if ($this->child > 0) {
253 9
				// Parent process, sit and wait
254
				$status = 'Forked ' . $this->child . ' at ' . strftime('%F %T');
255 9
				$this->updateProcLine($status);
256 9
				$this->logger->log(Psr\Log\LogLevel::INFO, $status);
257 9
258
				// Wait until the child process finishes before continuing
259
				while (pcntl_wait($status, WNOHANG) === 0) {
260 9
					if (function_exists('pcntl_signal_dispatch')) {
261 9
						pcntl_signal_dispatch();
262 9
					}
263
264
					// Pause for a half a second to conserve system resources
265
					usleep(500000);
266 9
				}
267
268
				if (pcntl_wifexited($status) !== true) {
269 9
					$job->fail(new Resque_Job_DirtyExitException('Job exited abnormally'));
270
				} elseif (($exitStatus = pcntl_wexitstatus($status)) !== 0) {
271 9
					$job->fail(new Resque_Job_DirtyExitException(
272
						'Job exited with exit code ' . $exitStatus
273
					));
274
				} else {
275
					if (in_array($job->getStatus(), array(Resque_Job_Status::STATUS_WAITING, Resque_Job_Status::STATUS_RUNNING))) {
276
						$job->updateStatus(Resque_Job_Status::STATUS_COMPLETE);
277
						$this->logger->log(Psr\Log\LogLevel::INFO, 'done ' . $job);
0 ignored issues
show
Bug introduced by
Are you sure $job of type object|true can be used in concatenation? ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-type  annotation

277
						$this->logger->log(Psr\Log\LogLevel::INFO, 'done ' . /** @scrutinizer ignore-type */ $job);
Loading history...
278 9
					}
279
				}
280 1
			}
281 1
282
			$this->child = null;
283
			$this->doneWorking();
284
		}
285
286 9
		$this->unregisterWorker();
287 9
	}
288
289
	/**
290 13
	 * Process a single job.
291 13
	 *
292
	 * @param Resque_Job $job The job to be processed.
293
	 */
294
	public function perform(Resque_Job $job)
295
	{
296
		$result = null;
0 ignored issues
show
Unused Code introduced by
The assignment to $result is dead and can be removed.
Loading history...
297
		try {
298 5
			Resque_Event::trigger('afterFork', $job);
299
			$result = $job->perform();
300 5
		} catch (Exception $e) {
301
			$this->logger->log(Psr\Log\LogLevel::CRITICAL, '{job} has failed {exception}', array('job' => $job, 'exception' => $e));
302 5
			$job->fail($e);
303 5
			return;
304
		} catch (Error $e) {
305 1
			$this->logger->log(Psr\Log\LogLevel::CRITICAL, '{job} has failed {exception}', array('job' => $job, 'exception' => $e));
306 1
			$job->fail($e);
307 1
			return;
308 1
		}
309
310
		$job->updateStatus(Resque_Job_Status::STATUS_COMPLETE, $result);
311
		$this->logger->log(Psr\Log\LogLevel::NOTICE, '{job} has finished', array('job' => $job));
312
	}
313
314
	/**
315
	 * @param  bool            $blocking
316 4
	 * @param  int             $timeout
317 4
	 * @return object|boolean               Instance of Resque_Job if a job is found, false if not.
318 4
	 */
319
	public function reserve($blocking = false, $timeout = null)
320
	{
321
		if ($this->hasParent && !posix_kill(posix_getppid(), 0)) {
322
			$this->shutdown();
323
			return false;
324
		}
325 22
326
		$queues = $this->queues();
327 22
		if (!is_array($queues)) {
0 ignored issues
show
introduced by
The condition is_array($queues) is always true.
Loading history...
328
			return;
329
		}
330
331
		if ($blocking === true) {
332
			if (empty($queues)) {
333 22
				$this->logger->log(Psr\Log\LogLevel::INFO, 'No queue was found, sleeping for {interval}', array('interval' => $timeout));
334 22
				usleep($timeout * 1000000);
335
				return false;
336
			}
337
			$job = Resque_Job::reserveBlocking($queues, $timeout);
338 22
			if ($job) {
0 ignored issues
show
introduced by
The condition $job is always false.
Loading history...
339 1
				$this->logger->log(Psr\Log\LogLevel::INFO, 'Found job on {queue}', array('queue' => $job->queue));
340
				return $job;
341
			}
342
		} else {
343
			foreach ($queues as $queue) {
344 1
				$this->logger->log(Psr\Log\LogLevel::INFO, 'Checking {queue} for jobs', array('queue' => $queue));
345 1
				$job = Resque_Job::reserve($queue);
346 1
				if ($job) {
347 1
					$this->logger->log(Psr\Log\LogLevel::INFO, 'Found job on {queue}', array('queue' => $job->queue));
348
					return $job;
349
				}
350 21
			}
351 21
		}
352 21
353 21
		return false;
354 17
	}
355 17
356
	/**
357
	 * Return an array containing all of the queues that this worker should use
358
	 * when searching for jobs.
359
	 *
360 13
	 * If * is found in the list of queues, every queue will be searched in
361
	 * alphabetic order. (@see $fetch)
362
	 *
363
	 * @param boolean $fetch If true, and the queue is set to *, will fetch
364
	 * all queue names from redis.
365
	 * @return array Array of associated queues.
366
	 */
367
	public function queues($fetch = true)
368
	{
369
		if (!in_array('*', $this->queues) || $fetch == false) {
0 ignored issues
show
Coding Style Best Practice introduced by
It seems like you are loosely comparing two booleans. Considering using the strict comparison === instead.

When comparing two booleans, it is generally considered safer to use the strict comparison operator.

Loading history...
370
			return $this->queues;
371
		}
372
373
		$queues = Resque::queues();
374 22
		sort($queues);
375
		return $queues;
376 22
	}
377 20
378
	/**
379
	 * Perform necessary actions to start a worker.
380 2
	 */
381 2
	private function startup()
382 2
	{
383
		$this->registerSigHandlers();
384
		$this->pruneDeadWorkers();
385
		Resque_Event::trigger('beforeFirstFork', $this);
386
		$this->registerWorker();
387
	}
388 13
389
	/**
390 13
	 * On supported systems (with the PECL proctitle module installed), update
391 13
	 * the name of the currently running process to indicate the current state
392 13
	 * of a worker.
393 13
	 *
394 13
	 * @param string $status The updated process title.
395
	 */
396
	private function updateProcLine($status)
397
	{
398
		$processTitle = static::$processPrefix . '-' . Resque::VERSION . ' (' . implode(',', $this->queues) . '): ' . $status;
0 ignored issues
show
Bug introduced by
Since $processPrefix is declared private, accessing it with static will lead to errors in possible sub-classes; you can either use self, or increase the visibility of $processPrefix to at least protected.
Loading history...
399
		if (function_exists('cli_set_process_title') && PHP_OS !== 'Darwin') {
400
			cli_set_process_title($processTitle);
401
		} elseif (function_exists('setproctitle')) {
402
			setproctitle($processTitle);
403 13
		}
404
	}
405 13
406 13
	/**
407 13
	 * Register signal handlers that a worker should respond to.
408
	 *
409
	 * TERM: Shutdown immediately and stop processing jobs.
410
	 * INT: Shutdown immediately and stop processing jobs.
411
	 * QUIT: Shutdown after the current job finishes processing.
412 13
	 * USR1: Kill the forked child immediately and continue processing jobs.
413
	 */
414
	private function registerSigHandlers()
415
	{
416
		if (!function_exists('pcntl_signal')) {
417
			return;
418
		}
419
420
		pcntl_signal(SIGTERM, array($this, 'shutDownNow'));
421
		pcntl_signal(SIGINT, array($this, 'shutDownNow'));
422 13
		pcntl_signal(SIGQUIT, array($this, 'shutdown'));
423
		pcntl_signal(SIGUSR1, array($this, 'killChild'));
424 13
		pcntl_signal(SIGUSR2, array($this, 'pauseProcessing'));
425
		pcntl_signal(SIGCONT, array($this, 'unPauseProcessing'));
426
		$this->logger->log(Psr\Log\LogLevel::DEBUG, 'Registered signals');
427
	}
428 13
429 13
	/**
430 13
	 * Signal handler callback for USR2, pauses processing of new jobs.
431 13
	 */
432 13
	public function pauseProcessing()
433 13
	{
434 13
		$this->logger->log(Psr\Log\LogLevel::NOTICE, 'USR2 received; pausing job processing');
435 13
		$this->paused = true;
436
	}
437
438
	/**
439
	 * Signal handler callback for CONT, resumes worker allowing it to pick
440 2
	 * up new jobs.
441
	 */
442 2
	public function unPauseProcessing()
443 2
	{
444 2
		$this->logger->log(Psr\Log\LogLevel::NOTICE, 'CONT received; resuming job processing');
445
		$this->paused = false;
446
	}
447
448
	/**
449
	 * Schedule a worker for shutdown. Will finish processing the current job
450 1
	 * and when the timeout interval is reached, the worker will shut down.
451
	 */
452 1
	public function shutdown()
453 1
	{
454 1
		$this->shutdown = true;
455
		$this->logger->log(Psr\Log\LogLevel::NOTICE, 'Shutting down');
456
	}
457
458
	/**
459
	 * Force an immediate shutdown of the worker, killing any child jobs
460
	 * currently running.
461
	 */
462
	public function shutdownNow()
463
	{
464
		$this->shutdown();
465
		$this->killChild();
466
	}
467
468
	/**
469
	 * @return int Child process PID.
470
	 */
471
	public function getChildPID()
472
	{
473
		return $this->child;
474
	}
475
476
	/**
477
	 * Kill a forked child job immediately. The job it is processing will not
478
	 * be completed.
479
	 */
480
	public function killChild()
481
	{
482
		if (!$this->child) {
483
			$this->logger->log(Psr\Log\LogLevel::DEBUG, 'No child to kill.');
484
			return;
485
		}
486
487
		$this->logger->log(Psr\Log\LogLevel::INFO, 'Killing child at {child}', array('child' => $this->child));
488
		if (exec('ps -o pid,s -p ' . $this->child, $output, $returnCode) && $returnCode != 1) {
489
			$this->logger->log(Psr\Log\LogLevel::DEBUG, 'Child {child} found, killing.', array('child' => $this->child));
490
			posix_kill($this->child, SIGKILL);
491
			$this->child = null;
492
		} else {
493
			$this->logger->log(Psr\Log\LogLevel::INFO, 'Child {child} not found, restarting.', array('child' => $this->child));
494
			$this->shutdown();
495
		}
496
	}
497
498
	/**
499
	 * Look for any workers which should be running on this server and if
500
	 * they're not, remove them from Redis.
501
	 *
502
	 * This is a form of garbage collection to handle cases where the
503
	 * server may have been killed and the Resque workers did not die gracefully
504
	 * and therefore leave state information in Redis.
505
	 */
506
	public function pruneDeadWorkers()
507
	{
508
		$workerPids = $this->workerPids();
509
		$workers = self::all();
510
		foreach ($workers as $worker) {
511
			if (is_object($worker)) {
512
				list($host, $pid, $queues) = explode(':', (string)$worker, 3);
513
				if ($host != $this->hostname || in_array($pid, $workerPids) || $pid == getmypid()) {
514 15
					continue;
515
				}
516 15
				$this->logger->log(Psr\Log\LogLevel::INFO, 'Pruning dead worker: {worker}', array('worker' => (string)$worker));
517 15
				$worker->unregisterWorker();
518 15
			}
519 7
		}
520 7
	}
521 7
522 7
	/**
523
	 * Return an array of process IDs for all of the Resque workers currently
524 2
	 * running on this machine.
525 2
	 *
526
	 * @return array Array of Resque worker process IDs.
527
	 */
528 15
	public function workerPids()
529
	{
530
		$pids = array();
531
		if (strtoupper(substr(PHP_OS, 0, 3)) === 'WIN') {
532
			exec('WMIC path win32_process get Processid,Commandline | findstr resque | findstr /V findstr', $cmdOutput);
533
			foreach ($cmdOutput as $line) {
534
				$line = preg_replace('/\s+/m', ' ', $line);
535
				list(,,$pids[]) = explode(' ', trim($line), 3);
536 15
			}
537
		} else {
538 15
			exec('ps -A -o pid,args | grep [r]esque', $cmdOutput);
539 15
			foreach ($cmdOutput as $line) {
540
				list($pids[],) = explode(' ', trim($line), 2);
541
			}
542
		}
543
		return $pids;
544
	}
545
546
	/**
547 15
	 * Register this worker in Redis.
548 15
	 */
549 15
	public function registerWorker()
550
	{
551
		Resque::redis()->sadd('workers', (string)$this);
0 ignored issues
show
Bug introduced by
The method sadd() does not exist on Resque_Redis. Since you implemented __call, consider adding a @method annotation. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-call  annotation

551
		Resque::redis()->/** @scrutinizer ignore-call */ sadd('workers', (string)$this);
Loading history...
552 15
		Resque::redis()->set('worker:' . (string)$this . ':started', date('c'));
0 ignored issues
show
Bug introduced by
The method set() does not exist on Resque_Redis. Since you implemented __call, consider adding a @method annotation. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-call  annotation

552
		Resque::redis()->/** @scrutinizer ignore-call */ set('worker:' . (string)$this . ':started', date('c'));
Loading history...
553
	}
554
555
	/**
556
	 * Unregister this worker in Redis. (shutdown etc)
557
	 */
558 60
	public function unregisterWorker()
559
	{
560 60
		if (is_object($this->currentJob)) {
561 60
			$this->currentJob->fail(new Resque_Job_DirtyExitException());
562 60
		}
563
564
		$id = (string)$this;
565
		Resque::redis()->srem('workers', $id);
0 ignored issues
show
Bug introduced by
The method srem() does not exist on Resque_Redis. Since you implemented __call, consider adding a @method annotation. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-call  annotation

565
		Resque::redis()->/** @scrutinizer ignore-call */ srem('workers', $id);
Loading history...
566
		Resque::redis()->del('worker:' . $id);
0 ignored issues
show
Bug introduced by
The method del() does not exist on Resque_Redis. Since you implemented __call, consider adding a @method annotation. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-call  annotation

566
		Resque::redis()->/** @scrutinizer ignore-call */ del('worker:' . $id);
Loading history...
567 17
		Resque::redis()->del('worker:' . $id . ':started');
568
		Resque_Stat::clear('processed:' . $id);
569 17
		Resque_Stat::clear('failed:' . $id);
570 1
	}
571
572
	/**
573 17
	 * Tell Redis which job we're currently working on.
574 17
	 *
575 17
	 * @param object $job Resque_Job instance containing the job we're working on.
576 17
	 */
577 17
	public function workingOn(Resque_Job $job)
578 17
	{
579 17
		$job->worker = $this;
580
		$this->currentJob = $job;
581
		$job->updateStatus(Resque_Job_Status::STATUS_RUNNING);
582
		$data = json_encode(array(
583
			'queue' => $job->queue,
584
			'run_at' => date('c'),
585
			'payload' => $job->payload
586 14
		));
587
		Resque::redis()->set('worker:' . $job->worker, $data);
588 14
	}
589 14
590 14
	/**
591 14
	 * Notify Redis that we've finished working on a job, clearing the working
592 14
	 * state and incrementing the job stats.
593 14
	 */
594 14
	public function doneWorking()
595
	{
596 14
		$this->currentJob = null;
597 14
		Resque_Stat::incr('processed');
598
		Resque_Stat::incr('processed:' . (string)$this);
599
		Resque::redis()->del('worker:' . (string)$this);
600
	}
601
602
	/**
603 10
	 * Generate a string representation of this worker.
604
	 *
605 10
	 * @return string String identifier for this worker instance.
606 10
	 */
607 10
	public function __toString()
608 10
	{
609 10
		return $this->id;
610
	}
611
612
	/**
613
	 * Return an object describing the job this worker is currently working on.
614
	 *
615
	 * @return object Object with details of current job.
616 63
	 */
617
	public function job()
618 63
	{
619
		$job = Resque::redis()->get('worker:' . $this);
0 ignored issues
show
Bug introduced by
The method get() does not exist on Resque_Redis. Since you implemented __call, consider adding a @method annotation. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-call  annotation

619
		$job = Resque::redis()->/** @scrutinizer ignore-call */ get('worker:' . $this);
Loading history...
620
		if (!$job) {
621
			return array();
0 ignored issues
show
Bug Best Practice introduced by
The expression return array() returns the type array which is incompatible with the documented return type object.
Loading history...
622
		} else {
623
			return json_decode($job, true);
624
		}
625
	}
626 2
627
	/**
628 2
	 * Get a statistic belonging to this worker.
629 2
	 *
630 1
	 * @param string $stat Statistic to fetch.
631
	 * @return int Statistic value.
632
	 */
633 1
	public function getStat($stat)
634
	{
635
		return Resque_Stat::get($stat . ':' . $this);
636
	}
637
638
	/**
639
	 * Inject the logging object into the worker
640
	 *
641
	 * @param Psr\Log\LoggerInterface $logger
642
	 */
643 1
	public function setLogger(Psr\Log\LoggerInterface $logger)
644
	{
645 1
		$this->logger = $logger;
646
	}
647
}
648