Completed
Push — master ( cfbc00...1384b1 )
by Akihito
02:24
created

Snidel::forkChild()   C

Complexity

Conditions 8
Paths 5

Size

Total Lines 65
Code Lines 41

Duplication

Lines 0
Ratio 0 %

Importance

Changes 1
Bugs 0 Features 0
Metric Value
c 1
b 0
f 0
dl 0
loc 65
rs 6.7651
cc 8
eloc 41
nc 5
nop 4

How to fix   Long Method   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

1
<?php
2
declare(ticks = 1);
3
4
namespace Ackintosh;
5
6
use Ackintosh\Snidel\ForkContainer;
7
use Ackintosh\Snidel\Result;
8
use Ackintosh\Snidel\Token;
9
use Ackintosh\Snidel\Log;
10
use Ackintosh\Snidel\Pcntl;
11
use Ackintosh\Snidel\DataRepository;
12
use Ackintosh\Snidel\MapContainer;
13
use Ackintosh\Snidel\Task\Task;
14
use Ackintosh\Snidel\Exception\SharedMemoryControlException;
15
16
class Snidel
17
{
18
    /** @var string */
19
    const VERSION = '0.6.3';
20
21
    /** @var \Ackintosh\Snidel\ForkContainer */
22
    private $forkContainer;
23
24
    /** @var \Ackintosh\Snidel\Pcntl */
25
    private $pcntl;
26
27
    /** @var int */
28
    private $concurrency;
29
30
    /** @var \Ackintosh\Snidel\Log */
31
    private $log;
32
33
    /** @var bool */
34
    private $joined = false;
35
36
    /** @var int */
37
    private $ownerPid;
38
39
    /** @var array */
40
    private $signals = array(
41
        SIGTERM,
42
        SIGINT,
43
    );
44
45
    /** @var int */
46
    private $receivedSignal;
47
48
    /** @var \Ackintosh\Snidel\Token */
49
    private $processToken;
0 ignored issues
show
Unused Code introduced by
The property $processToken is not used and could be removed.

This check marks private properties in classes that are never used. Those properties can be removed.

Loading history...
50
51
    /** @var bool */
52
    private $exceptionHasOccured = false;
53
54
    public function __construct($concurrency = 5)
55
    {
56
        $this->ownerPid         = getmypid();
57
        $this->concurrency      = $concurrency;
58
        $this->log              = new Log($this->ownerPid);
59
        $this->pcntl            = new Pcntl();
60
        $this->forkContainer    = new ForkContainer($this->ownerPid, $this->log, $this->concurrency);
61
62
        $log    = $this->log;
63
        $self   = $this;
64
        foreach ($this->signals as $sig) {
65
            $this->pcntl->signal(
66
                $sig,
67
                function ($sig) use($log, $self) {
68
                    $log->info('received signal. signo: ' . $sig);
69
                    $self->setReceivedSignal($sig);
70
71
                    $log->info('--> sending a signal to children.');
72
                    $self->sendSignalToChildren($sig);
73
                    $log->info('<-- signal handling has been completed successfully.');
74
                    exit;
0 ignored issues
show
Coding Style Compatibility introduced by
The method __construct() contains an exit expression.

An exit expression should only be used in rare cases. For example, if you write a short command line script.

In most cases however, using an exit expression makes the code untestable and often causes incompatibilities with other libraries. Thus, unless you are absolutely sure it is required here, we recommend to refactor your code to avoid its usage.

Loading history...
75
                },
76
                false
77
            );
78
        }
79
80
        $this->log->info('parent pid: ' . $this->ownerPid);
81
    }
82
83
    /**
84
     * sets the resource for the log.
85
     *
86
     * @param   resource    $resource
87
     * @return  void
88
     * @codeCoverageIgnore
89
     */
90
    public function setLoggingDestination($resource)
91
    {
92
        $this->log->setDestination($resource);
93
    }
94
95
    /**
96
     * this method uses master / worker model.
97
     *
98
     * @param   callable    $callable
99
     * @param   mixed       $args
100
     * @param   string      $tag
101
     * @return  void
102
     * @throws  \RuntimeException
103
     */
104
    public function fork($callable, $args = array(), $tag = null)
105
    {
106
        $this->joined = false;
107
108
        if (!$this->forkContainer->existsMaster()) {
109
            $this->forkContainer->forkMaster();
110
        }
111
112
        try {
113
            $this->forkContainer->enqueue(new Task($callable, $args, $tag));
114
        } catch (\RuntimeException $e) {
115
            throw $e;
116
        }
117
118
        $this->log->info('queued task #' . $this->forkContainer->queuedCount());
119
    }
120
121
    /**
122
     * fork process
123
     * the processes which forked are wait for token.
124
     *
125
     * @param   callable                    $callable
126
     * @param   mixed                       $args
127
     * @param   string                      $tag
128
     * @param   \Ackintosh\Snidel\Token     $token
129
     * @return  void
130
     * @throws  \RuntimeException
131
     */
132
    private function forkChild(Token $token, $callable, $args = array(), $tag = null)
133
    {
134
        $task = new Task($callable, $args, $tag);
135
136
        try {
137
            $fork = $this->forkContainer->fork($task);
138
        } catch (\RuntimeException $e) {
139
            $this->log->error($e->getMessage());
140
            throw $e;
141
        }
142
143
        if (getmypid() === $this->ownerPid) {
144
            // parent
145
            $this->log->info('created child process. pid: ' . $fork->getPid());
146
        } else {
147
            // @codeCoverageIgnoreStart
148
            // child
149
            foreach ($this->signals as $sig) {
150
                $this->pcntl->signal($sig, SIG_DFL, true);
151
            }
152
153
            /**
154
             * in php5.3, we can not use $this in anonymous functions
155
             */
156
            $log = $this->log;
157
            $resultHasWritten = false;
158
            register_shutdown_function(function () use (&$resultHasWritten, $fork, $log, $token) {
159
                if (!$resultHasWritten) {
160
                    $dataRepository = new DataRepository();
161
                    $data = $dataRepository->load(getmypid());
162
                    $result = new Result();
163
                    $result->setFailure();
164
                    $result->setFork($fork);
165
                    try {
166
                        $data->write($result);
167
                    } catch (SharedMemoryControlException $e) {
168
                        throw $e;
169
                    }
170
                }
171
172
                $log->info('<-- return token.');
173
                $token->back();
174
            });
175
176
            $log->info('--> waiting for the token come around.');
177
            if ($token->accept()) {
178
                $log->info('----> started the function.');
179
                $result = $fork->executeTask();
180
                $log->info('<---- completed the function.');
181
                $dataRepository = new DataRepository();
182
                $data = $dataRepository->load(getmypid());
183
                try {
184
                    $data->write($result);
185
                } catch (SharedMemoryControlException $e) {
186
                    throw $e;
187
                }
188
                $resultHasWritten = true;
0 ignored issues
show
Unused Code introduced by
$resultHasWritten is not used, you could remove the assignment.

This check looks for variable assignements that are either overwritten by other assignments or where the variable is not used subsequently.

$myVar = 'Value';
$higher = false;

if (rand(1, 6) > 3) {
    $higher = true;
} else {
    $higher = false;
}

Both the $myVar assignment in line 1 and the $higher assignment in line 2 are dead. The first because $myVar is never used and the second because $higher is always overwritten for every possible time line.

Loading history...
189
            }
190
191
            $this->_exit();
192
            // @codeCoverageIgnoreEnd
193
        }
194
195
        return $fork->getPid();
196
    }
197
198
    /**
199
     * waits until all tasks that queued by Snidel::fork() are completed
200
     *
201
     * @return  void
202
     */
203
    public function wait()
204
    {
205
        $this->forkContainer->wait();
206
        $this->joined = true;
207
    }
208
209
    /**
210
     * @return  bool
211
     */
212
    public function hasError()
213
    {
214
        return $this->forkContainer->hasError();
215
    }
216
217
    /**
218
     * @return  \Ackintosh\Snidel\Error
219
     */
220
    public function getError()
221
    {
222
        return $this->forkContainer->getError();
223
    }
224
225
    /**
226
     * gets results
227
     *
228
     * @param   string  $tag
229
     * @return  \Ackintosh\Snidel\ForkCollection
230
     * @throws  \InvalidArgumentException
231
     */
232
    public function get($tag = null)
233
    {
234
        if (!$this->joined) {
235
            $this->wait();
236
        }
237
        if ($tag !== null && !$this->forkContainer->hasTag($tag)) {
238
            throw new \InvalidArgumentException('unknown tag: ' . $tag);
239
        }
240
241
        return $this->forkContainer->getCollection($tag);
242
    }
243
244
    /**
245
     * sends signal to child
246
     *
247
     * @param   int     $sig
248
     * @return  void
249
     */
250
    public function sendSignalToChildren($sig)
251
    {
252
        foreach ($this->forkContainer->getChildPids() as $pid) {
253
            $this->log->info('----> sending a signal to child. pid: ' . $pid);
254
            posix_kill($pid, $sig);
255
        }
256
    }
257
258
    public function setReceivedSignal($sig)
259
    {
260
        $this->receivedSignal = $sig;
261
    }
262
263
    /**
264
     * delete shared memory
265
     *
266
     * @return  void
267
     * @throws  \Ackintosh\Snidel\Exception\SharedMemoryControlException
268
     */
269
    private function deleteAllData()
270
    {
271
        $dataRepository = new DataRepository();
272
        try {
273
            $dataRepository->deleteAll();
274
        } catch (SharedMemoryControlException $e) {
275
            throw $e;
276
        }
277
    }
278
279
    /**
280
     * create map object
281
     *
282
     * @param   array       $args
283
     * @param   callable    $callable
284
     * @return  \Ackintosh\Snidel\MapContainer
285
     */
286
    public function map(Array $args, $callable)
287
    {
288
        return new MapContainer($args, $callable, $this->concurrency);
289
    }
290
291
    /**
292
     * run map object
293
     *
294
     * @param   \Ackintosh\Snidel\MapContainer
295
     * @return  array
296
     * @throws  \RuntimeException
297
     */
298
    public function run(MapContainer $mapContainer)
299
    {
300
        $token = new Token($this->ownerPid, $this->concurrency);
301
        try {
302
            $this->forkTheFirstProcessing($mapContainer, $token);
303
            $this->waitsAndConnectsProcess($mapContainer);
304
        } catch (\RuntimeException $e) {
305
            $this->exceptionHasOccured = true;
306
            throw $e;
307
        }
308
309
        return $this->getResultsOf($mapContainer);
310
    }
311
312
    /**
313
     * fork the first processing of the map container
314
     *
315
     * @param   \Ackintosh\Snidel\MapContainer
316
     * @return  void
317
     * @throws  \RuntimeException
318
     */
319
    private function forkTheFirstProcessing(MapContainer $mapContainer, $token)
320
    {
321
        foreach ($mapContainer->getFirstArgs() as $args) {
322
            try {
323
                $childPid = $this->forkChild($token, $mapContainer->getFirstMap()->getCallable(), $args);
324
            } catch (\RuntimeException $e) {
325
                throw $e;
326
            }
327
            $mapContainer->getFirstMap()->countTheForked();
328
            $mapContainer->getFirstMap()->addChildPid($childPid);
329
        }
330
    }
331
332
    /**
333
     * waits and connects the process of map container
334
     *
335
     * @param   \Ackintosh\Snidel\MapContainer
336
     * @return  void
337
     * @throws  \RuntimeException
338
     */
339
    private function waitsAndConnectsProcess(MapContainer $mapContainer)
340
    {
341
        if ($this->joined) {
342
            return;
343
        }
344
345
        while ($mapContainer->isProcessing()) {
346
            try {
347
                $result = $this->forkContainer->waitForChild();
348
            } catch (SharedMemoryControlException $e) {
349
                throw $e;
350
            }
351
352
            $childPid = $result->getFork()->getPid();
353
            if ($result->getFork()->hasNotFinishedSuccessfully()) {
354
                $message = 'an error has occurred in child process. pid: ' . $childPid;
355
                $this->log->error($message);
356
                throw new \RuntimeException($message);
357
            }
358
359
            if ($nextMap = $mapContainer->nextMap($childPid)) {
360
                try {
361
                    $nextMapPid = $this->forkChild(
362
                        $nextMap->getToken(),
363
                        $nextMap->getCallable(),
364
                        $result
365
                    );
366
                } catch (\RuntimeException $e) {
367
                    throw $e;
368
                }
369
                $message = sprintf('processing is connected from [%d] to [%d]', $childPid, $nextMapPid);
370
                $this->log->info($message);
371
                $nextMap->countTheForked();
372
                $nextMap->addChildPid($nextMapPid);
373
            }
374
            $mapContainer->countTheCompleted($childPid);
375
        }
376
377
        $this->joined = true;
378
    }
379
380
    /**
381
     * gets results of map container
382
     *
383
     * @param   \Ackintosh\Snidel\MapContainer
384
     * @return  array
385
     */
386
    private function getResultsOf(MapContainer $mapContainer)
387
    {
388
        $results = array();
389
        foreach ($mapContainer->getLastMapPids() as $pid) {
390
            $results[] = $this->forkContainer->get($pid)->getReturn();
391
        }
392
393
        return $results;
394
    }
395
396
    private function _exit($status = 0)
397
    {
398
        exit($status);
0 ignored issues
show
Coding Style Compatibility introduced by
The method _exit() contains an exit expression.

An exit expression should only be used in rare cases. For example, if you write a short command line script.

In most cases however, using an exit expression makes the code untestable and often causes incompatibilities with other libraries. Thus, unless you are absolutely sure it is required here, we recommend to refactor your code to avoid its usage.

Loading history...
399
    }
400
401
    public function __destruct()
402
    {
403
        if ($this->forkContainer->existsMaster() && $this->ownerPid === getmypid()) {
404
            $this->log->info('shutdown master process.');
405
            $this->forkContainer->killMaster();
406
407
            unset($this->forkContainer);
408
        }
409
410
        if ($this->exceptionHasOccured) {
411
            $this->log->info('destruct processes are started.(exception has occured)');
412
            $this->log->info('--> deleting all shared memory.');
413
            $this->deleteAllData();
414
        } elseif ($this->ownerPid === getmypid() && !$this->joined && $this->receivedSignal === null) {
415
            $message = 'snidel will have to wait for the child process is completed. please use Snidel::wait()';
416
            $this->log->error($message);
417
            $this->log->info('destruct processes are started.');
418
419
            $this->log->info('--> sending a signal to children.');
420
            $this->sendSignalToChildren(SIGTERM);
421
422
            $this->log->info('--> deleting all shared memory.');
423
            $this->deleteAllData();
424
425
            $this->log->info('--> deleting token.');
426
            unset($this->token);
427
428
            $this->log->info('--> destruct processes are finished successfully.');
429
            throw new \LogicException($message);
430
        }
431
    }
432
}
433