Completed
Push — master ( a15f7e...94610b )
by Akihito
02:20
created

Snidel::getError()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 4
Code Lines 2

Duplication

Lines 0
Ratio 0 %

Importance

Changes 4
Bugs 0 Features 0
Metric Value
c 4
b 0
f 0
dl 0
loc 4
rs 10
cc 1
eloc 2
nc 1
nop 0
1
<?php
2
declare(ticks = 1);
3
4
namespace Ackintosh;
5
6
use Ackintosh\Snidel\ForkContainer;
7
use Ackintosh\Snidel\Result;
8
use Ackintosh\Snidel\Token;
9
use Ackintosh\Snidel\Log;
10
use Ackintosh\Snidel\Pcntl;
11
use Ackintosh\Snidel\DataRepository;
12
use Ackintosh\Snidel\MapContainer;
13
use Ackintosh\Snidel\Task;
14
use Ackintosh\Snidel\Exception\SharedMemoryControlException;
15
16
class Snidel
17
{
18
    /** @var string */
19
    const VERSION = '0.6.0';
20
21
    /** @var \Ackintosh\Snidel\ForkContainer */
22
    private $forkContainer;
23
24
    /** @var \Ackintosh\Snidel\Pcntl */
25
    private $pcntl;
26
27
    /** @var int */
28
    private $concurrency;
29
30
    /** @var \Ackintosh\Snidel\Log */
31
    private $log;
32
33
    /** @var bool */
34
    private $joined = false;
35
36
    /** @var int */
37
    private $ownerPid;
38
39
    /** @var array */
40
    private $signals = array(
41
        SIGTERM,
42
        SIGINT,
43
    );
44
45
    /** @var int */
46
    private $receivedSignal;
47
48
    /** @var \Ackintosh\Snidel\Token */
49
    private $processToken;
0 ignored issues
show
Unused Code introduced by
The property $processToken is not used and could be removed.

This check marks private properties in classes that are never used. Those properties can be removed.

Loading history...
50
51
    /** @var bool */
52
    private $exceptionHasOccured = false;
53
54
    public function __construct($concurrency = 5)
55
    {
56
        $this->ownerPid         = getmypid();
57
        $this->concurrency      = $concurrency;
58
        $this->log              = new Log($this->ownerPid);
59
        $this->pcntl            = new Pcntl();
60
        $this->forkContainer    = new ForkContainer($this->ownerPid, $this->log, $this->concurrency);
61
62
        $log    = $this->log;
63
        $self   = $this;
64
        foreach ($this->signals as $sig) {
65
            $this->pcntl->signal(
66
                $sig,
67
                function ($sig) use($log, $self) {
68
                    $log->info('received signal. signo: ' . $sig);
69
                    $self->setReceivedSignal($sig);
70
71
                    $log->info('--> sending a signal to children.');
72
                    $self->sendSignalToChildren($sig);
73
                    $log->info('<-- signal handling has been completed successfully.');
74
                    exit;
0 ignored issues
show
Coding Style Compatibility introduced by
The method __construct() contains an exit expression.

An exit expression should only be used in rare cases. For example, if you write a short command line script.

In most cases however, using an exit expression makes the code untestable and often causes incompatibilities with other libraries. Thus, unless you are absolutely sure it is required here, we recommend to refactor your code to avoid its usage.

Loading history...
75
                },
76
                false
77
            );
78
        }
79
80
        $this->log->info('parent pid: ' . $this->ownerPid);
81
    }
82
83
    /**
84
     * sets the resource for the log.
85
     *
86
     * @param   resource    $resource
87
     * @return  void
88
     * @codeCoverageIgnore
89
     */
90
    public function setLoggingDestination($resource)
91
    {
92
        $this->log->setDestination($resource);
93
    }
94
95
    /**
96
     * this method uses master / worker model.
97
     *
98
     * @param   callable    $callable
99
     * @param   mixed       $args
100
     * @param   string      $tag
101
     * @return  void
102
     * @throws  \RuntimeException
103
     */
104
    public function fork($callable, $args = array(), $tag = null)
105
    {
106
        if (!$this->forkContainer->existsMaster()) {
107
            $this->forkContainer->forkMaster();
108
        }
109
110
        try {
111
            $this->forkContainer->enqueue(new Task($callable, $args, $tag));
112
        } catch (\RuntimeException $e) {
113
            throw $e;
114
        }
115
116
        $this->log->info('queued task #' . $this->forkContainer->queuedCount());
117
    }
118
119
    /**
120
     * fork process
121
     * the processes which forked are wait for token.
122
     *
123
     * @param   callable                    $callable
124
     * @param   mixed                       $args
125
     * @param   string                      $tag
126
     * @param   \Ackintosh\Snidel\Token     $token
127
     * @return  void
128
     * @throws  \RuntimeException
129
     */
130
    private function forkChild(Token $token, $callable, $args = array(), $tag = null)
131
    {
132
        $task = new Task($callable, $args, $tag);
133
134
        try {
135
            $fork = $this->forkContainer->fork($task);
136
        } catch (\RuntimeException $e) {
137
            $this->log->error($e->getMessage());
138
            throw $e;
139
        }
140
141
        if (getmypid() === $this->ownerPid) {
142
            // parent
143
            $this->log->info('created child process. pid: ' . $fork->getPid());
144
        } else {
145
            // @codeCoverageIgnoreStart
146
            // child
147
            foreach ($this->signals as $sig) {
148
                $this->pcntl->signal($sig, SIG_DFL, true);
149
            }
150
151
            /**
152
             * in php5.3, we can not use $this in anonymous functions
153
             */
154
            $log = $this->log;
155
            register_shutdown_function(function () use ($fork, $log, $token) {
156
                $dataRepository = new DataRepository();
157
                $data = $dataRepository->load(getmypid());
158
                try {
159
                    $data->write($fork);
160
                } catch (SharedMemoryControlException $e) {
161
                    throw $e;
162
                }
163
                $log->info('<-- return token.');
164
                $token->back();
165
            });
166
167
            $log->info('--> waiting for the token come around.');
168
            if ($token->accept()) {
169
                $log->info('----> started the function.');
170
                $fork->executeTask();
171
                $log->info('<---- completed the function.');
172
            }
173
174
            $this->_exit();
175
            // @codeCoverageIgnoreEnd
176
        }
177
178
        return $fork->getPid();
179
    }
180
181
    /**
182
     * waits until all tasks that queued by Snidel::fork() are completed
183
     *
184
     * @return  void
185
     */
186
    public function wait()
187
    {
188
        $this->forkContainer->wait();
189
        $this->joined = true;
190
    }
191
192
    /**
193
     * @return  bool
194
     */
195
    public function hasError()
196
    {
197
        return $this->forkContainer->hasError();
198
    }
199
200
    /**
201
     * @return  \Ackintosh\Snidel\Error
202
     */
203
    public function getError()
204
    {
205
        return $this->forkContainer->getError();
206
    }
207
208
    /**
209
     * gets results
210
     *
211
     * @param   string  $tag
212
     * @return  \Ackintosh\Snidel\ForkCollection
213
     * @throws  \InvalidArgumentException
214
     */
215
    public function get($tag = null)
216
    {
217
        if (!$this->joined) {
218
            $this->wait();
219
        }
220
        if (!$this->forkContainer->hasTag($tag)) {
221
            throw new \InvalidArgumentException('unknown tag: ' . $tag);
222
        }
223
224
        return $this->forkContainer->getCollection($tag);
225
    }
226
227
    /**
228
     * sends signal to child
229
     *
230
     * @param   int     $sig
231
     * @return  void
232
     */
233
    public function sendSignalToChildren($sig)
234
    {
235
        foreach ($this->forkContainer->getChildPids() as $pid) {
236
            $this->log->info('----> sending a signal to child. pid: ' . $pid);
237
            posix_kill($pid, $sig);
238
        }
239
    }
240
241
    public function setReceivedSignal($sig)
242
    {
243
        $this->receivedSignal = $sig;
244
    }
245
246
    /**
247
     * delete shared memory
248
     *
249
     * @return  void
250
     * @throws  \Ackintosh\Snidel\Exception\SharedMemoryControlException
251
     */
252
    private function deleteAllData()
253
    {
254
        $dataRepository = new DataRepository();
255
        try {
256
            $dataRepository->deleteAll();
257
        } catch (SharedMemoryControlException $e) {
258
            throw $e;
259
        }
260
    }
261
262
    /**
263
     * create map object
264
     *
265
     * @param   array       $args
266
     * @param   callable    $callable
267
     * @return  \Ackintosh\Snidel\MapContainer
268
     */
269
    public function map(Array $args, $callable)
270
    {
271
        return new MapContainer($args, $callable, $this->concurrency);
272
    }
273
274
    /**
275
     * run map object
276
     *
277
     * @param   \Ackintosh\Snidel\MapContainer
278
     * @return  array
279
     * @throws  \RuntimeException
280
     */
281
    public function run(MapContainer $mapContainer)
282
    {
283
        $token = new Token($this->ownerPid, $this->concurrency);
284
        try {
285
            $this->forkTheFirstProcessing($mapContainer, $token);
286
            $this->waitsAndConnectsProcess($mapContainer);
287
        } catch (\RuntimeException $e) {
288
            $this->exceptionHasOccured = true;
289
            throw $e;
290
        }
291
292
        return $this->getResultsOf($mapContainer);
293
    }
294
295
    /**
296
     * fork the first processing of the map container
297
     *
298
     * @param   \Ackintosh\Snidel\MapContainer
299
     * @return  void
300
     * @throws  \RuntimeException
301
     */
302
    private function forkTheFirstProcessing(MapContainer $mapContainer, $token)
303
    {
304
        foreach ($mapContainer->getFirstArgs() as $args) {
305
            try {
306
                $childPid = $this->forkChild($token, $mapContainer->getFirstMap()->getCallable(), $args);
307
            } catch (\RuntimeException $e) {
308
                throw $e;
309
            }
310
            $mapContainer->getFirstMap()->countTheForked();
311
            $mapContainer->getFirstMap()->addChildPid($childPid);
312
        }
313
    }
314
315
    /**
316
     * waits and connects the process of map container
317
     *
318
     * @param   \Ackintosh\Snidel\MapContainer
319
     * @return  void
320
     * @throws  \RuntimeException
321
     */
322
    private function waitsAndConnectsProcess(MapContainer $mapContainer)
323
    {
324
        if ($this->joined) {
325
            return;
326
        }
327
328
        while ($mapContainer->isProcessing()) {
329
            try {
330
                $fork = $this->forkContainer->waitForChild();
331
            } catch (SharedMemoryControlException $e) {
332
                throw $e;
333
            }
334
335
            $childPid = $fork->getPid();
336
            if ($fork->hasNotFinishedSuccessfully()) {
337
                $message = 'an error has occurred in child process. pid: ' . $childPid;
338
                $this->log->error($message);
339
                throw new \RuntimeException($message);
340
            }
341
342
            if ($nextMap = $mapContainer->nextMap($childPid)) {
343
                try {
344
                    $nextMapPid = $this->forkChild(
345
                        $nextMap->getToken(),
346
                        $nextMap->getCallable(),
347
                        $fork
348
                    );
349
                } catch (\RuntimeException $e) {
350
                    throw $e;
351
                }
352
                $message = sprintf('processing is connected from [%d] to [%d]', $childPid, $nextMapPid);
353
                $this->log->info($message);
354
                $nextMap->countTheForked();
355
                $nextMap->addChildPid($nextMapPid);
356
            }
357
            $mapContainer->countTheCompleted($childPid);
358
        }
359
360
        $this->joined = true;
361
    }
362
363
    /**
364
     * gets results of map container
365
     *
366
     * @param   \Ackintosh\Snidel\MapContainer
367
     * @return  array
368
     */
369
    private function getResultsOf(MapContainer $mapContainer)
370
    {
371
        $results = array();
372
        foreach ($mapContainer->getLastMapPids() as $pid) {
373
            $results[] = $this->forkContainer->get($pid)->getResult()->getReturn();
374
        }
375
376
        return $results;
377
    }
378
379
    private function _exit($status = 0)
380
    {
381
        exit($status);
0 ignored issues
show
Coding Style Compatibility introduced by
The method _exit() contains an exit expression.

An exit expression should only be used in rare cases. For example, if you write a short command line script.

In most cases however, using an exit expression makes the code untestable and often causes incompatibilities with other libraries. Thus, unless you are absolutely sure it is required here, we recommend to refactor your code to avoid its usage.

Loading history...
382
    }
383
384
    public function __destruct()
385
    {
386
        if ($this->forkContainer->existsMaster() && $this->ownerPid === getmypid()) {
387
            $this->log->info('shutdown master process.');
388
            $this->forkContainer->killMaster();
389
390
            unset($this->forkContainer);
391
        }
392
393
        if ($this->exceptionHasOccured) {
394
            $this->log->info('destruct processes are started.(exception has occured)');
395
            $this->log->info('--> deleting all shared memory.');
396
            $this->deleteAllData();
397
        } elseif ($this->ownerPid === getmypid() && !$this->joined && $this->receivedSignal === null) {
398
            $message = 'snidel will have to wait for the child process is completed. please use Snidel::wait()';
399
            $this->log->error($message);
400
            $this->log->info('destruct processes are started.');
401
402
            $this->log->info('--> sending a signal to children.');
403
            $this->sendSignalToChildren(SIGTERM);
404
405
            $this->log->info('--> deleting all shared memory.');
406
            $this->deleteAllData();
407
408
            $this->log->info('--> deleting token.');
409
            unset($this->token);
410
411
            $this->log->info('--> destruct processes are finished successfully.');
412
            throw new \LogicException($message);
413
        }
414
    }
415
}
416