ParallelProcessHandler   A
last analyzed

Complexity

Total Complexity 13

Size/Duplication

Total Lines 111
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
wmc 13
eloc 32
c 0
b 0
f 0
dl 0
loc 111
rs 10

6 Methods

Rating   Name   Duplication   Size   Complexity  
A __construct() 0 5 1
A process() 0 15 4
A sendEventIfComplete() 0 8 2
A checkRunningProcesses() 0 9 3
A getResult() 0 6 1
A addToPool() 0 7 2
1
<?php
2
3
declare(strict_types=1);
4
5
namespace Churn\Process\Handler;
6
7
use Churn\Event\Broker;
8
use Churn\Event\Event\AfterFileAnalysisEvent;
9
use Churn\File\File;
10
use Churn\Process\ProcessFactory;
11
use Churn\Process\ProcessInterface;
12
use Churn\Result\Result;
13
use Generator;
14
15
/**
16
 * @internal
17
 */
18
final class ParallelProcessHandler extends BaseProcessHandler
19
{
20
    /**
21
     * Array of completed processes.
22
     *
23
     * @var array<Result>
24
     */
25
    private $completedProcesses;
26
27
    /**
28
     * Number of parallel jobs to run.
29
     *
30
     * @var integer
31
     */
32
    private $numberOfParallelJobs;
33
34
    /**
35
     * @var Broker
36
     */
37
    private $broker;
38
39
    /**
40
     * @param integer $numberOfParallelJobs Number of parallel jobs to run.
41
     * @param Broker $broker The event broker.
42
     */
43
    public function __construct(int $numberOfParallelJobs, Broker $broker)
44
    {
45
        $this->numberOfParallelJobs = $numberOfParallelJobs;
46
        $this->completedProcesses = [];
47
        $this->broker = $broker;
48
    }
49
50
    /**
51
     * Run the processes to gather information.
52
     *
53
     * @param Generator $filesFinder Collection of files.
54
     * @param ProcessFactory $processFactory Process Factory.
55
     * @psalm-param Generator<\Churn\File\File> $filesFinder
56
     */
57
    #[\Override]
58
    public function process(Generator $filesFinder, ProcessFactory $processFactory): void
59
    {
60
        $pool = [];
61
62
        foreach ($filesFinder as $file) {
63
            while (\count($pool) >= $this->numberOfParallelJobs) {
64
                $this->checkRunningProcesses($pool);
65
            }
66
67
            $this->addToPool($pool, $file, $processFactory);
68
        }
69
70
        while (\count($pool) > 0) {
71
            $this->checkRunningProcesses($pool);
72
        }
73
    }
74
75
    /**
76
     * @param array<ProcessInterface> $pool Pool of processes.
77
     */
78
    private function checkRunningProcesses(array &$pool): void
79
    {
80
        foreach ($pool as $key => $process) {
81
            if (!$process->isSuccessful()) {
82
                continue;
83
            }
84
85
            unset($pool[$key]);
86
            $this->sendEventIfComplete($this->getResult($process));
87
        }
88
    }
89
90
    /**
91
     * @param array<ProcessInterface> $pool Pool of processes.
92
     * @param File $file The file to process.
93
     * @param ProcessFactory $processFactory Process Factory.
94
     */
95
    private function addToPool(array &$pool, File $file, ProcessFactory $processFactory): void
96
    {
97
        $i = 0;
98
        foreach ($processFactory->createProcesses($file) as $process) {
99
            $process->start();
100
            $pool["$i:" . $file->getDisplayPath()] = $process;
101
            $i++;
102
        }
103
    }
104
105
    /**
106
     * Returns the result of processes for a file.
107
     *
108
     * @param ProcessInterface $process A successful process.
109
     */
110
    private function getResult(ProcessInterface $process): Result
111
    {
112
        $key = $process->getFile()->getDisplayPath();
113
        $this->completedProcesses[$key] = $this->completedProcesses[$key] ?? new Result($process->getFile());
114
115
        return $this->saveResult($process, $this->completedProcesses[$key]);
116
    }
117
118
    /**
119
     * @param Result $result The result of the processes for a file.
120
     */
121
    private function sendEventIfComplete(Result $result): void
122
    {
123
        if (!$result->isComplete()) {
124
            return;
125
        }
126
127
        unset($this->completedProcesses[$result->getFile()->getDisplayPath()]);
128
        $this->broker->notify(new AfterFileAnalysisEvent($result));
129
    }
130
}
131