1
|
|
|
<?php |
2
|
|
|
namespace Ackintosh\Snidel\Fork; |
3
|
|
|
|
4
|
|
|
use Ackintosh\Snidel\Fork\Fork; |
5
|
|
|
use Ackintosh\Snidel\Pcntl; |
6
|
|
|
use Ackintosh\Snidel\DataRepository; |
7
|
|
|
use Ackintosh\Snidel\Task\Queue as TaskQueue; |
8
|
|
|
use Ackintosh\Snidel\Result\Result; |
9
|
|
|
use Ackintosh\Snidel\Result\Queue as ResultQueue; |
10
|
|
|
use Ackintosh\Snidel\Result\Collection; |
11
|
|
|
use Ackintosh\Snidel\Error; |
12
|
|
|
use Ackintosh\Snidel\Exception\SharedMemoryControlException; |
13
|
|
|
|
14
|
|
|
class Container |
15
|
|
|
{ |
16
|
|
|
/** @var int */ |
17
|
|
|
private $ownerPid; |
18
|
|
|
|
19
|
|
|
/** @var int */ |
20
|
|
|
private $masterPid; |
21
|
|
|
|
22
|
|
|
/** @var \Ackintosh\Snidel\Fork\Fork[] */ |
23
|
|
|
private $forks = array(); |
24
|
|
|
|
25
|
|
|
/** @var \Ackintosh\Snidel\Result\Result[] */ |
26
|
|
|
private $results = array(); |
27
|
|
|
|
28
|
|
|
/** @var \Ackintosh\Snidel\Pcntl */ |
29
|
|
|
private $pcntl; |
30
|
|
|
|
31
|
|
|
/** @var \Ackintosh\Snidel\DataRepository */ |
32
|
|
|
private $dataRepository; |
33
|
|
|
|
34
|
|
|
/** @var \Ackintosh\Snidel\Error */ |
35
|
|
|
private $error; |
36
|
|
|
|
37
|
|
|
/** @var \Ackintosh\Snidel\Task\Queue */ |
38
|
|
|
private $taskQueue; |
39
|
|
|
|
40
|
|
|
/** @var \Ackintosh\Snidel\Result\Queue */ |
41
|
|
|
private $resultQueue; |
42
|
|
|
|
43
|
|
|
/** @var \Ackintosh\Snidel\Log */ |
44
|
|
|
private $log; |
45
|
|
|
|
46
|
|
|
/** @var array */ |
47
|
|
|
private $signals = array( |
48
|
|
|
SIGTERM, |
49
|
|
|
SIGINT, |
50
|
|
|
); |
51
|
|
|
|
52
|
|
|
/** @var int */ |
53
|
|
|
private $concurrency; |
54
|
|
|
|
55
|
|
|
/** |
56
|
|
|
* @param int $ownerPid |
57
|
|
|
*/ |
58
|
|
|
public function __construct($ownerPid, $log, $concurrency = 5) |
59
|
|
|
{ |
60
|
|
|
$this->ownerPid = $ownerPid; |
61
|
|
|
$this->log = $log; |
62
|
|
|
$this->concurrency = $concurrency; |
63
|
|
|
$this->pcntl = new Pcntl(); |
64
|
|
|
$this->dataRepository = new DataRepository(); |
65
|
|
|
$this->taskQueue = new TaskQueue($this->ownerPid); |
66
|
|
|
$this->resultQueue = new ResultQueue($this->ownerPid); |
67
|
|
|
$this->error = new Error(); |
68
|
|
|
} |
69
|
|
|
|
70
|
|
|
/** |
71
|
|
|
* @param \Ackintosh\Snidel\Task |
72
|
|
|
* @return void |
73
|
|
|
* @throws \RuntimeException |
74
|
|
|
*/ |
75
|
|
|
public function enqueue($task) |
76
|
|
|
{ |
77
|
|
|
try { |
78
|
|
|
$this->taskQueue->enqueue($task); |
79
|
|
|
} catch (\RuntimeException $e) { |
80
|
|
|
throw $e; |
81
|
|
|
} |
82
|
|
|
} |
83
|
|
|
|
84
|
|
|
/** |
85
|
|
|
* @return int |
86
|
|
|
*/ |
87
|
|
|
public function queuedCount() |
88
|
|
|
{ |
89
|
|
|
return $this->taskQueue->queuedCount(); |
90
|
|
|
} |
91
|
|
|
|
92
|
|
|
/** |
93
|
|
|
* @return \Ackintosh\Snidel\Fork\Fork |
94
|
|
|
*/ |
95
|
|
|
private function dequeue() |
96
|
|
|
{ |
97
|
|
|
return $this->resultQueue->dequeue(); |
98
|
|
|
} |
99
|
|
|
|
100
|
|
|
/** |
101
|
|
|
* @return int |
102
|
|
|
*/ |
103
|
|
|
public function dequeuedCount() |
104
|
|
|
{ |
105
|
|
|
return $this->resultQueue->dequeuedCount(); |
106
|
|
|
} |
107
|
|
|
|
108
|
|
|
/** |
109
|
|
|
* fork process |
110
|
|
|
* |
111
|
|
|
* @return \Ackintosh\Snidel\Fork\Fork |
112
|
|
|
* @throws \RuntimeException |
113
|
|
|
*/ |
114
|
|
|
public function fork() |
115
|
|
|
{ |
116
|
|
|
$pid = $this->pcntl->fork(); |
117
|
|
|
if ($pid === -1) { |
118
|
|
|
throw new \RuntimeException('could not fork a new process'); |
119
|
|
|
} |
120
|
|
|
|
121
|
|
|
$pid = ($pid === 0) ? getmypid() : $pid; |
122
|
|
|
|
123
|
|
|
$fork = new Fork($pid); |
124
|
|
|
$this->forks[$pid] = $fork; |
125
|
|
|
|
126
|
|
|
return $fork; |
127
|
|
|
} |
128
|
|
|
|
129
|
|
|
/** |
130
|
|
|
* fork master process |
131
|
|
|
* |
132
|
|
|
* @return int $masterPid |
133
|
|
|
*/ |
134
|
|
|
public function forkMaster() |
135
|
|
|
{ |
136
|
|
|
$pid = $this->pcntl->fork(); |
137
|
|
|
$this->masterPid = ($pid === 0) ? getmypid() : $pid; |
138
|
|
|
$this->log->setMasterPid($this->masterPid); |
139
|
|
|
|
140
|
|
|
if ($pid) { |
141
|
|
|
// owner |
142
|
|
|
$this->log->info('pid: ' . getmypid()); |
143
|
|
|
|
144
|
|
|
return $this->masterPid; |
145
|
|
|
} elseif ($pid === -1) { |
|
|
|
|
146
|
|
|
// error |
147
|
|
|
} else { |
148
|
|
|
// master |
149
|
|
|
$taskQueue = new TaskQueue($this->ownerPid); |
150
|
|
|
$this->log->info('pid: ' . $this->masterPid); |
151
|
|
|
|
152
|
|
|
$log = $this->log; |
153
|
|
|
foreach ($this->signals as $sig) { |
154
|
|
|
$this->pcntl->signal($sig, function ($sig) use ($log) { |
155
|
|
|
$log->info('received signal: ' . $sig); |
156
|
|
|
exit; |
|
|
|
|
157
|
|
|
}); |
158
|
|
|
} |
159
|
|
|
$workerCount = 0; |
160
|
|
|
|
161
|
|
|
while ($task = $taskQueue->dequeue()) { |
162
|
|
|
$this->log->info('dequeued task #' . $taskQueue->dequeuedCount()); |
163
|
|
|
if ($workerCount >= $this->concurrency) { |
164
|
|
|
$status = null; |
165
|
|
|
$this->pcntl->waitpid(-1, $status); |
166
|
|
|
$workerCount--; |
167
|
|
|
} |
168
|
|
|
$this->forkWorker($task); |
169
|
|
|
$workerCount++; |
170
|
|
|
} |
171
|
|
|
exit; |
|
|
|
|
172
|
|
|
} |
173
|
|
|
} |
174
|
|
|
|
175
|
|
|
/** |
176
|
|
|
* fork worker process |
177
|
|
|
* |
178
|
|
|
* @param \Ackintosh\Snidel\Task |
179
|
|
|
* @return void |
180
|
|
|
* @throws \RuntimeException |
181
|
|
|
*/ |
182
|
|
|
private function forkWorker($task) |
183
|
|
|
{ |
184
|
|
|
try { |
185
|
|
|
$fork = $this->fork(); |
186
|
|
|
} catch (\RuntimeException $e) { |
187
|
|
|
$this->log->error($e->getMessage()); |
188
|
|
|
throw $e; |
189
|
|
|
} |
190
|
|
|
|
191
|
|
|
if (getmypid() === $this->masterPid) { |
192
|
|
|
// master |
193
|
|
|
$this->log->info('forked worker. pid: ' . $fork->getPid()); |
194
|
|
|
} else { |
195
|
|
|
// worker |
196
|
|
|
$this->log->info('has forked. pid: ' . getmypid()); |
197
|
|
|
// @codeCoverageIgnoreStart |
198
|
|
|
|
199
|
|
|
foreach ($this->signals as $sig) { |
200
|
|
|
$this->pcntl->signal($sig, SIG_DFL, true); |
201
|
|
|
} |
202
|
|
|
|
203
|
|
|
$resultQueue = new ResultQueue($this->ownerPid); |
204
|
|
|
$resultHasQueued = false; |
205
|
|
|
register_shutdown_function(function () use (&$resultHasQueued, $fork, $task, $resultQueue) { |
206
|
|
|
if (!$resultHasQueued) { |
207
|
|
|
$result = new Result(); |
208
|
|
|
$result->setError(error_get_last()); |
209
|
|
|
$result->setTask($task); |
210
|
|
|
$result->setFork($fork); |
211
|
|
|
$resultQueue->enqueue($result); |
212
|
|
|
} |
213
|
|
|
}); |
214
|
|
|
|
215
|
|
|
$this->log->info('----> started the function.'); |
216
|
|
|
$result = $task->execute(); |
217
|
|
|
$result->setFork($fork); |
218
|
|
|
$this->log->info('<---- completed the function.'); |
219
|
|
|
|
220
|
|
|
try { |
221
|
|
|
$resultQueue->enqueue($result); |
222
|
|
|
} catch (\RuntimeException $e) { |
223
|
|
|
$this->log->error($e->getMessage()); |
224
|
|
|
$result->setError(error_get_last()); |
225
|
|
|
$resultQueue->enqueue($result); |
226
|
|
|
} |
227
|
|
|
$resultHasQueued = true; |
|
|
|
|
228
|
|
|
$this->log->info('queued the result and exit.'); |
229
|
|
|
exit; |
|
|
|
|
230
|
|
|
// @codeCoverageIgnoreEnd |
231
|
|
|
} |
232
|
|
|
} |
233
|
|
|
|
234
|
|
|
/** |
235
|
|
|
* @return bool |
236
|
|
|
*/ |
237
|
|
|
public function existsMaster() |
238
|
|
|
{ |
239
|
|
|
return $this->masterPid !== null; |
240
|
|
|
} |
241
|
|
|
|
242
|
|
|
/** |
243
|
|
|
* send signal to master process |
244
|
|
|
* |
245
|
|
|
* @return void |
246
|
|
|
*/ |
247
|
|
|
public function sendSignalToMaster($sig = SIGTERM) |
248
|
|
|
{ |
249
|
|
|
$this->log->info('----> sending signal to master. signal: ' . $sig); |
250
|
|
|
posix_kill($this->masterPid, $sig); |
251
|
|
|
$this->log->info('<---- sent signal.'); |
252
|
|
|
|
253
|
|
|
$status = null; |
254
|
|
|
$this->pcntl->waitpid($this->masterPid, $status); |
255
|
|
|
$this->log->info('. status: ' . $status); |
256
|
|
|
$this->masterPid = null; |
257
|
|
|
} |
258
|
|
|
|
259
|
|
|
/** |
260
|
|
|
* |
261
|
|
|
* @param string $tag |
262
|
|
|
* @return bool |
263
|
|
|
*/ |
264
|
|
|
public function hasTag($tag) |
265
|
|
|
{ |
266
|
|
|
foreach ($this->results as $result) { |
267
|
|
|
if ($result->getTask()->getTag() === $tag) { |
268
|
|
|
return true; |
269
|
|
|
} |
270
|
|
|
} |
271
|
|
|
|
272
|
|
|
return false; |
273
|
|
|
} |
274
|
|
|
|
275
|
|
|
/** |
276
|
|
|
* @return void |
277
|
|
|
*/ |
278
|
|
|
public function wait() |
279
|
|
|
{ |
280
|
|
|
for (; $this->queuedCount() > $this->dequeuedCount();) { |
281
|
|
|
$result = $this->dequeue(); |
282
|
|
|
$pid = $result->getFork()->getPid(); |
283
|
|
|
$this->results[$pid] = $result; |
284
|
|
|
|
285
|
|
|
if ($result->isFailure()) { |
286
|
|
|
$this->error[$pid] = $result; |
287
|
|
|
} |
288
|
|
|
} |
289
|
|
|
} |
290
|
|
|
|
291
|
|
|
/** |
292
|
|
|
* wait child |
293
|
|
|
* |
294
|
|
|
* @return \Ackintosh\Snidel\Result\Result |
295
|
|
|
*/ |
296
|
|
|
public function waitForChild() |
297
|
|
|
{ |
298
|
|
|
$status = null; |
299
|
|
|
$childPid = $this->pcntl->waitpid(-1, $status); |
300
|
|
|
try { |
301
|
|
|
$result = $this->dataRepository->load($childPid)->readAndDelete(); |
302
|
|
|
} catch (SharedMemoryControlException $e) { |
303
|
|
|
throw $e; |
304
|
|
|
} |
305
|
|
|
$fork = $result->getFork(); |
306
|
|
|
$fork->setStatus($status); |
307
|
|
|
$result->setFork($fork); |
308
|
|
|
|
309
|
|
|
if ($result->isFailure() || !$this->pcntl->wifexited($status) || $this->pcntl->wexitstatus($status) !== 0) { |
310
|
|
|
$this->error[$childPid] = $fork; |
311
|
|
|
} |
312
|
|
|
$this->results[$childPid] = $result; |
313
|
|
|
|
314
|
|
|
return $result; |
315
|
|
|
} |
316
|
|
|
|
317
|
|
|
/** |
318
|
|
|
* @return array |
319
|
|
|
*/ |
320
|
|
|
public function getChildPids() |
321
|
|
|
{ |
322
|
|
|
return array_keys($this->forks); |
323
|
|
|
} |
324
|
|
|
|
325
|
|
|
/** |
326
|
|
|
* return fork |
327
|
|
|
* |
328
|
|
|
* @param int $pid |
329
|
|
|
* @return \Ackintosh\Snidel\Fork\Fork |
330
|
|
|
*/ |
331
|
|
|
public function get($pid) |
332
|
|
|
{ |
333
|
|
|
return $this->results[$pid]; |
334
|
|
|
} |
335
|
|
|
|
336
|
|
|
public function getCollection($tag = null) |
337
|
|
|
{ |
338
|
|
|
if ($tag === null) { |
339
|
|
|
$collection = new Collection($this->results); |
340
|
|
|
$this->results = array(); |
341
|
|
|
|
342
|
|
|
return $collection; |
343
|
|
|
} |
344
|
|
|
|
345
|
|
|
return $this->getCollectionWithTag($tag); |
346
|
|
|
} |
347
|
|
|
|
348
|
|
|
/** |
349
|
|
|
* return results |
350
|
|
|
* |
351
|
|
|
* @param string $tag |
352
|
|
|
* @return \Ackintosh\Snidel\Result\Collection |
353
|
|
|
*/ |
354
|
|
|
private function getCollectionWithTag($tag) |
355
|
|
|
{ |
356
|
|
|
$results = array(); |
357
|
|
|
foreach ($this->results as $r) { |
358
|
|
|
if ($r->getTask()->getTag() !== $tag) { |
359
|
|
|
continue; |
360
|
|
|
} |
361
|
|
|
|
362
|
|
|
$results[] = $r; |
363
|
|
|
unset($this->results[$r->getFork()->getPid()]); |
364
|
|
|
} |
365
|
|
|
|
366
|
|
|
return new Collection($results); |
367
|
|
|
} |
368
|
|
|
|
369
|
|
|
/** |
370
|
|
|
* @return bool |
371
|
|
|
*/ |
372
|
|
|
public function hasError() |
373
|
|
|
{ |
374
|
|
|
return $this->error->exists(); |
375
|
|
|
} |
376
|
|
|
|
377
|
|
|
/** |
378
|
|
|
* @return \Ackintosh\Sniden\Error |
379
|
|
|
*/ |
380
|
|
|
public function getError() |
381
|
|
|
{ |
382
|
|
|
return $this->error; |
383
|
|
|
} |
384
|
|
|
|
385
|
|
|
public function __destruct() |
386
|
|
|
{ |
387
|
|
|
unset($this->taskQueue); |
388
|
|
|
unset($this->resultQueue); |
389
|
|
|
} |
390
|
|
|
} |
391
|
|
|
|
This check looks for the bodies of
elseif
statements that have no statements or where all statements have been commented out. This may be the result of changes for debugging or the code may simply be obsolete.These
elseif
bodies can be removed. If you have an empty elseif but statements in theelse
branch, consider inverting the condition.