Completed
Push — master ( 94cacb...6d69f5 )
by Akihito
03:32
created

Snidel::forkWorker()   B

Complexity

Conditions 7
Paths 8

Size

Total Lines 54
Code Lines 37

Duplication

Lines 0
Ratio 0 %

Importance

Changes 1
Bugs 0 Features 0
Metric Value
c 1
b 0
f 0
dl 0
loc 54
rs 7.8331
cc 7
eloc 37
nc 8
nop 3

How to fix   Long Method   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

1
<?php
2
declare(ticks = 1);
3
4
namespace Ackintosh;
5
6
use Ackintosh\Snidel\ForkContainer;
7
use Ackintosh\Snidel\ForkCollection;
8
use Ackintosh\Snidel\Result;
9
use Ackintosh\Snidel\Token;
10
use Ackintosh\Snidel\Log;
11
use Ackintosh\Snidel\Error;
12
use Ackintosh\Snidel\Pcntl;
13
use Ackintosh\Snidel\DataRepository;
14
use Ackintosh\Snidel\MapContainer;
15
use Ackintosh\Snidel\TaskQueue;
16
use Ackintosh\Snidel\ResultQueue;
17
use Ackintosh\Snidel\Exception\SharedMemoryControlException;
18
19
class Snidel
20
{
21
    /** @var string */
22
    const VERSION = '0.5.0';
23
24
    private $masterProcessId = null;
25
26
    /** @var array */
27
    private $childPids = array();
28
29
    /** @var \Ackintosh\Snidel\ForkContainer */
30
    private $forkContainer;
31
32
    /**  @var array */
33
    private $forks = array();
34
35
    /** @var \Ackintosh\Snidel\Error */
36
    private $error;
37
38
    /** @var \Ackintosh\Snidel\Pcntl */
39
    private $pcntl;
40
41
    /** @var int */
42
    private $concurrency;
43
44
    /** @var \Ackintosh\Snidel\Token */
45
    private $token;
46
47
    /** @var \Ackintosh\Snidel\Log */
48
    private $log;
49
50
    /** @var \Ackintosh\Snidel\DataRepository */
51
    private $dataRepository;
52
53
    /** @var bool */
54
    private $joined = false;
55
56
    /** @var array */
57
    private $results = array();
0 ignored issues
show
Unused Code introduced by
The property $results is not used and could be removed.

This check marks private properties in classes that are never used. Those properties can be removed.

Loading history...
58
59
    /** @var int */
60
    private $ownerPid;
61
62
    /** @var array */
63
    private $signals = array(
64
        SIGTERM,
65
        SIGINT,
66
    );
67
68
    /** @var int */
69
    private $receivedSignal;
70
71
    /** @var \Ackintosh\Snidel\Token */
72
    private $processToken;
73
74
    /** @var bool */
75
    private $exceptionHasOccured = false;
76
77
    public function __construct($concurrency = 5)
78
    {
79
        $this->ownerPid         = getmypid();
80
        $this->childPids        = array();
81
        $this->concurrency      = $concurrency;
82
        $this->token            = new Token(getmypid(), $concurrency);
83
        $this->log              = new Log(getmypid());
84
        $this->error            = new Error();
85
        $this->pcntl            = new Pcntl();
86
        $this->dataRepository   = new DataRepository();
87
        $this->forkContainer    = new ForkContainer();
88
        $this->taskQueue        = new TaskQueue(getmypid());
0 ignored issues
show
Bug introduced by
The property taskQueue does not exist. Did you maybe forget to declare it?

In PHP it is possible to write to properties without declaring them. For example, the following is perfectly valid PHP code:

class MyClass { }

$x = new MyClass();
$x->foo = true;

Generally, it is a good practice to explictly declare properties to avoid accidental typos and provide IDE auto-completion:

class MyClass {
    public $foo;
}

$x = new MyClass();
$x->foo = true;
Loading history...
89
        $this->resultQueue      = new ResultQueue(getmypid());
0 ignored issues
show
Bug introduced by
The property resultQueue does not exist. Did you maybe forget to declare it?

In PHP it is possible to write to properties without declaring them. For example, the following is perfectly valid PHP code:

class MyClass { }

$x = new MyClass();
$x->foo = true;

Generally, it is a good practice to explictly declare properties to avoid accidental typos and provide IDE auto-completion:

class MyClass {
    public $foo;
}

$x = new MyClass();
$x->foo = true;
Loading history...
90
91
        foreach ($this->signals as $sig) {
92
            $this->pcntl->signal(
93
                $sig,
94
                function ($sig) {
95
                    $this->log->info('received signal. signo: ' . $sig);
96
                    $this->receivedSignal = $sig;
97
98
                    $this->log->info('--> sending a signal to children.');
99
                    $this->sendSignalToChildren($sig);
100
101
                    $this->log->info('--> deleting token.');
102
                    unset($this->token);
103
104
                    $this->log->info('<-- signal handling has been completed successfully.');
105
                    $this->_exit();
106
                },
107
                false
108
            );
109
        }
110
111
        $this->log->info('parent pid: ' . $this->ownerPid);
112
    }
113
114
    /**
115
     * sets the resource for the log.
116
     *
117
     * @param   resource    $resource
118
     * @return  void
119
     * @codeCoverageIgnore
120
     */
121
    public function setLoggingDestination($resource)
122
    {
123
        $this->log->setDestination($resource);
124
    }
125
126
    /**
127
     * fork process
128
     *
129
     * @param   callable    $callable
130
     * @param   mixed       $args
131
     * @param   string      $tag
132
     * @return  void
133
     * @throws  \RuntimeException
134
     */
135
    public function fork($callable, $args = array(), $tag = null)
136
    {
137
        if (!is_array($args)) {
138
            $args = array($args);
139
        }
140
141
        if ($this->masterProcessId === null) {
142
            $this->forkMaster();
143
        }
144
145
        try {
146
            $this->taskQueue->enqueue($callable, $args, $tag);
147
        } catch (\RuntimeException $e) {
148
            throw $e;
149
        }
150
151
        $this->log->info('queued task #' . $this->taskQueue->queuedCount());
152
    }
153
154
    /**
155
     * fork master process
156
     *
157
     * @return  void
158
     */
159
    private function forkMaster()
160
    {
161
        $pid = $this->pcntl->fork();
162
        $this->masterProcessId = ($pid === 0) ? getmypid() : $pid;
163
        $this->log->setMasterProcessId($this->masterProcessId);
164
165
        if ($pid) {
166
            // owner
167
            $this->log->info('pid: ' . getmypid());
168
        } elseif ($pid === -1) {
0 ignored issues
show
Unused Code introduced by
This elseif statement is empty, and could be removed.

This check looks for the bodies of elseif statements that have no statements or where all statements have been commented out. This may be the result of changes for debugging or the code may simply be obsolete.

These elseif bodies can be removed. If you have an empty elseif but statements in the else branch, consider inverting the condition.

Loading history...
169
            // error
170
        } else {
171
            // master
172
            $this->log->info('pid: ' . $this->masterProcessId);
173
            foreach ($this->signals as $sig) {
174
                $this->pcntl->signal($sig, SIG_DFL, true);
175
            }
176
            while ($task = $this->taskQueue->dequeue()) {
177
                $this->log->info('dequeued task #' . $this->taskQueue->dequeuedCount());
178
                if ($this->token->accept()) {
179
                    $this->forkWorker($task['callable'], $task['args'], $task['tag']);
180
                }
181
            }
182
            $this->_exit();
183
        }
184
    }
185
186
    /**
187
     * fork worker process
188
     *
189
     * @param   callable    $callable
190
     * @param   mixed       $args
191
     * @param   string      $tag
192
     * @return  void
193
     * @throws  \RuntimeException
194
     */
195
    private function forkWorker($callable, $args = array(), $tag = null)
196
    {
197
        if (!is_array($args)) {
198
            $args = array($args);
199
        }
200
201
        try {
202
            $fork = $this->forkContainer->fork($tag);
203
        } catch (\RuntimeException $e) {
204
            $this->log->error($e->getMessage());
205
            throw $e;
206
        }
207
208
        $fork->setCallable($callable);
209
        $fork->setArgs($args);
210
        $fork->setTag($tag);
211
212
        if (getmypid() === $this->masterProcessId) {
213
            // master
214
            $this->log->info('forked worker. pid: ' . $fork->getPid());
215
        } else {
216
            // worker
217
            $this->log->info('has forked. pid: ' . getmypid());
218
            // @codeCoverageIgnoreStart
219
            foreach ($this->signals as $sig) {
220
                $this->pcntl->signal($sig, SIG_DFL, true);
221
            }
222
223
            $resultHasQueued = false;
224
            register_shutdown_function(function () use ($fork, &$resultHasQueued) {
225
                if ($fork->hasNoResult() || $resultHasQueued === false) {
226
                    $result = new Result();
227
                    $result->setFailure();
228
                    $fork->setResult($result);
229
                    $this->resultQueue->enqueue($fork);
230
                }
231
            });
232
            $this->log->info('----> started the function.');
233
            ob_start();
234
            $result = new Result();
235
            $result->setReturn(call_user_func_array($callable, $args));
236
            $result->setOutput(ob_get_clean());
237
            $fork->setResult($result);
238
            $this->log->info('<---- completed the function.');
239
240
            $this->resultQueue->enqueue($fork);
241
            $resultHasQueued = true;
0 ignored issues
show
Unused Code introduced by
$resultHasQueued is not used, you could remove the assignment.

This check looks for variable assignements that are either overwritten by other assignments or where the variable is not used subsequently.

$myVar = 'Value';
$higher = false;

if (rand(1, 6) > 3) {
    $higher = true;
} else {
    $higher = false;
}

Both the $myVar assignment in line 1 and the $higher assignment in line 2 are dead. The first because $myVar is never used and the second because $higher is always overwritten for every possible time line.

Loading history...
242
            $this->log->info('queued the result.');
243
            $this->token->back();
244
            $this->log->info('return the token and exit.');
245
            $this->_exit();
246
            // @codeCoverageIgnoreEnd
247
        }
248
    }
249
250
    /**
251
     * fork process
252
     * this method does't use a master / worker model.
253
     *
254
     * @param   callable                    $callable
255
     * @param   mixed                       $args
256
     * @param   string                      $tag
257
     * @param   \Ackintosh\Snidel\Token     $token
258
     * @return  void
259
     * @throws  \RuntimeException
260
     */
261
    public function forkSimply($callable, $args = array(), $tag = null, Token $token = null)
262
    {
263
        $this->processToken = $token ? $token : $this->token;
264
        if (!is_array($args)) {
265
            $args = array($args);
266
        }
267
268
        try {
269
            $fork = $this->forkContainer->fork($tag);
270
        } catch (\RuntimeException $e) {
271
            $this->log->error($e->getMessage());
272
            throw $e;
273
        }
274
275
        $fork->setCallable($callable);
276
        $fork->setArgs($args);
277
278
        if (getmypid() === $this->ownerPid) {
279
            // parent
280
            $this->log->info('created child process. pid: ' . $fork->getPid());
281
            $this->childPids[] = $fork->getPid();
282
        } else {
283
            // @codeCoverageIgnoreStart
284
            // child
285
            foreach ($this->signals as $sig) {
286
                $this->pcntl->signal($sig, SIG_DFL, true);
287
            }
288
289
            $result = new Result();
290
            /**
291
             * in php5.3, we can not use $this in anonymous functions
292
             */
293
            $dataRepository     = $this->dataRepository;
294
            $log                = $this->log;
295
            $processToken       = $this->processToken;
296
            register_shutdown_function(function () use ($result, $dataRepository, $log, $processToken) {
297
                $data = $dataRepository->load(getmypid());
298
                try {
299
                    $data->write($result);
300
                } catch (SharedMemoryControlException $e) {
301
                    throw $e;
302
                }
303
                $log->info('<-- return token.');
304
                $processToken->back();
305
            });
306
307
            $log->info('--> waiting for the token come around.');
308
            if ($processToken->accept()) {
309
                $log->info('----> started the function.');
310
                ob_start();
311
                $result->setReturn(call_user_func_array($callable, $args));
312
                $result->setOutput(ob_get_clean());
313
                $log->info('<---- completed the function.');
314
            }
315
316
            $this->_exit();
317
            // @codeCoverageIgnoreEnd
318
        }
319
320
        return $fork->getPid();
321
    }
322
323
    /**
324
     * waits until all children that has forked by Snidel::forkSimply() are completed
325
     *
326
     * @return  void
327
     * @throws  \Ackintosh\Snidel\Exception\SharedMemoryControlException
328
     */
329
    public function waitSimply()
330
    {
331
        if ($this->joined) {
332
            return;
333
        }
334
335
        $count = count($this->childPids);
336
        for ($i = 0; $i < $count; $i++) {
337
            try {
338
                $fork = $this->forkContainer->wait();
339
            } catch (SharedMemoryControlException $e) {
340
                $this->exceptionHasOccured = true;
341
                throw $e;
342
            }
343
344
            $childPid   = $fork->getPid();
345
            $result     = $fork->getResult();
346
            if ($fork->hasNotFinishedSuccessfully()) {
347
                $message = 'an error has occurred in child process. pid: ' . $childPid;
348
                $this->log->error($message);
349
                $this->error[$childPid] = array(
350
                    'status'    => $fork->getStatus(),
351
                    'message'   => $message,
352
                    'callable'  => $fork->getCallable(),
353
                    'args'      => $fork->getArgs(),
354
                    'return'    => $result->getReturn(),
355
                );
356
            }
357
            unset($this->childPids[array_search($childPid, $this->childPids)]);
358
        }
359
        $this->joined = true;
360
    }
361
362
    /**
363
     * waits until all tasks that queued by Snidel::fork() are completed
364
     *
365
     * @return  void
366
     */
367
    public function wait()
368
    {
369
        for (; $this->taskQueue->queuedCount() > $this->resultQueue->dequeuedCount();) {
370
            $fork = $this->resultQueue->dequeue();
371
            if ($fork->getResult()->isFailure()) {
372
                $this->error[$fork->getPid()] = $fork;
373
            }
374
            $this->forks[] = $fork;
375
        }
376
377
        $this->joined = true;
378
    }
379
380
    /**
381
     * @return  bool
382
     */
383
    public function hasError()
384
    {
385
        return $this->error->exists();
386
    }
387
388
    /**
389
     * @return  \Ackintosh\Snidel\Error
390
     */
391
    public function getError()
392
    {
393
        return $this->error;
394
    }
395
396
    /**
397
     * gets results
398
     *
399
     * @param   string  $tag
400
     * @return  \Ackintosh\Snidel\ForkCollection
401
     * @throws  \InvalidArgumentException
402
     */
403
    public function getSimply($tag = null)
404
    {
405
        if (!$this->joined) {
406
            $this->waitSimply();
407
        }
408
409
        if ($tag === null) {
410
            return $this->forkContainer->getCollection();
411
        }
412
413
        if (!$this->forkContainer->hasTag($tag)) {
414
            throw new \InvalidArgumentException('unknown tag: ' . $tag);
415
        }
416
417
        return $this->forkContainer->getCollection($tag);
418
    }
419
420
    /**
421
     * gets results
422
     *
423
     * @param   string  $tag
424
     * @return  \Ackintosh\Snidel\ForkCollection
425
     * @throws  \InvalidArgumentException
426
     */
427
    public function get($tag = null)
428
    {
429
        if (!$this->joined) {
430
            $this->wait();
431
        }
432
433
        if ($this->taskQueue->queuedCount() === 0) {
434
            return;
435
        }
436
437
        if ($tag === null) {
438
            return new ForkCollection($this->forks);
439
        }
440
441
        $filtered = array_filter($this->forks, function ($fork) use ($tag) {
442
            return $fork->getTag() === $tag;
443
        });
444
445
        if (count($filtered) === 0) {
446
            throw new \InvalidArgumentException('unknown tag: ' . $tag);
447
        }
448
449
        return new ForkCollection($filtered);
450
    }
451
452
    /**
453
     * sends signal to child
454
     *
455
     * @param   int     $sig
456
     * @return  void
457
     */
458
    private function sendSignalToChildren($sig)
459
    {
460
        foreach ($this->childPids as $pid) {
461
            $this->log->info('----> sending a signal to child. pid: ' . $pid);
462
            posix_kill($pid, $sig);
463
        }
464
    }
465
466
    /**
467
     * delete shared memory
468
     *
469
     * @return  void
470
     * @throws  \Ackintosh\Snidel\Exception\SharedMemoryControlException
471
     */
472
    private function deleteAllData()
473
    {
474
        foreach ($this->childPids as $pid) {
475
            $data = $this->dataRepository->load($pid);
476
            try {
477
                $data->deleteIfExists();
478
            } catch (SharedMemoryControlException $e) {
479
                throw $e;
480
            }
481
        }
482
    }
483
484
    /**
485
     * create map object
486
     *
487
     * @param   array       $args
488
     * @param   callable    $callable
489
     * @return  \Ackintosh\Snidel\MapContainer
490
     */
491
    public function map(Array $args, $callable)
492
    {
493
        return new MapContainer($args, $callable, $this->concurrency);
494
    }
495
496
    /**
497
     * run map object
498
     *
499
     * @param   \Ackintosh\Snidel\MapContainer
500
     * @return  array
501
     * @throws  \RuntimeException
502
     */
503
    public function run(MapContainer $mapContainer)
504
    {
505
        try {
506
            $this->forkTheFirstProcessing($mapContainer);
507
            $this->waitsAndConnectsProcess($mapContainer);
508
        } catch (\RuntimeException $e) {
509
            $this->exceptionHasOccured = true;
510
            throw $e;
511
        }
512
513
        return $this->getResultsOf($mapContainer);
514
    }
515
516
    /**
517
     * fork the first processing of the map container
518
     *
519
     * @param   \Ackintosh\Snidel\MapContainer
520
     * @return  void
521
     * @throws  \RuntimeException
522
     */
523
    private function forkTheFirstProcessing(MapContainer $mapContainer)
524
    {
525
        foreach ($mapContainer->getFirstArgs() as $args) {
526
            try {
527
                $childPid = $this->forkSimply($mapContainer->getFirstMap()->getCallable(), $args);
528
            } catch (\RuntimeException $e) {
529
                throw $e;
530
            }
531
            $mapContainer->getFirstMap()->countTheForked();
532
            $mapContainer->getFirstMap()->addChildPid($childPid);
533
        }
534
    }
535
536
    /**
537
     * waits and connects the process of map container
538
     *
539
     * @param   \Ackintosh\Snidel\MapContainer
540
     * @return  void
541
     * @throws  \RuntimeException
542
     */
543
    private function waitsAndConnectsProcess(MapContainer $mapContainer)
544
    {
545
        if ($this->joined) {
546
            return;
547
        }
548
549
        while ($mapContainer->isProcessing()) {
550
            try {
551
                $fork = $this->forkContainer->wait();
552
            } catch (SharedMemoryControlException $e) {
553
                throw $e;
554
            }
555
556
            $childPid = $fork->getPid();
557
            if ($fork->hasNotFinishedSuccessfully()) {
558
                $message = 'an error has occurred in child process. pid: ' . $childPid;
559
                $this->log->error($message);
560
                throw new \RuntimeException($message);
561
            }
562
563
            unset($this->childPids[array_search($childPid, $this->childPids)]);
564
            if ($nextMap = $mapContainer->nextMap($childPid)) {
565
                try {
566
                    $nextMapPid = $this->forkSimply(
567
                        $nextMap->getCallable(),
568
                        $fork,
569
                        null,
570
                        $nextMap->getToken()
571
                    );
572
                } catch (\RuntimeException $e) {
573
                    throw $e;
574
                }
575
                $message = sprintf('processing is connected from [%d] to [%d]', $childPid, $nextMapPid);
576
                $this->log->info($message);
577
                $nextMap->countTheForked();
578
                $nextMap->addChildPid($nextMapPid);
579
            }
580
            $mapContainer->countTheCompleted($childPid);
581
        }
582
583
        $this->joined = true;
584
    }
585
586
    /**
587
     * gets results of map container
588
     *
589
     * @param   \Ackintosh\Snidel\MapContainer
590
     * @return  array
591
     */
592
    private function getResultsOf(MapContainer $mapContainer)
593
    {
594
        $results = array();
595
        foreach ($mapContainer->getLastMapPids() as $pid) {
596
            $results[] = $this->forkContainer->get($pid)->getResult()->getReturn();
597
        }
598
599
        return $results;
600
    }
601
602
    private function _exit($status = 0)
603
    {
604
        exit($status);
0 ignored issues
show
Coding Style Compatibility introduced by
The method _exit() contains an exit expression.

An exit expression should only be used in rare cases. For example, if you write a short command line script.

In most cases however, using an exit expression makes the code untestable and often causes incompatibilities with other libraries. Thus, unless you are absolutely sure it is required here, we recommend to refactor your code to avoid its usage.

Loading history...
605
    }
606
607
    public function __destruct()
608
    {
609
        if ($this->masterProcessId !== null && $this->ownerPid === getmypid()) {
610
            $this->log->info('shutdown master process.');
611
            posix_kill($this->masterProcessId, SIGTERM);
612
613
            unset($this->taskQueue);
614
            unset($this->resultQueue);
615
        }
616
617
        if ($this->exceptionHasOccured) {
618
            $this->log->info('destruct processes are started.(exception has occured)');
619
            $this->log->info('--> deleting all shared memory.');
620
            $this->deleteAllData();
621
        } elseif ($this->ownerPid === getmypid() && !$this->joined && $this->receivedSignal === null) {
622
            $message = 'snidel will have to wait for the child process is completed. please use Snidel::wait()';
623
            $this->log->error($message);
624
            $this->log->info('destruct processes are started.');
625
626
            $this->log->info('--> sending a signal to children.');
627
            $this->sendSignalToChildren(SIGTERM);
628
629
            $this->log->info('--> deleting all shared memory.');
630
            $this->deleteAllData();
631
632
            $this->log->info('--> deleting token.');
633
            unset($this->token);
634
635
            $this->log->info('--> destruct processes are finished successfully.');
636
            throw new \LogicException($message);
637
        }
638
    }
639
}
640