Passed
Push — master ( fdca42...7dd147 )
by Hennik
03:23 queued 10s
created

Resque_Worker::getChildPID()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 2
Code Lines 1

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 0
CRAP Score 2

Importance

Changes 0
Metric Value
cc 1
eloc 1
c 0
b 0
f 0
nc 1
nop 0
dl 0
loc 2
ccs 0
cts 2
cp 0
crap 2
rs 10
1
<?php
2 1
declare(ticks = 1);
3
4
/**
5
 * Resque worker that handles checking queues for jobs, fetching them
6
 * off the queues, running them and handling the result.
7
 *
8
 * @package		Resque/Worker
9
 * @author		Chris Boulton <[email protected]>
10
 * @license		http://www.opensource.org/licenses/mit-license.php
11
 */
12
class Resque_Worker
13
{
14
	/**
15
	 * @var string Prefix for the process name
16
	 */
17
	private static $processPrefix = 'resque';
18
19
	/**
20
	* @var LoggerInterface Logging object that impliments the PSR-3 LoggerInterface
0 ignored issues
show
Bug introduced by
The type LoggerInterface was not found. Maybe you did not declare it correctly or list all dependencies?

The issue could also be caused by a filter entry in the build configuration. If the path has been excluded in your configuration, e.g. excluded_paths: ["lib/*"], you can move it to the dependency path list as follows:

filter:
    dependency_paths: ["lib/*"]

For further information see https://scrutinizer-ci.com/docs/tools/php/php-scrutinizer/#list-dependency-paths

Loading history...
21
	*/
22
	public $logger;
23
24
	/**
25
	 * @var bool Whether this worker is running in a forked child process.
26
	 */
27
	public $hasParent = false;
28
29
	/**
30
	 * @var array Array of all associated queues for this worker.
31
	 */
32
	private $queues = array();
33
34
	/**
35
	 * @var string The hostname of this worker.
36
	 */
37
	private $hostname;
38
39
	/**
40
	 * @var boolean True if on the next iteration, the worker should shutdown.
41
	 */
42
	private $shutdown = false;
43
44
	/**
45
	 * @var boolean True if this worker is paused.
46
	 */
47
	private $paused = false;
48
49
	/**
50
	 * @var string String identifying this worker.
51
	 */
52
	private $id;
53
54
	/**
55
	 * @var Resque_Job Current job, if any, being processed by this worker.
56
	 */
57
	private $currentJob = null;
58
59
	/**
60
	 * @var int Process ID of child worker processes.
61
	 */
62
	private $child = null;
63
64
    /**
65
     * Instantiate a new worker, given a list of queues that it should be working
66
     * on. The list of queues should be supplied in the priority that they should
67
     * be checked for jobs (first come, first served)
68
     *
69
     * Passing a single '*' allows the worker to work on all queues in alphabetical
70
     * order. You can easily add new queues dynamically and have them worked on using
71
     * this method.
72
     *
73
     * @param string|array $queues String with a single queue name, array with multiple.
74
     */
75 69
    public function __construct($queues)
76
    {
77 69
        $this->logger = new Resque_Log();
0 ignored issues
show
Documentation Bug introduced by
It seems like new Resque_Log() of type Resque_Log is incompatible with the declared type LoggerInterface of property $logger.

Our type inference engine has found an assignment to a property that is incompatible with the declared type of that property.

Either this assignment is in error or the assigned type should be added to the documentation/type hint for that property..

Loading history...
78
79 69
        if(!is_array($queues)) {
80 67
            $queues = array($queues);
81
        }
82
83 69
        $this->queues = $queues;
84 69
        $this->hostname = php_uname('n');
85
86 69
        $this->id = $this->hostname . ':'.getmypid() . ':' . implode(',', $this->queues);
87 69
    }
88
89
	/**
90
	 * Set the process prefix of the workers to the given prefix string.
91
	 * @param string $prefix The new process prefix
92
	 */
93
	public static function setProcessPrefix($prefix)
94
	{
95
		self::$processPrefix = $prefix;
96
	}
97
98
	/**
99
	 * Return all workers known to Resque as instantiated instances.
100
	 * @return array
101
	 */
102 17
	public static function all()
103
	{
104 17
		$workers = Resque::redis()->smembers('workers');
0 ignored issues
show
Bug introduced by
The method smembers() does not exist on Resque_Redis. Since you implemented __call, consider adding a @method annotation. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-call  annotation

104
		$workers = Resque::redis()->/** @scrutinizer ignore-call */ smembers('workers');
Loading history...
105 17
		if(!is_array($workers)) {
106
			$workers = array();
107
		}
108
109 17
		$instances = array();
110 17
		foreach($workers as $workerId) {
111 8
			$instances[] = self::find($workerId);
112
		}
113 17
		return $instances;
114
	}
115
116
	/**
117
	 * Given a worker ID, check if it is registered/valid.
118
	 *
119
	 * @param string $workerId ID of the worker.
120
	 * @return boolean True if the worker exists, false if not.
121
	 */
122 11
	public static function exists($workerId)
123
	{
124 11
		return (bool)Resque::redis()->sismember('workers', $workerId);
0 ignored issues
show
Bug introduced by
The method sismember() does not exist on Resque_Redis. Since you implemented __call, consider adding a @method annotation. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-call  annotation

124
		return (bool)Resque::redis()->/** @scrutinizer ignore-call */ sismember('workers', $workerId);
Loading history...
125
	}
126
127
	/**
128
	 * Given a worker ID, find it and return an instantiated worker class for it.
129
	 *
130
	 * @param string $workerId The ID of the worker.
131
	 * @return Resque_Worker Instance of the worker. False if the worker does not exist.
132
	 */
133 9
	public static function find($workerId)
134
	{
135 9
		if(!self::exists($workerId) || false === strpos($workerId, ":")) {
136
			return false;
0 ignored issues
show
Bug Best Practice introduced by
The expression return false returns the type false which is incompatible with the documented return type Resque_Worker.
Loading history...
137
		}
138
139 9
		list($hostname, $pid, $queues) = explode(':', $workerId, 3);
140 9
		$queues = explode(',', $queues);
141 9
		$worker = new self($queues);
142 9
		$worker->setId($workerId);
143 9
		return $worker;
144
	}
145
146
	/**
147
	 * Set the ID of this worker to a given ID string.
148
	 *
149
	 * @param string $workerId ID for the worker.
150
	 */
151 9
	public function setId($workerId)
152
	{
153 9
		$this->id = $workerId;
154 9
	}
155
156
	/**
157
	 * The primary loop for a worker which when called on an instance starts
158
	 * the worker's life cycle.
159
	 *
160
	 * Queues are checked every $interval (seconds) for new jobs.
161
	 *
162
	 * @param int $interval How often to check for new jobs across the queues.
163
	 */
164 13
	public function work($interval = Resque::DEFAULT_INTERVAL, $blocking = false)
165
	{
166 13
		$this->updateProcLine('Starting');
167 13
		$this->startup();
168
169 13
        if(function_exists('pcntl_signal_dispatch')) {
170 13
            pcntl_signal_dispatch();
171
        }
172
173 13
		while(true) {
174 13
			if($this->shutdown) {
175
				break;
176
			}
177
178
			// is redis still alive?
179
			try {
180 13
			    if (Resque::redis()->ping() === false) {
0 ignored issues
show
Bug introduced by
The method ping() does not exist on Resque_Redis. Since you implemented __call, consider adding a @method annotation. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-call  annotation

180
			    if (Resque::redis()->/** @scrutinizer ignore-call */ ping() === false) {
Loading history...
181
			        throw new CredisException('redis ping() failed');
182
			    }
183
			} catch (CredisException $e) {
184
			    $this->logger->log(Psr\Log\LogLevel::ERROR, 'redis went away. trying to reconnect');
185
			    Resque::$redis = null;
186
			    usleep($interval * 1000000);
187
			    continue;
188
			}
189
190
			// Attempt to find and reserve a job
191 13
			$job = false;
192 13
			if(!$this->paused) {
193 12
				if($blocking === true) {
194
					$this->logger->log(Psr\Log\LogLevel::INFO, 'Starting blocking with timeout of {interval}', array('interval' => $interval));
195
					$this->updateProcLine('Waiting with blocking timeout ' . $interval);
196
				} else {
197 12
					$this->updateProcLine('Waiting with interval ' . $interval);
198
				}
199
200 12
				$job = $this->reserve($blocking, $interval);
201
			}
202
203 13
			if(!$job) {
204
				// For an interval of 0, break now - helps with unit testing etc
205 13
				if($interval == 0) {
206 13
					break;
207
				}
208
209
				if($blocking === false)
210
				{
211
					// If no job was found, we sleep for $interval before continuing and checking again
212
					$this->logger->log(Psr\Log\LogLevel::INFO, 'Sleeping for {interval}', array('interval' => $interval));
213
					if($this->paused) {
214
						$this->updateProcLine('Paused');
215
					}
216
					else {
217
						$this->updateProcLine('Waiting');
218
					}
219
220
					usleep($interval * 1000000);
221
				}
222
223
				continue;
224
			}
225
226 9
			$this->logger->log(Psr\Log\LogLevel::NOTICE, 'Starting work on {job}', array('job' => $job));
227 9
			Resque_Event::trigger('beforeFork', $job);
228 9
			$this->workingOn($job);
0 ignored issues
show
Bug introduced by
It seems like $job can also be of type true; however, parameter $job of Resque_Worker::workingOn() does only seem to accept Resque_Job, maybe add an additional type check? ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-type  annotation

228
			$this->workingOn(/** @scrutinizer ignore-type */ $job);
Loading history...
229
230 9
			$this->child = Resque::fork();
231
232
			// Forked and we're the child. Or PCNTL is not installed. Run the job.
233 9
			if ($this->child === 0 || $this->child === false || $this->child === -1) {
234
				$status = 'Processing ' . $job->queue . ' since ' . strftime('%F %T');
235
				$this->updateProcLine($status);
236
				$this->logger->log(Psr\Log\LogLevel::INFO, $status);
237
238
				if(!empty($job->payload['id'])) {
239
					Resque_Job_PID::create($job->payload['id']);
240
				}
241
242
				$this->perform($job);
0 ignored issues
show
Bug introduced by
It seems like $job can also be of type true; however, parameter $job of Resque_Worker::perform() does only seem to accept Resque_Job, maybe add an additional type check? ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-type  annotation

242
				$this->perform(/** @scrutinizer ignore-type */ $job);
Loading history...
243
244
				if(!empty($job->payload['id'])) {
245
					Resque_Job_PID::del($job->payload['id']);
246
				}
247
248
				if ($this->child === 0) {
249
					exit(0);
0 ignored issues
show
Best Practice introduced by
Using exit here is not recommended.

In general, usage of exit should be done with care and only when running in a scripting context like a CLI script.

Loading history...
250
				}
251
			}
252
253 9
			if($this->child > 0) {
254
				// Parent process, sit and wait
255 9
				$status = 'Forked ' . $this->child . ' at ' . strftime('%F %T');
256 9
				$this->updateProcLine($status);
257 9
				$this->logger->log(Psr\Log\LogLevel::INFO, $status);
258
259
				// Wait until the child process finishes before continuing
260 9
				while (pcntl_wait($status, WNOHANG) === 0) {
261 9
					if(function_exists('pcntl_signal_dispatch')) {
262 9
			            pcntl_signal_dispatch();
263
			        }
264
265
					// Pause for a half a second to conserve system resources
266 9
					usleep(500000);
267
				}
268
269 9
				if (pcntl_wifexited($status) !== true) {
270
					$job->fail(new Resque_Job_DirtyExitException('Job exited abnormally'));
271 9
				} elseif (($exitStatus = pcntl_wexitstatus($status)) !== 0) {
272
					$job->fail(new Resque_Job_DirtyExitException(
273
						'Job exited with exit code ' . $exitStatus
274
					));
275
				}
276
				else
277
				{
278 9
					if (in_array($job->getStatus(), array(Resque_Job_Status::STATUS_WAITING, Resque_Job_Status::STATUS_RUNNING)))
279
					{
280 1
						$job->updateStatus(Resque_Job_Status::STATUS_COMPLETE);
281 1
						$this->logger->log(Psr\Log\LogLevel::INFO, 'done ' . $job);
0 ignored issues
show
Bug introduced by
Are you sure $job of type object|true can be used in concatenation? ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-type  annotation

281
						$this->logger->log(Psr\Log\LogLevel::INFO, 'done ' . /** @scrutinizer ignore-type */ $job);
Loading history...
282
					}
283
				}
284
			}
285
286 9
			$this->child = null;
287 9
			$this->doneWorking();
288
		}
289
290 13
		$this->unregisterWorker();
291 13
	}
292
293
	/**
294
	 * Process a single job.
295
	 *
296
	 * @param Resque_Job $job The job to be processed.
297
	 */
298 5
	public function perform(Resque_Job $job)
299
	{
300 5
		$result = null;
0 ignored issues
show
Unused Code introduced by
The assignment to $result is dead and can be removed.
Loading history...
301
		try {
302 5
			Resque_Event::trigger('afterFork', $job);
303 5
			$result = $job->perform();
304
		}
305 1
		catch(Exception $e) {
306 1
			$this->logger->log(Psr\Log\LogLevel::CRITICAL, '{job} has failed {exception}', array('job' => $job, 'exception' => $e));
307 1
			$job->fail($e);
308 1
			return;
309
		}
310
		catch(Error $e) {
311
			$this->logger->log(Psr\Log\LogLevel::CRITICAL, '{job} has failed {exception}', array('job' => $job, 'exception' => $e));
312
			$job->fail($e);
313
			return;
314
		}
315
316 4
		$job->updateStatus(Resque_Job_Status::STATUS_COMPLETE, $result);
317 4
		$this->logger->log(Psr\Log\LogLevel::NOTICE, '{job} has finished', array('job' => $job));
318 4
	}
319
320
	/**
321
	 * @param  bool            $blocking
322
	 * @param  int             $timeout
323
	 * @return object|boolean               Instance of Resque_Job if a job is found, false if not.
324
	 */
325 22
	public function reserve($blocking = false, $timeout = null)
326
	{
327 22
		if ($this->hasParent && !posix_kill(posix_getppid(), 0))
328
		{
329
			$this->shutdown();
330
			return false;
331
		}
332
333 22
		$queues = $this->queues();
334 22
		if(!is_array($queues)) {
0 ignored issues
show
introduced by
The condition is_array($queues) is always true.
Loading history...
335
			return;
336
		}
337
338 22
		if($blocking === true) {
339 1
		    if(empty($queues)){
340
                $this->logger->log(Psr\Log\LogLevel::INFO, 'No queue was found, sleeping for {interval}', array('interval' => $timeout));
341
                usleep($timeout * 1000000);
342
                return false;
343
            }
344 1
			$job = Resque_Job::reserveBlocking($queues, $timeout);
345 1
			if($job) {
0 ignored issues
show
introduced by
The condition $job is always false.
Loading history...
346 1
				$this->logger->log(Psr\Log\LogLevel::INFO, 'Found job on {queue}', array('queue' => $job->queue));
347 1
				return $job;
348
			}
349
		} else {
350 21
			foreach($queues as $queue) {
351 21
				$this->logger->log(Psr\Log\LogLevel::INFO, 'Checking {queue} for jobs', array('queue' => $queue));
352 21
				$job = Resque_Job::reserve($queue);
353 21
				if($job) {
354 17
					$this->logger->log(Psr\Log\LogLevel::INFO, 'Found job on {queue}', array('queue' => $job->queue));
355 17
					return $job;
356
				}
357
			}
358
		}
359
360 13
		return false;
361
	}
362
363
	/**
364
	 * Return an array containing all of the queues that this worker should use
365
	 * when searching for jobs.
366
	 *
367
	 * If * is found in the list of queues, every queue will be searched in
368
	 * alphabetic order. (@see $fetch)
369
	 *
370
	 * @param boolean $fetch If true, and the queue is set to *, will fetch
371
	 * all queue names from redis.
372
	 * @return array Array of associated queues.
373
	 */
374 22
	public function queues($fetch = true)
375
	{
376 22
		if(!in_array('*', $this->queues) || $fetch == false) {
0 ignored issues
show
Coding Style Best Practice introduced by
It seems like you are loosely comparing two booleans. Considering using the strict comparison === instead.

When comparing two booleans, it is generally considered safer to use the strict comparison operator.

Loading history...
377 20
			return $this->queues;
378
		}
379
380 2
		$queues = Resque::queues();
381 2
		sort($queues);
382 2
		return $queues;
383
	}
384
385
	/**
386
	 * Perform necessary actions to start a worker.
387
	 */
388 13
	private function startup()
389
	{
390 13
		$this->registerSigHandlers();
391 13
		$this->pruneDeadWorkers();
392 13
		Resque_Event::trigger('beforeFirstFork', $this);
393 13
		$this->registerWorker();
394 13
	}
395
396
	/**
397
	 * On supported systems (with the PECL proctitle module installed), update
398
	 * the name of the currently running process to indicate the current state
399
	 * of a worker.
400
	 *
401
	 * @param string $status The updated process title.
402
	 */
403 13
	private function updateProcLine($status)
404
	{
405 13
		$processTitle = static::$processPrefix . '-' . Resque::VERSION . ' (' . implode(',', $this->queues) . '): ' . $status;
0 ignored issues
show
Bug introduced by
Since $processPrefix is declared private, accessing it with static will lead to errors in possible sub-classes; you can either use self, or increase the visibility of $processPrefix to at least protected.
Loading history...
406 13
		if(function_exists('cli_set_process_title') && PHP_OS !== 'Darwin') {
407 13
			cli_set_process_title($processTitle);
408
		}
409
		else if(function_exists('setproctitle')) {
410
			setproctitle($processTitle);
411
		}
412 13
	}
413
414
	/**
415
	 * Register signal handlers that a worker should respond to.
416
	 *
417
	 * TERM: Shutdown immediately and stop processing jobs.
418
	 * INT: Shutdown immediately and stop processing jobs.
419
	 * QUIT: Shutdown after the current job finishes processing.
420
	 * USR1: Kill the forked child immediately and continue processing jobs.
421
	 */
422 13
	private function registerSigHandlers()
423
	{
424 13
		if(!function_exists('pcntl_signal')) {
425
			return;
426
		}
427
428 13
		pcntl_signal(SIGTERM, array($this, 'shutDownNow'));
429 13
		pcntl_signal(SIGINT, array($this, 'shutDownNow'));
430 13
		pcntl_signal(SIGQUIT, array($this, 'shutdown'));
431 13
		pcntl_signal(SIGUSR1, array($this, 'killChild'));
432 13
		pcntl_signal(SIGUSR2, array($this, 'pauseProcessing'));
433 13
		pcntl_signal(SIGCONT, array($this, 'unPauseProcessing'));
434 13
		$this->logger->log(Psr\Log\LogLevel::DEBUG, 'Registered signals');
435 13
	}
436
437
	/**
438
	 * Signal handler callback for USR2, pauses processing of new jobs.
439
	 */
440 2
	public function pauseProcessing()
441
	{
442 2
		$this->logger->log(Psr\Log\LogLevel::NOTICE, 'USR2 received; pausing job processing');
443 2
		$this->paused = true;
444 2
	}
445
446
	/**
447
	 * Signal handler callback for CONT, resumes worker allowing it to pick
448
	 * up new jobs.
449
	 */
450 1
	public function unPauseProcessing()
451
	{
452 1
		$this->logger->log(Psr\Log\LogLevel::NOTICE, 'CONT received; resuming job processing');
453 1
		$this->paused = false;
454 1
	}
455
456
	/**
457
	 * Schedule a worker for shutdown. Will finish processing the current job
458
	 * and when the timeout interval is reached, the worker will shut down.
459
	 */
460
	public function shutdown()
461
	{
462
		$this->shutdown = true;
463
		$this->logger->log(Psr\Log\LogLevel::NOTICE, 'Shutting down');
464
	}
465
466
	/**
467
	 * Force an immediate shutdown of the worker, killing any child jobs
468
	 * currently running.
469
	 */
470
	public function shutdownNow()
471
	{
472
		$this->shutdown();
473
		$this->killChild();
474
	}
475
476
	/**
477
	 * @return int Child process PID.
478
	 */
479
	public function getChildPID() {
480
		return $this->child;
481
	}
482
483
	/**
484
	 * Kill a forked child job immediately. The job it is processing will not
485
	 * be completed.
486
	 */
487
	public function killChild()
488
	{
489
		if(!$this->child) {
490
			$this->logger->log(Psr\Log\LogLevel::DEBUG, 'No child to kill.');
491
			return;
492
		}
493
494
		$this->logger->log(Psr\Log\LogLevel::INFO, 'Killing child at {child}', array('child' => $this->child));
495
		if(exec('ps -o pid,s -p ' . $this->child, $output, $returnCode) && $returnCode != 1) {
496
			$this->logger->log(Psr\Log\LogLevel::DEBUG, 'Child {child} found, killing.', array('child' => $this->child));
497
			posix_kill($this->child, SIGKILL);
498
			$this->child = null;
499
		}
500
		else {
501
			$this->logger->log(Psr\Log\LogLevel::INFO, 'Child {child} not found, restarting.', array('child' => $this->child));
502
			$this->shutdown();
503
		}
504
	}
505
506
	/**
507
	 * Look for any workers which should be running on this server and if
508
	 * they're not, remove them from Redis.
509
	 *
510
	 * This is a form of garbage collection to handle cases where the
511
	 * server may have been killed and the Resque workers did not die gracefully
512
	 * and therefore leave state information in Redis.
513
	 */
514 15
	public function pruneDeadWorkers()
515
	{
516 15
		$workerPids = $this->workerPids();
517 15
		$workers = self::all();
518 15
		foreach($workers as $worker) {
519 7
			if (is_object($worker)) {
520 7
				list($host, $pid, $queues) = explode(':', (string)$worker, 3);
521 7
				if($host != $this->hostname || in_array($pid, $workerPids) || $pid == getmypid()) {
522 7
					continue;
523
				}
524 2
				$this->logger->log(Psr\Log\LogLevel::INFO, 'Pruning dead worker: {worker}', array('worker' => (string)$worker));
525 2
				$worker->unregisterWorker();
526
			}
527
		}
528 15
	}
529
530
	/**
531
	 * Return an array of process IDs for all of the Resque workers currently
532
	 * running on this machine.
533
	 *
534
	 * @return array Array of Resque worker process IDs.
535
	 */
536 15
	public function workerPids()
537
	{
538 15
		$pids = array();
539 15
		if (strtoupper(substr(PHP_OS, 0, 3)) === 'WIN') {
540
			exec('WMIC path win32_process get Processid,Commandline | findstr resque | findstr /V findstr', $cmdOutput);
541
			foreach($cmdOutput as $line) {
542
				$line = preg_replace('/\s+/m', ' ', $line);
543
				list(,,$pids[]) = explode(' ', trim($line), 3);
544
			}
545
		}
546
		else {
547 15
			exec('ps -A -o pid,args | grep [r]esque', $cmdOutput);
548 15
			foreach($cmdOutput as $line) {
549 15
				list($pids[],) = explode(' ', trim($line), 2);
550
			}
551
		}
552 15
		return $pids;
553
	}
554
555
	/**
556
	 * Register this worker in Redis.
557
	 */
558 60
	public function registerWorker()
559
	{
560 60
		Resque::redis()->sadd('workers', (string)$this);
0 ignored issues
show
Bug introduced by
The method sadd() does not exist on Resque_Redis. Since you implemented __call, consider adding a @method annotation. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-call  annotation

560
		Resque::redis()->/** @scrutinizer ignore-call */ sadd('workers', (string)$this);
Loading history...
561 60
		Resque::redis()->set('worker:' . (string)$this . ':started', date('c'));
0 ignored issues
show
Bug introduced by
The method set() does not exist on Resque_Redis. Since you implemented __call, consider adding a @method annotation. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-call  annotation

561
		Resque::redis()->/** @scrutinizer ignore-call */ set('worker:' . (string)$this . ':started', date('c'));
Loading history...
562 60
	}
563
564
	/**
565
	 * Unregister this worker in Redis. (shutdown etc)
566
	 */
567 17
	public function unregisterWorker()
568
	{
569 17
		if(is_object($this->currentJob)) {
570 1
			$this->currentJob->fail(new Resque_Job_DirtyExitException);
571
		}
572
573 17
		$id = (string)$this;
574 17
		Resque::redis()->srem('workers', $id);
0 ignored issues
show
Bug introduced by
The method srem() does not exist on Resque_Redis. Since you implemented __call, consider adding a @method annotation. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-call  annotation

574
		Resque::redis()->/** @scrutinizer ignore-call */ srem('workers', $id);
Loading history...
575 17
		Resque::redis()->del('worker:' . $id);
0 ignored issues
show
Bug introduced by
The method del() does not exist on Resque_Redis. Since you implemented __call, consider adding a @method annotation. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-call  annotation

575
		Resque::redis()->/** @scrutinizer ignore-call */ del('worker:' . $id);
Loading history...
576 17
		Resque::redis()->del('worker:' . $id . ':started');
577 17
		Resque_Stat::clear('processed:' . $id);
578 17
		Resque_Stat::clear('failed:' . $id);
579 17
	}
580
581
	/**
582
	 * Tell Redis which job we're currently working on.
583
	 *
584
	 * @param object $job Resque_Job instance containing the job we're working on.
585
	 */
586 14
	public function workingOn(Resque_Job $job)
587
	{
588 14
		$job->worker = $this;
589 14
		$this->currentJob = $job;
590 14
		$job->updateStatus(Resque_Job_Status::STATUS_RUNNING);
591 14
		$data = json_encode(array(
592 14
			'queue' => $job->queue,
593 14
			'run_at' => date('c'),
594 14
			'payload' => $job->payload
595
		));
596 14
		Resque::redis()->set('worker:' . $job->worker, $data);
597 14
	}
598
599
	/**
600
	 * Notify Redis that we've finished working on a job, clearing the working
601
	 * state and incrementing the job stats.
602
	 */
603 10
	public function doneWorking()
604
	{
605 10
		$this->currentJob = null;
606 10
		Resque_Stat::incr('processed');
607 10
		Resque_Stat::incr('processed:' . (string)$this);
608 10
		Resque::redis()->del('worker:' . (string)$this);
609 10
	}
610
611
	/**
612
	 * Generate a string representation of this worker.
613
	 *
614
	 * @return string String identifier for this worker instance.
615
	 */
616 63
	public function __toString()
617
	{
618 63
		return $this->id;
619
	}
620
621
	/**
622
	 * Return an object describing the job this worker is currently working on.
623
	 *
624
	 * @return object Object with details of current job.
625
	 */
626 2
	public function job()
627
	{
628 2
		$job = Resque::redis()->get('worker:' . $this);
0 ignored issues
show
Bug introduced by
The method get() does not exist on Resque_Redis. Since you implemented __call, consider adding a @method annotation. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-call  annotation

628
		$job = Resque::redis()->/** @scrutinizer ignore-call */ get('worker:' . $this);
Loading history...
629 2
		if(!$job) {
630 1
			return array();
0 ignored issues
show
Bug Best Practice introduced by
The expression return array() returns the type array which is incompatible with the documented return type object.
Loading history...
631
		}
632
		else {
633 1
			return json_decode($job, true);
634
		}
635
	}
636
637
	/**
638
	 * Get a statistic belonging to this worker.
639
	 *
640
	 * @param string $stat Statistic to fetch.
641
	 * @return int Statistic value.
642
	 */
643 1
	public function getStat($stat)
644
	{
645 1
		return Resque_Stat::get($stat . ':' . $this);
646
	}
647
648
	/**
649
	 * Inject the logging object into the worker
650
	 *
651
	 * @param Psr\Log\LoggerInterface $logger
652
	 */
653 69
	public function setLogger(Psr\Log\LoggerInterface $logger)
654
	{
655 69
		$this->logger = $logger;
0 ignored issues
show
Documentation Bug introduced by
It seems like $logger of type Psr\Log\LoggerInterface is incompatible with the declared type LoggerInterface of property $logger.

Our type inference engine has found an assignment to a property that is incompatible with the declared type of that property.

Either this assignment is in error or the assigned type should be added to the documentation/type hint for that property..

Loading history...
656 69
	}
657
}
658