Completed
Push — master ( 526d6e...2376dc )
by Akihito
03:04
created

Snidel::waitsAndConnectsProcess()   C

Complexity

Conditions 8
Paths 7

Size

Total Lines 45
Code Lines 33

Duplication

Lines 0
Ratio 0 %
Metric Value
dl 0
loc 45
rs 5.3846
cc 8
eloc 33
nc 7
nop 1
1
<?php
0 ignored issues
show
Coding Style Compatibility introduced by
For compatibility and reusability of your code, PSR1 recommends that a file should introduce either new symbols (like classes, functions, etc.) or have side-effects (like outputting something, or including other files), but not both at the same time. The first symbol is defined on line 14 and the first side effect is on line 2.

The PSR-1: Basic Coding Standard recommends that a file should either introduce new symbols, that is classes, functions, constants or similar, or have side effects. Side effects are anything that executes logic, like for example printing output, changing ini settings or writing to a file.

The idea behind this recommendation is that merely auto-loading a class should not change the state of an application. It also promotes a cleaner style of programming and makes your code less prone to errors, because the logic is not spread out all over the place.

To learn more about the PSR-1, please see the PHP-FIG site on the PSR-1.

Loading history...
2
declare(ticks = 1);
3
4
namespace Ackintosh;
5
6
use Ackintosh\Snidel\Token;
7
use Ackintosh\Snidel\Log;
8
use Ackintosh\Snidel\Error;
9
use Ackintosh\Snidel\Pcntl;
10
use Ackintosh\Snidel\DataRepository;
11
use Ackintosh\Snidel\MapContainer;
12
use Ackintosh\Snidel\Exception\SharedMemoryControlException;
13
14
class Snidel
15
{
16
    /** @var string */
17
    const VERSION = '0.3.0';
18
19
    /** @var array */
20
    private $childPids = array();
21
22
    /** @var Snidel\Error */
23
    private $error;
24
25
    /** @var Snidel\Pcntl */
26
    private $pcntl;
27
28
    /** @var int */
29
    private $concurrency;
30
31
    /** @var Snidel\Token */
32
    private $token;
33
34
    /** @var Snidel\Log */
35
    private $log;
36
37
    /** @var Snidel\DataRepository */
38
    private $dataRepository;
39
40
    /** @var bool */
41
    private $joined = false;
42
43
    /** @var array */
44
    private $results = array();
45
46
    /** @var int */
47
    private $ownerPid;
48
49
    /** @var array */
50
    private $tagsToPids = array();
51
52
    /** @var array */
53
    private $signals = array(
54
        SIGTERM,
55
        SIGINT,
56
    );
57
58
    /** @var int */
59
    private $receivedSignal;
60
61
    /** @var Snidel\Token */
62
    private $processToken;
63
64
    /** @var array */
65
    private $processInformation = array();
66
67
    /** @var bool */
68
    private $exceptionHasOccured = false;
69
70
    public function __construct($concurrency = 5)
71
    {
72
        $this->ownerPid         = getmypid();
73
        $this->childPids        = array();
74
        $this->concurrency      = $concurrency;
75
        $this->token            = new Token(getmypid(), $concurrency);
76
        $this->log              = new Log(getmypid());
77
        $this->error            = new Error();
78
        $this->pcntl            = new Pcntl();
79
        $this->dataRepository   = new DataRepository();
80
81
        foreach ($this->signals as $sig) {
82
            $this->pcntl->signal($sig, array($this, 'signalHandler'), false);
83
        }
84
85
        $this->log->info('parent pid: ' . $this->ownerPid);
86
    }
87
88
    /**
89
     * sets the resource for the log.
90
     *
91
     * @param   resource    $resource
92
     * @return  void
93
     * @codeCoverageIgnore
94
     */
95
    public function setLoggingDestination($resource)
96
    {
97
        $this->log->setDestination($resource);
98
    }
99
100
    /**
101
     * fork process
102
     *
103
     * @param   callable    $callable
104
     * @param   array       $args
105
     * @param   string      $tag
106
     * @return  int         $pid        forked PID of forked child process
107
     * @throws  \RuntimeException
108
     */
109
    public function fork($callable, $args = array(), $tag = null, Token $token = null)
110
    {
111
        $this->processToken = $token ? $token : $this->token;
112
        if (!is_array($args)) {
113
            $args = array($args);
114
        }
115
116
        $pid = $this->pcntl->fork();
117
        if (-1 === $pid) {
118
            $message = 'could not fork a new process';
119
            $this->log->error($message);
120
            throw new \RuntimeException($message);
121
        } elseif ($pid) {
122
            // parent
123
            $this->log->info('created child process. pid: ' . $pid);
124
            $this->childPids[] = $pid;
125
            if ($tag !== null) {
126
                $this->tagsToPids[$tag][] = $pid;
127
            }
128
        } else {
129
            // @codeCoverageIgnoreStart
130
            // child
131
            register_shutdown_function(array($this, 'childShutdownFunction'));
132
            $this->processInformation['callable']    = $callable instanceof \Closure ? '*Closure*' : $callable;
133
            $this->processInformation['args']        = $args;
134
135
            foreach ($this->signals as $sig) {
136
                $this->pcntl->signal($sig, SIG_DFL, true);
137
            }
138
            $this->log->info('--> waiting for the token come around.');
139
            if ($this->processToken->accept()) {
140
                $this->log->info('----> started the function.');
141
                $this->processInformation['return'] = call_user_func_array($callable, $args);
142
                $this->log->info('<---- completed the function.');
143
            }
144
            $this->_exit();
145
            // @codeCoverageIgnoreEnd
146
        }
147
148
        return $pid;
149
    }
150
151
    /**
152
     * waits until all children are completed
153
     *
154
     * @return  void
155
     * @throws  Ackintosh\Snidel\Exception\SharedMemoryControlException
156
     */
157
    public function wait()
158
    {
159
        if ($this->joined) {
160
            return;
161
        }
162
163
        $count = count($this->childPids);
164
        for ($i = 0; $i < $count; $i++) {
165
            $status = null;
166
            $childPid = $this->pcntl->waitpid(-1, $status);
167
            $data = $this->dataRepository->load($childPid);
168
            try {
169
                $result = $data->readAndDelete();
170
            } catch (SharedMemoryControlException $e) {
171
                $this->exceptionHasOccured = true;
172
                throw $e;
173
            }
174
175
            if (!$this->pcntl->wifexited($status) || $this->pcntl->wexitstatus($status) !== 0) {
176
                $message = 'an error has occurred in child process. pid: ' . $childPid;
177
                $this->log->error($message);
178
                $this->error[$childPid] = array(
179
                    'status'    => $status,
180
                    'message'   => $message,
181
                    'callable'  => $result['callable'],
182
                    'args'      => $result['args'],
183
                    'return'    => isset($result['return']) ? $result['return'] : null,
184
                );
185
            } else {
186
                $this->results[$childPid] = $result['return'];
187
            }
188
            unset($this->childPids[array_search($childPid, $this->childPids)]);
189
        }
190
        $this->joined = true;
191
    }
192
193
    /**
194
     * @return  bool
195
     */
196
    public function hasError()
197
    {
198
        return $this->error->exists();
199
    }
200
201
    /**
202
     * @return  Snidel\Error
203
     */
204
    public function getError()
205
    {
206
        return $this->error;
207
    }
208
209
    /**
210
     * gets results
211
     *
212
     * @param   string  $tag
213
     * @return  array   $ret
214
     * @throws  \InvalidArgumentException
215
     */
216
    public function get($tag = null)
217
    {
218
        if (!$this->joined) {
219
            $this->wait();
220
        }
221
222
        if ($tag === null) {
223
            return array_values($this->results);
224
        } else {
225
            try {
226
                return $this->getWithTag($tag);
227
            } catch (\InvalidArgumentException $e) {
228
                throw $e;
229
            }
230
        }
231
    }
232
233
    /**
234
     * gets results with tag
235
     *
236
     * @param   string  $tag
237
     * @return  array   $results
238
     * @throws  \InvalidArgumentException
239
     */
240
    private function getWithTag($tag)
241
    {
242
        if (!isset($this->tagsToPids[$tag])) {
243
            throw new \InvalidArgumentException('unknown tag: ' . $tag);
244
        }
245
246
        $results = array();
247
        foreach ($this->tagsToPids[$tag] as $pid) {
248
            $results[] = $this->results[$pid];
249
        }
250
251
        return $results;
252
    }
253
254
    /**
255
     * @param   int     $sig
256
     * @return  void
257
     */
258
    public function signalHandler($sig)
259
    {
260
        $this->log->info('received signal. signo: ' . $sig);
261
        $this->receivedSignal = $sig;
262
263
        $this->log->info('--> sending a signal to children.');
264
        $this->sendSignalToChildren($sig);
265
266
        $this->log->info('--> deleting token.');
267
        unset($this->token);
268
269
        $this->log->info('<-- signal handling has been completed successfully.');
270
        $this->_exit();
271
    }
272
273
    /**
274
     * sends signal to child
275
     *
276
     * @param   int     $sig
277
     * @return  void
278
     */
279
    private function sendSignalToChildren($sig)
280
    {
281
        foreach ($this->childPids as $pid) {
282
            $this->log->info('----> sending a signal to child. pid: ' . $pid);
283
            posix_kill($pid, $sig);
284
        }
285
    }
286
287
    /**
288
     * delete shared memory
289
     *
290
     * @return  void
291
     * @throws  Ackintosh\Snidel\Exception\SharedMemoryControlException
292
     */
293
    private function deleteAllData()
294
    {
295
        foreach ($this->childPids as $pid) {
296
            $data = $this->dataRepository->load($pid);
297
            try {
298
                $data->deleteIfExists();
299
            } catch (SharedMemoryControlException $e) {
300
                throw $e;
301
            }
302
        }
303
    }
304
305
    /**
306
     * create map object
307
     *
308
     * @param   array       $args
309
     * @param   callable    $callable
310
     * @return  Ackintosh\Snidel\MapContainer
311
     */
312
    public function map(Array $args, $callable)
313
    {
314
        return new MapContainer($args, $callable, $this->concurrency);
315
    }
316
317
    /**
318
     * run map object
319
     *
320
     * @param   Snidel\MapContainer
321
     * @return  array
322
     * @throws  \RuntimeException
323
     */
324
    public function run(MapContainer $mapContainer)
325
    {
326
        try {
327
            $this->forkTheFirstProcessing($mapContainer);
328
            $this->waitsAndConnectsProcess($mapContainer);
329
        } catch (\RuntimeException $e) {
330
            $this->exceptionHasOccured = true;
331
            throw $e;
332
        }
333
334
        return $this->getResultsOf($mapContainer);
335
    }
336
337
    /**
338
     * fork the first processing of the map container
339
     *
340
     * @param   Snidel\MapContainer
341
     * @return  void
342
     * @throws  \RuntimeException
343
     */
344
    private function forkTheFirstProcessing(MapContainer $mapContainer)
345
    {
346
        foreach ($mapContainer->getFirstArgs() as $args) {
347
            try {
348
                $childPid = $this->fork($mapContainer->getFirstMap()->getCallable(), $args);
349
            } catch (\RuntimeException $e) {
350
                throw $e;
351
            }
352
            $mapContainer->getFirstMap()->countTheForked();
353
            $mapContainer->getFirstMap()->addChildPid($childPid);
354
        }
355
    }
356
357
    /**
358
     * waits and connects the process of map container
359
     *
360
     * @param   Snidel\MapContainer
361
     * @return  void
362
     * @throws  \RuntimeException
363
     */
364
    private function waitsAndConnectsProcess(MapContainer $mapContainer)
365
    {
366
        if ($this->joined) {
367
            return;
368
        }
369
370
        while ($mapContainer->isProcessing()) {
371
            $status = null;
372
            $childPid = $this->pcntl->waitpid(-1, $status);
373
            $data = $this->dataRepository->load($childPid);
374
            try {
375
                $result = $data->readAndDelete();
376
            } catch (SharedMemoryControlException $e) {
377
                throw $e;
378
            }
379
380
            if (!$this->pcntl->wifexited($status) || $this->pcntl->wexitstatus($status) !== 0) {
381
                $message = 'an error has occurred in child process. pid: ' . $childPid;
382
                $this->log->error($message);
383
                throw new \RuntimeException($message);
384
            } else {
385
                $this->results[$childPid] = $result['return'];
386
            }
387
            unset($this->childPids[array_search($childPid, $this->childPids)]);
388
            if ($nextMap = $mapContainer->nextMap($childPid)) {
389
                try {
390
                    $nextMapPid = $this->fork(
391
                        $nextMap->getCallable(),
392
                        array($this->results[$childPid]),
393
                        null,
394
                        $nextMap->getToken()
395
                    );
396
                } catch (\RuntimeException $e) {
397
                    throw $e;
398
                }
399
                $message = sprintf('processing is connected from [%d] to [%d]', $childPid, $nextMapPid);
400
                $this->log->info($message);
401
                $nextMap->countTheForked();
402
                $nextMap->addChildPid($nextMapPid);
403
            }
404
            $mapContainer->countTheCompleted($childPid);
405
        }
406
407
        $this->joined = true;
408
    }
409
410
    /**
411
     * gets results of map container
412
     *
413
     * @param   Ackintosh\Snidel\MapContainer
414
     * @return  array
415
     */
416
    private function getResultsOf(MapContainer $mapContainer)
417
    {
418
        $results = array();
419
        foreach ($mapContainer->getLastMapPids() as $pid) {
420
            $results[] = $this->results[$pid];
421
        }
422
423
        return $results;
424
    }
425
426
    private function _exit($status = 0)
427
    {
428
        exit($status);
0 ignored issues
show
Coding Style Compatibility introduced by
The method _exit() contains an exit expression.

An exit expression should only be used in rare cases. For example, if you write a short command line script.

In most cases however, using an exit expression makes the code untestable and often causes incompatibilities with other libraries. Thus, unless you are absolutely sure it is required here, we recommend to refactor your code to avoid its usage.

Loading history...
429
    }
430
431
    /**
432
     * @return void
433
     * @throws Ackintosh\Snidel\Exception\SharedMemoryControlException
434
     */
435
    public function childShutdownFunction()
436
    {
437
        $data = $this->dataRepository->load(getmypid());
438
        try {
439
            $data->write($this->processInformation);
440
        } catch (SharedMemoryControlException $e) {
441
            throw $e;
442
        }
443
        $this->log->info('<-- return token.');
444
        $this->processToken->back();
445
    }
446
447
    public function __destruct()
448
    {
449
        if ($this->exceptionHasOccured) {
450
            $this->log->info('destruct processes are started.(exception has occured)');
451
            $this->log->info('--> deleting all shared memory.');
452
            $this->deleteAllData();
453
        } elseif ($this->ownerPid === getmypid() && !$this->joined && $this->receivedSignal === null) {
454
            $message = 'snidel will have to wait for the child process is completed. please use Snidel::wait()';
455
            $this->log->error($message);
456
            $this->log->info('destruct processes are started.');
457
458
            $this->log->info('--> sending a signal to children.');
459
            $this->sendSignalToChildren(SIGTERM);
460
461
            $this->log->info('--> deleting all shared memory.');
462
            $this->deleteAllData();
463
464
            $this->log->info('--> deleting token.');
465
            unset($this->token);
466
467
            $this->log->info('--> destruct processes are finished successfully.');
468
            throw new \LogicException($message);
469
        }
470
    }
471
}
472