Completed
Branch develop (ff0d26)
by Edward
03:17
created

ProcessQueue::resolve()   A

Complexity

Conditions 3
Paths 4

Size

Total Lines 11
Code Lines 6

Duplication

Lines 0
Ratio 0 %

Importance

Changes 1
Bugs 0 Features 1
Metric Value
c 1
b 0
f 1
dl 0
loc 11
rs 9.4286
cc 3
eloc 6
nc 4
nop 1
1
<?php
2
/**
3
 * File ProcessQueue.php
4
 *
5
 * @author Edward Pfremmer <[email protected]>
6
 */
7
namespace Epfremme\ProcessQueue\Process;
8
9
use Epfremme\Collection\Collection;
10
use Epfremme\ProcessQueue\System\ProcessorCounter;
11
use GuzzleHttp\Promise\PromiseInterface;
12
use Symfony\Component\Process\Process;
13
14
/**
15
 * Class ProcessQueue
16
 *
17
 * @package Epfremme\ProcessQueue\Process
18
 */
19
class ProcessQueue implements \Countable
20
{
21
    const SLEEP_MICRO_SECONDS = 1000;
22
    const PROMISE_KEY = 'promise';
23
24
    /**
25
     * @var int
26
     */
27
    private $limit;
28
29
    /**
30
     * @var Collection
31
     */
32
    private $queue;
33
34
    /**
35
     * ProcessQueue constructor
36
     *
37
     * @param int $limit
38
     */
39
    public function __construct($limit = null)
40
    {
41
        if (!is_null($limit) && !is_int($limit)) {
42
            throw new \InvalidArgumentException(sprintf('Limit must be of type int %s given', gettype($limit)));
43
        }
44
45
        $this->limit = $limit ?: (new ProcessorCounter())->getCpuCount();
46
        $this->queue = new Collection();
47
    }
48
49
    /**
50
     * Add new process to the queue
51
     *
52
     * @param Process $process
53
     * @return $this
54
     */
55
    public function add(Process $process)
56
    {
57
        $this->queue->add($process);
58
59
        return $this;
60
    }
61
62
    /**
63
     * Return pending processes
64
     *
65
     * @return Collection
66
     */
67
    public function getPending()
68
    {
69
        return $this->queue->filter(function(Process $process) {
70
            return !$process->isStarted();
71
        });
72
    }
73
74
    /**
75
     * Return running processes
76
     *
77
     * @return Collection
78
     */
79
    public function getRunning()
80
    {
81
        return $this->queue->filter(function(Process $process) {
82
            return $process->isRunning();
83
        });
84
    }
85
86
    /**
87
     * Return completed processes
88
     *
89
     * @return Collection
90
     */
91
    public function getCompleted()
92
    {
93
        return $this->queue->filter(function(Process $process) {
94
            return $process->isTerminated();
95
        });
96
    }
97
98
    /**
99
     * Clear and resolve completed processes from the queue
100
     *
101
     * @return void
102
     */
103
    private function clearCompleted()
104
    {
105
        $this->getCompleted()->each(function(Process $process) {
106
            $process->wait();
107
            $this->resolve($process);
108
        });
109
    }
110
111
    /**
112
     * Halt execution and wait for target process to finish
113
     *
114
     * @param Process $process
115
     */
116
    public function resolve(Process $process)
117
    {
118
        $options = $process->getOptions();
119
        $promise = array_key_exists(self::PROMISE_KEY, $options) ? $options[self::PROMISE_KEY] : null;
120
121
        if ($promise instanceof PromiseInterface) {
122
            $promise->wait(false);
123
        }
124
125
        $this->queue->remove($process);
126
    }
127
128
    /**
129
     * Run the queue
130
     *
131
     * @return \Generator
132
     */
133
    public function __invoke()
134
    {
135
        while (!$this->queue->isEmpty()) {
136
            usleep(self::SLEEP_MICRO_SECONDS);
137
138
            $pending = $this->getPending();
139
140
            if ($pending->count() && $this->getRunning()->count() < $this->limit) {
141
                yield $pending->shift();
142
            } else {
143
                yield new NullProcess();
144
            }
145
146
            $this->clearCompleted();
147
        }
148
    }
149
150
    /**
151
     * Return queue count
152
     *
153
     * @return int
154
     */
155
    public function count()
156
    {
157
        return $this->queue->count();
158
    }
159
}
160