Completed
Push — master ( dbb88a...a26c6d )
by Akihito
02:15
created

Snidel::__construct()   B

Complexity

Conditions 2
Paths 2

Size

Total Lines 34
Code Lines 24

Duplication

Lines 0
Ratio 0 %

Importance

Changes 3
Bugs 0 Features 0
Metric Value
c 3
b 0
f 0
dl 0
loc 34
rs 8.8571
cc 2
eloc 24
nc 2
nop 1
1
<?php
2
declare(ticks = 1);
3
4
namespace Ackintosh;
5
6
use Ackintosh\Snidel\ForkContainer;
7
use Ackintosh\Snidel\Result;
8
use Ackintosh\Snidel\Token;
9
use Ackintosh\Snidel\Log;
10
use Ackintosh\Snidel\Error;
11
use Ackintosh\Snidel\Pcntl;
12
use Ackintosh\Snidel\DataRepository;
13
use Ackintosh\Snidel\MapContainer;
14
use Ackintosh\Snidel\Exception\SharedMemoryControlException;
15
16
class Snidel
17
{
18
    /** @var string */
19
    const VERSION = '0.4.0';
20
21
    /** @var array */
22
    private $childPids = array();
23
24
    /** @var \Ackintosh\Snidel\ForkContainer */
25
    private $forkContainer;
26
27
    /** @var \Ackintosh\Snidel\Error */
28
    private $error;
29
30
    /** @var \Ackintosh\Snidel\Pcntl */
31
    private $pcntl;
32
33
    /** @var int */
34
    private $concurrency;
35
36
    /** @var \Ackintosh\Snidel\Token */
37
    private $token;
38
39
    /** @var \Ackintosh\Snidel\Log */
40
    private $log;
41
42
    /** @var \Ackintosh\Snidel\DataRepository */
43
    private $dataRepository;
44
45
    /** @var bool */
46
    private $joined = false;
47
48
    /** @var array */
49
    private $results = array();
0 ignored issues
show
Unused Code introduced by
The property $results is not used and could be removed.

This check marks private properties in classes that are never used. Those properties can be removed.

Loading history...
50
51
    /** @var int */
52
    private $ownerPid;
53
54
    /** @var array */
55
    private $signals = array(
56
        SIGTERM,
57
        SIGINT,
58
    );
59
60
    /** @var int */
61
    private $receivedSignal;
62
63
    /** @var \Ackintosh\Snidel\Token */
64
    private $processToken;
65
66
    /** @var bool */
67
    private $exceptionHasOccured = false;
68
69
    public function __construct($concurrency = 5)
70
    {
71
        $this->ownerPid         = getmypid();
72
        $this->childPids        = array();
73
        $this->concurrency      = $concurrency;
74
        $this->token            = new Token(getmypid(), $concurrency);
75
        $this->log              = new Log(getmypid());
76
        $this->error            = new Error();
77
        $this->pcntl            = new Pcntl();
78
        $this->dataRepository   = new DataRepository();
79
        $this->forkContainer    = new ForkContainer();
80
81
        foreach ($this->signals as $sig) {
82
            $this->pcntl->signal(
83
                $sig,
84
                function ($sig) {
85
                    $this->log->info('received signal. signo: ' . $sig);
86
                    $this->receivedSignal = $sig;
87
88
                    $this->log->info('--> sending a signal to children.');
89
                    $this->sendSignalToChildren($sig);
90
91
                    $this->log->info('--> deleting token.');
92
                    unset($this->token);
93
94
                    $this->log->info('<-- signal handling has been completed successfully.');
95
                    $this->_exit();
96
                },
97
                false
98
            );
99
        }
100
101
        $this->log->info('parent pid: ' . $this->ownerPid);
102
    }
103
104
    /**
105
     * sets the resource for the log.
106
     *
107
     * @param   resource    $resource
108
     * @return  void
109
     * @codeCoverageIgnore
110
     */
111
    public function setLoggingDestination($resource)
112
    {
113
        $this->log->setDestination($resource);
114
    }
115
116
    /**
117
     * fork process
118
     *
119
     * @param   callable    $callable
120
     * @param   mixed       $args
121
     * @param   string      $tag
122
     * @return  int         $pid        forked PID of forked child process
123
     * @throws  \RuntimeException
124
     */
125
    public function fork($callable, $args = array(), $tag = null, Token $token = null)
126
    {
127
        $this->processToken = $token ? $token : $this->token;
128
        if (!is_array($args)) {
129
            $args = array($args);
130
        }
131
132
        try {
133
            $fork = $this->forkContainer->fork($tag);
134
        } catch (\RuntimeException $e) {
135
            $this->log->error($e->getMessage());
136
            throw $e;
137
        }
138
139
        $fork->setCallable($callable);
140
        $fork->setArgs($args);
141
142
        if ($pid = $fork->getPid()) {
143
            // parent
144
            $this->log->info('created child process. pid: ' . $pid);
145
            $this->childPids[] = $pid;
146
        } else {
147
            // @codeCoverageIgnoreStart
148
            // child
149
            foreach ($this->signals as $sig) {
150
                $this->pcntl->signal($sig, SIG_DFL, true);
151
            }
152
153
            $result = new Result();
154
            register_shutdown_function(function () use ($result) {
155
                $data = $this->dataRepository->load(getmypid());
156
                try {
157
                    $data->write($result);
158
                } catch (SharedMemoryControlException $e) {
159
                    throw $e;
160
                }
161
                $this->log->info('<-- return token.');
162
                $this->processToken->back();
163
            });
164
165
            $this->log->info('--> waiting for the token come around.');
166
            if ($this->processToken->accept()) {
167
                $this->log->info('----> started the function.');
168
                $result->setReturn(call_user_func_array($callable, $args));
169
                $this->log->info('<---- completed the function.');
170
            }
171
172
            $this->_exit();
173
            // @codeCoverageIgnoreEnd
174
        }
175
176
        return $pid;
177
    }
178
179
    /**
180
     * waits until all children are completed
181
     *
182
     * @return  void
183
     * @throws  \Ackintosh\Snidel\Exception\SharedMemoryControlException
184
     */
185
    public function wait()
186
    {
187
        if ($this->joined) {
188
            return;
189
        }
190
191
        $count = count($this->childPids);
192
        for ($i = 0; $i < $count; $i++) {
193
            try {
194
                $fork = $this->forkContainer->wait();
195
            } catch (SharedMemoryControlException $e) {
196
                $this->exceptionHasOccured = true;
197
                throw $e;
198
            }
199
200
            $childPid   = $fork->getPid();
201
            $result     = $fork->getResult();
202
            if (!$fork->isSuccessful()) {
203
                $message = 'an error has occurred in child process. pid: ' . $childPid;
204
                $this->log->error($message);
205
                $this->error[$childPid] = array(
206
                    'status'    => $fork->getStatus(),
207
                    'message'   => $message,
208
                    'callable'  => $fork->getCallable(),
209
                    'args'      => $fork->getArgs(),
210
                    'return'    => $result->getReturn(),
211
                );
212
            }
213
            unset($this->childPids[array_search($childPid, $this->childPids)]);
214
        }
215
        $this->joined = true;
216
    }
217
218
    /**
219
     * @return  bool
220
     */
221
    public function hasError()
222
    {
223
        return $this->error->exists();
224
    }
225
226
    /**
227
     * @return  \Ackintosh\Snidel\Error
228
     */
229
    public function getError()
230
    {
231
        return $this->error;
232
    }
233
234
    /**
235
     * gets results
236
     *
237
     * @param   string  $tag
238
     * @return  array   $ret
239
     * @throws  \InvalidArgumentException
240
     */
241
    public function get($tag = null)
242
    {
243
        if (!$this->joined) {
244
            $this->wait();
245
        }
246
247
        if ($tag === null) {
248
            return array_map(
249
                function ($fork) {
250
                    return $fork->getResult()->getReturn();
251
                },
252
                $this->forkContainer->get()
253
            );
254
        } else {
255
            try {
256
                return $this->getWithTag($tag);
257
            } catch (\InvalidArgumentException $e) {
258
                throw $e;
259
            }
260
        }
261
    }
262
263
    /**
264
     * gets results with tag
265
     *
266
     * @param   string  $tag
267
     * @return  array   $results
268
     * @throws  \InvalidArgumentException
269
     */
270
    private function getWithTag($tag)
271
    {
272
        if (!$this->forkContainer->hasTag($tag)) {
273
            throw new \InvalidArgumentException('unknown tag: ' . $tag);
274
        }
275
276
        return array_map(
277
            function ($fork) {
278
                return $fork->getResult()->getReturn();
279
            },
280
            $this->forkContainer->get($tag)
281
        );
282
    }
283
284
    /**
285
     * sends signal to child
286
     *
287
     * @param   int     $sig
288
     * @return  void
289
     */
290
    private function sendSignalToChildren($sig)
291
    {
292
        foreach ($this->childPids as $pid) {
293
            $this->log->info('----> sending a signal to child. pid: ' . $pid);
294
            posix_kill($pid, $sig);
295
        }
296
    }
297
298
    /**
299
     * delete shared memory
300
     *
301
     * @return  void
302
     * @throws  \Ackintosh\Snidel\Exception\SharedMemoryControlException
303
     */
304
    private function deleteAllData()
305
    {
306
        foreach ($this->childPids as $pid) {
307
            $data = $this->dataRepository->load($pid);
308
            try {
309
                $data->deleteIfExists();
310
            } catch (SharedMemoryControlException $e) {
311
                throw $e;
312
            }
313
        }
314
    }
315
316
    /**
317
     * create map object
318
     *
319
     * @param   array       $args
320
     * @param   callable    $callable
321
     * @return  \Ackintosh\Snidel\MapContainer
322
     */
323
    public function map(Array $args, $callable)
324
    {
325
        return new MapContainer($args, $callable, $this->concurrency);
326
    }
327
328
    /**
329
     * run map object
330
     *
331
     * @param   \Ackintosh\Snidel\MapContainer
332
     * @return  array
333
     * @throws  \RuntimeException
334
     */
335
    public function run(MapContainer $mapContainer)
336
    {
337
        try {
338
            $this->forkTheFirstProcessing($mapContainer);
339
            $this->waitsAndConnectsProcess($mapContainer);
340
        } catch (\RuntimeException $e) {
341
            $this->exceptionHasOccured = true;
342
            throw $e;
343
        }
344
345
        return $this->getResultsOf($mapContainer);
346
    }
347
348
    /**
349
     * fork the first processing of the map container
350
     *
351
     * @param   \Ackintosh\Snidel\MapContainer
352
     * @return  void
353
     * @throws  \RuntimeException
354
     */
355
    private function forkTheFirstProcessing(MapContainer $mapContainer)
356
    {
357
        foreach ($mapContainer->getFirstArgs() as $args) {
358
            try {
359
                $childPid = $this->fork($mapContainer->getFirstMap()->getCallable(), $args);
360
            } catch (\RuntimeException $e) {
361
                throw $e;
362
            }
363
            $mapContainer->getFirstMap()->countTheForked();
364
            $mapContainer->getFirstMap()->addChildPid($childPid);
365
        }
366
    }
367
368
    /**
369
     * waits and connects the process of map container
370
     *
371
     * @param   \Ackintosh\Snidel\MapContainer
372
     * @return  void
373
     * @throws  \RuntimeException
374
     */
375
    private function waitsAndConnectsProcess(MapContainer $mapContainer)
376
    {
377
        if ($this->joined) {
378
            return;
379
        }
380
381
        while ($mapContainer->isProcessing()) {
382
            try {
383
                $fork = $this->forkContainer->wait();
384
            } catch (SharedMemoryControlException $e) {
385
                throw $e;
386
            }
387
388
            $childPid = $fork->getPid();
389
            if (!$fork->isSuccessful()) {
390
                $message = 'an error has occurred in child process. pid: ' . $childPid;
391
                $this->log->error($message);
392
                throw new \RuntimeException($message);
393
            }
394
395
            unset($this->childPids[array_search($childPid, $this->childPids)]);
396
            if ($nextMap = $mapContainer->nextMap($childPid)) {
397
                try {
398
                    $nextMapPid = $this->fork(
399
                        $nextMap->getCallable(),
400
                        $fork,
401
                        null,
402
                        $nextMap->getToken()
403
                    );
404
                } catch (\RuntimeException $e) {
405
                    throw $e;
406
                }
407
                $message = sprintf('processing is connected from [%d] to [%d]', $childPid, $nextMapPid);
408
                $this->log->info($message);
409
                $nextMap->countTheForked();
410
                $nextMap->addChildPid($nextMapPid);
411
            }
412
            $mapContainer->countTheCompleted($childPid);
413
        }
414
415
        $this->joined = true;
416
    }
417
418
    /**
419
     * gets results of map container
420
     *
421
     * @param   \Ackintosh\Snidel\MapContainer
422
     * @return  array
423
     */
424
    private function getResultsOf(MapContainer $mapContainer)
425
    {
426
        $results = array();
427
        foreach ($mapContainer->getLastMapPids() as $pid) {
428
            $results[] = $this->forkContainer[$pid]->getResult()->getReturn();
429
        }
430
431
        return $results;
432
    }
433
434
    private function _exit($status = 0)
435
    {
436
        exit($status);
0 ignored issues
show
Coding Style Compatibility introduced by
The method _exit() contains an exit expression.

An exit expression should only be used in rare cases. For example, if you write a short command line script.

In most cases however, using an exit expression makes the code untestable and often causes incompatibilities with other libraries. Thus, unless you are absolutely sure it is required here, we recommend to refactor your code to avoid its usage.

Loading history...
437
    }
438
439
    public function __destruct()
440
    {
441
        if ($this->exceptionHasOccured) {
442
            $this->log->info('destruct processes are started.(exception has occured)');
443
            $this->log->info('--> deleting all shared memory.');
444
            $this->deleteAllData();
445
        } elseif ($this->ownerPid === getmypid() && !$this->joined && $this->receivedSignal === null) {
446
            $message = 'snidel will have to wait for the child process is completed. please use Snidel::wait()';
447
            $this->log->error($message);
448
            $this->log->info('destruct processes are started.');
449
450
            $this->log->info('--> sending a signal to children.');
451
            $this->sendSignalToChildren(SIGTERM);
452
453
            $this->log->info('--> deleting all shared memory.');
454
            $this->deleteAllData();
455
456
            $this->log->info('--> deleting token.');
457
            unset($this->token);
458
459
            $this->log->info('--> destruct processes are finished successfully.');
460
            throw new \LogicException($message);
461
        }
462
    }
463
}
464