Completed
Push — master ( 6d69f5...dd2f15 )
by Akihito
02:19
created

Snidel   F

Complexity

Total Complexity 77

Size/Duplication

Total Lines 631
Duplicated Lines 0 %

Coupling/Cohesion

Components 2
Dependencies 15

Importance

Changes 15
Bugs 3 Features 1
Metric Value
wmc 77
c 15
b 3
f 1
lcom 2
cbo 15
dl 0
loc 631
rs 2.0391

22 Methods

Rating   Name   Duplication   Size   Complexity  
B __construct() 0 39 2
A setLoggingDestination() 0 4 1
A fork() 0 18 4
C forkMaster() 0 26 7
B forkWorker() 0 56 7
B forkSimply() 0 61 8
B waitSimply() 0 32 5
A wait() 0 12 3
A hasError() 0 4 1
A getError() 0 4 1
A getSimply() 0 16 4
B get() 0 24 5
A sendSignalToChildren() 0 7 2
A setReceivedSignal() 0 4 1
A deleteAllData() 0 11 3
A map() 0 4 1
A run() 0 12 2
A forkTheFirstProcessing() 0 12 3
C waitsAndConnectsProcess() 0 42 7
A getResultsOf() 0 9 2
A _exit() 0 4 1
C __destruct() 0 32 7

How to fix   Complexity   

Complex Class

Complex classes like Snidel often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes. You can also have a look at the cohesion graph to spot any un-connected, or weakly-connected components.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

While breaking up the class, it is a good idea to analyze how other classes use Snidel, and based on these observations, apply Extract Interface, too.

