@@ -255,7 +255,7 @@ |
||
| 255 | 255 | * @param mixed $args |
| 256 | 256 | * @param string $tag |
| 257 | 257 | * @param \Ackintosh\Snidel\Token $token |
| 258 | - * @return void |
|
| 258 | + * @return integer |
|
| 259 | 259 | * @throws \RuntimeException |
| 260 | 260 | */ |
| 261 | 261 | public function forkSimply($callable, $args = array(), $tag = null, Token $token = null) |
@@ -91,8 +91,8 @@ discard block |
||
| 91 | 91 | foreach ($this->signals as $sig) { |
| 92 | 92 | $this->pcntl->signal( |
| 93 | 93 | $sig, |
| 94 | - function ($sig) { |
|
| 95 | - $this->log->info('received signal. signo: ' . $sig); |
|
| 94 | + function($sig) { |
|
| 95 | + $this->log->info('received signal. signo: '.$sig); |
|
| 96 | 96 | $this->receivedSignal = $sig; |
| 97 | 97 | |
| 98 | 98 | $this->log->info('--> sending a signal to children.'); |
@@ -108,7 +108,7 @@ discard block |
||
| 108 | 108 | ); |
| 109 | 109 | } |
| 110 | 110 | |
| 111 | - $this->log->info('parent pid: ' . $this->ownerPid); |
|
| 111 | + $this->log->info('parent pid: '.$this->ownerPid); |
|
| 112 | 112 | } |
| 113 | 113 | |
| 114 | 114 | /** |
@@ -148,7 +148,7 @@ discard block |
||
| 148 | 148 | throw $e; |
| 149 | 149 | } |
| 150 | 150 | |
| 151 | - $this->log->info('queued task #' . $this->taskQueue->queuedCount()); |
|
| 151 | + $this->log->info('queued task #'.$this->taskQueue->queuedCount()); |
|
| 152 | 152 | } |
| 153 | 153 | |
| 154 | 154 | /** |
@@ -164,17 +164,17 @@ discard block |
||
| 164 | 164 | |
| 165 | 165 | if ($pid) { |
| 166 | 166 | // owner |
| 167 | - $this->log->info('pid: ' . getmypid()); |
|
| 167 | + $this->log->info('pid: '.getmypid()); |
|
| 168 | 168 | } elseif ($pid === -1) { |
| 169 | 169 | // error |
| 170 | 170 | } else { |
| 171 | 171 | // master |
| 172 | - $this->log->info('pid: ' . $this->masterProcessId); |
|
| 172 | + $this->log->info('pid: '.$this->masterProcessId); |
|
| 173 | 173 | foreach ($this->signals as $sig) { |
| 174 | 174 | $this->pcntl->signal($sig, SIG_DFL, true); |
| 175 | 175 | } |
| 176 | 176 | while ($task = $this->taskQueue->dequeue()) { |
| 177 | - $this->log->info('dequeued task #' . $this->taskQueue->dequeuedCount()); |
|
| 177 | + $this->log->info('dequeued task #'.$this->taskQueue->dequeuedCount()); |
|
| 178 | 178 | if ($this->token->accept()) { |
| 179 | 179 | $this->forkWorker($task['callable'], $task['args'], $task['tag']); |
| 180 | 180 | } |
@@ -211,17 +211,17 @@ discard block |
||
| 211 | 211 | |
| 212 | 212 | if (getmypid() === $this->masterProcessId) { |
| 213 | 213 | // master |
| 214 | - $this->log->info('forked worker. pid: ' . $fork->getPid()); |
|
| 214 | + $this->log->info('forked worker. pid: '.$fork->getPid()); |
|
| 215 | 215 | } else { |
| 216 | 216 | // worker |
| 217 | - $this->log->info('has forked. pid: ' . getmypid()); |
|
| 217 | + $this->log->info('has forked. pid: '.getmypid()); |
|
| 218 | 218 | // @codeCoverageIgnoreStart |
| 219 | 219 | foreach ($this->signals as $sig) { |
| 220 | 220 | $this->pcntl->signal($sig, SIG_DFL, true); |
| 221 | 221 | } |
| 222 | 222 | |
| 223 | 223 | $resultHasQueued = false; |
| 224 | - register_shutdown_function(function () use ($fork, &$resultHasQueued) { |
|
| 224 | + register_shutdown_function(function() use ($fork, &$resultHasQueued) { |
|
| 225 | 225 | if ($fork->hasNoResult() || $resultHasQueued === false) { |
| 226 | 226 | $result = new Result(); |
| 227 | 227 | $result->setFailure(); |
@@ -277,7 +277,7 @@ discard block |
||
| 277 | 277 | |
| 278 | 278 | if (getmypid() === $this->ownerPid) { |
| 279 | 279 | // parent |
| 280 | - $this->log->info('created child process. pid: ' . $fork->getPid()); |
|
| 280 | + $this->log->info('created child process. pid: '.$fork->getPid()); |
|
| 281 | 281 | $this->childPids[] = $fork->getPid(); |
| 282 | 282 | } else { |
| 283 | 283 | // @codeCoverageIgnoreStart |
@@ -293,7 +293,7 @@ discard block |
||
| 293 | 293 | $dataRepository = $this->dataRepository; |
| 294 | 294 | $log = $this->log; |
| 295 | 295 | $processToken = $this->processToken; |
| 296 | - register_shutdown_function(function () use ($result, $dataRepository, $log, $processToken) { |
|
| 296 | + register_shutdown_function(function() use ($result, $dataRepository, $log, $processToken) { |
|
| 297 | 297 | $data = $dataRepository->load(getmypid()); |
| 298 | 298 | try { |
| 299 | 299 | $data->write($result); |
@@ -344,7 +344,7 @@ discard block |
||
| 344 | 344 | $childPid = $fork->getPid(); |
| 345 | 345 | $result = $fork->getResult(); |
| 346 | 346 | if ($fork->hasNotFinishedSuccessfully()) { |
| 347 | - $message = 'an error has occurred in child process. pid: ' . $childPid; |
|
| 347 | + $message = 'an error has occurred in child process. pid: '.$childPid; |
|
| 348 | 348 | $this->log->error($message); |
| 349 | 349 | $this->error[$childPid] = array( |
| 350 | 350 | 'status' => $fork->getStatus(), |
@@ -411,7 +411,7 @@ discard block |
||
| 411 | 411 | } |
| 412 | 412 | |
| 413 | 413 | if (!$this->forkContainer->hasTag($tag)) { |
| 414 | - throw new \InvalidArgumentException('unknown tag: ' . $tag); |
|
| 414 | + throw new \InvalidArgumentException('unknown tag: '.$tag); |
|
| 415 | 415 | } |
| 416 | 416 | |
| 417 | 417 | return $this->forkContainer->getCollection($tag); |
@@ -438,12 +438,12 @@ discard block |
||
| 438 | 438 | return new ForkCollection($this->forks); |
| 439 | 439 | } |
| 440 | 440 | |
| 441 | - $filtered = array_filter($this->forks, function ($fork) use ($tag) { |
|
| 441 | + $filtered = array_filter($this->forks, function($fork) use ($tag) { |
|
| 442 | 442 | return $fork->getTag() === $tag; |
| 443 | 443 | }); |
| 444 | 444 | |
| 445 | 445 | if (count($filtered) === 0) { |
| 446 | - throw new \InvalidArgumentException('unknown tag: ' . $tag); |
|
| 446 | + throw new \InvalidArgumentException('unknown tag: '.$tag); |
|
| 447 | 447 | } |
| 448 | 448 | |
| 449 | 449 | return new ForkCollection($filtered); |
@@ -458,7 +458,7 @@ discard block |
||
| 458 | 458 | private function sendSignalToChildren($sig) |
| 459 | 459 | { |
| 460 | 460 | foreach ($this->childPids as $pid) { |
| 461 | - $this->log->info('----> sending a signal to child. pid: ' . $pid); |
|
| 461 | + $this->log->info('----> sending a signal to child. pid: '.$pid); |
|
| 462 | 462 | posix_kill($pid, $sig); |
| 463 | 463 | } |
| 464 | 464 | } |
@@ -555,7 +555,7 @@ discard block |
||
| 555 | 555 | |
| 556 | 556 | $childPid = $fork->getPid(); |
| 557 | 557 | if ($fork->hasNotFinishedSuccessfully()) { |
| 558 | - $message = 'an error has occurred in child process. pid: ' . $childPid; |
|
| 558 | + $message = 'an error has occurred in child process. pid: '.$childPid; |
|
| 559 | 559 | $this->log->error($message); |
| 560 | 560 | throw new \RuntimeException($message); |
| 561 | 561 | } |
@@ -11,6 +11,9 @@ |
||
| 11 | 11 | |
| 12 | 12 | private $dequeuedCount = 0; |
| 13 | 13 | |
| 14 | + /** |
|
| 15 | + * @param integer $ownerPid |
|
| 16 | + */ |
|
| 14 | 17 | public function __construct($ownerPid) |
| 15 | 18 | { |
| 16 | 19 | $this->ownerPid = $ownerPid; |
@@ -2,7 +2,6 @@ |
||
| 2 | 2 | namespace Ackintosh\Snidel; |
| 3 | 3 | |
| 4 | 4 | use Ackintosh\Snidel\IpcKey; |
| 5 | -use Opis\Closure\SerializableClosure; |
|
| 6 | 5 | |
| 7 | 6 | class ResultQueue |
| 8 | 7 | { |
@@ -14,7 +14,7 @@ |
||
| 14 | 14 | public function __construct($ownerPid) |
| 15 | 15 | { |
| 16 | 16 | $this->ownerPid = $ownerPid; |
| 17 | - $this->ipcKey = new IpcKey($ownerPid, 'snidel_result_' . uniqid((string) mt_rand(1, 100), true)); |
|
| 17 | + $this->ipcKey = new IpcKey($ownerPid, 'snidel_result_'.uniqid((string) mt_rand(1, 100), true)); |
|
| 18 | 18 | $this->id = msg_get_queue($this->ipcKey->generate()); |
| 19 | 19 | } |
| 20 | 20 | |
@@ -11,6 +11,9 @@ |
||
| 11 | 11 | private $queuedCount = 0; |
| 12 | 12 | private $dequeuedCount = 0; |
| 13 | 13 | |
| 14 | + /** |
|
| 15 | + * @param integer $ownerPid |
|
| 16 | + */ |
|
| 14 | 17 | public function __construct($ownerPid) |
| 15 | 18 | { |
| 16 | 19 | $this->ownerPid = $ownerPid; |
@@ -14,7 +14,7 @@ |
||
| 14 | 14 | public function __construct($ownerPid) |
| 15 | 15 | { |
| 16 | 16 | $this->ownerPid = $ownerPid; |
| 17 | - $this->ipcKey = new IpcKey($ownerPid, 'snidel_task_' . uniqid((string) mt_rand(1, 100), true)); |
|
| 17 | + $this->ipcKey = new IpcKey($ownerPid, 'snidel_task_'.uniqid((string) mt_rand(1, 100), true)); |
|
| 18 | 18 | $this->id = msg_get_queue($this->ipcKey->generate()); |
| 19 | 19 | } |
| 20 | 20 | |
@@ -50,15 +50,15 @@ |
||
| 50 | 50 | } |
| 51 | 51 | $pid = getmypid(); |
| 52 | 52 | switch (true) { |
| 53 | - case $this->ownerPid === $pid: |
|
| 54 | - $role = 'owner'; |
|
| 55 | - break; |
|
| 56 | - case $this->masterPid !== null && $this->masterPid === $pid: |
|
| 57 | - $role = 'master'; |
|
| 58 | - break; |
|
| 59 | - default: |
|
| 60 | - $role = 'worker'; |
|
| 61 | - break; |
|
| 53 | + case $this->ownerPid === $pid: |
|
| 54 | + $role = 'owner'; |
|
| 55 | + break; |
|
| 56 | + case $this->masterPid !== null && $this->masterPid === $pid: |
|
| 57 | + $role = 'master'; |
|
| 58 | + break; |
|
| 59 | + default: |
|
| 60 | + $role = 'worker'; |
|
| 61 | + break; |
|
| 62 | 62 | } |
| 63 | 63 | fputs( |
| 64 | 64 | $this->destination, |
@@ -98,8 +98,8 @@ |
||
| 98 | 98 | */ |
| 99 | 99 | private function getCollectionWithTag($tag) |
| 100 | 100 | { |
| 101 | - $collection = array_filter($this->forks, function ($fork) use ($tag) { |
|
| 102 | - return $fork->getTag() === $tag; |
|
| 101 | + $collection = array_filter($this->forks, function($fork) use ($tag) { |
|
| 102 | + return $fork->getTag() === $tag; |
|
| 103 | 103 | }); |
| 104 | 104 | |
| 105 | 105 | return new ForkCollection($collection); |