Completed
Push — master ( a9f956...af79a7 )
by Edward
06:07 queued 03:41
created

ProcessQueue::__invoke()   A

Complexity

Conditions 4
Paths 3

Size

Total Lines 16
Code Lines 9

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 11
CRAP Score 4

Importance

Changes 1
Bugs 0 Features 1
Metric Value
c 1
b 0
f 1
dl 0
loc 16
ccs 11
cts 11
cp 1
rs 9.2
cc 4
eloc 9
nc 3
nop 0
crap 4
1
<?php
2
/**
3
 * File ProcessQueue.php
4
 *
5
 * @author Edward Pfremmer <[email protected]>
6
 */
7
namespace Epfremme\ProcessQueue\Process;
8
9
use Epfremme\Collection\Collection;
10
use Epfremme\ProcessQueue\System\ProcessorCounter;
11
use GuzzleHttp\Promise\PromiseInterface;
12
use Symfony\Component\Process\Process;
13
14
/**
15
 * Class ProcessQueue
16
 *
17
 * @package Epfremme\ProcessQueue\Process
18
 */
19
class ProcessQueue implements \Countable
20
{
21
    const SLEEP_MICRO_SECONDS = 1000;
22
    const PROMISE_KEY = 'promise';
23
24
    /**
25
     * @var int
26
     */
27
    private $limit;
28
29
    /**
30
     * @var Collection
31
     */
32
    private $queue;
33
34
    /**
35
     * ProcessQueue constructor
36
     *
37
     * @param int $limit
38
     */
39 18
    public function __construct($limit = null)
40
    {
41 18
        if (!is_null($limit) && !is_int($limit)) {
42 1
            throw new \InvalidArgumentException(sprintf('Limit must be of type int %s given', gettype($limit)));
43
        }
44
45 17
        $this->limit = $limit ?: (new ProcessorCounter())->getCpuCount();
46 17
        $this->queue = new Collection();
47 17
    }
48
49
    /**
50
     * Add new process to the queue
51
     *
52
     * @param Process $process
53
     * @return $this
54
     */
55 13
    public function add(Process $process)
56
    {
57 13
        $this->queue->add($process);
58
59 13
        return $this;
60
    }
61
62
    /**
63
     * Return pending processes
64
     *
65
     * @return Collection
66
     */
67 6
    public function getPending()
68
    {
69
        return $this->queue->filter(function(Process $process) {
70 6
            return !$process->isStarted();
71 6
        });
72
    }
73
74
    /**
75
     * Return running processes
76
     *
77
     * @return Collection
78
     */
79 6
    public function getRunning()
80
    {
81
        return $this->queue->filter(function(Process $process) {
82 6
            return $process->isRunning();
83 6
        });
84
    }
85
86
    /**
87
     * Return completed processes
88
     *
89
     * @return Collection
90
     */
91 6
    public function getCompleted()
92
    {
93
        return $this->queue->filter(function(Process $process) {
94 6
            return $process->isTerminated();
95 6
        });
96
    }
97
98
    /**
99
     * Clear and resolve completed processes from the queue
100
     *
101
     * @return void
102
     */
103
    private function clearCompleted()
104
    {
105 5
        $this->getCompleted()->each(function(Process $process) {
106 5
            $process->wait();
107 5
            $this->resolve($process);
108 5
        });
109 5
    }
110
111
    /**
112
     * Halt execution and wait for target process to finish
113
     *
114
     * @param Process $process
115
     */
116 7
    public function resolve(Process $process)
117
    {
118 7
        $options = $process->getOptions();
119 7
        $promise = array_key_exists(self::PROMISE_KEY, $options) ? $options[self::PROMISE_KEY] : null;
120
121 7
        if ($promise instanceof PromiseInterface) {
122 5
            $promise->wait(false);
123 5
        }
124
125 7
        $this->queue->remove($process);
126 7
    }
127
128
    /**
129
     * Run the queue
130
     *
131
     * @return \Generator
132
     */
133 5
    public function __invoke()
134
    {
135 5
        while (!$this->queue->isEmpty()) {
136 5
            usleep(self::SLEEP_MICRO_SECONDS);
137
138 5
            $pending = $this->getPending();
139
140 5
            if ($pending->count() && $this->getRunning()->count() < $this->limit) {
141 5
                yield $pending->shift();
142 5
            } else {
143 5
                yield new NullProcess();
144
            }
145
146 5
            $this->clearCompleted();
147 5
        }
148 5
    }
149
150
    /**
151
     * Return queue count
152
     *
153
     * @return int
154
     */
155 6
    public function count()
156
    {
157 6
        return $this->queue->count();
158
    }
159
}
160