1
<?php
2
declare(ticks = 1);
3
4
namespace Ackintosh;
5
6
use Ackintosh\Snidel\ForkContainer;
7
use Ackintosh\Snidel\ForkCollection;
8
use Ackintosh\Snidel\Result;
9
use Ackintosh\Snidel\Token;
10
use Ackintosh\Snidel\Log;
11
use Ackintosh\Snidel\Error;
12
use Ackintosh\Snidel\Pcntl;
13
use Ackintosh\Snidel\DataRepository;
14
use Ackintosh\Snidel\MapContainer;
15
use Ackintosh\Snidel\TaskQueue;
16
use Ackintosh\Snidel\ResultQueue;
17
use Ackintosh\Snidel\Exception\SharedMemoryControlException;
18
19
class Snidel
20
{
21
    /** @var string */
22
    const VERSION = '0.5.0';
23
24
    private $masterProcessId = null;
25
26
    /** @var array */
27
    private $childPids = array();
28
29
    /** @var \Ackintosh\Snidel\ForkContainer */
30
    private $forkContainer;
31
32
    /**  @var array */
33
    private $forks = array();
34
35
    /** @var \Ackintosh\Snidel\Error */
36
    private $error;
37
38
    /** @var \Ackintosh\Snidel\Pcntl */
39
    private $pcntl;
40
41
    /** @var int */
42
    private $concurrency;
43
44
    /** @var \Ackintosh\Snidel\Token */
45
    private $token;
46
47
    /** @var \Ackintosh\Snidel\Log */
48
    private $log;
49
50
    /** @var \Ackintosh\Snidel\DataRepository */
51
    private $dataRepository;
52
53
    /** @var bool */
54
    private $joined = false;
55
56
    /** @var array */
57
    private $results = array();
0 ignored issues
show
Unused Code introduced by
The property $results is not used and could be removed.

This check marks private properties in classes that are never used. Those properties can be removed.

Loading history...
58
59
    /** @var int */
60
    private $ownerPid;
61
62
    /** @var array */
63
    private $signals = array(
64
        SIGTERM,
65
        SIGINT,
66
    );
67
68
    /** @var int */
69
    private $receivedSignal;
70
71
    /** @var \Ackintosh\Snidel\Token */
72
    private $processToken;
73
74
    /** @var bool */
75
    private $exceptionHasOccured = false;
76
77
    public function __construct($concurrency = 5)
78
    {
79
        $this->ownerPid         = getmypid();
80
        $this->childPids        = array();
81
        $this->concurrency      = $concurrency;
82
        $this->token            = new Token(getmypid(), $concurrency);
83
        $this->log              = new Log(getmypid());
84
        $this->error            = new Error();
85
        $this->pcntl            = new Pcntl();
86
        $this->dataRepository   = new DataRepository();
87
        $this->forkContainer    = new ForkContainer();
88
        $this->taskQueue        = new TaskQueue(getmypid());
0 ignored issues
show
Bug introduced by
The property taskQueue does not exist. Did you maybe forget to declare it?

In PHP it is possible to write to properties without declaring them. For example, the following is perfectly valid PHP code:

class MyClass { }

$x = new MyClass();
$x->foo = true;

Generally, it is a good practice to explictly declare properties to avoid accidental typos and provide IDE auto-completion:

class MyClass {
    public $foo;
}

$x = new MyClass();
$x->foo = true;
Loading history...
89
        $this->resultQueue      = new ResultQueue(getmypid());
0 ignored issues
show
Bug introduced by
The property resultQueue does not exist. Did you maybe forget to declare it?

In PHP it is possible to write to properties without declaring them. For example, the following is perfectly valid PHP code:

class MyClass { }

$x = new MyClass();
$x->foo = true;

Generally, it is a good practice to explictly declare properties to avoid accidental typos and provide IDE auto-completion:

class MyClass {
    public $foo;
}

$x = new MyClass();
$x->foo = true;
Loading history...
90
91
        $log    = $this->log;
92
        $token  = $this->token;
93
        $self   = $this;
94
        foreach ($this->signals as $sig) {
95
            $this->pcntl->signal(
96
                $sig,
97
                function ($sig) use($log, $token, $self) {
98
                    $log->info('received signal. signo: ' . $sig);
99
                    $self->setReceivedSignal($sig);
100
101
                    $log->info('--> sending a signal to children.');
102
                    $self->sendSignalToChildren($sig);
103
104
                    $log->info('--> deleting token.');
105
                    unset($token);
106
107
                    $log->info('<-- signal handling has been completed successfully.');
108
                    exit;
0 ignored issues
show
Coding Style Compatibility introduced by
The method __construct() contains an exit expression.

An exit expression should only be used in rare cases. For example, if you write a short command line script.

In most cases however, using an exit expression makes the code untestable and often causes incompatibilities with other libraries. Thus, unless you are absolutely sure it is required here, we recommend to refactor your code to avoid its usage.

Loading history...
109
                },
110
                false
111
            );
112
        }
113
114
        $this->log->info('parent pid: ' . $this->ownerPid);
115
    }
116
117
    /**
118
     * sets the resource for the log.
119
     *
120
     * @param   resource    $resource
121
     * @return  void
122
     * @codeCoverageIgnore
123
     */
124
    public function setLoggingDestination($resource)
125
    {
126
        $this->log->setDestination($resource);
127
    }
128
129
    /**
130
     * fork process
131
     *
132
     * @param   callable    $callable
133
     * @param   mixed       $args
134
     * @param   string      $tag
135
     * @return  void
136
     * @throws  \RuntimeException
137
     */
138
    public function fork($callable, $args = array(), $tag = null)
139
    {
140
        if (!is_array($args)) {
141
            $args = array($args);
142
        }
143
144
        if ($this->masterProcessId === null) {
145
            $this->forkMaster();
146
        }
147
148
        try {
149
            $this->taskQueue->enqueue($callable, $args, $tag);
150
        } catch (\RuntimeException $e) {
151
            throw $e;
152
        }
153
154
        $this->log->info('queued task #' . $this->taskQueue->queuedCount());
155
    }
156
157
    /**
158
     * fork master process
159
     *
160
     * @return  void
161
     */
162
    private function forkMaster()
163
    {
164
        $pid = $this->pcntl->fork();
165
        $this->masterProcessId = ($pid === 0) ? getmypid() : $pid;
166
        $this->log->setMasterProcessId($this->masterProcessId);
167
168
        if ($pid) {
169
            // owner
170
            $this->log->info('pid: ' . getmypid());
171
        } elseif ($pid === -1) {
0 ignored issues
show
Unused Code introduced by
This elseif statement is empty, and could be removed.

This check looks for the bodies of elseif statements that have no statements or where all statements have been commented out. This may be the result of changes for debugging or the code may simply be obsolete.

These elseif bodies can be removed. If you have an empty elseif but statements in the else branch, consider inverting the condition.

Loading history...
172
            // error
173
        } else {
174
            // master
175
            $this->log->info('pid: ' . $this->masterProcessId);
176
            foreach ($this->signals as $sig) {
177
                $this->pcntl->signal($sig, SIG_DFL, true);
178
            }
179
            while ($task = $this->taskQueue->dequeue()) {
180
                $this->log->info('dequeued task #' . $this->taskQueue->dequeuedCount());
181
                if ($this->token->accept()) {
182
                    $this->forkWorker($task['callable'], $task['args'], $task['tag']);
183
                }
184
            }
185
            $this->_exit();
186
        }
187
    }
188
189
    /**
190
     * fork worker process
191
     *
192
     * @param   callable    $callable
193
     * @param   mixed       $args
194
     * @param   string      $tag
195
     * @return  void
196
     * @throws  \RuntimeException
197
     */
198
    private function forkWorker($callable, $args = array(), $tag = null)
199
    {
200
        if (!is_array($args)) {
201
            $args = array($args);
202
        }
203
204
        try {
205
            $fork = $this->forkContainer->fork($tag);
206
        } catch (\RuntimeException $e) {
207
            $this->log->error($e->getMessage());
208
            throw $e;
209
        }
210
211
        $fork->setCallable($callable);
212
        $fork->setArgs($args);
213
        $fork->setTag($tag);
214
215
        if (getmypid() === $this->masterProcessId) {
216
            // master
217
            $this->log->info('forked worker. pid: ' . $fork->getPid());
218
        } else {
219
            // worker
220
            $this->log->info('has forked. pid: ' . getmypid());
221
            // @codeCoverageIgnoreStart
222
            foreach ($this->signals as $sig) {
223
                $this->pcntl->signal($sig, SIG_DFL, true);
224
            }
225
226
            $resultHasQueued = false;
227
            // in php5.3, $this is not usable directly with closures.
228
            $resultQueue = $this->resultQueue;
229
            register_shutdown_function(function () use ($fork, $resultQueue, &$resultHasQueued) {
230
                if ($fork->hasNoResult() || $resultHasQueued === false) {
231
                    $result = new Result();
232
                    $result->setFailure();
233
                    $fork->setResult($result);
234
                    $resultQueue->enqueue($fork);
235
                }
236
            });
237
            $this->log->info('----> started the function.');
238
            ob_start();
239
            $result = new Result();
240
            $result->setReturn(call_user_func_array($callable, $args));
241
            $result->setOutput(ob_get_clean());
242
            $fork->setResult($result);
243
            $this->log->info('<---- completed the function.');
244
245
            $this->resultQueue->enqueue($fork);
246
            $resultHasQueued = true;
0 ignored issues
show
Unused Code introduced by
$resultHasQueued is not used, you could remove the assignment.

This check looks for variable assignements that are either overwritten by other assignments or where the variable is not used subsequently.

$myVar = 'Value';
$higher = false;

if (rand(1, 6) > 3) {
    $higher = true;
} else {
    $higher = false;
}

Both the $myVar assignment in line 1 and the $higher assignment in line 2 are dead. The first because $myVar is never used and the second because $higher is always overwritten for every possible time line.

Loading history...
247
            $this->log->info('queued the result.');
248
            $this->token->back();
249
            $this->log->info('return the token and exit.');
250
            $this->_exit();
251
            // @codeCoverageIgnoreEnd
252
        }
253
    }
254
255
    /**
256
     * fork process
257
     * this method does't use a master / worker model.
258
     *
259
     * @param   callable                    $callable
260
     * @param   mixed                       $args
261
     * @param   string                      $tag
262
     * @param   \Ackintosh\Snidel\Token     $token
263
     * @return  void
264
     * @throws  \RuntimeException
265
     */
266
    public function forkSimply($callable, $args = array(), $tag = null, Token $token = null)
267
    {
268
        $this->processToken = $token ? $token : $this->token;
269
        if (!is_array($args)) {
270
            $args = array($args);
271
        }
272
273
        try {
274
            $fork = $this->forkContainer->fork($tag);
275
        } catch (\RuntimeException $e) {
276
            $this->log->error($e->getMessage());
277
            throw $e;
278
        }
279
280
        $fork->setCallable($callable);
281
        $fork->setArgs($args);
282
283
        if (getmypid() === $this->ownerPid) {
284
            // parent
285
            $this->log->info('created child process. pid: ' . $fork->getPid());
286
            $this->childPids[] = $fork->getPid();
287
        } else {
288
            // @codeCoverageIgnoreStart
289
            // child
290
            foreach ($this->signals as $sig) {
291
                $this->pcntl->signal($sig, SIG_DFL, true);
292
            }
293
294
            $result = new Result();
295
            /**
296
             * in php5.3, we can not use $this in anonymous functions
297
             */
298
            $dataRepository     = $this->dataRepository;
299
            $log                = $this->log;
300
            $processToken       = $this->processToken;
301
            register_shutdown_function(function () use ($result, $dataRepository, $log, $processToken) {
302
                $data = $dataRepository->load(getmypid());
303
                try {
304
                    $data->write($result);
305
                } catch (SharedMemoryControlException $e) {
306
                    throw $e;
307
                }
308
                $log->info('<-- return token.');
309
                $processToken->back();
310
            });
311
312
            $log->info('--> waiting for the token come around.');
313
            if ($processToken->accept()) {
314
                $log->info('----> started the function.');
315
                ob_start();
316
                $result->setReturn(call_user_func_array($callable, $args));
317
                $result->setOutput(ob_get_clean());
318
                $log->info('<---- completed the function.');
319
            }
320
321
            $this->_exit();
322
            // @codeCoverageIgnoreEnd
323
        }
324
325
        return $fork->getPid();
326
    }
327
328
    /**
329
     * waits until all children that has forked by Snidel::forkSimply() are completed
330
     *
331
     * @return  void
332
     * @throws  \Ackintosh\Snidel\Exception\SharedMemoryControlException
333
     */
334
    public function waitSimply()
335
    {
336
        if ($this->joined) {
337
            return;
338
        }
339
340
        $count = count($this->childPids);
341
        for ($i = 0; $i < $count; $i++) {
342
            try {
343
                $fork = $this->forkContainer->wait();
344
            } catch (SharedMemoryControlException $e) {
345
                $this->exceptionHasOccured = true;
346
                throw $e;
347
            }
348
349
            $childPid   = $fork->getPid();
350
            $result     = $fork->getResult();
351
            if ($fork->hasNotFinishedSuccessfully()) {
352
                $message = 'an error has occurred in child process. pid: ' . $childPid;
353
                $this->log->error($message);
354
                $this->error[$childPid] = array(
355
                    'status'    => $fork->getStatus(),
356
                    'message'   => $message,
357
                    'callable'  => $fork->getCallable(),
358
                    'args'      => $fork->getArgs(),
359
                    'return'    => $result->getReturn(),
360
                );
361
            }
362
            unset($this->childPids[array_search($childPid, $this->childPids)]);
363
        }
364
        $this->joined = true;
365
    }
366
367
    /**
368
     * waits until all tasks that queued by Snidel::fork() are completed
369
     *
370
     * @return  void
371
     */
372
    public function wait()
373
    {
374
        for (; $this->taskQueue->queuedCount() > $this->resultQueue->dequeuedCount();) {
375
            $fork = $this->resultQueue->dequeue();
376
            if ($fork->getResult()->isFailure()) {
377
                $this->error[$fork->getPid()] = $fork;
378
            }
379
            $this->forks[] = $fork;
380
        }
381
382
        $this->joined = true;
383
    }
384
385
    /**
386
     * @return  bool
387
     */
388
    public function hasError()
389
    {
390
        return $this->error->exists();
391
    }
392
393
    /**
394
     * @return  \Ackintosh\Snidel\Error
395
     */
396
    public function getError()
397
    {
398
        return $this->error;
399
    }
400
401
    /**
402
     * gets results
403
     *
404
     * @param   string  $tag
405
     * @return  \Ackintosh\Snidel\ForkCollection
406
     * @throws  \InvalidArgumentException
407
     */
408
    public function getSimply($tag = null)
409
    {
410
        if (!$this->joined) {
411
            $this->waitSimply();
412
        }
413
414
        if ($tag === null) {
415
            return $this->forkContainer->getCollection();
416
        }
417
418
        if (!$this->forkContainer->hasTag($tag)) {
419
            throw new \InvalidArgumentException('unknown tag: ' . $tag);
420
        }
421
422
        return $this->forkContainer->getCollection($tag);
423
    }
424
425
    /**
426
     * gets results
427
     *
428
     * @param   string  $tag
429
     * @return  \Ackintosh\Snidel\ForkCollection
430
     * @throws  \InvalidArgumentException
431
     */
432
    public function get($tag = null)
433
    {
434
        if (!$this->joined) {
435
            $this->wait();
436
        }
437
438
        if ($this->taskQueue->queuedCount() === 0) {
439
            return;
440
        }
441
442
        if ($tag === null) {
443
            return new ForkCollection($this->forks);
444
        }
445
446
        $filtered = array_filter($this->forks, function ($fork) use ($tag) {
447
            return $fork->getTag() === $tag;
448
        });
449
450
        if (count($filtered) === 0) {
451
            throw new \InvalidArgumentException('unknown tag: ' . $tag);
452
        }
453
454
        return new ForkCollection($filtered);
455
    }
456
457
    /**
458
     * sends signal to child
459
     *
460
     * @param   int     $sig
461
     * @return  void
462
     */
463
    public function sendSignalToChildren($sig)
464
    {
465
        foreach ($this->childPids as $pid) {
466
            $this->log->info('----> sending a signal to child. pid: ' . $pid);
467
            posix_kill($pid, $sig);
468
        }
469
    }
470
471
    public function setReceivedSignal($sig)
472
    {
473
        $this->receivedSignal = $sig;
474
    }
475
476
    /**
477
     * delete shared memory
478
     *
479
     * @return  void
480
     * @throws  \Ackintosh\Snidel\Exception\SharedMemoryControlException
481
     */
482
    private function deleteAllData()
483
    {
484
        foreach ($this->childPids as $pid) {
485
            $data = $this->dataRepository->load($pid);
486
            try {
487
                $data->deleteIfExists();
488
            } catch (SharedMemoryControlException $e) {
489
                throw $e;
490
            }
491
        }
492
    }
493
494
    /**
495
     * create map object
496
     *
497
     * @param   array       $args
498
     * @param   callable    $callable
499
     * @return  \Ackintosh\Snidel\MapContainer
500
     */
501
    public function map(Array $args, $callable)
502
    {
503
        return new MapContainer($args, $callable, $this->concurrency);
504
    }
505
506
    /**
507
     * run map object
508
     *
509
     * @param   \Ackintosh\Snidel\MapContainer
510
     * @return  array
511
     * @throws  \RuntimeException
512
     */
513
    public function run(MapContainer $mapContainer)
514
    {
515
        try {
516
            $this->forkTheFirstProcessing($mapContainer);
517
            $this->waitsAndConnectsProcess($mapContainer);
518
        } catch (\RuntimeException $e) {
519
            $this->exceptionHasOccured = true;
520
            throw $e;
521
        }
522
523
        return $this->getResultsOf($mapContainer);
524
    }
525
526
    /**
527
     * fork the first processing of the map container
528
     *
529
     * @param   \Ackintosh\Snidel\MapContainer
530
     * @return  void
531
     * @throws  \RuntimeException
532
     */
533
    private function forkTheFirstProcessing(MapContainer $mapContainer)
534
    {
535
        foreach ($mapContainer->getFirstArgs() as $args) {
536
            try {
537
                $childPid = $this->forkSimply($mapContainer->getFirstMap()->getCallable(), $args);
538
            } catch (\RuntimeException $e) {
539
                throw $e;
540
            }
541
            $mapContainer->getFirstMap()->countTheForked();
542
            $mapContainer->getFirstMap()->addChildPid($childPid);
543
        }
544
    }
545
546
    /**
547
     * waits and connects the process of map container
548
     *
549
     * @param   \Ackintosh\Snidel\MapContainer
550
     * @return  void
551
     * @throws  \RuntimeException
552
     */
553
    private function waitsAndConnectsProcess(MapContainer $mapContainer)
554
    {
555
        if ($this->joined) {
556
            return;
557
        }
558
559
        while ($mapContainer->isProcessing()) {
560
            try {
561
                $fork = $this->forkContainer->wait();
562
            } catch (SharedMemoryControlException $e) {
563
                throw $e;
564
            }
565
566
            $childPid = $fork->getPid();
567
            if ($fork->hasNotFinishedSuccessfully()) {
568
                $message = 'an error has occurred in child process. pid: ' . $childPid;
569
                $this->log->error($message);
570
                throw new \RuntimeException($message);
571
            }
572
573
            unset($this->childPids[array_search($childPid, $this->childPids)]);
574
            if ($nextMap = $mapContainer->nextMap($childPid)) {
575
                try {
576
                    $nextMapPid = $this->forkSimply(
577
                        $nextMap->getCallable(),
578
                        $fork,
579
                        null,
580
                        $nextMap->getToken()
581
                    );
582
                } catch (\RuntimeException $e) {
583
                    throw $e;
584
                }
585
                $message = sprintf('processing is connected from [%d] to [%d]', $childPid, $nextMapPid);
586
                $this->log->info($message);
587
                $nextMap->countTheForked();
588
                $nextMap->addChildPid($nextMapPid);
589
            }
590
            $mapContainer->countTheCompleted($childPid);
591
        }
592
593
        $this->joined = true;
594
    }
595
596
    /**
597
     * gets results of map container
598
     *
599
     * @param   \Ackintosh\Snidel\MapContainer
600
     * @return  array
601
     */
602
    private function getResultsOf(MapContainer $mapContainer)
603
    {
604
        $results = array();
605
        foreach ($mapContainer->getLastMapPids() as $pid) {
606
            $results[] = $this->forkContainer->get($pid)->getResult()->getReturn();
607
        }
608
609
        return $results;
610
    }
611
612
    private function _exit($status = 0)
613
    {
614
        exit($status);
0 ignored issues
show
Coding Style Compatibility introduced by
The method _exit() contains an exit expression.

An exit expression should only be used in rare cases. For example, if you write a short command line script.

In most cases however, using an exit expression makes the code untestable and often causes incompatibilities with other libraries. Thus, unless you are absolutely sure it is required here, we recommend to refactor your code to avoid its usage.

Loading history...
615
    }
616
617
    public function __destruct()
618
    {
619
        if ($this->masterProcessId !== null && $this->ownerPid === getmypid()) {
620
            $this->log->info('shutdown master process.');
621
            posix_kill($this->masterProcessId, SIGTERM);
622
623
            unset($this->taskQueue);
624
            unset($this->resultQueue);
625
        }
626
627
        if ($this->exceptionHasOccured) {
628
            $this->log->info('destruct processes are started.(exception has occured)');
629
            $this->log->info('--> deleting all shared memory.');
630
            $this->deleteAllData();
631
        } elseif ($this->ownerPid === getmypid() && !$this->joined && $this->receivedSignal === null) {
632
            $message = 'snidel will have to wait for the child process is completed. please use Snidel::wait()';
633
            $this->log->error($message);
634
            $this->log->info('destruct processes are started.');
635
636
            $this->log->info('--> sending a signal to children.');
637
            $this->sendSignalToChildren(SIGTERM);
638
639
            $this->log->info('--> deleting all shared memory.');
640
            $this->deleteAllData();
641
642
            $this->log->info('--> deleting token.');
643
            unset($this->token);
644
645
            $this->log->info('--> destruct processes are finished successfully.');
646
            throw new \LogicException($message);
647
        }
648
    }
649
}
650