Completed
Push — master ( 27f7d3...3aad4e )
by Akihito
02:33
created

Snidel::fork()   B

Complexity

Conditions 9
Paths 28

Size

Total Lines 56
Code Lines 35

Duplication

Lines 0
Ratio 0 %

Importance

Changes 5
Bugs 0 Features 0
Metric Value
c 5
b 0
f 0
dl 0
loc 56
rs 7.1584
cc 9
eloc 35
nc 28
nop 4

How to fix   Long Method   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

1
<?php
0 ignored issues
show
Coding Style Compatibility introduced by
For compatibility and reusability of your code, PSR1 recommends that a file should introduce either new symbols (like classes, functions, etc.) or have side-effects (like outputting something, or including other files), but not both at the same time. The first symbol is defined on line 16 and the first side effect is on line 2.

The PSR-1: Basic Coding Standard recommends that a file should either introduce new symbols, that is classes, functions, constants or similar, or have side effects. Side effects are anything that executes logic, like for example printing output, changing ini settings or writing to a file.

The idea behind this recommendation is that merely auto-loading a class should not change the state of an application. It also promotes a cleaner style of programming and makes your code less prone to errors, because the logic is not spread out all over the place.

To learn more about the PSR-1, please see the PHP-FIG site on the PSR-1.

