Issues (4)

src/ThreadPool.php (1 issue)

Severity
1
<?php
2
3
namespace ByJG\PHPThread;
4
5
use Closure;
6
7
/**
8
 * Manage a pool of threads.
9
 *
10
 */
11
class ThreadPool
12
{
13
14
    /**
15
     * The list of threads
16
     * @var array
17
     */
18
    protected $threadList = array();
19
20
    /**
21
     * The list of threads instances
22
     * @var array
23
     */
24
    protected $threadInstance = array();
25
26
    protected $poolStarted = false;
27
28
    /**
29
     * Queue a new thread worker
30
     *
31
     * @param Closure $closure
32
     * @param array $params The thread parameters
33
     * @param string $thid The Thread id to identify the ID
34
     * @return string
35
     */
36 3
    public function queueWorker(Closure $closure, $params = [], $thid = null)
37
    {
38 3
        if (!is_array($params)) {
0 ignored issues
show
The condition is_array($params) is always true.
Loading history...
39
            throw new \InvalidArgumentException('The params needs to be an array');
40
        }
41
42 3
        if (is_null($thid)) {
43 3
            $thid = uniqid("", true);
44
        }
45
46 3
        $data = new \stdClass;
47 3
        $data->closure = $closure;
48 3
        $data->params = $params;
49
50 3
        $this->threadList[$thid] = $data;
51
52 3
        if ($this->isPoolStarted()) {
53 1
            $this->startWorker($thid);
54
        }
55
56 3
        return $thid;
57
    }
58
59
    /**
60
     * Start all the workers in the queue
61
     */
62 2
    public function startPool()
63
    {
64 2
        foreach ($this->threadList as $key => $value) {
65 2
            $this->startWorker($key);
66
        }
67
68 2
        $this->poolStarted = true;
69
    }
70
71 2
    protected function startWorker($threadItemKey)
72
    {
73 2
        $thread = new Thread($this->threadList[$threadItemKey]->closure);
74
75 2
        call_user_func_array([$thread, 'execute'], $this->threadList[$threadItemKey]->params);
76 2
        $this->threadInstance[$threadItemKey] = $thread;
77
    }
78
79
    /**
80
     * Stop all the workers in the queue
81
     */
82 1
    public function stopPool()
83
    {
84 1
        foreach ($this->threadList as $key => $value) {
85 1
            $this->removeWorker($key);
86
        }
87
88 1
        $this->poolStarted = false;
89
    }
90
91
    /**
92
     * Wait until all workers are finished
93
     */
94 1
    public function waitWorkers()
95
    {
96 1
        foreach ($this->threadInstance as $value) {
97 1
            $value->waitFinish();
98
        }
99
    }
100
101
    /**
102
     * How many workers are active
103
     *
104
     * @return int
105
     */
106 3
    public function activeWorkers()
107
    {
108 3
        $count = 0;
109
110 3
        foreach ($this->threadInstance as $value) {
111 2
            $count += $value->isAlive() ? 1 : 0;
112
        }
113
114 3
        return $count;
115
    }
116
117
    /**
118
     * Return a list of threads
119
     *
120
     * @return array
121
     */
122
    public function getThreads()
123
    {
124
        return array_keys($this->threadInstance);
125
    }
126
127
    /**
128
     * Get the thread result from the Shared Memory
129
     *
130
     * @param string $threadId
131
     * @return mixed
132
     */
133 2
    public function getThreadResult($threadId)
134
    {
135 2
        if (!isset($this->threadInstance[$threadId])) {
136 1
            return null;
137
        }
138
139 1
        return $this->threadInstance[$threadId]->getResult();
140
    }
141
142
    /**
143
     * Check if the thread is running or not
144
     *
145
     * @param string $threadId
146
     * @return bool
147
     */
148
    public function isAlive($threadId)
149
    {
150
        $thread = $this->getThreadById($threadId);
151
152
        if (is_null($thread)) {
153
            return null;
154
        }
155
156
        return $thread->isAlive();
157
    }
158
159
    /**
160
     * Return a Thread object based on your id
161
     *
162
     * @param string $threadId
163
     * @return Thread
164
     */
165 1
    protected function getThreadById($threadId)
166
    {
167 1
        if (!isset($this->threadInstance[$threadId])) {
168
            return null;
169
        }
170
171 1
        return $this->threadInstance[$threadId];
172
    }
173
174
    /**
175
     * Stops a specific thread
176
     *
177
     * @param string $threadId
178
     * @param bool $remove
179
     * @return bool
180
     */
181 1
    public function stopWorker($threadId, $remove = true)
182
    {
183 1
        $thread = $this->getThreadById($threadId);
184
185 1
        if (is_null($thread)) {
186
            return null;
187 1
        } elseif ($thread->isAlive()) {
188 1
            $thread->stop();
189 1
            if ($remove) {
190
                $this->removeWorker($threadId);
191
            }
192 1
            return true;
193
        } else {
194
            return false;
195
        }
196
    }
197
198 1
    public function removeWorker($threadId)
199
    {
200 1
        $this->stopWorker($threadId, false);
201 1
        unset($this->threadInstance[$threadId]);
202 1
        unset($this->threadList[$threadId]);
203
    }
204
205 3
    public function isPoolStarted()
206
    {
207 3
        return $this->poolStarted;
208
    }
209
}
210