ProcessQueue::resolve()   A
last analyzed

Complexity

Conditions 3
Paths 4

Size

Total Lines 11
Code Lines 6

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 8
CRAP Score 3

Importance

Changes 1
Bugs 0 Features 1
Metric Value
c 1
b 0
f 1
dl 0
loc 11
ccs 8
cts 8
cp 1
rs 9.4286
cc 3
eloc 6
nc 4
nop 1
crap 3
1
<?php
2
/**
3
 * File ProcessQueue.php
4
 *
5
 * @author Edward Pfremmer <[email protected]>
6
 */
7
namespace Epfremme\ProcessQueue\Process;
8
9
use Epfremme\Collection\Collection;
10
use Epfremme\ProcessQueue\System\ProcessorCounter;
11
use GuzzleHttp\Promise\PromiseInterface;
12
use Symfony\Component\Process\Process;
13
14
/**
15
 * Class ProcessQueue
16
 *
17
 * @package Epfremme\ProcessQueue\Process
18
 */
19
class ProcessQueue implements \Countable
20
{
21
    const SLEEP_MICRO_SECONDS = 1000;
22
    const PROMISE_KEY = 'promise';
23
24
    /**
25
     * @var int
26
     */
27
    private $limit;
28
29
    /**
30
     * @var Collection
31
     */
32
    private $queue;
33
34
    /**
35
     * ProcessQueue constructor
36
     *
37
     * @param int $limit
38
     */
39 20
    public function __construct($limit = null)
40
    {
41 20
        $limit = !is_null($limit) ? $limit : (new ProcessorCounter())->getCpuCount();
42
43 20
        if (!is_numeric($limit)) {
44 1
            throw new \InvalidArgumentException(sprintf('Limit must be numeric, type "%s" given', gettype($limit)));
45
        }
46
47 19
        if ($limit < 1) {
48 1
            throw new \InvalidArgumentException('Process limit must be greater than 0');
49
        }
50
51 18
        $this->limit = (int) $limit;
52 18
        $this->queue = new Collection();
53 18
    }
54
55
    /**
56
     * Add new process to the queue
57
     *
58
     * @param Process $process
59
     * @return $this
60
     */
61 13
    public function add(Process $process)
62
    {
63 13
        $this->queue->add($process);
64
65 13
        return $this;
66
    }
67
68
    /**
69
     * Return pending processes
70
     *
71
     * @return Collection
72
     */
73 6
    public function getPending()
74
    {
75
        return $this->queue->filter(function(Process $process) {
76 6
            return !$process->isStarted();
77 6
        });
78
    }
79
80
    /**
81
     * Return running processes
82
     *
83
     * @return Collection
84
     */
85 6
    public function getRunning()
86
    {
87
        return $this->queue->filter(function(Process $process) {
88 6
            return $process->isRunning();
89 6
        });
90
    }
91
92
    /**
93
     * Return completed processes
94
     *
95
     * @return Collection
96
     */
97 6
    public function getCompleted()
98
    {
99
        return $this->queue->filter(function(Process $process) {
100 6
            return $process->isTerminated();
101 6
        });
102
    }
103
104
    /**
105
     * Clear and resolve completed processes from the queue
106
     *
107
     * @return void
108
     */
109
    private function clearCompleted()
110
    {
111 5
        $this->getCompleted()->each(function(Process $process) {
112 5
            $this->resolve($process);
113 5
        });
114 5
    }
115
116
    /**
117
     * Halt execution and wait for target process to finish
118
     *
119
     * @param Process $process
120
     */
121 7
    public function resolve(Process $process)
122
    {
123 7
        $options = $process->getOptions();
124 7
        $promise = array_key_exists(self::PROMISE_KEY, $options) ? $options[self::PROMISE_KEY] : null;
125
126 7
        if ($promise instanceof PromiseInterface) {
127 5
            $promise->wait(false);
128 5
        }
129
130 7
        $this->queue->remove($process);
131 7
    }
132
133
    /**
134
     * Run the queue
135
     *
136
     * @return \Generator
137
     */
138 5
    public function __invoke()
139
    {
140 5
        while (!$this->queue->isEmpty()) {
141 5
            usleep(self::SLEEP_MICRO_SECONDS);
142
143 5
            $pending = $this->getPending();
144
145 5
            if ($pending->count() && $this->getRunning()->count() < $this->limit) {
146 5
                yield $pending->shift();
147 5
            } else {
148 5
                yield new NullProcess();
149
            }
150
151 5
            $this->clearCompleted();
152 5
        }
153 5
    }
154
155
    /**
156
     * Return queue count
157
     *
158
     * @return int
159
     */
160 6
    public function count()
161
    {
162 6
        return $this->queue->count();
163
    }
164
}
165