Test Failed
Pull Request — develop (#54)
by
unknown
02:26
created

ResqueWorker::shutdown()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 4
Code Lines 2

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 1
eloc 2
nc 1
nop 0
dl 0
loc 4
rs 10
c 0
b 0
f 0
1
<?php
2
3
declare(ticks=1);
4
5
namespace Resque\Worker;
6
7
use Resque\Logger;
8
use Resque\Resque;
9
use CredisException;
10
use Psr\Log\LogLevel;
11
use Resque\Job\PID;
12
use Resque\Event;
13
use Resque\Exceptions\DirtyExitException;
14
use Resque\Job\Status;
15
use Resque\JobHandler;
16
use Resque\Stat;
17
use Psr\Log\LoggerInterface;
18
use Exception;
19
use Error;
20
21
/**
22
 * Resque worker that handles checking queues for jobs, fetching them
23
 * off the queues, running them and handling the result.
24
 *
25
 * @package		Resque/Worker
26
 * @author		Chris Boulton <[email protected]>
27
 * @license		http://www.opensource.org/licenses/mit-license.php
28
 */
29
class ResqueWorker
30
{
31
	/**
32
	 * @var string Prefix for the process name
33
	 */
34
	private static $processPrefix = 'resque';
35
36
	/**
37
	* @var \Psr\Log\LoggerInterface Logging object that impliments the PSR-3 LoggerInterface
38
	*/
39
	public $logger;
40
41
	/**
42
	 * @var bool Whether this worker is running in a forked child process.
43
	 */
44
	public $hasParent = false;
45
46
	/**
47
	 * @var array Array of all associated queues for this worker.
48
	 */
49
	private $queues = array();
50
51
	/**
52
	 * @var string The hostname of this worker.
53
	 */
54
	private $hostname;
55
56
	/**
57
	 * @var boolean True if on the next iteration, the worker should shutdown.
58
	 */
59
	private $shutdown = false;
60
61
	/**
62
	 * @var boolean True if this worker is paused.
63
	 */
64
	private $paused = false;
65
66
	/**
67
	 * @var string String identifying this worker.
68
	 */
69
	private $id;
70
71
	/**
72
	 * @var \Resque\JobHandler Current job, if any, being processed by this worker.
73
	 */
74
	private $currentJob = null;
75
76
	/**
77
	 * @var int Process ID of child worker processes.
78
	 */
79
	private $child = null;
80
81
	/**
82
	 * Instantiate a new worker, given a list of queues that it should be working
83
	 * on. The list of queues should be supplied in the priority that they should
84
	 * be checked for jobs (first come, first served)
85
	 *
86
	 * Passing a single '*' allows the worker to work on all queues in alphabetical
87
	 * order. You can easily add new queues dynamically and have them worked on using
88
	 * this method.
89
	 *
90
	 * @param string|array $queues String with a single queue name, array with multiple.
91
	 */
92
	public function __construct($queues)
93
	{
94
		$this->logger = new Logger();
95
96
		if (!is_array($queues)) {
97
			$queues = array($queues);
98
		}
99
100
		$this->queues = $queues;
101
		$this->hostname = php_uname('n');
102
103
		$this->id = $this->hostname . ':' . getmypid() . ':' . implode(',', $this->queues);
104
	}
105
106
	/**
107
	 * Set the process prefix of the workers to the given prefix string.
108
	 * @param string $prefix The new process prefix
109
	 */
110
	public static function setProcessPrefix($prefix)
111
	{
112
		self::$processPrefix = $prefix;
113
	}
114
115
	/**
116
	 * Return all workers known to Resque as instantiated instances.
117
	 * @return array
118
	 */
119
	public static function all()
120
	{
121
		$workers = Resque::redis()->smembers('workers');
0 ignored issues
show
Bug introduced by
The method smembers() does not exist on Resque\Redis. Since you implemented __call, consider adding a @method annotation. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-call  annotation

121
		$workers = Resque::redis()->/** @scrutinizer ignore-call */ smembers('workers');
Loading history...
122
		if (!is_array($workers)) {
123
			$workers = array();
124
		}
125
126
		$instances = array();
127
		foreach ($workers as $workerId) {
128
			$instances[] = self::find($workerId);
129
		}
130
		return $instances;
131
	}
132
133
	/**
134
	 * Given a worker ID, check if it is registered/valid.
135
	 *
136
	 * @param string $workerId ID of the worker.
137
	 * @return boolean True if the worker exists, false if not.
138
	 */
139
	public static function exists($workerId)
140
	{
141
		return (bool)Resque::redis()->sismember('workers', $workerId);
0 ignored issues
show
Bug introduced by
The method sismember() does not exist on Resque\Redis. Since you implemented __call, consider adding a @method annotation. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-call  annotation

141
		return (bool)Resque::redis()->/** @scrutinizer ignore-call */ sismember('workers', $workerId);
Loading history...
142
	}
143
144
	/**
145
	 * Given a worker ID, find it and return an instantiated worker class for it.
146
	 *
147
	 * @param string $workerId The ID of the worker.
148
	 * @return \Resque\Worker\ResqueWorker Instance of the worker. False if the worker does not exist.
149
	 */
150
	public static function find($workerId)
151
	{
152
		if (!self::exists($workerId) || false === strpos($workerId, ":")) {
153
			return false;
0 ignored issues
show
Bug Best Practice introduced by
The expression return false returns the type false which is incompatible with the documented return type Resque\Worker\ResqueWorker.
Loading history...
154
		}
155
156
		list($hostname, $pid, $queues) = explode(':', $workerId, 3);
157
		$queues = explode(',', $queues);
158
		$worker = new self($queues);
159
		$worker->setId($workerId);
160
		return $worker;
161
	}
162
163
	/**
164
	 * Set the ID of this worker to a given ID string.
165
	 *
166
	 * @param string $workerId ID for the worker.
167
	 */
168
	public function setId($workerId)
169
	{
170
		$this->id = $workerId;
171
	}
172
173
	/**
174
	 * The primary loop for a worker which when called on an instance starts
175
	 * the worker's life cycle.
176
	 *
177
	 * Queues are checked every $interval (seconds) for new jobs.
178
	 *
179
	 * @param int $interval How often to check for new jobs across the queues.
180
	 */
181
	public function work($interval = Resque::DEFAULT_INTERVAL, $blocking = false)
182
	{
183
		$this->updateProcLine('Starting');
184
		$this->startup();
185
186
		if (function_exists('pcntl_signal_dispatch')) {
187
			pcntl_signal_dispatch();
188
		}
189
190
		$ready_statuses = array(Status::STATUS_WAITING, Status::STATUS_RUNNING);
191
192
		while (true) {
193
			if ($this->shutdown) {
194
				break;
195
			}
196
197
			// is redis still alive?
198
			try {
199
				if (!$this->paused && Resque::redis()->ping() === false) {
0 ignored issues
show
Bug introduced by
The method ping() does not exist on Resque\Redis. Since you implemented __call, consider adding a @method annotation. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-call  annotation

199
				if (!$this->paused && Resque::redis()->/** @scrutinizer ignore-call */ ping() === false) {
Loading history...
200
					throw new CredisException('redis ping() failed');
201
				}
202
			} catch (CredisException $e) {
203
				$this->logger->log(LogLevel::ERROR, 'redis went away. trying to reconnect');
204
				Resque::$redis = null;
205
				usleep($interval * 1000000);
206
				continue;
207
			}
208
209
			// Attempt to find and reserve a job
210
			$job = false;
211
			if (!$this->paused) {
212
				if ($blocking === true) {
213
					$context = array('interval' => $interval);
214
					$message = 'Starting blocking with timeout of {interval}';
215
					$this->logger->log(LogLevel::INFO, $message, $context);
216
					$this->updateProcLine('Waiting with blocking timeout ' . $interval);
217
				} else {
218
					$this->updateProcLine('Waiting with interval ' . $interval);
219
				}
220
221
				$job = $this->reserve($blocking, $interval);
222
			}
223
224
			if (!$job) {
225
				// For an interval of 0, break now - helps with unit testing etc
226
				if ($interval == 0) {
227
					break;
228
				}
229
230
				if ($blocking === false) {
231
					// If no job was found, we sleep for $interval before continuing and checking again
232
					$context = array('interval' => $interval);
233
					$this->logger->log(LogLevel::INFO, 'Sleeping for {interval}', $context);
234
					if ($this->paused) {
235
						$this->updateProcLine('Paused');
236
					} else {
237
						$this->updateProcLine('Waiting');
238
					}
239
240
					usleep($interval * 1000000);
241
				}
242
243
				continue;
244
			}
245
246
			$context = array('job' => $job);
247
			$this->logger->log(LogLevel::NOTICE, 'Starting work on {job}', $context);
248
			Event::trigger('beforeFork', $job);
249
			$this->workingOn($job);
0 ignored issues
show
Bug introduced by
It seems like $job can also be of type true; however, parameter $job of Resque\Worker\ResqueWorker::workingOn() does only seem to accept Resque\JobHandler, maybe add an additional type check? ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-type  annotation

249
			$this->workingOn(/** @scrutinizer ignore-type */ $job);
Loading history...
250
251
			$this->child = Resque::fork();
252
253
			// Forked and we're the child. Or PCNTL is not installed. Run the job.
254
			if ($this->child === 0 || $this->child === false || $this->child === -1) {
255
				$status = 'Processing ' . $job->queue . ' since ' . strftime('%F %T');
256
				$this->updateProcLine($status);
257
				$this->logger->log(LogLevel::INFO, $status);
258
259
				if (!empty($job->payload['id'])) {
260
					PID::create($job->payload['id']);
261
				}
262
263
				$this->perform($job);
0 ignored issues
show
Bug introduced by
It seems like $job can also be of type true; however, parameter $job of Resque\Worker\ResqueWorker::perform() does only seem to accept Resque\JobHandler, maybe add an additional type check? ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-type  annotation

263
				$this->perform(/** @scrutinizer ignore-type */ $job);
Loading history...
264
265
				if (!empty($job->payload['id'])) {
266
					PID::del($job->payload['id']);
267
				}
268
269
				if ($this->child === 0) {
270
					exit(0);
0 ignored issues
show
Best Practice introduced by
Using exit here is not recommended.

In general, usage of exit should be done with care and only when running in a scripting context like a CLI script.

Loading history...
271
				}
272
			}
273
274
			if ($this->child > 0) {
275
				// Parent process, sit and wait
276
				$status = 'Forked ' . $this->child . ' at ' . strftime('%F %T');
277
				$this->updateProcLine($status);
278
				$this->logger->log(LogLevel::INFO, $status);
279
280
				// Wait until the child process finishes before continuing
281
				while (pcntl_wait($status, WNOHANG) === 0) {
282
					if (function_exists('pcntl_signal_dispatch')) {
283
						pcntl_signal_dispatch();
284
					}
285
286
					// Pause for a half a second to conserve system resources
287
					usleep(500000);
288
				}
289
290
				if (pcntl_wifexited($status) !== true) {
291
					$job->fail(new DirtyExitException('Job exited abnormally'));
292
				} elseif (($exitStatus = pcntl_wexitstatus($status)) !== 0) {
293
					$job->fail(new DirtyExitException(
294
						'Job exited with exit code ' . $exitStatus
295
					));
296
				} else {
297
					if (in_array($job->getStatus(), $ready_statuses)) {
298
						$job->updateStatus(Status::STATUS_COMPLETE);
299
						$this->logger->log(LogLevel::INFO, 'done ' . $job);
0 ignored issues
show
Bug introduced by
Are you sure $job of type object|true can be used in concatenation? ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-type  annotation

299
						$this->logger->log(LogLevel::INFO, 'done ' . /** @scrutinizer ignore-type */ $job);
Loading history...
300
					}
301
				}
302
			}
303
304
			$this->child = null;
305
			$this->doneWorking();
306
		}
307
308
		$this->unregisterWorker();
309
	}
310
311
	/**
312
	 * Process a single job.
313
	 *
314
	 * @param \Resque\JobHandler $job The job to be processed.
315
	 */
316
	public function perform(JobHandler $job)
317
	{
318
		$result = null;
0 ignored issues
show
Unused Code introduced by
The assignment to $result is dead and can be removed.
Loading history...
319
		try {
320
			Event::trigger('afterFork', $job);
321
			$result = $job->perform();
322
		} catch (Exception $e) {
323
			$context = array('job' => $job, 'exception' => $e);
324
			$this->logger->log(LogLevel::CRITICAL, '{job} has failed {exception}', $context);
325
			$job->fail($e);
326
			return;
327
		} catch (Error $e) {
328
			$context = array('job' => $job, 'exception' => $e);
329
			$this->logger->log(LogLevel::CRITICAL, '{job} has failed {exception}', $context);
330
			$job->fail($e);
331
			return;
332
		}
333
334
		$job->updateStatus(Status::STATUS_COMPLETE, $result);
335
		$this->logger->log(LogLevel::NOTICE, '{job} has finished', array('job' => $job));
336
	}
337
338
	/**
339
	 * @param  bool            $blocking
340
	 * @param  int             $timeout
341
	 * @return object|boolean               Instance of Resque\JobHandler if a job is found, false if not.
342
	 */
343
	public function reserve($blocking = false, $timeout = null)
344
	{
345
		if ($this->hasParent && !posix_kill(posix_getppid(), 0)) {
346
			$this->shutdown();
347
			return false;
348
		}
349
350
		$queues = $this->queues();
351
		if (!is_array($queues)) {
0 ignored issues
show
introduced by
The condition is_array($queues) is always true.
Loading history...
352
			return;
353
		}
354
355
		if ($blocking === true) {
356
			if (empty($queues)) {
357
				$context = array('interval' => $timeout);
358
				$this->logger->log(LogLevel::INFO, 'No queue was found, sleeping for {interval}', $context);
359
				usleep($timeout * 1000000);
360
				return false;
361
			}
362
			$job = JobHandler::reserveBlocking($queues, $timeout);
363
			if ($job) {
0 ignored issues
show
introduced by
The condition $job is always false.
Loading history...
364
				$context = array('queue' => $job->queue);
365
				$this->logger->log(LogLevel::INFO, 'Found job on {queue}', $context);
366
				return $job;
367
			}
368
		} else {
369
			foreach ($queues as $queue) {
370
				$context = array('queue' => $queue);
371
				$this->logger->log(LogLevel::INFO, 'Checking {queue} for jobs', $context);
372
				$job = JobHandler::reserve($queue);
373
				if ($job) {
374
					$context = array('queue' => $job->queue);
375
					$this->logger->log(LogLevel::INFO, 'Found job on {queue}', $context);
376
					return $job;
377
				}
378
			}
379
		}
380
381
		return false;
382
	}
383
384
	/**
385
	 * Return an array containing all of the queues that this worker should use
386
	 * when searching for jobs.
387
	 *
388
	 * If * is found in the list of queues, every queue will be searched in
389
	 * alphabetic order. (@see $fetch)
390
	 *
391
	 * @param boolean $fetch If true, and the queue is set to *, will fetch
392
	 * all queue names from redis.
393
	 * @return array Array of associated queues.
394
	 */
395
	public function queues($fetch = true)
396
	{
397
		if (!in_array('*', $this->queues) || $fetch == false) {
0 ignored issues
show
Coding Style Best Practice introduced by
It seems like you are loosely comparing two booleans. Considering using the strict comparison === instead.

When comparing two booleans, it is generally considered safer to use the strict comparison operator.

Loading history...
398
			return $this->queues;
399
		}
400
401
		$queues = Resque::queues();
402
		sort($queues);
403
		return $queues;
404
	}
405
406
	/**
407
	 * Perform necessary actions to start a worker.
408
	 */
409
	private function startup()
410
	{
411
		$this->registerSigHandlers();
412
		$this->pruneDeadWorkers();
413
		Event::trigger('beforeFirstFork', $this);
414
		$this->registerWorker();
415
	}
416
417
	/**
418
	 * On supported systems (with the PECL proctitle module installed), update
419
	 * the name of the currently running process to indicate the current state
420
	 * of a worker.
421
	 *
422
	 * @param string $status The updated process title.
423
	 */
424
	private function updateProcLine($status)
425
	{
426
		$processTitle  = static::$processPrefix . '-' . Resque::VERSION;
0 ignored issues
show
Bug introduced by
Since $processPrefix is declared private, accessing it with static will lead to errors in possible sub-classes; you can either use self, or increase the visibility of $processPrefix to at least protected.
Loading history...
427
		$processTitle .= ' (' . implode(',', $this->queues) . '): ' . $status;
428
		if (function_exists('cli_set_process_title') && PHP_OS !== 'Darwin') {
429
			cli_set_process_title($processTitle);
430
		} elseif (function_exists('setproctitle')) {
431
			setproctitle($processTitle);
432
		}
433
	}
434
435
	/**
436
	 * Register signal handlers that a worker should respond to.
437
	 *
438
	 * TERM: Shutdown immediately and stop processing jobs.
439
	 * INT: Shutdown immediately and stop processing jobs.
440
	 * QUIT: Shutdown after the current job finishes processing.
441
	 * USR1: Kill the forked child immediately and continue processing jobs.
442
	 */
443
	private function registerSigHandlers()
444
	{
445
		if (!function_exists('pcntl_signal')) {
446
			return;
447
		}
448
449
		pcntl_signal(SIGTERM, array($this, 'shutDownNow'));
450
		pcntl_signal(SIGINT, array($this, 'shutDownNow'));
451
		pcntl_signal(SIGQUIT, array($this, 'shutdown'));
452
		pcntl_signal(SIGUSR1, array($this, 'killChild'));
453
		pcntl_signal(SIGUSR2, array($this, 'pauseProcessing'));
454
		pcntl_signal(SIGCONT, array($this, 'unPauseProcessing'));
455
		$this->logger->log(LogLevel::DEBUG, 'Registered signals');
456
	}
457
458
	/**
459
	 * Signal handler callback for USR2, pauses processing of new jobs.
460
	 */
461
	public function pauseProcessing()
462
	{
463
		$this->logger->log(LogLevel::NOTICE, 'USR2 received; pausing job processing');
464
		$this->paused = true;
465
	}
466
467
	/**
468
	 * Signal handler callback for CONT, resumes worker allowing it to pick
469
	 * up new jobs.
470
	 */
471
	public function unPauseProcessing()
472
	{
473
		$this->logger->log(LogLevel::NOTICE, 'CONT received; resuming job processing');
474
		$this->paused = false;
475
	}
476
477
	/**
478
	 * Schedule a worker for shutdown. Will finish processing the current job
479
	 * and when the timeout interval is reached, the worker will shut down.
480
	 */
481
	public function shutdown()
482
	{
483
		$this->shutdown = true;
484
		$this->logger->log(LogLevel::NOTICE, 'Shutting down');
485
	}
486
487
	/**
488
	 * Force an immediate shutdown of the worker, killing any child jobs
489
	 * currently running.
490
	 */
491
	public function shutdownNow()
492
	{
493
		$this->shutdown();
494
		$this->killChild();
495
	}
496
497
	/**
498
	 * @return int Child process PID.
499
	 */
500
	public function getChildPID()
501
	{
502
		return $this->child;
503
	}
504
505
	/**
506
	 * Kill a forked child job immediately. The job it is processing will not
507
	 * be completed.
508
	 */
509
	public function killChild()
510
	{
511
		if (!$this->child) {
512
			$this->logger->log(LogLevel::DEBUG, 'No child to kill.');
513
			return;
514
		}
515
516
		$context = array('child' => $this->child);
517
		$this->logger->log(LogLevel::INFO, 'Killing child at {child}', $context);
518
		if (exec('ps -o pid,s -p ' . $this->child, $output, $returnCode) && $returnCode != 1) {
519
			$context = array('child' => $this->child);
520
			$this->logger->log(LogLevel::DEBUG, 'Child {child} found, killing.', $context);
521
			posix_kill($this->child, SIGKILL);
522
			$this->child = null;
523
		} else {
524
			$context = array('child' => $this->child);
525
			$this->logger->log(LogLevel::INFO, 'Child {child} not found, restarting.', $context);
526
			$this->shutdown();
527
		}
528
	}
529
530
	/**
531
	 * Look for any workers which should be running on this server and if
532
	 * they're not, remove them from Redis.
533
	 *
534
	 * This is a form of garbage collection to handle cases where the
535
	 * server may have been killed and the Resque workers did not die gracefully
536
	 * and therefore leave state information in Redis.
537
	 */
538
	public function pruneDeadWorkers()
539
	{
540
		$workerPids = $this->workerPids();
541
		$workers = self::all();
542
		foreach ($workers as $worker) {
543
			if (is_object($worker)) {
544
				list($host, $pid, $queues) = explode(':', (string)$worker, 3);
545
				if ($host != $this->hostname || in_array($pid, $workerPids) || $pid == getmypid()) {
546
					continue;
547
				}
548
				$context = array('worker' => (string)$worker);
549
				$this->logger->log(LogLevel::INFO, 'Pruning dead worker: {worker}', $context);
550
				$worker->unregisterWorker();
551
			}
552
		}
553
	}
554
555
	/**
556
	 * Return an array of process IDs for all of the Resque workers currently
557
	 * running on this machine.
558
	 *
559
	 * @return array Array of Resque worker process IDs.
560
	 */
561
	public function workerPids()
562
	{
563
		$pids = array();
564
		if (strtoupper(substr(PHP_OS, 0, 3)) === 'WIN') {
565
			exec('WMIC path win32_process get Processid,Commandline | findstr resque | findstr /V findstr', $cmdOutput);
566
			foreach ($cmdOutput as $line) {
567
				$line = preg_replace('/\s+/m', ' ', $line);
568
				list(,,$pids[]) = explode(' ', trim($line), 3);
569
			}
570
		} else {
571
			exec('ps -A -o pid,args | grep [r]esque', $cmdOutput);
572
			foreach ($cmdOutput as $line) {
573
				list($pids[],) = explode(' ', trim($line), 2);
574
			}
575
		}
576
		return $pids;
577
	}
578
579
	/**
580
	 * Register this worker in Redis.
581
	 */
582
	public function registerWorker()
583
	{
584
		Resque::redis()->sadd('workers', (string)$this);
0 ignored issues
show
Bug introduced by
The method sadd() does not exist on Resque\Redis. Since you implemented __call, consider adding a @method annotation. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-call  annotation

584
		Resque::redis()->/** @scrutinizer ignore-call */ sadd('workers', (string)$this);
Loading history...
585
		Resque::redis()->set('worker:' . (string)$this . ':started', date('c'));
0 ignored issues
show
Bug introduced by
The method set() does not exist on Resque\Redis. Since you implemented __call, consider adding a @method annotation. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-call  annotation

585
		Resque::redis()->/** @scrutinizer ignore-call */ set('worker:' . (string)$this . ':started', date('c'));
Loading history...
586
	}
587
588
	/**
589
	 * Unregister this worker in Redis. (shutdown etc)
590
	 */
591
	public function unregisterWorker()
592
	{
593
		if (is_object($this->currentJob)) {
594
			$this->currentJob->fail(new DirtyExitException());
595
		}
596
597
		$id = (string)$this;
598
		Resque::redis()->srem('workers', $id);
0 ignored issues
show
Bug introduced by
The method srem() does not exist on Resque\Redis. Since you implemented __call, consider adding a @method annotation. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-call  annotation

598
		Resque::redis()->/** @scrutinizer ignore-call */ srem('workers', $id);
Loading history...
599
		Resque::redis()->del('worker:' . $id);
0 ignored issues
show
Bug introduced by
The method del() does not exist on Resque\Redis. Since you implemented __call, consider adding a @method annotation. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-call  annotation

599
		Resque::redis()->/** @scrutinizer ignore-call */ del('worker:' . $id);
Loading history...
600
		Resque::redis()->del('worker:' . $id . ':started');
601
		Stat::clear('processed:' . $id);
602
		Stat::clear('failed:' . $id);
603
	}
604
605
	/**
606
	 * Tell Redis which job we're currently working on.
607
	 *
608
	 * @param object $job \Resque\JobHandler instance containing the job we're working on.
609
	 */
610
	public function workingOn(JobHandler $job)
611
	{
612
		$job->worker = $this;
0 ignored issues
show
Documentation Bug introduced by
It seems like $this of type Resque\Worker\ResqueWorker is incompatible with the declared type Resque\Worker\Resque of property $worker.

Our type inference engine has found an assignment to a property that is incompatible with the declared type of that property.

Either this assignment is in error or the assigned type should be added to the documentation/type hint for that property..

Loading history...
613
		$this->currentJob = $job;
614
		$job->updateStatus(Status::STATUS_RUNNING);
615
		$data = json_encode(array(
616
			'queue' => $job->queue,
617
			'run_at' => date('c'),
618
			'payload' => $job->payload
619
		));
620
		Resque::redis()->set('worker:' . $job->worker, $data);
621
	}
622
623
	/**
624
	 * Notify Redis that we've finished working on a job, clearing the working
625
	 * state and incrementing the job stats.
626
	 */
627
	public function doneWorking()
628
	{
629
		$this->currentJob = null;
630
		Stat::incr('processed');
631
		Stat::incr('processed:' . (string)$this);
632
		Resque::redis()->del('worker:' . (string)$this);
633
	}
634
635
	/**
636
	 * Generate a string representation of this worker.
637
	 *
638
	 * @return string String identifier for this worker instance.
639
	 */
640
	public function __toString()
641
	{
642
		return $this->id;
643
	}
644
645
	/**
646
	 * Return an object describing the job this worker is currently working on.
647
	 *
648
	 * @return object Object with details of current job.
649
	 */
650
	public function job()
651
	{
652
		$job = Resque::redis()->get('worker:' . $this);
0 ignored issues
show
Bug introduced by
The method get() does not exist on Resque\Redis. Since you implemented __call, consider adding a @method annotation. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-call  annotation

652
		$job = Resque::redis()->/** @scrutinizer ignore-call */ get('worker:' . $this);
Loading history...
653
		if (!$job) {
654
			return array();
0 ignored issues
show
Bug Best Practice introduced by
The expression return array() returns the type array which is incompatible with the documented return type object.
Loading history...
655
		} else {
656
			return json_decode($job, true);
657
		}
658
	}
659
660
	/**
661
	 * Get a statistic belonging to this worker.
662
	 *
663
	 * @param string $stat Statistic to fetch.
664
	 * @return int Statistic value.
665
	 */
666
	public function getStat($stat)
667
	{
668
		return Stat::get($stat . ':' . $this);
669
	}
670
671
	/**
672
	 * Inject the logging object into the worker
673
	 *
674
	 * @param \Psr\Log\LoggerInterface $logger
675
	 */
676
	public function setLogger(LoggerInterface $logger)
677
	{
678
		$this->logger = $logger;
679
	}
680
}
681