Completed
Push — master ( 215ee6...5002cd )
by Akihito
02:09
created

Snidel::hasError()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 4
Code Lines 2

Duplication

Lines 0
Ratio 0 %
Metric Value
dl 0
loc 4
rs 10
cc 1
eloc 2
nc 1
nop 0
1
<?php
2
declare(ticks = 1);
3
4
namespace Ackintosh;
5
6
use Ackintosh\Snidel\ForkContainer;
7
use Ackintosh\Snidel\Result;
8
use Ackintosh\Snidel\Token;
9
use Ackintosh\Snidel\Log;
10
use Ackintosh\Snidel\Error;
11
use Ackintosh\Snidel\Pcntl;
12
use Ackintosh\Snidel\DataRepository;
13
use Ackintosh\Snidel\MapContainer;
14
use Ackintosh\Snidel\Exception\SharedMemoryControlException;
15
16
class Snidel
17
{
18
    /** @var string */
19
    const VERSION = '0.4.0';
20
21
    /** @var array */
22
    private $childPids = array();
23
24
    /** @var \Ackintosh\Snidel\ForkContainer */
25
    private $forkContainer;
26
27
    /** @var \Ackintosh\Snidel\Error */
28
    private $error;
29
30
    /** @var \Ackintosh\Snidel\Pcntl */
31
    private $pcntl;
32
33
    /** @var int */
34
    private $concurrency;
35
36
    /** @var \Ackintosh\Snidel\Token */
37
    private $token;
38
39
    /** @var \Ackintosh\Snidel\Log */
40
    private $log;
41
42
    /** @var \Ackintosh\Snidel\DataRepository */
43
    private $dataRepository;
44
45
    /** @var bool */
46
    private $joined = false;
47
48
    /** @var array */
49
    private $results = array();
0 ignored issues
show
Unused Code introduced by
The property $results is not used and could be removed.

This check marks private properties in classes that are never used. Those properties can be removed.

Loading history...
50
51
    /** @var int */
52
    private $ownerPid;
53
54
    /** @var array */
55
    private $signals = array(
56
        SIGTERM,
57
        SIGINT,
58
    );
59
60
    /** @var int */
61
    private $receivedSignal;
62
63
    /** @var \Ackintosh\Snidel\Token */
64
    private $processToken;
65
66
    /** @var bool */
67
    private $exceptionHasOccured = false;
68
69
    public function __construct($concurrency = 5)
70
    {
71
        $this->ownerPid         = getmypid();
72
        $this->childPids        = array();
73
        $this->concurrency      = $concurrency;
74
        $this->token            = new Token(getmypid(), $concurrency);
75
        $this->log              = new Log(getmypid());
76
        $this->error            = new Error();
77
        $this->pcntl            = new Pcntl();
78
        $this->dataRepository   = new DataRepository();
79
        $this->forkContainer    = new ForkContainer();
80
81
        foreach ($this->signals as $sig) {
82
            $this->pcntl->signal(
83
                $sig,
84
                function ($sig) {
85
                    $this->log->info('received signal. signo: ' . $sig);
86
                    $this->receivedSignal = $sig;
87
88
                    $this->log->info('--> sending a signal to children.');
89
                    $this->sendSignalToChildren($sig);
90
91
                    $this->log->info('--> deleting token.');
92
                    unset($this->token);
93
94
                    $this->log->info('<-- signal handling has been completed successfully.');
95
                    $this->_exit();
96
                },
97
                false
98
            );
99
        }
100
101
        $this->log->info('parent pid: ' . $this->ownerPid);
102
    }
103
104
    /**
105
     * sets the resource for the log.
106
     *
107
     * @param   resource    $resource
108
     * @return  void
109
     * @codeCoverageIgnore
110
     */
111
    public function setLoggingDestination($resource)
112
    {
113
        $this->log->setDestination($resource);
114
    }
115
116
    /**
117
     * fork process
118
     *
119
     * @param   callable    $callable
120
     * @param   mixed       $args
121
     * @param   string      $tag
122
     * @return  int         $pid        forked PID of forked child process
123
     * @throws  \RuntimeException
124
     */
125
    public function fork($callable, $args = array(), $tag = null, Token $token = null)
126
    {
127
        $this->processToken = $token ? $token : $this->token;
128
        if (!is_array($args)) {
129
            $args = array($args);
130
        }
131
132
        try {
133
            $fork = $this->forkContainer->fork($tag);
134
        } catch (\RuntimeException $e) {
135
            $this->log->error($e->getMessage());
136
            throw $e;
137
        }
138
139
        $fork->setCallable($callable);
140
        $fork->setArgs($args);
141
142
        if ($pid = $fork->getPid()) {
143
            // parent
144
            $this->log->info('created child process. pid: ' . $pid);
145
            $this->childPids[] = $pid;
146
        } else {
147
            // @codeCoverageIgnoreStart
148
            // child
149
            foreach ($this->signals as $sig) {
150
                $this->pcntl->signal($sig, SIG_DFL, true);
151
            }
152
153
            $result = new Result();
154
            register_shutdown_function(function () use ($result) {
155
                $data = $this->dataRepository->load(getmypid());
156
                try {
157
                    $data->write($result);
158
                } catch (SharedMemoryControlException $e) {
159
                    throw $e;
160
                }
161
                $this->log->info('<-- return token.');
162
                $this->processToken->back();
163
            });
164
165
            $this->log->info('--> waiting for the token come around.');
166
            if ($this->processToken->accept()) {
167
                $this->log->info('----> started the function.');
168
                $result->setReturn(call_user_func_array($callable, $args));
169
                $this->log->info('<---- completed the function.');
170
            }
171
172
            $this->_exit();
173
            // @codeCoverageIgnoreEnd
174
        }
175
176
        return $pid;
177
    }
178
179
    /**
180
     * waits until all children are completed
181
     *
182
     * @return  void
183
     * @throws  \Ackintosh\Snidel\Exception\SharedMemoryControlException
184
     */
185
    public function wait()
186
    {
187
        if ($this->joined) {
188
            return;
189
        }
190
191
        $count = count($this->childPids);
192
        for ($i = 0; $i < $count; $i++) {
193
            try {
194
                $fork = $this->forkContainer->wait();
195
            } catch (SharedMemoryControlException $e) {
196
                $this->exceptionHasOccured = true;
197
                throw $e;
198
            }
199
200
            $childPid   = $fork->getPid();
201
            $result     = $fork->getResult();
202
            if (!$fork->isSuccessful()) {
203
                $message = 'an error has occurred in child process. pid: ' . $childPid;
204
                $this->log->error($message);
205
                $this->error[$childPid] = array(
206
                    'status'    => $fork->getStatus(),
207
                    'message'   => $message,
208
                    'callable'  => $fork->getCallable(),
209
                    'args'      => $fork->getArgs(),
210
                    'return'    => $result->getReturn(),
211
                );
212
            }
213
            unset($this->childPids[array_search($childPid, $this->childPids)]);
214
        }
215
        $this->joined = true;
216
    }
217
218
    /**
219
     * @return  bool
220
     */
221
    public function hasError()
222
    {
223
        return $this->error->exists();
224
    }
225
226
    /**
227
     * @return  \Ackintosh\Snidel\Error
228
     */
229
    public function getError()
230
    {
231
        return $this->error;
232
    }
233
234
    /**
235
     * gets results
236
     *
237
     * @param   string  $tag
238
     * @return  array   $ret
239
     * @throws  \InvalidArgumentException
240
     */
241
    public function get($tag = null)
242
    {
243
        if (!$this->joined) {
244
            $this->wait();
245
        }
246
247
        if ($tag === null) {
248
            return $this->forkContainer->getCollection();
249
        }
250
251
        if (!$this->forkContainer->hasTag($tag)) {
252
            throw new \InvalidArgumentException('unknown tag: ' . $tag);
253
        }
254
255
        return $this->forkContainer->getCollection($tag);
256
    }
257
258
    /**
259
     * sends signal to child
260
     *
261
     * @param   int     $sig
262
     * @return  void
263
     */
264
    private function sendSignalToChildren($sig)
265
    {
266
        foreach ($this->childPids as $pid) {
267
            $this->log->info('----> sending a signal to child. pid: ' . $pid);
268
            posix_kill($pid, $sig);
269
        }
270
    }
271
272
    /**
273
     * delete shared memory
274
     *
275
     * @return  void
276
     * @throws  \Ackintosh\Snidel\Exception\SharedMemoryControlException
277
     */
278
    private function deleteAllData()
279
    {
280
        foreach ($this->childPids as $pid) {
281
            $data = $this->dataRepository->load($pid);
282
            try {
283
                $data->deleteIfExists();
284
            } catch (SharedMemoryControlException $e) {
285
                throw $e;
286
            }
287
        }
288
    }
289
290
    /**
291
     * create map object
292
     *
293
     * @param   array       $args
294
     * @param   callable    $callable
295
     * @return  \Ackintosh\Snidel\MapContainer
296
     */
297
    public function map(Array $args, $callable)
298
    {
299
        return new MapContainer($args, $callable, $this->concurrency);
300
    }
301
302
    /**
303
     * run map object
304
     *
305
     * @param   \Ackintosh\Snidel\MapContainer
306
     * @return  array
307
     * @throws  \RuntimeException
308
     */
309
    public function run(MapContainer $mapContainer)
310
    {
311
        try {
312
            $this->forkTheFirstProcessing($mapContainer);
313
            $this->waitsAndConnectsProcess($mapContainer);
314
        } catch (\RuntimeException $e) {
315
            $this->exceptionHasOccured = true;
316
            throw $e;
317
        }
318
319
        return $this->getResultsOf($mapContainer);
320
    }
321
322
    /**
323
     * fork the first processing of the map container
324
     *
325
     * @param   \Ackintosh\Snidel\MapContainer
326
     * @return  void
327
     * @throws  \RuntimeException
328
     */
329
    private function forkTheFirstProcessing(MapContainer $mapContainer)
330
    {
331
        foreach ($mapContainer->getFirstArgs() as $args) {
332
            try {
333
                $childPid = $this->fork($mapContainer->getFirstMap()->getCallable(), $args);
334
            } catch (\RuntimeException $e) {
335
                throw $e;
336
            }
337
            $mapContainer->getFirstMap()->countTheForked();
338
            $mapContainer->getFirstMap()->addChildPid($childPid);
339
        }
340
    }
341
342
    /**
343
     * waits and connects the process of map container
344
     *
345
     * @param   \Ackintosh\Snidel\MapContainer
346
     * @return  void
347
     * @throws  \RuntimeException
348
     */
349
    private function waitsAndConnectsProcess(MapContainer $mapContainer)
350
    {
351
        if ($this->joined) {
352
            return;
353
        }
354
355
        while ($mapContainer->isProcessing()) {
356
            try {
357
                $fork = $this->forkContainer->wait();
358
            } catch (SharedMemoryControlException $e) {
359
                throw $e;
360
            }
361
362
            $childPid = $fork->getPid();
363
            if (!$fork->isSuccessful()) {
364
                $message = 'an error has occurred in child process. pid: ' . $childPid;
365
                $this->log->error($message);
366
                throw new \RuntimeException($message);
367
            }
368
369
            unset($this->childPids[array_search($childPid, $this->childPids)]);
370
            if ($nextMap = $mapContainer->nextMap($childPid)) {
371
                try {
372
                    $nextMapPid = $this->fork(
373
                        $nextMap->getCallable(),
374
                        $fork,
375
                        null,
376
                        $nextMap->getToken()
377
                    );
378
                } catch (\RuntimeException $e) {
379
                    throw $e;
380
                }
381
                $message = sprintf('processing is connected from [%d] to [%d]', $childPid, $nextMapPid);
382
                $this->log->info($message);
383
                $nextMap->countTheForked();
384
                $nextMap->addChildPid($nextMapPid);
385
            }
386
            $mapContainer->countTheCompleted($childPid);
387
        }
388
389
        $this->joined = true;
390
    }
391
392
    /**
393
     * gets results of map container
394
     *
395
     * @param   \Ackintosh\Snidel\MapContainer
396
     * @return  array
397
     */
398
    private function getResultsOf(MapContainer $mapContainer)
399
    {
400
        $results = array();
401
        foreach ($mapContainer->getLastMapPids() as $pid) {
402
            $results[] = $this->forkContainer[$pid]->getResult()->getReturn();
403
        }
404
405
        return $results;
406
    }
407
408
    private function _exit($status = 0)
409
    {
410
        exit($status);
0 ignored issues
show
Coding Style Compatibility introduced by
The method _exit() contains an exit expression.

An exit expression should only be used in rare cases. For example, if you write a short command line script.

In most cases however, using an exit expression makes the code untestable and often causes incompatibilities with other libraries. Thus, unless you are absolutely sure it is required here, we recommend to refactor your code to avoid its usage.

Loading history...
411
    }
412
413
    public function __destruct()
414
    {
415
        if ($this->exceptionHasOccured) {
416
            $this->log->info('destruct processes are started.(exception has occured)');
417
            $this->log->info('--> deleting all shared memory.');
418
            $this->deleteAllData();
419
        } elseif ($this->ownerPid === getmypid() && !$this->joined && $this->receivedSignal === null) {
420
            $message = 'snidel will have to wait for the child process is completed. please use Snidel::wait()';
421
            $this->log->error($message);
422
            $this->log->info('destruct processes are started.');
423
424
            $this->log->info('--> sending a signal to children.');
425
            $this->sendSignalToChildren(SIGTERM);
426
427
            $this->log->info('--> deleting all shared memory.');
428
            $this->deleteAllData();
429
430
            $this->log->info('--> deleting token.');
431
            unset($this->token);
432
433
            $this->log->info('--> destruct processes are finished successfully.');
434
            throw new \LogicException($message);
435
        }
436
    }
437
}
438