ForkManager::isMaximumNumberOfThreadsReached()   A
last analyzed

Complexity

Conditions 1
Paths 1

Size

Total Lines 4

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
dl 0
loc 4
rs 10
c 0
b 0
f 0
cc 1
nc 1
nop 0
1
<?php
2
/**
3
 * @author stev leibelt <[email protected]>
4
 * @since 2014-07-20 
5
 */
6
7
namespace Net\Bazzline\Component\ProcessForkManager;
8
9
use Net\Bazzline\Component\MemoryLimitManager\MemoryLimitManager;
10
use Net\Bazzline\Component\MemoryLimitManager\MemoryLimitManagerDependentInterface;
11
use Net\Bazzline\Component\TimeLimitManager\TimeLimitManager;
12
use Net\Bazzline\Component\TimeLimitManager\TimeLimitManagerDependentInterface;
13
use Symfony\Component\EventDispatcher\EventDispatcher;
14
15
/**
16
 * Class ForkManager
17
 * @package Net\Bazzline\Component\ProcessForkManager
18
 */
19
class ForkManager implements ExecutableInterface, MemoryLimitManagerDependentInterface, TimeLimitManagerDependentInterface
20
{
21
    /**
22
     * @var ForkManagerEvent
23
     */
24
    private $event;
25
26
    /**
27
     * @var EventDispatcher
28
     */
29
    private $eventDispatcher;
30
31
    /**
32
     * @var int
33
     */
34
    private $maximumNumberOfThreads;
35
36
    /**
37
     * @var MemoryLimitManager
38
     */
39
    private $memoryLimitManager;
40
41
    /**
42
     * @var int
43
     */
44
    private $numberOfMicrosecondsToCheckThreadStatus;
45
46
    /**
47
     * @var int
48
     */
49
    private $processId;
50
51
    /**
52
     * @var bool
53
     */
54
    private $noShutdownSignalReceived;
55
56
    /**
57
     * @var TaskManager
58
     */
59
    private $taskManager;
60
61
    /**
62
     * @var TimeLimitManager
63
     */
64
    private $timeLimitManager;
65
66
    /**
67
     * @var array
68
     */
69
    private $threads;
70
71
    /**
72
     * @param bool $validateEnvironment
73
     * @throws RuntimeException
74
     */
75
    public function __construct($validateEnvironment = true)
76
    {
77
        if ($validateEnvironment) {
78
            //@todo add all needed
79
            $mandatoryPHPFunctions = array(
80
                'getmypid',
81
                'memory_get_usage',
82
                'pcntl_fork',
83
                'pcntl_signal',
84
                'pcntl_signal_dispatch',
85
                'posix_getpid',
86
                'spl_object_hash'
87
            );
88
89
            foreach ($mandatoryPHPFunctions as $mandatoryPHPFunction) {
90
                if (!function_exists($mandatoryPHPFunction)) {
91
                    throw new RuntimeException(
92
                        'mandatory php function "' . $mandatoryPHPFunction . '" is not available'
93
                    );
94
                }
95
            }
96
        }
97
98
        declare(ticks = 10);
99
100
        $this->processId = posix_getpid();
101
        $this->noShutdownSignalReceived = true;
102
        $this->threads = array();
103
    }
104
105
    /**
106
     * @param AbstractTask $task
107
     * @return $this
108
     */
109
    public function addTask(AbstractTask $task)
110
    {
111
        $task->setParentProcessId($this->processId);
112
        $this->taskManager->addOpenTask($task);
113
114
        return $this;
115
    }
116
117
    /**
118
     * @return EventDispatcher
119
     */
120
    public function getEventDispatcher()
121
    {
122
        return $this->eventDispatcher;
123
    }
124
125
    /**
126
     * @return MemoryLimitManager
127
     */
128
    public function getMemoryLimitManager()
129
    {
130
        return $this->memoryLimitManager;
131
    }
132
133
    /**
134
     * @return TaskManager
135
     */
136
    public function getTaskManager()
137
    {
138
        return $this->taskManager;
139
    }
140
141
    /**
142
     * @return TimeLimitManager
143
     */
144
    public function getTimeLimitManager()
145
    {
146
        return $this->timeLimitManager;
147
    }
148
149
    /**
150
     * @param ForkManagerEvent $event
151
     */
152
    public function injectEvent(ForkManagerEvent $event)
153
    {
154
        $this->event = $event;
155
    }
156
157
    /**
158
     * @param EventDispatcher $dispatcher
159
     */
160
    public function injectEventDispatcher(EventDispatcher $dispatcher)
161
    {
162
        $this->eventDispatcher = $dispatcher;
163
    }
164
165
    /**
166
     * @param MemoryLimitManager $manager
167
     */
168
    public function injectMemoryLimitManager(MemoryLimitManager $manager)
169
    {
170
        $this->memoryLimitManager = $manager;
171
    }
172
173
    /**
174
     * @param TimeLimitManager $manager
175
     */
176
    public function injectTimeLimitManager(TimeLimitManager $manager)
177
    {
178
        $this->timeLimitManager = $manager;
179
    }
180
181
    /**
182
     * @param TaskManager $manager
183
     */
184
    public function injectTaskManager(TaskManager $manager)
185
    {
186
        $this->taskManager = $manager;
187
    }
188
189
    /**
190
     * @param int $maximumNumberOfThreads
191
     */
192
    public function setMaximumNumberOfThreads($maximumNumberOfThreads)
193
    {
194
        $this->maximumNumberOfThreads = (int) $maximumNumberOfThreads;
195
    }
196
197
    /**
198
     * @param int $numberOfMicrosecondsToCheckThreadStatus
199
     */
200
    public function setNumberOfMicrosecondsToCheckThreadStatus($numberOfMicrosecondsToCheckThreadStatus)
201
    {
202
        $this->numberOfMicrosecondsToCheckThreadStatus = (int) $numberOfMicrosecondsToCheckThreadStatus;
203
    }
204
205
    /**
206
     * @throws RuntimeException
207
     */
208
    public function execute()
209
    {
210
        $this->assertMandatoryPropertiesAreSet();
211
        $this->setUpSignalHandling('signalHandler');
212
213
        $this->eventDispatcher->dispatch(
214
            ForkManagerEvent::STARTING_EXECUTION,
215
            $this->createNewEvent(__METHOD__)
216
        );
217
218
        while ($this->taskManager->areThereOpenTasksLeft()
219
            && $this->noShutdownSignalReceived) {
220
            if ($this->timeLimitManager->isLimitReached()) {
221
                $this->eventDispatcher->dispatch(
222
                    ForkManagerEvent::REACHING_TIME_LIMIT,
223
                    $this->createNewEvent(__METHOD__, $this)
224
                );
225
                $this->stopAllThreads();
226
            } else if ($this->isMaximumMemoryLimitOfWholeThreadsReached()) {
227
                $this->eventDispatcher->dispatch(
228
                    ForkManagerEvent::REACHING_TIME_LIMIT,
229
                    $this->createNewEvent(__METHOD__, $this)
230
                );
231
                $this->stopNewestThread();
232
                $this->sleep();
233
            } else {
234
                if ($this->isMaximumNumberOfThreadsReached()) {
235
                    $this->updateNumberOfRunningThreads();
236
                    $this->sleep();
237
                } else {
238
                    $task = $this->taskManager->getOpenTask();
239
                    if ($task instanceof AbstractTask) {
240
                        $this->startThread($task);
241
                    }
242
                }
243
            }
244
        }
245
246
        $this->eventDispatcher->dispatch(
247
            ForkManagerEvent::FINISHED_EXECUTION_OF_OPEN_TASK,
248
            $this->createNewEvent(__METHOD__)
249
        );
250
        $this->eventDispatcher->dispatch(
251
            ForkManagerEvent::STARTING_WAITING_FOR_RUNNING_TASKS,
252
            $this->createNewEvent(__METHOD__)
253
        );
254
255
        while ($this->notAllThreadsAreFinished()
256
            && $this->noShutdownSignalReceived) {
257
            if ($this->timeLimitManager->isLimitReached()) {
258
                $this->eventDispatcher->dispatch(
259
                    ForkManagerEvent::REACHING_TIME_LIMIT,
260
                    $this->createNewEvent(__METHOD__, $this)
261
                );
262
                $this->stopAllThreads();
263
            } else if ($this->isMaximumMemoryLimitOfWholeThreadsReached()) {
264
                $this->eventDispatcher->dispatch(
265
                    ForkManagerEvent::REACHING_TIME_LIMIT,
266
                    $this->createNewEvent(__METHOD__, $this)
267
                );
268
                $this->stopNewestThread();
269
                $this->sleep();
270
            } else {
271
                $this->updateNumberOfRunningThreads();
272
                $this->sleep();
273
            }
274
        }
275
276
        $this->eventDispatcher->dispatch(
277
            ForkManagerEvent::FINISHED_WAITING_FOR_RUNNING_TASKS,
278
            $this->createNewEvent(__METHOD__)
279
        );
280
        $this->eventDispatcher->dispatch(
281
            ForkManagerEvent::FINISHED_EXECUTION,
282
            $this->createNewEvent(__METHOD__)
283
        );
284
    }
285
286
    /**
287
     * @return bool
288
     */
289
    private function notAllThreadsAreFinished()
290
    {
291
        return ($this->countNumberOfThreads() !== 0);
292
    }
293
294
    private function updateNumberOfRunningThreads()
295
    {
296
        foreach ($this->threads as $processId => $data) {
297
            if ($this->hasThreadFinished($processId)) {
298
                $this->taskManager->markRunningTaskAsFinished($data['task']);
299
                unset($this->threads[$processId]);
300
                $this->eventDispatcher->dispatch(
301
                    ForkManagerEvent::FINISHED_TASK,
302
                    $this->createNewEvent(__METHOD__, null, $data['task'])
303
                );
304
            }
305
        }
306
    }
307
308
    /**
309
     * @param AbstractTask $task
310
     * @throws RuntimeException
311
     */
312
    private function startThread(AbstractTask $task)
313
    {
314
        $time = time();
315
        $processId = pcntl_fork();
316
317
        if ($processId < 0) {
318
            throw new RuntimeException(
319
                'can not fork process'
320
            );
321
        } else if ($processId === 0) {
322
            //child
323
            $task->execute();
324
            exit(0);
325
        } else {
326
            //parent
327
            //$processId > 0
328
            $this->eventDispatcher->dispatch(
329
                ForkManagerEvent::STARTING_TASK,
330
                $this->createNewEvent(__METHOD__, null, $task)
331
            );
332
            $this->threads[$processId] = array(
333
                'startTime' => $time,
334
                'task' => $task
335
            );
336
            $this->taskManager->markOpenTaskAsRunning($task);
337
        }
338
    }
339
340
    /**
341
     * @param $processId
342
     * @throws RuntimeException
343
     */
344
    private function stopThread($processId)
345
    {
346
        if ($processId > 0) {
347
            if (isset($this->threads[$processId])) {
348
                $isStopped = posix_kill($processId, SIGTERM);
349
                if ($isStopped) {
350
                    $task = $this->threads[$processId]['task'];
351
                    unset($this->threads[$processId]);
352
                    $this->taskManager->markRunningTaskAsAborted($task);
353
                    $this->eventDispatcher->dispatch(
354
                        ForkManagerEvent::STOPPING_TASK,
355
                        $this->createNewEvent(__METHOD__, null, $task)
356
                    );
357
                } else {
358
                    $this->sleep(10);
359
                    if (!$this->hasThreadFinished($processId)) {
360
                        throw new RuntimeException(
361
                            'thread with process id "' . $processId . '" could not be stopped'
362
                        );
363
                    }
364
                }
365
            }
366
        }
367
    }
368
369
    /**
370
     * @throws RuntimeException
371
     * @todo think about the idea to put this in a "HandleMaximumMemoryLimitReachedStrategy"
372
     */
373
    private function stopNewestThread()
374
    {
375
        $newestProcessId = null;
376
        $newestStartTime = 0;
377
378
        foreach ($this->threads as $processId => $data) {
379
            if ($data['startTime'] > $newestStartTime) {
380
                $newestProcessId = $processId;
381
            }
382
        }
383
384
        if (!is_null($newestProcessId)) {
385
            $this->stopThread($newestProcessId);
386
        }
387
    }
388
389
    /**
390
     * @throws RuntimeException
391
     */
392
    private function stopAllThreads()
393
    {
394
        foreach ($this->threads as $processId => $data) {
395
            $this->stopThread($processId);
396
        }
397
    }
398
399
    /**
400
     * @param int $processId
401
     * @return int
402
     * @throws RuntimeException
403
     */
404
    private function hasThreadFinished($processId)
405
    {
406
        if ($processId > 0) {
407
            $statusCode = 0;
408
            $result = pcntl_waitpid($processId, $statusCode, WUNTRACED OR WNOHANG);
409
410
            if ($statusCode > 0) {
411
                throw new RuntimeException(
412
                    'thread with process id "' . $processId .
413
                    '" returned statusCode code "' . $statusCode . '"'
414
                );
415
            }
416
417
            $threadHasFinished = ($result === $processId);
418
        } else {
419
            $threadHasFinished = true;
420
        }
421
422
        return $threadHasFinished;
423
    }
424
425
    /**
426
     * @return int
427
     */
428
    private function countNumberOfThreads()
429
    {
430
        return count($this->threads);
431
    }
432
433
    /**
434
     * @return bool
435
     */
436
    private function isMaximumNumberOfThreadsReached()
437
    {
438
        return ($this->countNumberOfThreads() >= $this->maximumNumberOfThreads);
439
    }
440
441
    /**
442
     * @return bool
443
     */
444
    private function isMaximumMemoryLimitOfWholeThreadsReached()
445
    {
446
        $processIds = array_keys($this->threads);
447
448
        $isReached = $this->memoryLimitManager->isLimitReached($processIds);
449
450
        return $isReached;
451
    }
452
453
    /**
454
     * @param int $steps
455
     */
456
    private function sleep($steps = 1)
457
    {
458
        $this->dispatchSignal();
459
460
        for ($iterator = 0; $iterator < $steps; ++$iterator) {
461
            usleep($this->numberOfMicrosecondsToCheckThreadStatus);
462
        }
463
    }
464
465
    //begin of posix signal handling
466
    /**
467
     * @param int $signal
468
     * @codeCoverageIgnore
469
     */
470
    private function signalHandler($signal)
0 ignored issues
show
Unused Code introduced by
This method is not used, and could be removed.
Loading history...
471
    {
472
        //dispatch event caught signal
473
        switch ($signal) {
474
            case SIGCHLD:
475
                $this->updateNumberOfRunningThreads();
476
                break;
477
            case SIGABRT:
478
            case SIGALRM:
479
            case SIGHUP:
480
            case SIGINT:
481
            default:
482
                echo $signal . PHP_EOL;
483
                $this->shutdown();
484
        }
485
    }
486
487
    private function shutdown()
488
    {
489
        $this->eventDispatcher->dispatch(
490
            ForkManagerEvent::STOPPING_EXECUTION,
491
            $this->createNewEvent(__METHOD__, $this)
492
        );
493
        $this->stopAllThreads();
494
        $this->noShutdownSignalReceived = false;
495
    }
496
497
    private function dispatchSignal()
498
    {
499
        pcntl_signal_dispatch();
500
    }
501
502
    /**
503
     * @param $nameOfSignalHandlerMethod
504
     * @throws InvalidArgumentException
505
     * @codeCoverageIgnore
506
     */
507
    private function setUpSignalHandling($nameOfSignalHandlerMethod)
508
    {
509 View Code Duplication
        if (!is_callable(array($this, $nameOfSignalHandlerMethod))) {
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated across your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
510
            throw new InvalidArgumentException(
511
                'provided method name "' . $nameOfSignalHandlerMethod . '" is not available'
512
            );
513
        }
514
515
        pcntl_signal(SIGHUP,    array($this, $nameOfSignalHandlerMethod));    //controlling terminal is closed
516
        pcntl_signal(SIGINT,    array($this, $nameOfSignalHandlerMethod));  //interrupt this processing | ctrl+c
517
        //pcntl_signal(SIGUSR1,   array($this, $nameOfSignalHandlerMethod));    //user defined conditions
518
        //pcntl_signal(SIGUSR2,   array($this, $nameOfSignalHandlerMethod));    //user defined conditions
519
        //pcntl_signal(SIGQUIT,   array($this, $nameOfSignalHandlerMethod));    //quit your processing
520
        //pcntl_signal(SIGILL,    array($this, $nameOfSignalHandlerMethod));    //illegal instruction performed
521
        pcntl_signal(SIGABRT,   array($this, $nameOfSignalHandlerMethod));    //abort process
522
        //pcntl_signal(SIGFPE,    array($this, $nameOfSignalHandlerMethod));    //error on arithmetic
523
        //pcntl_signal(SIGSEGV,   array($this, $nameOfSignalHandlerMethod));    //invalid virtual memory reference
524
        //pcntl_signal(SIGPIPE,   array($this, $nameOfSignalHandlerMethod));    //write to a pipe without other process is connected to it
525
        pcntl_signal(SIGALRM,   array($this, $nameOfSignalHandlerMethod));    //some kind of limit is reached
526
        //pcntl_signal(SIGTERM,   array($this, $nameOfSignalHandlerMethod));  //termination signal | kill <pid>
527
        pcntl_signal(SIGCHLD,   array($this, $nameOfSignalHandlerMethod));    //child is terminated
528
        //pcntl_signal(SIGCONT,   array($this, $nameOfSignalHandlerMethod));    //continue your work
529
        //pcntl_signal(SIGTSTP,   array($this, $nameOfSignalHandlerMethod));    //terminal stop signal
530
        //pcntl_signal(SIGTTIN,   array($this, $nameOfSignalHandlerMethod));    //background process attempting read
531
        //pcntl_signal(SIGTTOU,   array($this, $nameOfSignalHandlerMethod));    //background process attempting write
532
    }
533
    //end of posix signal handling
534
535
    /**
536
     * @throws RuntimeException
537
     */
538
    private function assertMandatoryPropertiesAreSet()
539
    {
540
        $properties = array(
541
            'event',
542
            'eventDispatcher',
543
            'memoryLimitManager',
544
            'timeLimitManager',
545
            'taskManager'
546
        );
547
548
        foreach ($properties as $property) {
549
            if (is_null($this->$property)) {
550
                throw new RuntimeException(
551
                    'mandatory property "' . $property . '" not set'
552
                );
553
            }
554
        }
555
    }
556
557
    /**
558
     * @param string $source
559
     * @param ForkManager $forkManager
560
     * @param TaskInterface $task
561
     * @return ForkManagerEvent
562
     */
563
    private function createNewEvent($source = null, ForkManager $forkManager = null, TaskInterface $task = null)
564
    {
565
        $event = clone $this->event;
566
567
        if ($forkManager instanceof ForkManager) {
568
            $event->setForkManager($this);
569
        }
570
571
        if ($task instanceof TaskInterface) {
572
            $event->setTask($task);
573
        }
574
575
        if (!is_null($source)) {
576
            $event->setSource($source);
577
        }
578
579
        return $event;
580
    }
581
}