|
1
|
|
|
<?php |
|
2
|
|
|
namespace Ackintosh\Snidel\Fork; |
|
3
|
|
|
|
|
4
|
|
|
use Ackintosh\Snidel\Fork\Fork; |
|
5
|
|
|
use Ackintosh\Snidel\Pcntl; |
|
6
|
|
|
use Ackintosh\Snidel\DataRepository; |
|
7
|
|
|
use Ackintosh\Snidel\Task\Queue as TaskQueue; |
|
8
|
|
|
use Ackintosh\Snidel\Result\Result; |
|
9
|
|
|
use Ackintosh\Snidel\Result\Queue as ResultQueue; |
|
10
|
|
|
use Ackintosh\Snidel\Result\Collection; |
|
11
|
|
|
use Ackintosh\Snidel\Error; |
|
12
|
|
|
use Ackintosh\Snidel\Exception\SharedMemoryControlException; |
|
13
|
|
|
use Ackintosh\Snidel\Worker; |
|
14
|
|
|
use Ackintosh\Snidel\ActiveWorkerSet; |
|
15
|
|
|
|
|
16
|
|
|
class Container |
|
17
|
|
|
{ |
|
18
|
|
|
/** @var int */ |
|
19
|
|
|
private $ownerPid; |
|
20
|
|
|
|
|
21
|
|
|
/** @var int */ |
|
22
|
|
|
private $masterPid; |
|
23
|
|
|
|
|
24
|
|
|
/** @var \Ackintosh\Snidel\Result\Result[] */ |
|
25
|
|
|
private $results = array(); |
|
26
|
|
|
|
|
27
|
|
|
/** @var \Ackintosh\Snidel\Pcntl */ |
|
28
|
|
|
private $pcntl; |
|
29
|
|
|
|
|
30
|
|
|
/** @var \Ackintosh\Snidel\DataRepository */ |
|
31
|
|
|
private $dataRepository; |
|
32
|
|
|
|
|
33
|
|
|
/** @var \Ackintosh\Snidel\Error */ |
|
34
|
|
|
private $error; |
|
35
|
|
|
|
|
36
|
|
|
/** @var \Ackintosh\Snidel\Task\Queue */ |
|
37
|
|
|
private $taskQueue; |
|
38
|
|
|
|
|
39
|
|
|
/** @var \Ackintosh\Snidel\Result\Queue */ |
|
40
|
|
|
private $resultQueue; |
|
41
|
|
|
|
|
42
|
|
|
/** @var \Ackintosh\Snidel\Log */ |
|
43
|
|
|
private $log; |
|
44
|
|
|
|
|
45
|
|
|
/** @var array */ |
|
46
|
|
|
private $signals = array( |
|
47
|
|
|
SIGTERM, |
|
48
|
|
|
SIGINT, |
|
49
|
|
|
); |
|
50
|
|
|
|
|
51
|
|
|
/** @var int */ |
|
52
|
|
|
private $concurrency; |
|
53
|
|
|
|
|
54
|
|
|
/** |
|
55
|
|
|
* @param int $ownerPid |
|
56
|
|
|
*/ |
|
57
|
|
|
public function __construct($ownerPid, $log, $concurrency = 5) |
|
58
|
|
|
{ |
|
59
|
|
|
$this->ownerPid = $ownerPid; |
|
60
|
|
|
$this->log = $log; |
|
61
|
|
|
$this->concurrency = $concurrency; |
|
62
|
|
|
$this->pcntl = new Pcntl(); |
|
63
|
|
|
$this->dataRepository = new DataRepository(); |
|
64
|
|
|
$this->taskQueue = new TaskQueue($this->ownerPid); |
|
65
|
|
|
$this->resultQueue = new ResultQueue($this->ownerPid); |
|
66
|
|
|
$this->error = new Error(); |
|
67
|
|
|
} |
|
68
|
|
|
|
|
69
|
|
|
/** |
|
70
|
|
|
* @param \Ackintosh\Snidel\Task |
|
71
|
|
|
* @return void |
|
72
|
|
|
* @throws \RuntimeException |
|
73
|
|
|
*/ |
|
74
|
|
|
public function enqueue($task) |
|
75
|
|
|
{ |
|
76
|
|
|
try { |
|
77
|
|
|
$this->taskQueue->enqueue($task); |
|
78
|
|
|
} catch (\RuntimeException $e) { |
|
79
|
|
|
throw $e; |
|
80
|
|
|
} |
|
81
|
|
|
} |
|
82
|
|
|
|
|
83
|
|
|
/** |
|
84
|
|
|
* @return int |
|
85
|
|
|
*/ |
|
86
|
|
|
public function queuedCount() |
|
87
|
|
|
{ |
|
88
|
|
|
return $this->taskQueue->queuedCount(); |
|
89
|
|
|
} |
|
90
|
|
|
|
|
91
|
|
|
/** |
|
92
|
|
|
* @return \Ackintosh\Snidel\Fork\Fork |
|
93
|
|
|
*/ |
|
94
|
|
|
private function dequeue() |
|
95
|
|
|
{ |
|
96
|
|
|
return $this->resultQueue->dequeue(); |
|
97
|
|
|
} |
|
98
|
|
|
|
|
99
|
|
|
/** |
|
100
|
|
|
* @return int |
|
101
|
|
|
*/ |
|
102
|
|
|
public function dequeuedCount() |
|
103
|
|
|
{ |
|
104
|
|
|
return $this->resultQueue->dequeuedCount(); |
|
105
|
|
|
} |
|
106
|
|
|
|
|
107
|
|
|
/** |
|
108
|
|
|
* fork process |
|
109
|
|
|
* |
|
110
|
|
|
* @return \Ackintosh\Snidel\Fork\Fork |
|
111
|
|
|
* @throws \RuntimeException |
|
112
|
|
|
*/ |
|
113
|
|
|
private function fork() |
|
114
|
|
|
{ |
|
115
|
|
|
$pid = $this->pcntl->fork(); |
|
116
|
|
|
if ($pid === -1) { |
|
117
|
|
|
throw new \RuntimeException('could not fork a new process'); |
|
118
|
|
|
} |
|
119
|
|
|
|
|
120
|
|
|
$pid = ($pid === 0) ? getmypid() : $pid; |
|
121
|
|
|
|
|
122
|
|
|
return new Fork($pid); |
|
123
|
|
|
} |
|
124
|
|
|
|
|
125
|
|
|
/** |
|
126
|
|
|
* fork master process |
|
127
|
|
|
* |
|
128
|
|
|
* @return int $masterPid |
|
129
|
|
|
*/ |
|
130
|
|
|
public function forkMaster() |
|
131
|
|
|
{ |
|
132
|
|
|
try { |
|
133
|
|
|
$fork = $this->fork(); |
|
134
|
|
|
} catch (\RuntimeException $e) { |
|
135
|
|
|
throw $e; |
|
136
|
|
|
} |
|
137
|
|
|
|
|
138
|
|
|
$this->masterPid = $fork->getPid(); |
|
139
|
|
|
$this->log->setMasterPid($this->masterPid); |
|
140
|
|
|
|
|
141
|
|
|
if (getmypid() === $this->ownerPid) { |
|
142
|
|
|
// owner |
|
143
|
|
|
$this->log->info('pid: ' . getmypid()); |
|
144
|
|
|
|
|
145
|
|
|
return $this->masterPid; |
|
146
|
|
|
} else { |
|
147
|
|
|
// master |
|
148
|
|
|
$taskQueue = new TaskQueue($this->ownerPid); |
|
149
|
|
|
$activeWorkerSet = new ActiveWorkerSet(); |
|
150
|
|
|
$this->log->info('pid: ' . $this->masterPid); |
|
151
|
|
|
|
|
152
|
|
|
$log = $this->log; |
|
153
|
|
|
foreach ($this->signals as $sig) { |
|
154
|
|
|
$this->pcntl->signal($sig, function ($sig) use ($log, $activeWorkerSet) { |
|
155
|
|
|
$log->info('received signal: ' . $sig); |
|
156
|
|
|
|
|
157
|
|
|
if ($activeWorkerSet->count() === 0) { |
|
158
|
|
|
$log->info('no worker is active.'); |
|
159
|
|
|
} else { |
|
160
|
|
|
$log->info('------> sending signal to workers. signal: ' . $sig); |
|
161
|
|
|
$activeWorkerSet->terminate($sig); |
|
162
|
|
|
$log->info('<------ sent signal'); |
|
163
|
|
|
} |
|
164
|
|
|
exit; |
|
|
|
|
|
|
165
|
|
|
}); |
|
166
|
|
|
} |
|
167
|
|
|
|
|
168
|
|
|
while ($task = $taskQueue->dequeue()) { |
|
169
|
|
|
$this->log->info('dequeued task #' . $taskQueue->dequeuedCount()); |
|
170
|
|
|
if ($activeWorkerSet->count() >= $this->concurrency) { |
|
171
|
|
|
$status = null; |
|
172
|
|
|
$workerPid = $this->pcntl->waitpid(-1, $status); |
|
173
|
|
|
$activeWorkerSet->delete($workerPid); |
|
174
|
|
|
} |
|
175
|
|
|
$activeWorkerSet->add( |
|
176
|
|
|
$this->forkWorker($task) |
|
177
|
|
|
); |
|
178
|
|
|
} |
|
179
|
|
|
exit; |
|
|
|
|
|
|
180
|
|
|
} |
|
181
|
|
|
} |
|
182
|
|
|
|
|
183
|
|
|
/** |
|
184
|
|
|
* fork worker process |
|
185
|
|
|
* |
|
186
|
|
|
* @param \Ackintosh\Snidel\Task |
|
187
|
|
|
* @return \Ackintosh\Snidel\Worker |
|
188
|
|
|
* @throws \RuntimeException |
|
189
|
|
|
*/ |
|
190
|
|
|
private function forkWorker($task) |
|
191
|
|
|
{ |
|
192
|
|
|
try { |
|
193
|
|
|
$fork = $this->fork(); |
|
194
|
|
|
} catch (\RuntimeException $e) { |
|
195
|
|
|
$this->log->error($e->getMessage()); |
|
196
|
|
|
throw $e; |
|
197
|
|
|
} |
|
198
|
|
|
|
|
199
|
|
|
$worker = new Worker($fork, $task); |
|
200
|
|
|
|
|
201
|
|
|
if (getmypid() === $this->masterPid) { |
|
202
|
|
|
// master |
|
203
|
|
|
$this->log->info('forked worker. pid: ' . $worker->getPid()); |
|
204
|
|
|
return $worker; |
|
205
|
|
|
} else { |
|
206
|
|
|
// worker |
|
207
|
|
|
$this->log->info('has forked. pid: ' . getmypid()); |
|
208
|
|
|
// @codeCoverageIgnoreStart |
|
209
|
|
|
|
|
210
|
|
|
foreach ($this->signals as $sig) { |
|
211
|
|
|
$this->pcntl->signal($sig, SIG_DFL, true); |
|
212
|
|
|
} |
|
213
|
|
|
|
|
214
|
|
|
$worker->setResultQueue(new ResultQueue($this->ownerPid)); |
|
215
|
|
|
|
|
216
|
|
|
$resultHasQueued = false; |
|
217
|
|
|
register_shutdown_function(function () use (&$resultHasQueued, $worker) { |
|
218
|
|
|
if (!$resultHasQueued) { |
|
219
|
|
|
$worker->error(); |
|
220
|
|
|
} |
|
221
|
|
|
}); |
|
222
|
|
|
|
|
223
|
|
|
$this->log->info('----> started the function.'); |
|
224
|
|
|
try { |
|
225
|
|
|
$worker->run(); |
|
226
|
|
|
} catch (\RuntimeException $e) { |
|
227
|
|
|
$this->log->error($e->getMessage()); |
|
228
|
|
|
exit; |
|
|
|
|
|
|
229
|
|
|
} |
|
230
|
|
|
$this->log->info('<---- completed the function.'); |
|
231
|
|
|
|
|
232
|
|
|
$resultHasQueued = true; |
|
|
|
|
|
|
233
|
|
|
$this->log->info('queued the result and exit.'); |
|
234
|
|
|
exit; |
|
|
|
|
|
|
235
|
|
|
// @codeCoverageIgnoreEnd |
|
236
|
|
|
} |
|
237
|
|
|
} |
|
238
|
|
|
|
|
239
|
|
|
/** |
|
240
|
|
|
* @return bool |
|
241
|
|
|
*/ |
|
242
|
|
|
public function existsMaster() |
|
243
|
|
|
{ |
|
244
|
|
|
return $this->masterPid !== null; |
|
245
|
|
|
} |
|
246
|
|
|
|
|
247
|
|
|
/** |
|
248
|
|
|
* send signal to master process |
|
249
|
|
|
* |
|
250
|
|
|
* @return void |
|
251
|
|
|
*/ |
|
252
|
|
|
public function sendSignalToMaster($sig = SIGTERM) |
|
253
|
|
|
{ |
|
254
|
|
|
$this->log->info('----> sending signal to master. signal: ' . $sig); |
|
255
|
|
|
posix_kill($this->masterPid, $sig); |
|
256
|
|
|
$this->log->info('<---- sent signal.'); |
|
257
|
|
|
|
|
258
|
|
|
$status = null; |
|
259
|
|
|
$this->pcntl->waitpid($this->masterPid, $status); |
|
260
|
|
|
$this->log->info('. status: ' . $status); |
|
261
|
|
|
$this->masterPid = null; |
|
262
|
|
|
} |
|
263
|
|
|
|
|
264
|
|
|
/** |
|
265
|
|
|
* |
|
266
|
|
|
* @param string $tag |
|
267
|
|
|
* @return bool |
|
268
|
|
|
*/ |
|
269
|
|
|
public function hasTag($tag) |
|
270
|
|
|
{ |
|
271
|
|
|
foreach ($this->results as $result) { |
|
272
|
|
|
if ($result->getTask()->getTag() === $tag) { |
|
273
|
|
|
return true; |
|
274
|
|
|
} |
|
275
|
|
|
} |
|
276
|
|
|
|
|
277
|
|
|
return false; |
|
278
|
|
|
} |
|
279
|
|
|
|
|
280
|
|
|
/** |
|
281
|
|
|
* @return void |
|
282
|
|
|
*/ |
|
283
|
|
|
public function wait() |
|
284
|
|
|
{ |
|
285
|
|
|
for (; $this->queuedCount() > $this->dequeuedCount();) { |
|
286
|
|
|
$result = $this->dequeue(); |
|
287
|
|
|
$pid = $result->getFork()->getPid(); |
|
288
|
|
|
$this->results[$pid] = $result; |
|
289
|
|
|
|
|
290
|
|
|
if ($result->isFailure()) { |
|
291
|
|
|
$this->error[$pid] = $result; |
|
292
|
|
|
} |
|
293
|
|
|
} |
|
294
|
|
|
} |
|
295
|
|
|
|
|
296
|
|
|
public function getCollection($tag = null) |
|
297
|
|
|
{ |
|
298
|
|
|
if ($tag === null) { |
|
299
|
|
|
$collection = new Collection($this->results); |
|
300
|
|
|
$this->results = array(); |
|
301
|
|
|
|
|
302
|
|
|
return $collection; |
|
303
|
|
|
} |
|
304
|
|
|
|
|
305
|
|
|
return $this->getCollectionWithTag($tag); |
|
306
|
|
|
} |
|
307
|
|
|
|
|
308
|
|
|
/** |
|
309
|
|
|
* return results |
|
310
|
|
|
* |
|
311
|
|
|
* @param string $tag |
|
312
|
|
|
* @return \Ackintosh\Snidel\Result\Collection |
|
313
|
|
|
*/ |
|
314
|
|
|
private function getCollectionWithTag($tag) |
|
315
|
|
|
{ |
|
316
|
|
|
$results = array(); |
|
317
|
|
|
foreach ($this->results as $r) { |
|
318
|
|
|
if ($r->getTask()->getTag() !== $tag) { |
|
319
|
|
|
continue; |
|
320
|
|
|
} |
|
321
|
|
|
|
|
322
|
|
|
$results[] = $r; |
|
323
|
|
|
unset($this->results[$r->getFork()->getPid()]); |
|
324
|
|
|
} |
|
325
|
|
|
|
|
326
|
|
|
return new Collection($results); |
|
327
|
|
|
} |
|
328
|
|
|
|
|
329
|
|
|
/** |
|
330
|
|
|
* @return bool |
|
331
|
|
|
*/ |
|
332
|
|
|
public function hasError() |
|
333
|
|
|
{ |
|
334
|
|
|
return $this->error->exists(); |
|
335
|
|
|
} |
|
336
|
|
|
|
|
337
|
|
|
/** |
|
338
|
|
|
* @return \Ackintosh\Sniden\Error |
|
339
|
|
|
*/ |
|
340
|
|
|
public function getError() |
|
341
|
|
|
{ |
|
342
|
|
|
return $this->error; |
|
343
|
|
|
} |
|
344
|
|
|
|
|
345
|
|
|
public function __destruct() |
|
346
|
|
|
{ |
|
347
|
|
|
unset($this->taskQueue); |
|
348
|
|
|
unset($this->resultQueue); |
|
349
|
|
|
} |
|
350
|
|
|
} |
|
351
|
|
|
|
An exit expression should only be used in rare cases. For example, if you write a short command line script.
In most cases however, using an
exitexpression makes the code untestable and often causes incompatibilities with other libraries. Thus, unless you are absolutely sure it is required here, we recommend to refactor your code to avoid its usage.