Test Failed
Pull Request — master (#56)
by
unknown
02:50
created

ResqueWorker::reserve()   B

Complexity

Conditions 9
Paths 7

Size

Total Lines 35
Code Lines 23

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 9
eloc 23
nc 7
nop 2
dl 0
loc 35
rs 8.0555
c 0
b 0
f 0
1
<?php
2
3
declare(ticks=1);
4
5
namespace Resque\Worker;
6
7
use \Resque\Logger;
8
use \Resque\Resque;
9
use \CredisException;
10
use \Psr\Log\LogLevel;
11
use \Resque\Job\PID;
12
use \Resque\Event;
13
use \Resque\Exceptions\DirtyExitException;
14
use \Resque\Job\Status;
15
use \Resque\JobHandler;
16
use \Resque\Stat;
17
use \Psr\Log\LoggerInterface;
18
use \Exception as CoreException;
19
use \Error;
20
21
/**
22
 * Resque worker that handles checking queues for jobs, fetching them
23
 * off the queues, running them and handling the result.
24
 *
25
 * @package		Resque/Worker
26
 * @author		Chris Boulton <[email protected]>
27
 * @license		http://www.opensource.org/licenses/mit-license.php
28
 */
29
class ResqueWorker
30
{
31
	/**
32
	 * @var string Prefix for the process name
33
	 */
34
	private static $processPrefix = 'resque';
35
36
	/**
37
	* @var \Psr\Log\LoggerInterface Logging object that impliments the PSR-3 LoggerInterface
38
	*/
39
	public $logger;
40
41
	/**
42
	 * @var bool Whether this worker is running in a forked child process.
43
	 */
44
	public $hasParent = false;
45
46
	/**
47
	 * @var array Array of all associated queues for this worker.
48
	 */
49
	private $queues = array();
50
51
	/**
52
	 * @var string The hostname of this worker.
53
	 */
54
	private $hostname;
55
56
	/**
57
	 * @var boolean True if on the next iteration, the worker should shutdown.
58
	 */
59
	private $shutdown = false;
60
61
	/**
62
	 * @var boolean True if this worker is paused.
63
	 */
64
	private $paused = false;
65
66
	/**
67
	 * @var string String identifying this worker.
68
	 */
69
	private $id;
70
71
	/**
72
	 * @var \Resque\JobHandler Current job, if any, being processed by this worker.
73
	 */
74
	private $currentJob = null;
75
76
	/**
77
	 * @var int Process ID of child worker processes.
78
	 */
79
	private $child = null;
80
81
	/**
82
	 * Instantiate a new worker, given a list of queues that it should be working
83
	 * on. The list of queues should be supplied in the priority that they should
84
	 * be checked for jobs (first come, first served)
85
	 *
86
	 * Passing a single '*' allows the worker to work on all queues in alphabetical
87
	 * order. You can easily add new queues dynamically and have them worked on using
88
	 * this method.
89
	 *
90
	 * @param string|array $queues String with a single queue name, array with multiple.
91
	 */
92
	public function __construct($queues)
93
	{
94
		$this->logger = new Logger();
95
96
		if (!is_array($queues)) {
97
			$queues = array($queues);
98
		}
99
100
		$this->queues = $queues;
101
		$this->hostname = php_uname('n');
102
103
		$this->id = $this->hostname . ':' . getmypid() . ':' . implode(',', $this->queues);
104
	}
105
106
	/**
107
	 * Set the process prefix of the workers to the given prefix string.
108
	 * @param string $prefix The new process prefix
109
	 */
110
	public static function setProcessPrefix($prefix)
111
	{
112
		self::$processPrefix = $prefix;
113
	}
114
115
	/**
116
	 * Return all workers known to Resque as instantiated instances.
117
	 * @return array
118
	 */
119
	public static function all()
120
	{
121
		$workers = Resque::redis()->smembers('workers');
0 ignored issues
show
Bug introduced by
The method smembers() does not exist on Resque\Redis. Since you implemented __call, consider adding a @method annotation. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-call  annotation

121
		$workers = Resque::redis()->/** @scrutinizer ignore-call */ smembers('workers');
Loading history...
122
		if (!is_array($workers)) {
123
			$workers = array();
124
		}
125
126
		$instances = array();
127
		foreach ($workers as $workerId) {
128
			$instances[] = self::find($workerId);
129
		}
130
		return $instances;
131
	}
132
133
	/**
134
	 * Given a worker ID, check if it is registered/valid.
135
	 *
136
	 * @param string $workerId ID of the worker.
137
	 * @return boolean True if the worker exists, false if not.
138
	 */
139
	public static function exists($workerId)
140
	{
141
		return (bool)Resque::redis()->sismember('workers', $workerId);
0 ignored issues
show
Bug introduced by
The method sismember() does not exist on Resque\Redis. Since you implemented __call, consider adding a @method annotation. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-call  annotation

141
		return (bool)Resque::redis()->/** @scrutinizer ignore-call */ sismember('workers', $workerId);
Loading history...
142
	}
143
144
	/**
145
	 * Given a worker ID, find it and return an instantiated worker class for it.
146
	 *
147
	 * @param string $workerId The ID of the worker.
148
	 * @return \Resque\Worker\ResqueWorker Instance of the worker. False if the worker does not exist.
149
	 */
150
	public static function find($workerId)
151
	{
152
		if (!self::exists($workerId) || false === strpos($workerId, ":")) {
153
			return false;
0 ignored issues
show
Bug Best Practice introduced by
The expression return false returns the type false which is incompatible with the documented return type Resque\Worker\ResqueWorker.
Loading history...
154
		}
155
156
		list($hostname, $pid, $queues) = explode(':', $workerId, 3);
157
		$queues = explode(',', $queues);
158
		$worker = new self($queues);
159
		$worker->setId($workerId);
160
		return $worker;
161
	}
162
163
	/**
164
	 * Set the ID of this worker to a given ID string.
165
	 *
166
	 * @param string $workerId ID for the worker.
167
	 */
168
	public function setId($workerId)
169
	{
170
		$this->id = $workerId;
171
	}
172
173
	/**
174
	 * The primary loop for a worker which when called on an instance starts
175
	 * the worker's life cycle.
176
	 *
177
	 * Queues are checked every $interval (seconds) for new jobs.
178
	 *
179
	 * @param int $interval How often to check for new jobs across the queues.
180
	 */
181
	public function work($interval = Resque::DEFAULT_INTERVAL, $blocking = false)
182
	{
183
		$this->updateProcLine('Starting');
184
		$this->startup();
185
186
		if (function_exists('pcntl_signal_dispatch')) {
187
			pcntl_signal_dispatch();
188
		}
189
190
		while (true) {
191
			if ($this->shutdown) {
192
				break;
193
			}
194
195
			// is redis still alive?
196
			try {
197
				if (!$this->paused && Resque::redis()->ping() === false) {
0 ignored issues
show
Bug introduced by
The method ping() does not exist on Resque\Redis. Since you implemented __call, consider adding a @method annotation. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-call  annotation

197
				if (!$this->paused && Resque::redis()->/** @scrutinizer ignore-call */ ping() === false) {
Loading history...
198
					throw new CredisException('redis ping() failed');
199
				}
200
			} catch (CredisException $e) {
201
				$this->logger->log(LogLevel::ERROR, 'redis went away. trying to reconnect');
202
				Resque::$redis = null;
203
				usleep($interval * 1000000);
204
				continue;
205
			}
206
207
			// Attempt to find and reserve a job
208
			$job = false;
209
			if (!$this->paused) {
210
				if ($blocking === true) {
211
					$this->logger->log(LogLevel::INFO, 'Starting blocking with timeout of {interval}', array('interval' => $interval));
212
					$this->updateProcLine('Waiting with blocking timeout ' . $interval);
213
				} else {
214
					$this->updateProcLine('Waiting with interval ' . $interval);
215
				}
216
217
				$job = $this->reserve($blocking, $interval);
218
			}
219
220
			if (!$job) {
221
				// For an interval of 0, break now - helps with unit testing etc
222
				if ($interval == 0) {
223
					break;
224
				}
225
226
				if ($blocking === false) {
227
					// If no job was found, we sleep for $interval before continuing and checking again
228
					$this->logger->log(LogLevel::INFO, 'Sleeping for {interval}', array('interval' => $interval));
229
					if ($this->paused) {
230
						$this->updateProcLine('Paused');
231
					} else {
232
						$this->updateProcLine('Waiting');
233
					}
234
235
					usleep($interval * 1000000);
236
				}
237
238
				continue;
239
			}
240
241
			$this->logger->log(LogLevel::NOTICE, 'Starting work on {job}', array('job' => $job));
242
			Event::trigger('beforeFork', $job);
243
			$this->workingOn($job);
0 ignored issues
show
Bug introduced by
It seems like $job can also be of type true; however, parameter $job of Resque\Worker\ResqueWorker::workingOn() does only seem to accept Resque\JobHandler, maybe add an additional type check? ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-type  annotation

243
			$this->workingOn(/** @scrutinizer ignore-type */ $job);
Loading history...
244
245
			$this->child = Resque::fork();
246
247
			// Forked and we're the child. Or PCNTL is not installed. Run the job.
248
			if ($this->child === 0 || $this->child === false || $this->child === -1) {
249
				$status = 'Processing ' . $job->queue . ' since ' . strftime('%F %T');
250
				$this->updateProcLine($status);
251
				$this->logger->log(LogLevel::INFO, $status);
252
253
				if (!empty($job->payload['id'])) {
254
					PID::create($job->payload['id']);
255
				}
256
257
				$this->perform($job);
0 ignored issues
show
Bug introduced by
It seems like $job can also be of type true; however, parameter $job of Resque\Worker\ResqueWorker::perform() does only seem to accept Resque\JobHandler, maybe add an additional type check? ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-type  annotation

257
				$this->perform(/** @scrutinizer ignore-type */ $job);
Loading history...
258
259
				if (!empty($job->payload['id'])) {
260
					PID::del($job->payload['id']);
261
				}
262
263
				if ($this->child === 0) {
264
					exit(0);
0 ignored issues
show
Best Practice introduced by
Using exit here is not recommended.

In general, usage of exit should be done with care and only when running in a scripting context like a CLI script.

Loading history...
265
				}
266
			}
267
268
			if ($this->child > 0) {
269
				// Parent process, sit and wait
270
				$status = 'Forked ' . $this->child . ' at ' . strftime('%F %T');
271
				$this->updateProcLine($status);
272
				$this->logger->log(LogLevel::INFO, $status);
273
274
				// Wait until the child process finishes before continuing
275
				while (pcntl_wait($status, WNOHANG) === 0) {
276
					if (function_exists('pcntl_signal_dispatch')) {
277
						pcntl_signal_dispatch();
278
					}
279
280
					// Pause for a half a second to conserve system resources
281
					usleep(500000);
282
				}
283
284
				if (pcntl_wifexited($status) !== true) {
285
					$job->fail(new DirtyExitException('Job exited abnormally'));
286
				} elseif (($exitStatus = pcntl_wexitstatus($status)) !== 0) {
287
					$job->fail(new DirtyExitException(
288
						'Job exited with exit code ' . $exitStatus
289
					));
290
				} else {
291
					if (in_array($job->getStatus(), array(Status::STATUS_WAITING, Status::STATUS_RUNNING))) {
292
						$job->updateStatus(Status::STATUS_COMPLETE);
293
						$this->logger->log(LogLevel::INFO, 'done ' . $job);
0 ignored issues
show
Bug introduced by
Are you sure $job of type object|true can be used in concatenation? ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-type  annotation

293
						$this->logger->log(LogLevel::INFO, 'done ' . /** @scrutinizer ignore-type */ $job);
Loading history...
294
					}
295
				}
296
			}
297
298
			$this->child = null;
299
			$this->doneWorking();
300
		}
301
302
		$this->unregisterWorker();
303
	}
304
305
	/**
306
	 * Process a single job.
307
	 *
308
	 * @param \Resque\JobHandler $job The job to be processed.
309
	 */
310
	public function perform(JobHandler $job)
311
	{
312
		$result = null;
0 ignored issues
show
Unused Code introduced by
The assignment to $result is dead and can be removed.
Loading history...
313
		try {
314
			Event::trigger('afterFork', $job);
315
			$result = $job->perform();
316
		} catch (CoreException $e) {
317
			$this->logger->log(LogLevel::CRITICAL, '{job} has failed {exception}', array('job' => $job, 'exception' => $e));
318
			$job->fail($e);
319
			return;
320
		} catch (Error $e) {
321
			$this->logger->log(LogLevel::CRITICAL, '{job} has failed {exception}', array('job' => $job, 'exception' => $e));
322
			$job->fail($e);
323
			return;
324
		}
325
326
		$job->updateStatus(Status::STATUS_COMPLETE, $result);
327
		$this->logger->log(LogLevel::NOTICE, '{job} has finished', array('job' => $job));
328
	}
329
330
	/**
331
	 * @param  bool            $blocking
332
	 * @param  int             $timeout
333
	 * @return object|boolean               Instance of Resque\JobHandler if a job is found, false if not.
334
	 */
335
	public function reserve($blocking = false, $timeout = null)
336
	{
337
		if ($this->hasParent && !posix_kill(posix_getppid(), 0)) {
338
			$this->shutdown();
339
			return false;
340
		}
341
342
		$queues = $this->queues();
343
		if (!is_array($queues)) {
0 ignored issues
show
introduced by
The condition is_array($queues) is always true.
Loading history...
344
			return;
345
		}
346
347
		if ($blocking === true) {
348
			if (empty($queues)) {
349
				$this->logger->log(LogLevel::INFO, 'No queue was found, sleeping for {interval}', array('interval' => $timeout));
350
				usleep($timeout * 1000000);
351
				return false;
352
			}
353
			$job = JobHandler::reserveBlocking($queues, $timeout);
354
			if ($job) {
0 ignored issues
show
introduced by
The condition $job is always false.
Loading history...
355
				$this->logger->log(LogLevel::INFO, 'Found job on {queue}', array('queue' => $job->queue));
356
				return $job;
357
			}
358
		} else {
359
			foreach ($queues as $queue) {
360
				$this->logger->log(LogLevel::INFO, 'Checking {queue} for jobs', array('queue' => $queue));
361
				$job = JobHandler::reserve($queue);
362
				if ($job) {
363
					$this->logger->log(LogLevel::INFO, 'Found job on {queue}', array('queue' => $job->queue));
364
					return $job;
365
				}
366
			}
367
		}
368
369
		return false;
370
	}
371
372
	/**
373
	 * Return an array containing all of the queues that this worker should use
374
	 * when searching for jobs.
375
	 *
376
	 * If * is found in the list of queues, every queue will be searched in
377
	 * alphabetic order. (@see $fetch)
378
	 *
379
	 * @param boolean $fetch If true, and the queue is set to *, will fetch
380
	 * all queue names from redis.
381
	 * @return array Array of associated queues.
382
	 */
383
	public function queues($fetch = true)
384
	{
385
		if (!in_array('*', $this->queues) || $fetch == false) {
0 ignored issues
show
Coding Style Best Practice introduced by
It seems like you are loosely comparing two booleans. Considering using the strict comparison === instead.

When comparing two booleans, it is generally considered safer to use the strict comparison operator.

Loading history...
386
			return $this->queues;
387
		}
388
389
		$queues = Resque::queues();
390
		sort($queues);
391
		return $queues;
392
	}
393
394
	/**
395
	 * Perform necessary actions to start a worker.
396
	 */
397
	private function startup()
398
	{
399
		$this->registerSigHandlers();
400
		$this->pruneDeadWorkers();
401
		Event::trigger('beforeFirstFork', $this);
402
		$this->registerWorker();
403
	}
404
405
	/**
406
	 * On supported systems (with the PECL proctitle module installed), update
407
	 * the name of the currently running process to indicate the current state
408
	 * of a worker.
409
	 *
410
	 * @param string $status The updated process title.
411
	 */
412
	private function updateProcLine($status)
413
	{
414
		$processTitle = static::$processPrefix . '-' . Resque::VERSION . ' (' . implode(',', $this->queues) . '): ' . $status;
0 ignored issues
show
Bug introduced by
Since $processPrefix is declared private, accessing it with static will lead to errors in possible sub-classes; you can either use self, or increase the visibility of $processPrefix to at least protected.
Loading history...
415
		if (function_exists('cli_set_process_title') && PHP_OS !== 'Darwin') {
416
			cli_set_process_title($processTitle);
417
		} elseif (function_exists('setproctitle')) {
418
			setproctitle($processTitle);
419
		}
420
	}
421
422
	/**
423
	 * Register signal handlers that a worker should respond to.
424
	 *
425
	 * TERM: Shutdown immediately and stop processing jobs.
426
	 * INT: Shutdown immediately and stop processing jobs.
427
	 * QUIT: Shutdown after the current job finishes processing.
428
	 * USR1: Kill the forked child immediately and continue processing jobs.
429
	 */
430
	private function registerSigHandlers()
431
	{
432
		if (!function_exists('pcntl_signal')) {
433
			return;
434
		}
435
436
		pcntl_signal(SIGTERM, array($this, 'shutDownNow'));
437
		pcntl_signal(SIGINT, array($this, 'shutDownNow'));
438
		pcntl_signal(SIGQUIT, array($this, 'shutdown'));
439
		pcntl_signal(SIGUSR1, array($this, 'killChild'));
440
		pcntl_signal(SIGUSR2, array($this, 'pauseProcessing'));
441
		pcntl_signal(SIGCONT, array($this, 'unPauseProcessing'));
442
		$this->logger->log(LogLevel::DEBUG, 'Registered signals');
443
	}
444
445
	/**
446
	 * Signal handler callback for USR2, pauses processing of new jobs.
447
	 */
448
	public function pauseProcessing()
449
	{
450
		$this->logger->log(LogLevel::NOTICE, 'USR2 received; pausing job processing');
451
		$this->paused = true;
452
	}
453
454
	/**
455
	 * Signal handler callback for CONT, resumes worker allowing it to pick
456
	 * up new jobs.
457
	 */
458
	public function unPauseProcessing()
459
	{
460
		$this->logger->log(LogLevel::NOTICE, 'CONT received; resuming job processing');
461
		$this->paused = false;
462
	}
463
464
	/**
465
	 * Schedule a worker for shutdown. Will finish processing the current job
466
	 * and when the timeout interval is reached, the worker will shut down.
467
	 */
468
	public function shutdown()
469
	{
470
		$this->shutdown = true;
471
		$this->logger->log(LogLevel::NOTICE, 'Shutting down');
472
	}
473
474
	/**
475
	 * Force an immediate shutdown of the worker, killing any child jobs
476
	 * currently running.
477
	 */
478
	public function shutdownNow()
479
	{
480
		$this->shutdown();
481
		$this->killChild();
482
	}
483
484
	/**
485
	 * @return int Child process PID.
486
	 */
487
	public function getChildPID()
488
	{
489
		return $this->child;
490
	}
491
492
	/**
493
	 * Kill a forked child job immediately. The job it is processing will not
494
	 * be completed.
495
	 */
496
	public function killChild()
497
	{
498
		if (!$this->child) {
499
			$this->logger->log(LogLevel::DEBUG, 'No child to kill.');
500
			return;
501
		}
502
503
		$this->logger->log(LogLevel::INFO, 'Killing child at {child}', array('child' => $this->child));
504
		if (exec('ps -o pid,s -p ' . $this->child, $output, $returnCode) && $returnCode != 1) {
505
			$this->logger->log(LogLevel::DEBUG, 'Child {child} found, killing.', array('child' => $this->child));
506
			posix_kill($this->child, SIGKILL);
507
			$this->child = null;
508
		} else {
509
			$this->logger->log(LogLevel::INFO, 'Child {child} not found, restarting.', array('child' => $this->child));
510
			$this->shutdown();
511
		}
512
	}
513
514
	/**
515
	 * Look for any workers which should be running on this server and if
516
	 * they're not, remove them from Redis.
517
	 *
518
	 * This is a form of garbage collection to handle cases where the
519
	 * server may have been killed and the Resque workers did not die gracefully
520
	 * and therefore leave state information in Redis.
521
	 */
522
	public function pruneDeadWorkers()
523
	{
524
		$workerPids = $this->workerPids();
525
		$workers = self::all();
526
		foreach ($workers as $worker) {
527
			if (is_object($worker)) {
528
				list($host, $pid, $queues) = explode(':', (string)$worker, 3);
529
				if ($host != $this->hostname || in_array($pid, $workerPids) || $pid == getmypid()) {
530
					continue;
531
				}
532
				$this->logger->log(LogLevel::INFO, 'Pruning dead worker: {worker}', array('worker' => (string)$worker));
533
				$worker->unregisterWorker();
534
			}
535
		}
536
	}
537
538
	/**
539
	 * Return an array of process IDs for all of the Resque workers currently
540
	 * running on this machine.
541
	 *
542
	 * @return array Array of Resque worker process IDs.
543
	 */
544
	public function workerPids()
545
	{
546
		$pids = array();
547
		if (strtoupper(substr(PHP_OS, 0, 3)) === 'WIN') {
548
			exec('WMIC path win32_process get Processid,Commandline | findstr resque | findstr /V findstr', $cmdOutput);
549
			foreach ($cmdOutput as $line) {
550
				$line = preg_replace('/\s+/m', ' ', $line);
551
				list(,,$pids[]) = explode(' ', trim($line), 3);
552
			}
553
		} else {
554
			exec('ps -A -o pid,args | grep [r]esque', $cmdOutput);
555
			foreach ($cmdOutput as $line) {
556
				list($pids[],) = explode(' ', trim($line), 2);
557
			}
558
		}
559
		return $pids;
560
	}
561
562
	/**
563
	 * Register this worker in Redis.
564
	 */
565
	public function registerWorker()
566
	{
567
		Resque::redis()->sadd('workers', (string)$this);
0 ignored issues
show
Bug introduced by
The method sadd() does not exist on Resque\Redis. Since you implemented __call, consider adding a @method annotation. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-call  annotation

567
		Resque::redis()->/** @scrutinizer ignore-call */ sadd('workers', (string)$this);
Loading history...
568
		Resque::redis()->set('worker:' . (string)$this . ':started', date('c'));
0 ignored issues
show
Bug introduced by
The method set() does not exist on Resque\Redis. Since you implemented __call, consider adding a @method annotation. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-call  annotation

568
		Resque::redis()->/** @scrutinizer ignore-call */ set('worker:' . (string)$this . ':started', date('c'));
Loading history...
569
	}
570
571
	/**
572
	 * Unregister this worker in Redis. (shutdown etc)
573
	 */
574
	public function unregisterWorker()
575
	{
576
		if (is_object($this->currentJob)) {
577
			$this->currentJob->fail(new DirtyExitException());
578
		}
579
580
		$id = (string)$this;
581
		Resque::redis()->srem('workers', $id);
0 ignored issues
show
Bug introduced by
The method srem() does not exist on Resque\Redis. Since you implemented __call, consider adding a @method annotation. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-call  annotation

581
		Resque::redis()->/** @scrutinizer ignore-call */ srem('workers', $id);
Loading history...
582
		Resque::redis()->del('worker:' . $id);
0 ignored issues
show
Bug introduced by
The method del() does not exist on Resque\Redis. Since you implemented __call, consider adding a @method annotation. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-call  annotation

582
		Resque::redis()->/** @scrutinizer ignore-call */ del('worker:' . $id);
Loading history...
583
		Resque::redis()->del('worker:' . $id . ':started');
584
		Stat::clear('processed:' . $id);
585
		Stat::clear('failed:' . $id);
586
	}
587
588
	/**
589
	 * Tell Redis which job we're currently working on.
590
	 *
591
	 * @param object $job \Resque\JobHandler instance containing the job we're working on.
592
	 */
593
	public function workingOn(JobHandler $job)
594
	{
595
		$job->worker = $this;
0 ignored issues
show
Documentation Bug introduced by
It seems like $this of type Resque\Worker\ResqueWorker is incompatible with the declared type Resque\Worker\Resque of property $worker.

Our type inference engine has found an assignment to a property that is incompatible with the declared type of that property.

Either this assignment is in error or the assigned type should be added to the documentation/type hint for that property..

Loading history...
596
		$this->currentJob = $job;
597
		$job->updateStatus(Status::STATUS_RUNNING);
598
		$data = json_encode(array(
599
			'queue' => $job->queue,
600
			'run_at' => date('c'),
601
			'payload' => $job->payload
602
		));
603
		Resque::redis()->set('worker:' . $job->worker, $data);
604
	}
605
606
	/**
607
	 * Notify Redis that we've finished working on a job, clearing the working
608
	 * state and incrementing the job stats.
609
	 */
610
	public function doneWorking()
611
	{
612
		$this->currentJob = null;
613
		Stat::incr('processed');
614
		Stat::incr('processed:' . (string)$this);
615
		Resque::redis()->del('worker:' . (string)$this);
616
	}
617
618
	/**
619
	 * Generate a string representation of this worker.
620
	 *
621
	 * @return string String identifier for this worker instance.
622
	 */
623
	public function __toString()
624
	{
625
		return $this->id;
626
	}
627
628
	/**
629
	 * Return an object describing the job this worker is currently working on.
630
	 *
631
	 * @return object Object with details of current job.
632
	 */
633
	public function job()
634
	{
635
		$job = Resque::redis()->get('worker:' . $this);
0 ignored issues
show
Bug introduced by
The method get() does not exist on Resque\Redis. Since you implemented __call, consider adding a @method annotation. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-call  annotation

635
		$job = Resque::redis()->/** @scrutinizer ignore-call */ get('worker:' . $this);
Loading history...
636
		if (!$job) {
637
			return array();
0 ignored issues
show
Bug Best Practice introduced by
The expression return array() returns the type array which is incompatible with the documented return type object.
Loading history...
638
		} else {
639
			return json_decode($job, true);
640
		}
641
	}
642
643
	/**
644
	 * Get a statistic belonging to this worker.
645
	 *
646
	 * @param string $stat Statistic to fetch.
647
	 * @return int Statistic value.
648
	 */
649
	public function getStat($stat)
650
	{
651
		return Stat::get($stat . ':' . $this);
652
	}
653
654
	/**
655
	 * Inject the logging object into the worker
656
	 *
657
	 * @param \Psr\Log\LoggerInterface $logger
658
	 */
659
	public function setLogger(LoggerInterface $logger)
660
	{
661
		$this->logger = $logger;
662
	}
663
}
664