Passed
Push — master ( fdca42...7dd147 )
by Hennik
03:23 queued 10s
created

Resque_Worker::work()   D

Complexity

Conditions 24
Paths 34

Size

Total Lines 127
Code Lines 69

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 37
CRAP Score 72.8655

Importance

Changes 14
Bugs 0 Features 0
Metric Value
cc 24
eloc 69
c 14
b 0
f 0
nc 34
nop 2
dl 0
loc 127
ccs 37
cts 66
cp 0.5606
crap 72.8655
rs 4.1666

How to fix   Long Method    Complexity   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

1
<?php
2 1
declare(ticks = 1);
3
4
/**
5
 * Resque worker that handles checking queues for jobs, fetching them
6
 * off the queues, running them and handling the result.
7
 *
8
 * @package		Resque/Worker
9
 * @author		Chris Boulton <[email protected]>
10
 * @license		http://www.opensource.org/licenses/mit-license.php
11
 */
12
class Resque_Worker
13
{
14
	/**
15
	 * @var string Prefix for the process name
16
	 */
17
	private static $processPrefix = 'resque';
18
19
	/**
20
	* @var LoggerInterface Logging object that impliments the PSR-3 LoggerInterface
0 ignored issues
show
Bug introduced by
The type LoggerInterface was not found. Maybe you did not declare it correctly or list all dependencies?

The issue could also be caused by a filter entry in the build configuration. If the path has been excluded in your configuration, e.g. excluded_paths: ["lib/*"], you can move it to the dependency path list as follows:

filter:
    dependency_paths: ["lib/*"]

For further information see https://scrutinizer-ci.com/docs/tools/php/php-scrutinizer/#list-dependency-paths

Loading history...
21
	*/
22
	public $logger;
23
24
	/**
25
	 * @var bool Whether this worker is running in a forked child process.
26
	 */
27
	public $hasParent = false;
28
29
	/**
30
	 * @var array Array of all associated queues for this worker.
31
	 */
32
	private $queues = array();
33
34
	/**
35
	 * @var string The hostname of this worker.
36
	 */
37
	private $hostname;
38
39
	/**
40
	 * @var boolean True if on the next iteration, the worker should shutdown.
41
	 */
42
	private $shutdown = false;
43
44
	/**
45
	 * @var boolean True if this worker is paused.
46
	 */
47
	private $paused = false;
48
49
	/**
50
	 * @var string String identifying this worker.
51
	 */
52
	private $id;
53
54
	/**
55
	 * @var Resque_Job Current job, if any, being processed by this worker.
56
	 */
57
	private $currentJob = null;
58
59
	/**
60
	 * @var int Process ID of child worker processes.
61
	 */
62
	private $child = null;
63
64
    /**
65
     * Instantiate a new worker, given a list of queues that it should be working
66
     * on. The list of queues should be supplied in the priority that they should
67
     * be checked for jobs (first come, first served)
68
     *
69
     * Passing a single '*' allows the worker to work on all queues in alphabetical
70
     * order. You can easily add new queues dynamically and have them worked on using
71
     * this method.
72
     *
73
     * @param string|array $queues String with a single queue name, array with multiple.
74
     */
75 69
    public function __construct($queues)
76
    {
77 69
        $this->logger = new Resque_Log();
0 ignored issues
show
Documentation Bug introduced by
It seems like new Resque_Log() of type Resque_Log is incompatible with the declared type LoggerInterface of property $logger.

Our type inference engine has found an assignment to a property that is incompatible with the declared type of that property.

Either this assignment is in error or the assigned type should be added to the documentation/type hint for that property..

Loading history...
78
79 69
        if(!is_array($queues)) {
80 67
            $queues = array($queues);
81
        }
82
83 69
        $this->queues = $queues;
84 69
        $this->hostname = php_uname('n');
85
86 69
        $this->id = $this->hostname . ':'.getmypid() . ':' . implode(',', $this->queues);
87 69
    }
88
89
	/**
90
	 * Set the process prefix of the workers to the given prefix string.
91
	 * @param string $prefix The new process prefix
92
	 */
93
	public static function setProcessPrefix($prefix)
94
	{
95
		self::$processPrefix = $prefix;
96
	}
97
98
	/**
99
	 * Return all workers known to Resque as instantiated instances.
100
	 * @return array
101
	 */
102 17
	public static function all()
103
	{
104 17
		$workers = Resque::redis()->smembers('workers');
0 ignored issues
show
Bug introduced by
The method smembers() does not exist on Resque_Redis. Since you implemented __call, consider adding a @method annotation. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-call  annotation

104
		$workers = Resque::redis()->/** @scrutinizer ignore-call */ smembers('workers');
Loading history...
105 17
		if(!is_array($workers)) {
106
			$workers = array();
107
		}
108
109 17
		$instances = array();
110 17
		foreach($workers as $workerId) {
111 8
			$instances[] = self::find($workerId);
112
		}
113 17
		return $instances;
114
	}
115
116
	/**
117
	 * Given a worker ID, check if it is registered/valid.
118
	 *
119
	 * @param string $workerId ID of the worker.
120
	 * @return boolean True if the worker exists, false if not.
121
	 */
122 11
	public static function exists($workerId)
123
	{
124 11
		return (bool)Resque::redis()->sismember('workers', $workerId);
0 ignored issues
show
Bug introduced by
The method sismember() does not exist on Resque_Redis. Since you implemented __call, consider adding a @method annotation. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-call  annotation

124
		return (bool)Resque::redis()->/** @scrutinizer ignore-call */ sismember('workers', $workerId);
Loading history...
125
	}
126
127
	/**
128
	 * Given a worker ID, find it and return an instantiated worker class for it.
129
	 *
130
	 * @param string $workerId The ID of the worker.
131
	 * @return Resque_Worker Instance of the worker. False if the worker does not exist.
132
	 */
133 9
	public static function find($workerId)
134
	{
135 9
		if(!self::exists($workerId) || false === strpos($workerId, ":")) {
136
			return false;
0 ignored issues
show
Bug Best Practice introduced by
The expression return false returns the type false which is incompatible with the documented return type Resque_Worker.
Loading history...
137
		}
138
139 9
		list($hostname, $pid, $queues) = explode(':', $workerId, 3);
140 9
		$queues = explode(',', $queues);
141 9
		$worker = new self($queues);
142 9
		$worker->setId($workerId);
143 9
		return $worker;
144
	}
145
146
	/**
147
	 * Set the ID of this worker to a given ID string.
148
	 *
149
	 * @param string $workerId ID for the worker.
150
	 */
151 9
	public function setId($workerId)
152
	{
153 9
		$this->id = $workerId;
154 9
	}
155
156
	/**
157
	 * The primary loop for a worker which when called on an instance starts
158
	 * the worker's life cycle.
159
	 *
160
	 * Queues are checked every $interval (seconds) for new jobs.
161
	 *
162
	 * @param int $interval How often to check for new jobs across the queues.
163
	 */
164 13
	public function work($interval = Resque::DEFAULT_INTERVAL, $blocking = false)
165
	{
166 13
		$this->updateProcLine('Starting');
167 13
		$this->startup();
168
169 13
        if(function_exists('pcntl_signal_dispatch')) {
170 13
            pcntl_signal_dispatch();
171
        }
172
173 13
		while(true) {
174 13
			if($this->shutdown) {
175
				break;
176
			}
177
178
			// is redis still alive?
179
			try {
180 13
			    if (Resque::redis()->ping() === false) {
0 ignored issues
show
Bug introduced by
The method ping() does not exist on Resque_Redis. Since you implemented __call, consider adding a @method annotation. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-call  annotation

180
			    if (Resque::redis()->/** @scrutinizer ignore-call */ ping() === false) {
Loading history...
181
			        throw new CredisException('redis ping() failed');
182
			    }
183
			} catch (CredisException $e) {
184
			    $this->logger->log(Psr\Log\LogLevel::ERROR, 'redis went away. trying to reconnect');
185
			    Resque::$redis = null;
186
			    usleep($interval * 1000000);
187
			    continue;
188
			}
189
190
			// Attempt to find and reserve a job
191 13
			$job = false;
192 13
			if(!$this->paused) {
193 12
				if($blocking === true) {
194
					$this->logger->log(Psr\Log\LogLevel::INFO, 'Starting blocking with timeout of {interval}', array('interval' => $interval));
195
					$this->updateProcLine('Waiting with blocking timeout ' . $interval);
196
				} else {
197 12
					$this->updateProcLine('Waiting with interval ' . $interval);
198
				}
199
200 12
				$job = $this->reserve($blocking, $interval);
201
			}
202
203 13
			if(!$job) {
204
				// For an interval of 0, break now - helps with unit testing etc
205 13
				if($interval == 0) {
206 13
					break;
207
				}
208
209
				if($blocking === false)
210
				{
211
					// If no job was found, we sleep for $interval before continuing and checking again
212
					$this->logger->log(Psr\Log\LogLevel::INFO, 'Sleeping for {interval}', array('interval' => $interval));
213
					if($this->paused) {
214
						$this->updateProcLine('Paused');
215
					}
216
					else {
217
						$this->updateProcLine('Waiting');
218
					}
219
220
					usleep($interval * 1000000);
221
				}
222
223
				continue;
224
			}
225
226 9
			$this->logger->log(Psr\Log\LogLevel::NOTICE, 'Starting work on {job}', array('job' => $job));
227 9
			Resque_Event::trigger('beforeFork', $job);
228 9
			$this->workingOn($job);
0 ignored issues
show
Bug introduced by
It seems like $job can also be of type true; however, parameter $job of Resque_Worker::workingOn() does only seem to accept Resque_Job, maybe add an additional type check? ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-type  annotation

228
			$this->workingOn(/** @scrutinizer ignore-type */ $job);
Loading history...
229
230 9
			$this->child = Resque::fork();
231
232
			// Forked and we're the child. Or PCNTL is not installed. Run the job.
233 9
			if ($this->child === 0 || $this->child === false || $this->child === -1) {
234
				$status = 'Processing ' . $job->queue . ' since ' . strftime('%F %T');
235
				$this->updateProcLine($status);
236
				$this->logger->log(Psr\Log\LogLevel::INFO, $status);
237
238
				if(!empty($job->payload['id'])) {
239
					Resque_Job_PID::create($job->payload['id']);
240
				}
241
242
				$this->perform($job);
0 ignored issues
show
Bug introduced by
It seems like $job can also be of type true; however, parameter $job of Resque_Worker::perform() does only seem to accept Resque_Job, maybe add an additional type check? ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-type  annotation

242
				$this->perform(/** @scrutinizer ignore-type */ $job);
Loading history...
243
244
				if(!empty($job->payload['id'])) {
245
					Resque_Job_PID::del($job->payload['id']);
246
				}
247
248
				if ($this->child === 0) {
249
					exit(0);
0 ignored issues
show
Best Practice introduced by
Using exit here is not recommended.

In general, usage of exit should be done with care and only when running in a scripting context like a CLI script.

Loading history...
250
				}
251
			}
252
253 9
			if($this->child > 0) {
254
				// Parent process, sit and wait
255 9
				$status = 'Forked ' . $this->child . ' at ' . strftime('%F %T');
256 9
				$this->updateProcLine($status);
257 9
				$this->logger->log(Psr\Log\LogLevel::INFO, $status);
258
259
				// Wait until the child process finishes before continuing
260 9
				while (pcntl_wait($status, WNOHANG) === 0) {
261 9
					if(function_exists('pcntl_signal_dispatch')) {
262 9
			            pcntl_signal_dispatch();
263
			        }
264
265
					// Pause for a half a second to conserve system resources
266 9
					usleep(500000);
267
				}
268
269 9
				if (pcntl_wifexited($status) !== true) {
270
					$job->fail(new Resque_Job_DirtyExitException('Job exited abnormally'));
271 9
				} elseif (($exitStatus = pcntl_wexitstatus($status)) !== 0) {
272
					$job->fail(new Resque_Job_DirtyExitException(
273
						'Job exited with exit code ' . $exitStatus
274
					));
275
				}
276
				else
277
				{
278 9
					if (in_array($job->getStatus(), array(Resque_Job_Status::STATUS_WAITING, Resque_Job_Status::STATUS_RUNNING)))
279
					{
280 1
						$job->updateStatus(Resque_Job_Status::STATUS_COMPLETE);
281 1
						$this->logger->log(Psr\Log\LogLevel::INFO, 'done ' . $job);
0 ignored issues
show
Bug introduced by
Are you sure $job of type object|true can be used in concatenation? ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-type  annotation

281
						$this->logger->log(Psr\Log\LogLevel::INFO, 'done ' . /** @scrutinizer ignore-type */ $job);
Loading history...
282
					}
283
				}
284
			}
285
286 9
			$this->child = null;
287 9
			$this->doneWorking();
288
		}
289
290 13
		$this->unregisterWorker();
291 13
	}
292
293
	/**
294
	 * Process a single job.
295
	 *
296
	 * @param Resque_Job $job The job to be processed.
297
	 */
298 5
	public function perform(Resque_Job $job)
299
	{
300 5
		$result = null;
0 ignored issues
show
Unused Code introduced by
The assignment to $result is dead and can be removed.
Loading history...
301
		try {
302 5
			Resque_Event::trigger('afterFork', $job);
303 5
			$result = $job->perform();
304
		}
305 1
		catch(Exception $e) {
306 1
			$this->logger->log(Psr\Log\LogLevel::CRITICAL, '{job} has failed {exception}', array('job' => $job, 'exception' => $e));
307 1
			$job->fail($e);
308 1
			return;
309
		}
310
		catch(Error $e) {
311
			$this->logger->log(Psr\Log\LogLevel::CRITICAL, '{job} has failed {exception}', array('job' => $job, 'exception' => $e));
312
			$job->fail($e);
313
			return;
314
		}
315
316 4
		$job->updateStatus(Resque_Job_Status::STATUS_COMPLETE, $result);
317 4
		$this->logger->log(Psr\Log\LogLevel::NOTICE, '{job} has finished', array('job' => $job));
318 4
	}
319
320
	/**
321
	 * @param  bool            $blocking
322
	 * @param  int             $timeout
323
	 * @return object|boolean               Instance of Resque_Job if a job is found, false if not.
324
	 */
325 22
	public function reserve($blocking = false, $timeout = null)
326
	{
327 22
		if ($this->hasParent && !posix_kill(posix_getppid(), 0))
328
		{
329
			$this->shutdown();
330
			return false;
331
		}
332
333 22
		$queues = $this->queues();
334 22
		if(!is_array($queues)) {
0 ignored issues
show
introduced by
The condition is_array($queues) is always true.
Loading history...
335
			return;
336
		}
337
338 22
		if($blocking === true) {
339 1
		    if(empty($queues)){
340
                $this->logger->log(Psr\Log\LogLevel::INFO, 'No queue was found, sleeping for {interval}', array('interval' => $timeout));
341
                usleep($timeout * 1000000);
342
                return false;
343
            }
344 1
			$job = Resque_Job::reserveBlocking($queues, $timeout);
345 1
			if($job) {
0 ignored issues
show
introduced by
The condition $job is always false.
Loading history...
346 1
				$this->logger->log(Psr\Log\LogLevel::INFO, 'Found job on {queue}', array('queue' => $job->queue));
347 1
				return $job;
348
			}
349
		} else {
350 21
			foreach($queues as $queue) {
351 21
				$this->logger->log(Psr\Log\LogLevel::INFO, 'Checking {queue} for jobs', array('queue' => $queue));
352 21
				$job = Resque_Job::reserve($queue);
353 21
				if($job) {
354 17
					$this->logger->log(Psr\Log\LogLevel::INFO, 'Found job on {queue}', array('queue' => $job->queue));
355 17
					return $job;
356
				}
357
			}
358
		}
359
360 13
		return false;
361
	}
362
363
	/**
364
	 * Return an array containing all of the queues that this worker should use
365
	 * when searching for jobs.
366
	 *
367
	 * If * is found in the list of queues, every queue will be searched in
368
	 * alphabetic order. (@see $fetch)
369
	 *
370
	 * @param boolean $fetch If true, and the queue is set to *, will fetch
371
	 * all queue names from redis.
372
	 * @return array Array of associated queues.
373
	 */
374 22
	public function queues($fetch = true)
375
	{
376 22
		if(!in_array('*', $this->queues) || $fetch == false) {
0 ignored issues
show
Coding Style Best Practice introduced by
It seems like you are loosely comparing two booleans. Considering using the strict comparison === instead.

When comparing two booleans, it is generally considered safer to use the strict comparison operator.

Loading history...
377 20
			return $this->queues;
378
		}
379
380 2
		$queues = Resque::queues();
381 2
		sort($queues);
382 2
		return $queues;
383
	}
384
385
	/**
386
	 * Perform necessary actions to start a worker.
387
	 */
388 13
	private function startup()
389
	{
390 13
		$this->registerSigHandlers();
391 13
		$this->pruneDeadWorkers();
392 13
		Resque_Event::trigger('beforeFirstFork', $this);
393 13
		$this->registerWorker();
394 13
	}
395
396
	/**
397
	 * On supported systems (with the PECL proctitle module installed), update
398
	 * the name of the currently running process to indicate the current state
399
	 * of a worker.
400
	 *
401
	 * @param string $status The updated process title.
402
	 */
403 13
	private function updateProcLine($status)
404
	{
405 13
		$processTitle = static::$processPrefix . '-' . Resque::VERSION . ' (' . implode(',', $this->queues) . '): ' . $status;
0 ignored issues
show
Bug introduced by
Since $processPrefix is declared private, accessing it with static will lead to errors in possible sub-classes; you can either use self, or increase the visibility of $processPrefix to at least protected.
Loading history...
406 13
		if(function_exists('cli_set_process_title') && PHP_OS !== 'Darwin') {
407 13
			cli_set_process_title($processTitle);
408
		}
409
		else if(function_exists('setproctitle')) {
410
			setproctitle($processTitle);
411
		}
412 13
	}
413
414
	/**
415
	 * Register signal handlers that a worker should respond to.
416
	 *
417
	 * TERM: Shutdown immediately and stop processing jobs.
418
	 * INT: Shutdown immediately and stop processing jobs.
419
	 * QUIT: Shutdown after the current job finishes processing.
420
	 * USR1: Kill the forked child immediately and continue processing jobs.
421
	 */
422 13
	private function registerSigHandlers()
423
	{
424 13
		if(!function_exists('pcntl_signal')) {
425
			return;
426
		}
427
428 13
		pcntl_signal(SIGTERM, array($this, 'shutDownNow'));
429 13
		pcntl_signal(SIGINT, array($this, 'shutDownNow'));
430 13
		pcntl_signal(SIGQUIT, array($this, 'shutdown'));
431 13
		pcntl_signal(SIGUSR1, array($this, 'killChild'));
432 13
		pcntl_signal(SIGUSR2, array($this, 'pauseProcessing'));
433 13
		pcntl_signal(SIGCONT, array($this, 'unPauseProcessing'));
434 13
		$this->logger->log(Psr\Log\LogLevel::DEBUG, 'Registered signals');
435 13
	}
436
437
	/**
438
	 * Signal handler callback for USR2, pauses processing of new jobs.
439
	 */
440 2
	public function pauseProcessing()
441
	{
442 2
		$this->logger->log(Psr\Log\LogLevel::NOTICE, 'USR2 received; pausing job processing');
443 2
		$this->paused = true;
444 2
	}
445
446
	/**
447
	 * Signal handler callback for CONT, resumes worker allowing it to pick
448
	 * up new jobs.
449
	 */
450 1
	public function unPauseProcessing()
451
	{
452 1
		$this->logger->log(Psr\Log\LogLevel::NOTICE, 'CONT received; resuming job processing');
453 1
		$this->paused = false;
454 1
	}
455
456
	/**
457
	 * Schedule a worker for shutdown. Will finish processing the current job
458
	 * and when the timeout interval is reached, the worker will shut down.
459
	 */
460
	public function shutdown()
461
	{
462
		$this->shutdown = true;
463
		$this->logger->log(Psr\Log\LogLevel::NOTICE, 'Shutting down');
464
	}
465
466
	/**
467
	 * Force an immediate shutdown of the worker, killing any child jobs
468
	 * currently running.
469
	 */
470
	public function shutdownNow()
471
	{
472
		$this->shutdown();
473
		$this->killChild();
474
	}
475
476
	/**
477
	 * @return int Child process PID.
478
	 */
479
	public function getChildPID() {
480
		return $this->child;
481
	}
482
483
	/**
484
	 * Kill a forked child job immediately. The job it is processing will not
485
	 * be completed.
486
	 */
487
	public function killChild()
488
	{
489
		if(!$this->child) {
490
			$this->logger->log(Psr\Log\LogLevel::DEBUG, 'No child to kill.');
491
			return;
492
		}
493
494
		$this->logger->log(Psr\Log\LogLevel::INFO, 'Killing child at {child}', array('child' => $this->child));
495
		if(exec('ps -o pid,s -p ' . $this->child, $output, $returnCode) && $returnCode != 1) {
496
			$this->logger->log(Psr\Log\LogLevel::DEBUG, 'Child {child} found, killing.', array('child' => $this->child));
497
			posix_kill($this->child, SIGKILL);
498
			$this->child = null;
499
		}
500
		else {
501
			$this->logger->log(Psr\Log\LogLevel::INFO, 'Child {child} not found, restarting.', array('child' => $this->child));
502
			$this->shutdown();
503
		}
504
	}
505
506
	/**
507
	 * Look for any workers which should be running on this server and if
508
	 * they're not, remove them from Redis.
509
	 *
510
	 * This is a form of garbage collection to handle cases where the
511
	 * server may have been killed and the Resque workers did not die gracefully
512
	 * and therefore leave state information in Redis.
513
	 */
514 15
	public function pruneDeadWorkers()
515
	{
516 15
		$workerPids = $this->workerPids();
517 15
		$workers = self::all();
518 15
		foreach($workers as $worker) {
519 7
			if (is_object($worker)) {
520 7
				list($host, $pid, $queues) = explode(':', (string)$worker, 3);
521 7
				if($host != $this->hostname || in_array($pid, $workerPids) || $pid == getmypid()) {
522 7
					continue;
523
				}
524 2
				$this->logger->log(Psr\Log\LogLevel::INFO, 'Pruning dead worker: {worker}', array('worker' => (string)$worker));
525 2
				$worker->unregisterWorker();
526
			}
527
		}
528 15
	}
529
530
	/**
531
	 * Return an array of process IDs for all of the Resque workers currently
532
	 * running on this machine.
533
	 *
534
	 * @return array Array of Resque worker process IDs.
535
	 */
536 15
	public function workerPids()
537
	{
538 15
		$pids = array();
539 15
		if (strtoupper(substr(PHP_OS, 0, 3)) === 'WIN') {
540
			exec('WMIC path win32_process get Processid,Commandline | findstr resque | findstr /V findstr', $cmdOutput);
541
			foreach($cmdOutput as $line) {
542
				$line = preg_replace('/\s+/m', ' ', $line);
543
				list(,,$pids[]) = explode(' ', trim($line), 3);
544
			}
545
		}
546
		else {
547 15
			exec('ps -A -o pid,args | grep [r]esque', $cmdOutput);
548 15
			foreach($cmdOutput as $line) {
549 15
				list($pids[],) = explode(' ', trim($line), 2);
550
			}
551
		}
552 15
		return $pids;
553
	}
554
555
	/**
556
	 * Register this worker in Redis.
557
	 */
558 60
	public function registerWorker()
559
	{
560 60
		Resque::redis()->sadd('workers', (string)$this);
0 ignored issues
show
Bug introduced by
The method sadd() does not exist on Resque_Redis. Since you implemented __call, consider adding a @method annotation. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-call  annotation

560
		Resque::redis()->/** @scrutinizer ignore-call */ sadd('workers', (string)$this);
Loading history...
561 60
		Resque::redis()->set('worker:' . (string)$this . ':started', date('c'));
0 ignored issues
show
Bug introduced by
The method set() does not exist on Resque_Redis. Since you implemented __call, consider adding a @method annotation. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-call  annotation

561
		Resque::redis()->/** @scrutinizer ignore-call */ set('worker:' . (string)$this . ':started', date('c'));
Loading history...
562 60
	}
563
564
	/**
565
	 * Unregister this worker in Redis. (shutdown etc)
566
	 */
567 17
	public function unregisterWorker()
568
	{
569 17
		if(is_object($this->currentJob)) {
570 1
			$this->currentJob->fail(new Resque_Job_DirtyExitException);
571
		}
572
573 17
		$id = (string)$this;
574 17
		Resque::redis()->srem('workers', $id);
0 ignored issues
show
Bug introduced by
The method srem() does not exist on Resque_Redis. Since you implemented __call, consider adding a @method annotation. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-call  annotation

574
		Resque::redis()->/** @scrutinizer ignore-call */ srem('workers', $id);
Loading history...
575 17
		Resque::redis()->del('worker:' . $id);
0 ignored issues
show
Bug introduced by
The method del() does not exist on Resque_Redis. Since you implemented __call, consider adding a @method annotation. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-call  annotation

575
		Resque::redis()->/** @scrutinizer ignore-call */ del('worker:' . $id);
Loading history...
576 17
		Resque::redis()->del('worker:' . $id . ':started');
577 17
		Resque_Stat::clear('processed:' . $id);
578 17
		Resque_Stat::clear('failed:' . $id);
579 17
	}
580
581
	/**
582
	 * Tell Redis which job we're currently working on.
583
	 *
584
	 * @param object $job Resque_Job instance containing the job we're working on.
585
	 */
586 14
	public function workingOn(Resque_Job $job)
587
	{
588 14
		$job->worker = $this;
589 14
		$this->currentJob = $job;
590 14
		$job->updateStatus(Resque_Job_Status::STATUS_RUNNING);
591 14
		$data = json_encode(array(
592 14
			'queue' => $job->queue,
593 14
			'run_at' => date('c'),
594 14
			'payload' => $job->payload
595
		));
596 14
		Resque::redis()->set('worker:' . $job->worker, $data);
597 14
	}
598
599
	/**
600
	 * Notify Redis that we've finished working on a job, clearing the working
601
	 * state and incrementing the job stats.
602
	 */
603 10
	public function doneWorking()
604
	{
605 10
		$this->currentJob = null;
606 10
		Resque_Stat::incr('processed');
607 10
		Resque_Stat::incr('processed:' . (string)$this);
608 10
		Resque::redis()->del('worker:' . (string)$this);
609 10
	}
610
611
	/**
612
	 * Generate a string representation of this worker.
613
	 *
614
	 * @return string String identifier for this worker instance.
615
	 */
616 63
	public function __toString()
617
	{
618 63
		return $this->id;
619
	}
620
621
	/**
622
	 * Return an object describing the job this worker is currently working on.
623
	 *
624
	 * @return object Object with details of current job.
625
	 */
626 2
	public function job()
627
	{
628 2
		$job = Resque::redis()->get('worker:' . $this);
0 ignored issues
show
Bug introduced by
The method get() does not exist on Resque_Redis. Since you implemented __call, consider adding a @method annotation. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-call  annotation

628
		$job = Resque::redis()->/** @scrutinizer ignore-call */ get('worker:' . $this);
Loading history...
629 2
		if(!$job) {
630 1
			return array();
0 ignored issues
show
Bug Best Practice introduced by
The expression return array() returns the type array which is incompatible with the documented return type object.
Loading history...
631
		}
632
		else {
633 1
			return json_decode($job, true);
634
		}
635
	}
636
637
	/**
638
	 * Get a statistic belonging to this worker.
639
	 *
640
	 * @param string $stat Statistic to fetch.
641
	 * @return int Statistic value.
642
	 */
643 1
	public function getStat($stat)
644
	{
645 1
		return Resque_Stat::get($stat . ':' . $this);
646
	}
647
648
	/**
649
	 * Inject the logging object into the worker
650
	 *
651
	 * @param Psr\Log\LoggerInterface $logger
652
	 */
653 69
	public function setLogger(Psr\Log\LoggerInterface $logger)
654
	{
655 69
		$this->logger = $logger;
0 ignored issues
show
Documentation Bug introduced by
It seems like $logger of type Psr\Log\LoggerInterface is incompatible with the declared type LoggerInterface of property $logger.

Our type inference engine has found an assignment to a property that is incompatible with the declared type of that property.

Either this assignment is in error or the assigned type should be added to the documentation/type hint for that property..

Loading history...
656 69
	}
657
}
658