Completed
Push — master ( 68dfed...e5b054 )
by Akihito
02:03
created

Snidel::prefork()   B

Complexity

Conditions 7
Paths 8

Size

Total Lines 55
Code Lines 33

Duplication

Lines 0
Ratio 0 %
Metric Value
dl 0
loc 55
rs 7.8235
cc 7
eloc 33
nc 8
nop 4

How to fix   Long Method   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

1
<?php
2
declare(ticks = 1);
3
4
namespace Ackintosh;
5
6
use Ackintosh\Snidel\ForkContainer;
7
use Ackintosh\Snidel\Result;
8
use Ackintosh\Snidel\Token;
9
use Ackintosh\Snidel\Log;
10
use Ackintosh\Snidel\Pcntl;
11
use Ackintosh\Snidel\DataRepository;
12
use Ackintosh\Snidel\MapContainer;
13
use Ackintosh\Snidel\Task;
14
use Ackintosh\Snidel\TaskQueue;
15
use Ackintosh\Snidel\ResultQueue;
16
use Ackintosh\Snidel\Exception\SharedMemoryControlException;
17
18
class Snidel
19
{
20
    /** @var string */
21
    const VERSION = '0.6.0';
22
23
    private $masterProcessId = null;
24
25
    /** @var array */
26
    private $childPids = array();
27
28
    /** @var \Ackintosh\Snidel\ForkContainer */
29
    private $forkContainer;
30
31
    /** @var \Ackintosh\Snidel\Pcntl */
32
    private $pcntl;
33
34
    /** @var int */
35
    private $concurrency;
36
37
    /** @var \Ackintosh\Snidel\Token */
38
    private $token;
39
40
    /** @var \Ackintosh\Snidel\Log */
41
    private $log;
42
43
    /** @var \Ackintosh\Snidel\DataRepository */
44
    private $dataRepository;
45
46
    /** @var bool */
47
    private $joined = false;
48
49
    /** @var int */
50
    private $ownerPid;
51
52
    /** @var array */
53
    private $signals = array(
54
        SIGTERM,
55
        SIGINT,
56
    );
57
58
    /** @var int */
59
    private $receivedSignal;
60
61
    /** @var \Ackintosh\Snidel\Token */
62
    private $processToken;
63
64
    /** @var bool */
65
    private $exceptionHasOccured = false;
66
67
    public function __construct($concurrency = 5)
68
    {
69
        $this->ownerPid         = getmypid();
70
        $this->childPids        = array();
71
        $this->concurrency      = $concurrency;
72
        $this->token            = new Token(getmypid(), $concurrency);
73
        $this->log              = new Log(getmypid());
74
        $this->pcntl            = new Pcntl();
75
        $this->dataRepository   = new DataRepository();
76
        $this->forkContainer    = new ForkContainer($this->ownerPid);
77
78
        $log    = $this->log;
79
        $token  = $this->token;
80
        $self   = $this;
81
        foreach ($this->signals as $sig) {
82
            $this->pcntl->signal(
83
                $sig,
84
                function ($sig) use($log, $token, $self) {
85
                    $log->info('received signal. signo: ' . $sig);
86
                    $self->setReceivedSignal($sig);
87
88
                    $log->info('--> sending a signal to children.');
89
                    $self->sendSignalToChildren($sig);
90
91
                    $log->info('--> deleting token.');
92
                    unset($token);
93
94
                    $log->info('<-- signal handling has been completed successfully.');
95
                    exit;
0 ignored issues
show
Coding Style Compatibility introduced by
The method __construct() contains an exit expression.

An exit expression should only be used in rare cases. For example, if you write a short command line script.

In most cases however, using an exit expression makes the code untestable and often causes incompatibilities with other libraries. Thus, unless you are absolutely sure it is required here, we recommend to refactor your code to avoid its usage.

Loading history...
96
                },
97
                false
98
            );
99
        }
100
101
        $this->log->info('parent pid: ' . $this->ownerPid);
102
    }
103
104
    /**
105
     * sets the resource for the log.
106
     *
107
     * @param   resource    $resource
108
     * @return  void
109
     * @codeCoverageIgnore
110
     */
111
    public function setLoggingDestination($resource)
112
    {
113
        $this->log->setDestination($resource);
114
    }
115
116
    /**
117
     * this method uses master / worker model.
118
     *
119
     * @param   callable    $callable
120
     * @param   mixed       $args
121
     * @param   string      $tag
122
     * @return  void
123
     * @throws  \RuntimeException
124
     */
125
    public function fork($callable, $args = array(), $tag = null)
126
    {
127
        if ($this->masterProcessId === null) {
128
            $this->forkMaster();
129
        }
130
131
        try {
132
            $this->forkContainer->enqueue(new Task($callable, $args, $tag));
133
        } catch (\RuntimeException $e) {
134
            throw $e;
135
        }
136
137
        $this->log->info('queued task #' . $this->forkContainer->queuedCount());
138
    }
139
140
    /**
141
     * fork master process
142
     *
143
     * @return  void
144
     */
145
    private function forkMaster()
146
    {
147
        $pid = $this->pcntl->fork();
148
        $this->masterProcessId = ($pid === 0) ? getmypid() : $pid;
149
        $this->log->setMasterProcessId($this->masterProcessId);
150
151
        if ($pid) {
152
            // owner
153
            $this->log->info('pid: ' . getmypid());
154
        } elseif ($pid === -1) {
0 ignored issues
show
Unused Code introduced by
This elseif statement is empty, and could be removed.

This check looks for the bodies of elseif statements that have no statements or where all statements have been commented out. This may be the result of changes for debugging or the code may simply be obsolete.

These elseif bodies can be removed. If you have an empty elseif but statements in the else branch, consider inverting the condition.

Loading history...
155
            // error
156
        } else {
157
            // master
158
            $taskQueue = new TaskQueue($this->ownerPid);
159
            $this->log->info('pid: ' . $this->masterProcessId);
160
161
            foreach ($this->signals as $sig) {
162
                $this->pcntl->signal($sig, SIG_DFL, true);
163
            }
164
165
            while ($task = $taskQueue->dequeue()) {
166
                $this->log->info('dequeued task #' . $taskQueue->dequeuedCount());
167
                if ($this->token->accept()) {
168
                    $this->forkWorker($task);
169
                }
170
            }
171
            $this->_exit();
172
        }
173
    }
174
175
    /**
176
     * fork worker process
177
     *
178
     * @param   \Ackintosh\Snidel\Task
179
     * @return  void
180
     * @throws  \RuntimeException
181
     */
182
    private function forkWorker($task)
183
    {
184
        try {
185
            $fork = $this->forkContainer->fork($task);
186
        } catch (\RuntimeException $e) {
187
            $this->log->error($e->getMessage());
188
            throw $e;
189
        }
190
191
        $fork->setTask($task);
192
193
        if (getmypid() === $this->masterProcessId) {
194
            // master
195
            $this->log->info('forked worker. pid: ' . $fork->getPid());
196
        } else {
197
            // worker
198
            $this->log->info('has forked. pid: ' . getmypid());
199
            // @codeCoverageIgnoreStart
200
201
            foreach ($this->signals as $sig) {
202
                $this->pcntl->signal($sig, SIG_DFL, true);
203
            }
204
205
            $resultQueue = new ResultQueue($this->ownerPid);
206
            register_shutdown_function(function () use ($fork, $resultQueue) {
207
                if ($fork->hasNoResult() || !$fork->isQueued()) {
208
                    $result = new Result();
209
                    $result->setFailure();
210
                    $fork->setResult($result);
211
                    $resultQueue->enqueue($fork);
212
                }
213
            });
214
215
            $this->log->info('----> started the function.');
216
            $fork->executeTask();
217
            $this->log->info('<---- completed the function.');
218
219
            $resultQueue->enqueue($fork);
220
            $fork->setQueued();
221
            $this->log->info('queued the result.');
222
223
            $this->token->back();
224
            $this->log->info('return the token and exit.');
225
            $this->_exit();
226
            // @codeCoverageIgnoreEnd
227
        }
228
    }
229
230
    /**
231
     * fork process
232
     * the processes which forked are wait for token.
233
     *
234
     * @param   callable                    $callable
235
     * @param   mixed                       $args
236
     * @param   string                      $tag
237
     * @param   \Ackintosh\Snidel\Token     $token
238
     * @return  void
239
     * @throws  \RuntimeException
240
     */
241
    private function prefork($callable, $args = array(), $tag = null, Token $token = null)
242
    {
243
        $this->processToken = $token ? $token : $this->token;
244
        $task = new Task($callable, $args, $tag);
245
246
        try {
247
            $fork = $this->forkContainer->fork($task);
248
        } catch (\RuntimeException $e) {
249
            $this->log->error($e->getMessage());
250
            throw $e;
251
        }
252
253
        $fork->setTask($task);
254
255
        if (getmypid() === $this->ownerPid) {
256
            // parent
257
            $this->log->info('created child process. pid: ' . $fork->getPid());
258
            $this->childPids[] = $fork->getPid();
259
        } else {
260
            // @codeCoverageIgnoreStart
261
            // child
262
            foreach ($this->signals as $sig) {
263
                $this->pcntl->signal($sig, SIG_DFL, true);
264
            }
265
266
            /**
267
             * in php5.3, we can not use $this in anonymous functions
268
             */
269
            $dataRepository     = $this->dataRepository;
270
            $log                = $this->log;
271
            $processToken       = $this->processToken;
272
            register_shutdown_function(function () use ($fork, $dataRepository, $log, $processToken) {
273
                $data = $dataRepository->load(getmypid());
274
                try {
275
                    $data->write($fork);
276
                } catch (SharedMemoryControlException $e) {
277
                    throw $e;
278
                }
279
                $log->info('<-- return token.');
280
                $processToken->back();
281
            });
282
283
            $log->info('--> waiting for the token come around.');
284
            if ($processToken->accept()) {
285
                $log->info('----> started the function.');
286
                $fork->executeTask();
287
                $log->info('<---- completed the function.');
288
            }
289
290
            $this->_exit();
291
            // @codeCoverageIgnoreEnd
292
        }
293
294
        return $fork->getPid();
295
    }
296
297
    /**
298
     * waits until all children that has forked by Snidel::prefork() are completed
299
     *
300
     * @return  void
301
     * @throws  \Ackintosh\Snidel\Exception\SharedMemoryControlException
302
     */
303
    public function waitSimply()
304
    {
305
        if ($this->joined) {
306
            return;
307
        }
308
309
        $count = count($this->childPids);
310
        for ($i = 0; $i < $count; $i++) {
311
            try {
312
                $fork = $this->forkContainer->waitSimply();
313
            } catch (SharedMemoryControlException $e) {
314
                $this->exceptionHasOccured = true;
315
                throw $e;
316
            }
317
318
            if ($fork->hasNotFinishedSuccessfully()) {
319
                $message = 'an error has occurred in child process. pid: ' . $fork->getPid();
320
                $this->log->error($message);
321
            }
322
            unset($this->childPids[array_search($fork->getPid(), $this->childPids)]);
323
        }
324
        $this->joined = true;
325
    }
326
327
    /**
328
     * waits until all tasks that queued by Snidel::fork() are completed
329
     *
330
     * @return  void
331
     */
332
    public function wait()
333
    {
334
        $this->forkContainer->wait();
335
        $this->joined = true;
336
    }
337
338
    /**
339
     * @return  bool
340
     */
341
    public function hasError()
342
    {
343
        return $this->forkContainer->hasError();
344
    }
345
346
    /**
347
     * @return  \Ackintosh\Snidel\Error
348
     */
349
    public function getError()
350
    {
351
        return $this->forkContainer->getError();
352
    }
353
354
    /**
355
     * gets results
356
     *
357
     * @param   string  $tag
358
     * @return  \Ackintosh\Snidel\ForkCollection
359
     * @throws  \InvalidArgumentException
360
     */
361
    public function get($tag = null)
362
    {
363
        if (!$this->joined) {
364
            $this->wait();
365
        }
366
367
        if (!$this->forkContainer->hasTag($tag)) {
0 ignored issues
show
Bug Best Practice introduced by
The expression $this->forkContainer->hasTag($tag) of type boolean|null is loosely compared to false; this is ambiguous if the boolean can be false. You might want to explicitly use !== null instead.

If an expression can have both false, and null as possible values. It is generally a good practice to always use strict comparison to clearly distinguish between those two values.

$a = canBeFalseAndNull();

// Instead of
if ( ! $a) { }

// Better use one of the explicit versions:
if ($a !== null) { }
if ($a !== false) { }
if ($a !== null && $a !== false) { }
Loading history...
368
            throw new \InvalidArgumentException('unknown tag: ' . $tag);
369
        }
370
371
        return $this->forkContainer->getCollection($tag);
372
    }
373
374
    /**
375
     * sends signal to child
376
     *
377
     * @param   int     $sig
378
     * @return  void
379
     */
380
    public function sendSignalToChildren($sig)
381
    {
382
        foreach ($this->childPids as $pid) {
383
            $this->log->info('----> sending a signal to child. pid: ' . $pid);
384
            posix_kill($pid, $sig);
385
        }
386
    }
387
388
    public function setReceivedSignal($sig)
389
    {
390
        $this->receivedSignal = $sig;
391
    }
392
393
    /**
394
     * delete shared memory
395
     *
396
     * @return  void
397
     * @throws  \Ackintosh\Snidel\Exception\SharedMemoryControlException
398
     */
399
    private function deleteAllData()
400
    {
401
        foreach ($this->childPids as $pid) {
402
            $data = $this->dataRepository->load($pid);
403
            try {
404
                $data->deleteIfExists();
405
            } catch (SharedMemoryControlException $e) {
406
                throw $e;
407
            }
408
        }
409
    }
410
411
    /**
412
     * create map object
413
     *
414
     * @param   array       $args
415
     * @param   callable    $callable
416
     * @return  \Ackintosh\Snidel\MapContainer
417
     */
418
    public function map(Array $args, $callable)
419
    {
420
        return new MapContainer($args, $callable, $this->concurrency);
421
    }
422
423
    /**
424
     * run map object
425
     *
426
     * @param   \Ackintosh\Snidel\MapContainer
427
     * @return  array
428
     * @throws  \RuntimeException
429
     */
430
    public function run(MapContainer $mapContainer)
431
    {
432
        try {
433
            $this->forkTheFirstProcessing($mapContainer);
434
            $this->waitsAndConnectsProcess($mapContainer);
435
        } catch (\RuntimeException $e) {
436
            $this->exceptionHasOccured = true;
437
            throw $e;
438
        }
439
440
        return $this->getResultsOf($mapContainer);
441
    }
442
443
    /**
444
     * fork the first processing of the map container
445
     *
446
     * @param   \Ackintosh\Snidel\MapContainer
447
     * @return  void
448
     * @throws  \RuntimeException
449
     */
450
    private function forkTheFirstProcessing(MapContainer $mapContainer)
451
    {
452
        foreach ($mapContainer->getFirstArgs() as $args) {
453
            try {
454
                $childPid = $this->prefork($mapContainer->getFirstMap()->getCallable(), $args);
455
            } catch (\RuntimeException $e) {
456
                throw $e;
457
            }
458
            $mapContainer->getFirstMap()->countTheForked();
459
            $mapContainer->getFirstMap()->addChildPid($childPid);
460
        }
461
    }
462
463
    /**
464
     * waits and connects the process of map container
465
     *
466
     * @param   \Ackintosh\Snidel\MapContainer
467
     * @return  void
468
     * @throws  \RuntimeException
469
     */
470
    private function waitsAndConnectsProcess(MapContainer $mapContainer)
471
    {
472
        if ($this->joined) {
473
            return;
474
        }
475
476
        while ($mapContainer->isProcessing()) {
477
            try {
478
                $fork = $this->forkContainer->waitSimply();
479
            } catch (SharedMemoryControlException $e) {
480
                throw $e;
481
            }
482
483
            $childPid = $fork->getPid();
484
            if ($fork->hasNotFinishedSuccessfully()) {
485
                $message = 'an error has occurred in child process. pid: ' . $childPid;
486
                $this->log->error($message);
487
                throw new \RuntimeException($message);
488
            }
489
490
            unset($this->childPids[array_search($childPid, $this->childPids)]);
491
            if ($nextMap = $mapContainer->nextMap($childPid)) {
492
                try {
493
                    $nextMapPid = $this->prefork(
494
                        $nextMap->getCallable(),
495
                        $fork,
496
                        null,
497
                        $nextMap->getToken()
498
                    );
499
                } catch (\RuntimeException $e) {
500
                    throw $e;
501
                }
502
                $message = sprintf('processing is connected from [%d] to [%d]', $childPid, $nextMapPid);
503
                $this->log->info($message);
504
                $nextMap->countTheForked();
505
                $nextMap->addChildPid($nextMapPid);
506
            }
507
            $mapContainer->countTheCompleted($childPid);
508
        }
509
510
        $this->joined = true;
511
    }
512
513
    /**
514
     * gets results of map container
515
     *
516
     * @param   \Ackintosh\Snidel\MapContainer
517
     * @return  array
518
     */
519
    private function getResultsOf(MapContainer $mapContainer)
520
    {
521
        $results = array();
522
        foreach ($mapContainer->getLastMapPids() as $pid) {
523
            $results[] = $this->forkContainer->get($pid)->getResult()->getReturn();
524
        }
525
526
        return $results;
527
    }
528
529
    private function _exit($status = 0)
530
    {
531
        exit($status);
0 ignored issues
show
Coding Style Compatibility introduced by
The method _exit() contains an exit expression.

An exit expression should only be used in rare cases. For example, if you write a short command line script.

In most cases however, using an exit expression makes the code untestable and often causes incompatibilities with other libraries. Thus, unless you are absolutely sure it is required here, we recommend to refactor your code to avoid its usage.

Loading history...
532
    }
533
534
    public function __destruct()
535
    {
536
        if ($this->masterProcessId !== null && $this->ownerPid === getmypid()) {
537
            $this->log->info('shutdown master process.');
538
            posix_kill($this->masterProcessId, SIGTERM);
539
540
            unset($this->forkContainer);
541
        }
542
543
        if ($this->exceptionHasOccured) {
544
            $this->log->info('destruct processes are started.(exception has occured)');
545
            $this->log->info('--> deleting all shared memory.');
546
            $this->deleteAllData();
547
        } elseif ($this->ownerPid === getmypid() && !$this->joined && $this->receivedSignal === null) {
548
            $message = 'snidel will have to wait for the child process is completed. please use Snidel::wait()';
549
            $this->log->error($message);
550
            $this->log->info('destruct processes are started.');
551
552
            $this->log->info('--> sending a signal to children.');
553
            $this->sendSignalToChildren(SIGTERM);
554
555
            $this->log->info('--> deleting all shared memory.');
556
            $this->deleteAllData();
557
558
            $this->log->info('--> deleting token.');
559
            unset($this->token);
560
561
            $this->log->info('--> destruct processes are finished successfully.');
562
            throw new \LogicException($message);
563
        }
564
    }
565
}
566