Loading history...
2
declare(ticks = 1);
3
4
namespace Ackintosh;
5
6
use Ackintosh\Snidel\ForkContainer;
7
use Ackintosh\Snidel\Result;
8
use Ackintosh\Snidel\Token;
9
use Ackintosh\Snidel\Log;
10
use Ackintosh\Snidel\Error;
11
use Ackintosh\Snidel\Pcntl;
12
use Ackintosh\Snidel\DataRepository;
13
use Ackintosh\Snidel\MapContainer;
14
use Ackintosh\Snidel\Exception\SharedMemoryControlException;
15
16
class Snidel
17
{
18
    /** @var string */
19
    const VERSION = '0.4.0';
20
21
    /** @var array */
22
    private $childPids = array();
23
24
    /** @var \Ackintosh\Snidel\ForkContainer */
25
    private $forkContainer;
26
27
    /** @var \Ackintosh\Snidel\Error */
28
    private $error;
29
30
    /** @var \Ackintosh\Snidel\Pcntl */
31
    private $pcntl;
32
33
    /** @var int */
34
    private $concurrency;
35
36
    /** @var \Ackintosh\Snidel\Token */
37
    private $token;
38
39
    /** @var \Ackintosh\Snidel\Log */
40
    private $log;
41
42
    /** @var \Ackintosh\Snidel\DataRepository */
43
    private $dataRepository;
44
45
    /** @var bool */
46
    private $joined = false;
47
48
    /** @var array */
49
    private $results = array();
50
51
    /** @var int */
52
    private $ownerPid;
53
54
    /** @var array */
55
    private $tagsToPids = array();
56
57
    /** @var array */
58
    private $signals = array(
59
        SIGTERM,
60
        SIGINT,
61
    );
62
63
    /** @var int */
64
    private $receivedSignal;
65
66
    /** @var \Ackintosh\Snidel\Token */
67
    private $processToken;
68
69
    /** @var array */
70
    private $processInformation = array();
0 ignored issues
show
Unused Code introduced by
The property $processInformation is not used and could be removed.

This check marks private properties in classes that are never used. Those properties can be removed.

Loading history...
71
72
    /** @var bool */
73
    private $exceptionHasOccured = false;
74
75
    public function __construct($concurrency = 5)
76
    {
77
        $this->ownerPid         = getmypid();
78
        $this->childPids        = array();
79
        $this->concurrency      = $concurrency;
80
        $this->token            = new Token(getmypid(), $concurrency);
81
        $this->log              = new Log(getmypid());
82
        $this->error            = new Error();
83
        $this->pcntl            = new Pcntl();
84
        $this->dataRepository   = new DataRepository();
85
        $this->forkContainer    = new ForkContainer();
86
87
        foreach ($this->signals as $sig) {
88
            $this->pcntl->signal($sig, array($this, 'signalHandler'), false);
89
        }
90
91
        $this->log->info('parent pid: ' . $this->ownerPid);
92
    }
93
94
    /**
95
     * sets the resource for the log.
96
     *
97
     * @param   resource    $resource
98
     * @return  void
99
     * @codeCoverageIgnore
100
     */
101
    public function setLoggingDestination($resource)
102
    {
103
        $this->log->setDestination($resource);
104
    }
105
106
    /**
107
     * fork process
108
     *
109
     * @param   callable    $callable
110
     * @param   array       $args
111
     * @param   string      $tag
112
     * @return  int         $pid        forked PID of forked child process
113
     * @throws  \RuntimeException
114
     */
115
    public function fork($callable, $args = array(), $tag = null, Token $token = null)
116
    {
117
        $this->processToken = $token ? $token : $this->token;
118
        if (!is_array($args)) {
119
            $args = array($args);
120
        }
121
122
        try {
123
            $fork = $this->forkContainer->fork();
124
        } catch (\RuntimeException $e) {
125
            $this->log->error($e->getMessage());
126
            throw $e;
127
        }
128
129
        $fork->setCallable($callable);
130
        $fork->setArgs($args);
131
132
        if ($pid = $fork->getPid()) {
133
            // parent
134
            $this->log->info('created child process. pid: ' . $pid);
135
            $this->childPids[] = $pid;
136
            if ($tag !== null) {
137
                $this->tagsToPids[$tag][] = $pid;
138
            }
139
        } else {
140
            // @codeCoverageIgnoreStart
141
            // child
142
            foreach ($this->signals as $sig) {
143
                $this->pcntl->signal($sig, SIG_DFL, true);
144
            }
145
146
            $result = new Result();
147
            register_shutdown_function(function () use ($result) {
148
                $data = $this->dataRepository->load(getmypid());
149
                try {
150
                    $data->write($result);
151
                } catch (SharedMemoryControlException $e) {
152
                    throw $e;
153
                }
154
                $this->log->info('<-- return token.');
155
                $this->processToken->back();
156
            });
157
158
            $this->log->info('--> waiting for the token come around.');
159
            if ($this->processToken->accept()) {
160
                $this->log->info('----> started the function.');
161
                $result->setReturn(call_user_func_array($callable, $args));
162
                $this->log->info('<---- completed the function.');
163
            }
164
165
            $this->_exit();
166
            // @codeCoverageIgnoreEnd
167
        }
168
169
        return $pid;
170
    }
171
172
    /**
173
     * waits until all children are completed
174
     *
175
     * @return  void
176
     * @throws  \Ackintosh\Snidel\Exception\SharedMemoryControlException
177
     */
178
    public function wait()
179
    {
180
        if ($this->joined) {
181
            return;
182
        }
183
184
        $count = count($this->childPids);
185
        for ($i = 0; $i < $count; $i++) {
186
            try {
187
                $fork = $this->forkContainer->wait();
188
            } catch (SharedMemoryControlException $e) {
189
                $this->exceptionHasOccured = true;
190
                throw $e;
191
            }
192
193
            $childPid   = $fork->getPid();
194
            $result     = $fork->getResult();
195
            if (!$fork->isSuccessful()) {
196
                $message = 'an error has occurred in child process. pid: ' . $childPid;
197
                $this->log->error($message);
198
                $this->error[$childPid] = array(
199
                    'status'    => $fork->getStatus(),
200
                    'message'   => $message,
201
                    'callable'  => $fork->getCallable(),
202
                    'args'      => $fork->getArgs(),
203
                    'return'    => $result->getReturn(),
204
                );
205
            } else {
206
                $this->results[$childPid] = $result->getReturn();
207
            }
208
            unset($this->childPids[array_search($childPid, $this->childPids)]);
209
        }
210
        $this->joined = true;
211
    }
212
213
    /**
214
     * @return  bool
215
     */
216
    public function hasError()
217
    {
218
        return $this->error->exists();
219
    }
220
221
    /**
222
     * @return  \Ackintosh\Snidel\Error
223
     */
224
    public function getError()
225
    {
226
        return $this->error;
227
    }
228
229
    /**
230
     * gets results
231
     *
232
     * @param   string  $tag
233
     * @return  array   $ret
234
     * @throws  \InvalidArgumentException
235
     */
236
    public function get($tag = null)
237
    {
238
        if (!$this->joined) {
239
            $this->wait();
240
        }
241
242
        if ($tag === null) {
243
            return array_values($this->results);
244
        } else {
245
            try {
246
                return $this->getWithTag($tag);
247
            } catch (\InvalidArgumentException $e) {
248
                throw $e;
249
            }
250
        }
251
    }
252
253
    /**
254
     * gets results with tag
255
     *
256
     * @param   string  $tag
257
     * @return  array   $results
258
     * @throws  \InvalidArgumentException
259
     */
260
    private function getWithTag($tag)
261
    {
262
        if (!isset($this->tagsToPids[$tag])) {
263
            throw new \InvalidArgumentException('unknown tag: ' . $tag);
264
        }
265
266
        $results = array();
267
        foreach ($this->tagsToPids[$tag] as $pid) {
268
            $results[] = $this->results[$pid];
269
        }
270
271
        return $results;
272
    }
273
274
    /**
275
     * @param   int     $sig
276
     * @return  void
277
     */
278
    public function signalHandler($sig)
279
    {
280
        $this->log->info('received signal. signo: ' . $sig);
281
        $this->receivedSignal = $sig;
282
283
        $this->log->info('--> sending a signal to children.');
284
        $this->sendSignalToChildren($sig);
285
286
        $this->log->info('--> deleting token.');
287
        unset($this->token);
288
289
        $this->log->info('<-- signal handling has been completed successfully.');
290
        $this->_exit();
291
    }
292
293
    /**
294
     * sends signal to child
295
     *
296
     * @param   int     $sig
297
     * @return  void
298
     */
299
    private function sendSignalToChildren($sig)
300
    {
301
        foreach ($this->childPids as $pid) {
302
            $this->log->info('----> sending a signal to child. pid: ' . $pid);
303
            posix_kill($pid, $sig);
304
        }
305
    }
306
307
    /**
308
     * delete shared memory
309
     *
310
     * @return  void
311
     * @throws  \Ackintosh\Snidel\Exception\SharedMemoryControlException
312
     */
313
    private function deleteAllData()
314
    {
315
        foreach ($this->childPids as $pid) {
316
            $data = $this->dataRepository->load($pid);
317
            try {
318
                $data->deleteIfExists();
319
            } catch (SharedMemoryControlException $e) {
320
                throw $e;
321
            }
322
        }
323
    }
324
325
    /**
326
     * create map object
327
     *
328
     * @param   array       $args
329
     * @param   callable    $callable
330
     * @return  \Ackintosh\Snidel\MapContainer
331
     */
332
    public function map(Array $args, $callable)
333
    {
334
        return new MapContainer($args, $callable, $this->concurrency);
335
    }
336
337
    /**
338
     * run map object
339
     *
340
     * @param   \Ackintosh\Snidel\MapContainer
341
     * @return  array
342
     * @throws  \RuntimeException
343
     */
344
    public function run(MapContainer $mapContainer)
345
    {
346
        try {
347
            $this->forkTheFirstProcessing($mapContainer);
348
            $this->waitsAndConnectsProcess($mapContainer);
349
        } catch (\RuntimeException $e) {
350
            $this->exceptionHasOccured = true;
351
            throw $e;
352
        }
353
354
        return $this->getResultsOf($mapContainer);
355
    }
356
357
    /**
358
     * fork the first processing of the map container
359
     *
360
     * @param   \Ackintosh\Snidel\MapContainer
361
     * @return  void
362
     * @throws  \RuntimeException
363
     */
364
    private function forkTheFirstProcessing(MapContainer $mapContainer)
365
    {
366
        foreach ($mapContainer->getFirstArgs() as $args) {
367
            try {
368
                $childPid = $this->fork($mapContainer->getFirstMap()->getCallable(), $args);
369
            } catch (\RuntimeException $e) {
370
                throw $e;
371
            }
372
            $mapContainer->getFirstMap()->countTheForked();
373
            $mapContainer->getFirstMap()->addChildPid($childPid);
374
        }
375
    }
376
377
    /**
378
     * waits and connects the process of map container
379
     *
380
     * @param   \Ackintosh\Snidel\MapContainer
381
     * @return  void
382
     * @throws  \RuntimeException
383
     */
384
    private function waitsAndConnectsProcess(MapContainer $mapContainer)
385
    {
386
        if ($this->joined) {
387
            return;
388
        }
389
390
        while ($mapContainer->isProcessing()) {
391
            try {
392
                $fork = $this->forkContainer->wait();
393
            } catch (SharedMemoryControlException $e) {
394
                throw $e;
395
            }
396
397
            $childPid = $fork->getPid();
398
            if (!$fork->isSuccessful()) {
399
                $message = 'an error has occurred in child process. pid: ' . $childPid;
400
                $this->log->error($message);
401
                throw new \RuntimeException($message);
402
            }
403
404
            $result = $fork->getResult();
405
            $this->results[$childPid] = $result->getReturn();
406
            unset($this->childPids[array_search($childPid, $this->childPids)]);
407
            if ($nextMap = $mapContainer->nextMap($childPid)) {
408
                try {
409
                    $nextMapPid = $this->fork(
410
                        $nextMap->getCallable(),
411
                        array($this->results[$childPid]),
412
                        null,
413
                        $nextMap->getToken()
414
                    );
415
                } catch (\RuntimeException $e) {
416
                    throw $e;
417
                }
418
                $message = sprintf('processing is connected from [%d] to [%d]', $childPid, $nextMapPid);
419
                $this->log->info($message);
420
                $nextMap->countTheForked();
421
                $nextMap->addChildPid($nextMapPid);
422
            }
423
            $mapContainer->countTheCompleted($childPid);
424
        }
425
426
        $this->joined = true;
427
    }
428
429
    /**
430
     * gets results of map container
431
     *
432
     * @param   \Ackintosh\Snidel\MapContainer
433
     * @return  array
434
     */
435
    private function getResultsOf(MapContainer $mapContainer)
436
    {
437
        $results = array();
438
        foreach ($mapContainer->getLastMapPids() as $pid) {
439
            $results[] = $this->results[$pid];
440
        }
441
442
        return $results;
443
    }
444
445
    private function _exit($status = 0)
446
    {
447
        exit($status);
0 ignored issues
show
Coding Style Compatibility introduced by
The method _exit() contains an exit expression.

An exit expression should only be used in rare cases. For example, if you write a short command line script.

In most cases however, using an exit expression makes the code untestable and often causes incompatibilities with other libraries. Thus, unless you are absolutely sure it is required here, we recommend to refactor your code to avoid its usage.

Loading history...
448
    }
449
450
    public function __destruct()
451
    {
452
        if ($this->exceptionHasOccured) {
453
            $this->log->info('destruct processes are started.(exception has occured)');
454
            $this->log->info('--> deleting all shared memory.');
455
            $this->deleteAllData();
456
        } elseif ($this->ownerPid === getmypid() && !$this->joined && $this->receivedSignal === null) {
457
            $message = 'snidel will have to wait for the child process is completed. please use Snidel::wait()';
458
            $this->log->error($message);
459
            $this->log->info('destruct processes are started.');
460
461
            $this->log->info('--> sending a signal to children.');
462
            $this->sendSignalToChildren(SIGTERM);
463
464
            $this->log->info('--> deleting all shared memory.');
465
            $this->deleteAllData();
466
467
            $this->log->info('--> deleting token.');
468
            unset($this->token);
469
470
            $this->log->info('--> destruct processes are finished successfully.');
471
            throw new \LogicException($message);
472
        }
473
    }
474
}
475