@@ -1,5 +1,5 @@ discard block |
||
| 1 | 1 | <?php |
| 2 | -declare(ticks = 1); |
|
| 2 | +declare(ticks=1); |
|
| 3 | 3 | |
| 4 | 4 | /** |
| 5 | 5 | * Resque worker that handles checking queues for jobs, fetching them |
@@ -76,14 +76,14 @@ discard block |
||
| 76 | 76 | { |
| 77 | 77 | $this->logger = new Resque_Log(); |
| 78 | 78 | |
| 79 | - if(!is_array($queues)) { |
|
| 79 | + if (!is_array($queues)) { |
|
| 80 | 80 | $queues = array($queues); |
| 81 | 81 | } |
| 82 | 82 | |
| 83 | 83 | $this->queues = $queues; |
| 84 | 84 | $this->hostname = php_uname('n'); |
| 85 | 85 | |
| 86 | - $this->id = $this->hostname . ':'.getmypid() . ':' . implode(',', $this->queues); |
|
| 86 | + $this->id = $this->hostname . ':' . getmypid() . ':' . implode(',', $this->queues); |
|
| 87 | 87 | } |
| 88 | 88 | |
| 89 | 89 | /** |
@@ -102,12 +102,12 @@ discard block |
||
| 102 | 102 | public static function all() |
| 103 | 103 | { |
| 104 | 104 | $workers = Resque::redis()->smembers('workers'); |
| 105 | - if(!is_array($workers)) { |
|
| 105 | + if (!is_array($workers)) { |
|
| 106 | 106 | $workers = array(); |
| 107 | 107 | } |
| 108 | 108 | |
| 109 | 109 | $instances = array(); |
| 110 | - foreach($workers as $workerId) { |
|
| 110 | + foreach ($workers as $workerId) { |
|
| 111 | 111 | $instances[] = self::find($workerId); |
| 112 | 112 | } |
| 113 | 113 | return $instances; |
@@ -132,7 +132,7 @@ discard block |
||
| 132 | 132 | */ |
| 133 | 133 | public static function find($workerId) |
| 134 | 134 | { |
| 135 | - if(!self::exists($workerId) || false === strpos($workerId, ":")) { |
|
| 135 | + if (!self::exists($workerId) || false === strpos($workerId, ":")) { |
|
| 136 | 136 | return false; |
| 137 | 137 | } |
| 138 | 138 | |
@@ -166,12 +166,12 @@ discard block |
||
| 166 | 166 | $this->updateProcLine('Starting'); |
| 167 | 167 | $this->startup(); |
| 168 | 168 | |
| 169 | - if(function_exists('pcntl_signal_dispatch')) { |
|
| 169 | + if (function_exists('pcntl_signal_dispatch')) { |
|
| 170 | 170 | pcntl_signal_dispatch(); |
| 171 | 171 | } |
| 172 | 172 | |
| 173 | - while(true) { |
|
| 174 | - if($this->shutdown) { |
|
| 173 | + while (true) { |
|
| 174 | + if ($this->shutdown) { |
|
| 175 | 175 | break; |
| 176 | 176 | } |
| 177 | 177 | |
@@ -189,8 +189,8 @@ discard block |
||
| 189 | 189 | |
| 190 | 190 | // Attempt to find and reserve a job |
| 191 | 191 | $job = false; |
| 192 | - if(!$this->paused) { |
|
| 193 | - if($blocking === true) { |
|
| 192 | + if (!$this->paused) { |
|
| 193 | + if ($blocking === true) { |
|
| 194 | 194 | $this->logger->log(Psr\Log\LogLevel::INFO, 'Starting blocking with timeout of {interval}', array('interval' => $interval)); |
| 195 | 195 | $this->updateProcLine('Waiting with blocking timeout ' . $interval); |
| 196 | 196 | } else { |
@@ -200,17 +200,17 @@ discard block |
||
| 200 | 200 | $job = $this->reserve($blocking, $interval); |
| 201 | 201 | } |
| 202 | 202 | |
| 203 | - if(!$job) { |
|
| 203 | + if (!$job) { |
|
| 204 | 204 | // For an interval of 0, break now - helps with unit testing etc |
| 205 | - if($interval == 0) { |
|
| 205 | + if ($interval == 0) { |
|
| 206 | 206 | break; |
| 207 | 207 | } |
| 208 | 208 | |
| 209 | - if($blocking === false) |
|
| 209 | + if ($blocking === false) |
|
| 210 | 210 | { |
| 211 | 211 | // If no job was found, we sleep for $interval before continuing and checking again |
| 212 | 212 | $this->logger->log(Psr\Log\LogLevel::INFO, 'Sleeping for {interval}', array('interval' => $interval)); |
| 213 | - if($this->paused) { |
|
| 213 | + if ($this->paused) { |
|
| 214 | 214 | $this->updateProcLine('Paused'); |
| 215 | 215 | } |
| 216 | 216 | else { |
@@ -235,13 +235,13 @@ discard block |
||
| 235 | 235 | $this->updateProcLine($status); |
| 236 | 236 | $this->logger->log(Psr\Log\LogLevel::INFO, $status); |
| 237 | 237 | |
| 238 | - if(!empty($job->payload['id'])) { |
|
| 238 | + if (!empty($job->payload['id'])) { |
|
| 239 | 239 | Resque_Job_PID::create($job->payload['id']); |
| 240 | 240 | } |
| 241 | 241 | |
| 242 | 242 | $this->perform($job); |
| 243 | 243 | |
| 244 | - if(!empty($job->payload['id'])) { |
|
| 244 | + if (!empty($job->payload['id'])) { |
|
| 245 | 245 | Resque_Job_PID::del($job->payload['id']); |
| 246 | 246 | } |
| 247 | 247 | |
@@ -250,7 +250,7 @@ discard block |
||
| 250 | 250 | } |
| 251 | 251 | } |
| 252 | 252 | |
| 253 | - if($this->child > 0) { |
|
| 253 | + if ($this->child > 0) { |
|
| 254 | 254 | // Parent process, sit and wait |
| 255 | 255 | $status = 'Forked ' . $this->child . ' at ' . date('Y-m-d H:i:s'); |
| 256 | 256 | $this->updateProcLine($status); |
@@ -258,7 +258,7 @@ discard block |
||
| 258 | 258 | |
| 259 | 259 | // Wait until the child process finishes before continuing |
| 260 | 260 | while (pcntl_wait($status, WNOHANG) === 0) { |
| 261 | - if(function_exists('pcntl_signal_dispatch')) { |
|
| 261 | + if (function_exists('pcntl_signal_dispatch')) { |
|
| 262 | 262 | pcntl_signal_dispatch(); |
| 263 | 263 | } |
| 264 | 264 | |
@@ -302,12 +302,12 @@ discard block |
||
| 302 | 302 | Resque_Event::trigger('afterFork', $job); |
| 303 | 303 | $result = $job->perform(); |
| 304 | 304 | } |
| 305 | - catch(Exception $e) { |
|
| 305 | + catch (Exception $e) { |
|
| 306 | 306 | $this->logger->log(Psr\Log\LogLevel::CRITICAL, '{job} has failed {exception}', array('job' => $job, 'exception' => $e)); |
| 307 | 307 | $job->fail($e); |
| 308 | 308 | return; |
| 309 | 309 | } |
| 310 | - catch(Error $e) { |
|
| 310 | + catch (Error $e) { |
|
| 311 | 311 | $this->logger->log(Psr\Log\LogLevel::CRITICAL, '{job} has failed {exception}', array('job' => $job, 'exception' => $e)); |
| 312 | 312 | $job->fail($e); |
| 313 | 313 | return; |
@@ -331,26 +331,26 @@ discard block |
||
| 331 | 331 | } |
| 332 | 332 | |
| 333 | 333 | $queues = $this->queues(); |
| 334 | - if(!is_array($queues)) { |
|
| 334 | + if (!is_array($queues)) { |
|
| 335 | 335 | return; |
| 336 | 336 | } |
| 337 | 337 | |
| 338 | - if($blocking === true) { |
|
| 339 | - if(empty($queues)){ |
|
| 338 | + if ($blocking === true) { |
|
| 339 | + if (empty($queues)) { |
|
| 340 | 340 | $this->logger->log(Psr\Log\LogLevel::INFO, 'No queue was found, sleeping for {interval}', array('interval' => $timeout)); |
| 341 | 341 | usleep($timeout * 1000000); |
| 342 | 342 | return false; |
| 343 | 343 | } |
| 344 | 344 | $job = Resque_Job::reserveBlocking($queues, $timeout); |
| 345 | - if($job) { |
|
| 345 | + if ($job) { |
|
| 346 | 346 | $this->logger->log(Psr\Log\LogLevel::INFO, 'Found job on {queue}', array('queue' => $job->queue)); |
| 347 | 347 | return $job; |
| 348 | 348 | } |
| 349 | 349 | } else { |
| 350 | - foreach($queues as $queue) { |
|
| 350 | + foreach ($queues as $queue) { |
|
| 351 | 351 | $this->logger->log(Psr\Log\LogLevel::INFO, 'Checking {queue} for jobs', array('queue' => $queue)); |
| 352 | 352 | $job = Resque_Job::reserve($queue); |
| 353 | - if($job) { |
|
| 353 | + if ($job) { |
|
| 354 | 354 | $this->logger->log(Psr\Log\LogLevel::INFO, 'Found job on {queue}', array('queue' => $job->queue)); |
| 355 | 355 | return $job; |
| 356 | 356 | } |
@@ -373,7 +373,7 @@ discard block |
||
| 373 | 373 | */ |
| 374 | 374 | public function queues($fetch = true) |
| 375 | 375 | { |
| 376 | - if(!in_array('*', $this->queues) || $fetch == false) { |
|
| 376 | + if (!in_array('*', $this->queues) || $fetch == false) { |
|
| 377 | 377 | return $this->queues; |
| 378 | 378 | } |
| 379 | 379 | |
@@ -403,10 +403,10 @@ discard block |
||
| 403 | 403 | private function updateProcLine($status) |
| 404 | 404 | { |
| 405 | 405 | $processTitle = static::$processPrefix . '-' . Resque::VERSION . ' (' . implode(',', $this->queues) . '): ' . $status; |
| 406 | - if(function_exists('cli_set_process_title') && PHP_OS !== 'Darwin') { |
|
| 406 | + if (function_exists('cli_set_process_title') && PHP_OS !== 'Darwin') { |
|
| 407 | 407 | cli_set_process_title($processTitle); |
| 408 | 408 | } |
| 409 | - else if(function_exists('setproctitle')) { |
|
| 409 | + else if (function_exists('setproctitle')) { |
|
| 410 | 410 | setproctitle($processTitle); |
| 411 | 411 | } |
| 412 | 412 | } |
@@ -421,7 +421,7 @@ discard block |
||
| 421 | 421 | */ |
| 422 | 422 | private function registerSigHandlers() |
| 423 | 423 | { |
| 424 | - if(!function_exists('pcntl_signal')) { |
|
| 424 | + if (!function_exists('pcntl_signal')) { |
|
| 425 | 425 | return; |
| 426 | 426 | } |
| 427 | 427 | |
@@ -486,13 +486,13 @@ discard block |
||
| 486 | 486 | */ |
| 487 | 487 | public function killChild() |
| 488 | 488 | { |
| 489 | - if(!$this->child) { |
|
| 489 | + if (!$this->child) { |
|
| 490 | 490 | $this->logger->log(Psr\Log\LogLevel::DEBUG, 'No child to kill.'); |
| 491 | 491 | return; |
| 492 | 492 | } |
| 493 | 493 | |
| 494 | 494 | $this->logger->log(Psr\Log\LogLevel::INFO, 'Killing child at {child}', array('child' => $this->child)); |
| 495 | - if(exec('ps -o pid,s -p ' . $this->child, $output, $returnCode) && $returnCode != 1) { |
|
| 495 | + if (exec('ps -o pid,s -p ' . $this->child, $output, $returnCode) && $returnCode != 1) { |
|
| 496 | 496 | $this->logger->log(Psr\Log\LogLevel::DEBUG, 'Child {child} found, killing.', array('child' => $this->child)); |
| 497 | 497 | posix_kill($this->child, SIGKILL); |
| 498 | 498 | $this->child = null; |
@@ -515,10 +515,10 @@ discard block |
||
| 515 | 515 | { |
| 516 | 516 | $workerPids = $this->workerPids(); |
| 517 | 517 | $workers = self::all(); |
| 518 | - foreach($workers as $worker) { |
|
| 518 | + foreach ($workers as $worker) { |
|
| 519 | 519 | if (is_object($worker)) { |
| 520 | 520 | list($host, $pid, $queues) = explode(':', (string)$worker, 3); |
| 521 | - if($host != $this->hostname || in_array($pid, $workerPids) || $pid == getmypid()) { |
|
| 521 | + if ($host != $this->hostname || in_array($pid, $workerPids) || $pid == getmypid()) { |
|
| 522 | 522 | continue; |
| 523 | 523 | } |
| 524 | 524 | $this->logger->log(Psr\Log\LogLevel::INFO, 'Pruning dead worker: {worker}', array('worker' => (string)$worker)); |
@@ -538,14 +538,14 @@ discard block |
||
| 538 | 538 | $pids = array(); |
| 539 | 539 | if (strtoupper(substr(PHP_OS, 0, 3)) === 'WIN') { |
| 540 | 540 | exec('WMIC path win32_process get Processid,Commandline | findstr resque | findstr /V findstr', $cmdOutput); |
| 541 | - foreach($cmdOutput as $line) { |
|
| 541 | + foreach ($cmdOutput as $line) { |
|
| 542 | 542 | $line = preg_replace('/\s+/m', ' ', $line); |
| 543 | - list(,,$pids[]) = explode(' ', trim($line), 3); |
|
| 543 | + list(,, $pids[]) = explode(' ', trim($line), 3); |
|
| 544 | 544 | } |
| 545 | 545 | } |
| 546 | 546 | else { |
| 547 | 547 | exec('ps -A -o pid,args | grep [r]esque', $cmdOutput); |
| 548 | - foreach($cmdOutput as $line) { |
|
| 548 | + foreach ($cmdOutput as $line) { |
|
| 549 | 549 | list($pids[],) = explode(' ', trim($line), 2); |
| 550 | 550 | } |
| 551 | 551 | } |
@@ -566,7 +566,7 @@ discard block |
||
| 566 | 566 | */ |
| 567 | 567 | public function unregisterWorker() |
| 568 | 568 | { |
| 569 | - if(is_object($this->currentJob)) { |
|
| 569 | + if (is_object($this->currentJob)) { |
|
| 570 | 570 | $this->currentJob->fail(new Resque_Job_DirtyExitException); |
| 571 | 571 | } |
| 572 | 572 | |
@@ -626,7 +626,7 @@ discard block |
||
| 626 | 626 | public function job() |
| 627 | 627 | { |
| 628 | 628 | $job = Resque::redis()->get('worker:' . $this); |
| 629 | - if(!$job) { |
|
| 629 | + if (!$job) { |
|
| 630 | 630 | return array(); |
| 631 | 631 | } |
| 632 | 632 | else { |
@@ -1,5 +1,5 @@ discard block |
||
| 1 | 1 | <?php |
| 2 | -declare(ticks = 1); |
|
| 2 | +declare(ticks=1); |
|
| 3 | 3 | |
| 4 | 4 | /** |
| 5 | 5 | * ResqueScheduler worker to handle scheduling of delayed tasks. |
@@ -53,10 +53,10 @@ discard block |
||
| 53 | 53 | $this->registerSigHandlers(); |
| 54 | 54 | |
| 55 | 55 | while (true) { |
| 56 | - if($this->shutdown) { |
|
| 56 | + if ($this->shutdown) { |
|
| 57 | 57 | break; |
| 58 | 58 | } |
| 59 | - if(!$this->paused) { |
|
| 59 | + if (!$this->paused) { |
|
| 60 | 60 | $this->handleDelayedItems(); |
| 61 | 61 | } |
| 62 | 62 | $this->sleep(); |
@@ -91,7 +91,7 @@ discard block |
||
| 91 | 91 | { |
| 92 | 92 | $item = null; |
| 93 | 93 | while ($item = ResqueScheduler::nextItemForTimestamp($timestamp)) { |
| 94 | - $this->log('queueing ' . $item['class'] . ' in ' . $item['queue'] .' [delayed]'); |
|
| 94 | + $this->log('queueing ' . $item['class'] . ' in ' . $item['queue'] . ' [delayed]'); |
|
| 95 | 95 | |
| 96 | 96 | Resque_Event::trigger('beforeDelayedEnqueue', array( |
| 97 | 97 | 'queue' => $item['queue'], |
@@ -123,7 +123,7 @@ discard block |
||
| 123 | 123 | */ |
| 124 | 124 | private function updateProcLine($status) |
| 125 | 125 | { |
| 126 | - if(function_exists('setproctitle')) { |
|
| 126 | + if (function_exists('setproctitle')) { |
|
| 127 | 127 | setproctitle('resque-scheduler-' . ResqueScheduler::VERSION . ': ' . $status); |
| 128 | 128 | } |
| 129 | 129 | } |
@@ -135,10 +135,10 @@ discard block |
||
| 135 | 135 | */ |
| 136 | 136 | public function log($message) |
| 137 | 137 | { |
| 138 | - if($this->logLevel == self::LOG_NORMAL) { |
|
| 138 | + if ($this->logLevel == self::LOG_NORMAL) { |
|
| 139 | 139 | fwrite(STDOUT, "*** " . $message . "\n"); |
| 140 | 140 | } |
| 141 | - else if($this->logLevel == self::LOG_VERBOSE) { |
|
| 141 | + else if ($this->logLevel == self::LOG_VERBOSE) { |
|
| 142 | 142 | fwrite(STDOUT, "** [" . date('H:i:s Y-m-d') . "] " . $message . "\n"); |
| 143 | 143 | } |
| 144 | 144 | } |
@@ -152,7 +152,7 @@ discard block |
||
| 152 | 152 | */ |
| 153 | 153 | private function registerSigHandlers() |
| 154 | 154 | { |
| 155 | - if(!function_exists('pcntl_signal')) { |
|
| 155 | + if (!function_exists('pcntl_signal')) { |
|
| 156 | 156 | return; |
| 157 | 157 | } |
| 158 | 158 | |
@@ -137,8 +137,7 @@ |
||
| 137 | 137 | { |
| 138 | 138 | if($this->logLevel == self::LOG_NORMAL) { |
| 139 | 139 | fwrite(STDOUT, "*** " . $message . "\n"); |
| 140 | - } |
|
| 141 | - else if($this->logLevel == self::LOG_VERBOSE) { |
|
| 140 | + } else if($this->logLevel == self::LOG_VERBOSE) { |
|
| 142 | 141 | fwrite(STDOUT, "** [" . date('H:i:s Y-m-d') . "] " . $message . "\n"); |
| 143 | 142 | } |
| 144 | 143 | } |