Passed
Push — master ( c5a620...f982b7 )
by Joao
07:36
created

ThreadPool::removeWorker()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 5
Code Lines 3

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 4
CRAP Score 1

Importance

Changes 0
Metric Value
eloc 3
c 0
b 0
f 0
dl 0
loc 5
ccs 4
cts 4
cp 1
rs 10
cc 1
nc 1
nop 1
crap 1
1
<?php
2
3
namespace ByJG\PHPThread;
4
5
/**
6
 * Manage a pool of threads.
7
 *
8
 */
9
class ThreadPool
10
{
11
12
    /**
13
     * The list of threads
14
     * @var array
15
     */
16
    protected $threadList = array();
17
18
    /**
19
     * The list of threads instances
20
     * @var array
21
     */
22
    protected $threadInstance = array();
23
24
    protected $poolStarted = false;
25
26
    /**
27
     * Queue a new thread worker
28
     *
29
     * @param callable $callable
30
     * @param array $params The thread parameters
31
     * @param string $thid The Thread id to identify the ID
32
     * @return string
33
     * @throws \InvalidArgumentException
34
     */
35 3
    public function queueWorker(callable $callable, $params = [], $thid = null)
36
    {
37 3
        if (!is_array($params)) {
0 ignored issues
show
introduced by
The condition is_array($params) is always true.
Loading history...
38
            throw new \InvalidArgumentException('The params needs to be an array');
39
        }
40
41 3
        if (is_null($thid)) {
42 3
            $thid = uniqid("", true);
43
        }
44
45 3
        $data = new \stdClass;
46 3
        $data->callable = $callable;
47 3
        $data->params = $params;
48
49 3
        $this->threadList[$thid] = $data;
50
51 3
        if ($this->isPoolStarted()) {
52 1
            $this->startWorker($thid);
53
        }
54
55 3
        return $thid;
56
    }
57
58
    /**
59
     * Start all the workers in the queue
60
     */
61 2
    public function startPool()
62
    {
63 2
        foreach ($this->threadList as $key => $value) {
64 2
            $this->startWorker($key);
65
        }
66
67 2
        $this->poolStarted = true;
68
    }
69
70 2
    protected function startWorker($threadItemKey)
71
    {
72 2
        $thread = new Thread($this->threadList[$threadItemKey]->callable);
73
74 2
        call_user_func_array([$thread, 'execute'], $this->threadList[$threadItemKey]->params);
75 2
        $this->threadInstance[$threadItemKey] = $thread;
76
    }
77
78
    /**
79
     * Stop all the workers in the queue
80
     */
81 1
    public function stopPool()
82
    {
83 1
        foreach ($this->threadList as $key => $value) {
84 1
            $this->removeWorker($key);
85
        }
86
87 1
        $this->poolStarted = false;
88
    }
89
90
    /**
91
     * Wait until all workers are finished
92
     */
93 1
    public function waitWorkers()
94
    {
95 1
        foreach ($this->threadInstance as $value) {
96 1
            $value->waitFinish();
97
        }
98
    }
99
100
    /**
101
     * How many workers are active
102
     *
103
     * @return int
104
     */
105 3
    public function activeWorkers()
106
    {
107 3
        $count = 0;
108
109 3
        foreach ($this->threadInstance as $value) {
110 2
            $count += $value->isAlive() ? 1 : 0;
111
        }
112
113 3
        return $count;
114
    }
115
116
    /**
117
     * Return a list of threads
118
     *
119
     * @return array
120
     */
121
    public function getThreads()
122
    {
123
        return array_keys($this->threadInstance);
124
    }
125
126
    /**
127
     * Get the thread result from the Shared Memory
128
     *
129
     * @param string $threadId
130
     * @return mixed
131
     */
132 2
    public function getThreadResult($threadId)
133
    {
134 2
        if (!isset($this->threadInstance[$threadId])) {
135 1
            return null;
136
        }
137
138 1
        return $this->threadInstance[$threadId]->getResult();
139
    }
140
141
    /**
142
     * Check if the thread is running or not
143
     *
144
     * @param string $threadId
145
     * @return bool
146
     */
147
    public function isAlive($threadId)
148
    {
149
        $thread = $this->getThreadById($threadId);
150
151
        if (is_null($thread)) {
152
            return null;
153
        }
154
155
        return $thread->isAlive();
156
    }
157
158
    /**
159
     * Return a Thread object based on your id
160
     *
161
     * @param string $threadId
162
     * @return Thread
163
     */
164 1
    protected function getThreadById($threadId)
165
    {
166 1
        if (!isset($this->threadInstance[$threadId])) {
167
            return null;
168
        }
169
170 1
        return $this->threadInstance[$threadId];
171
    }
172
173
    /**
174
     * Stops a specific thread
175
     *
176
     * @param string $threadId
177
     * @param bool $remove
178
     * @return bool
179
     */
180 1
    public function stopWorker($threadId, $remove = true)
181
    {
182 1
        $thread = $this->getThreadById($threadId);
183
184 1
        if (is_null($thread)) {
185
            return null;
186 1
        } elseif ($thread->isAlive()) {
187 1
            $thread->stop();
188 1
            if ($remove) {
189
                $this->removeWorker($threadId);
190
            }
191 1
            return true;
192
        } else {
193
            return false;
194
        }
195
    }
196
197 1
    public function removeWorker($threadId)
198
    {
199 1
        $this->stopWorker($threadId, false);
200 1
        unset($this->threadInstance[$threadId]);
201 1
        unset($this->threadList[$threadId]);
202
    }
203
204 3
    public function isPoolStarted()
205
    {
206 3
        return $this->poolStarted;
207
    }
208
}
209