|
1
|
|
|
<?php |
|
|
|
|
|
|
2
|
|
|
declare(ticks = 1); |
|
3
|
|
|
|
|
4
|
|
|
namespace Ackintosh; |
|
5
|
|
|
|
|
6
|
|
|
use Ackintosh\Snidel\ForkContainer; |
|
7
|
|
|
use Ackintosh\Snidel\Result; |
|
8
|
|
|
use Ackintosh\Snidel\Token; |
|
9
|
|
|
use Ackintosh\Snidel\Log; |
|
10
|
|
|
use Ackintosh\Snidel\Error; |
|
11
|
|
|
use Ackintosh\Snidel\Pcntl; |
|
12
|
|
|
use Ackintosh\Snidel\DataRepository; |
|
13
|
|
|
use Ackintosh\Snidel\MapContainer; |
|
14
|
|
|
use Ackintosh\Snidel\Exception\SharedMemoryControlException; |
|
15
|
|
|
|
|
16
|
|
|
class Snidel |
|
17
|
|
|
{ |
|
18
|
|
|
/** @var string */ |
|
19
|
|
|
const VERSION = '0.4.0'; |
|
20
|
|
|
|
|
21
|
|
|
/** @var array */ |
|
22
|
|
|
private $childPids = array(); |
|
23
|
|
|
|
|
24
|
|
|
/** @var \Ackintosh\Snidel\ForkContainer */ |
|
25
|
|
|
private $forkContainer; |
|
26
|
|
|
|
|
27
|
|
|
/** @var \Ackintosh\Snidel\Error */ |
|
28
|
|
|
private $error; |
|
29
|
|
|
|
|
30
|
|
|
/** @var \Ackintosh\Snidel\Pcntl */ |
|
31
|
|
|
private $pcntl; |
|
32
|
|
|
|
|
33
|
|
|
/** @var int */ |
|
34
|
|
|
private $concurrency; |
|
35
|
|
|
|
|
36
|
|
|
/** @var \Ackintosh\Snidel\Token */ |
|
37
|
|
|
private $token; |
|
38
|
|
|
|
|
39
|
|
|
/** @var \Ackintosh\Snidel\Log */ |
|
40
|
|
|
private $log; |
|
41
|
|
|
|
|
42
|
|
|
/** @var \Ackintosh\Snidel\DataRepository */ |
|
43
|
|
|
private $dataRepository; |
|
44
|
|
|
|
|
45
|
|
|
/** @var bool */ |
|
46
|
|
|
private $joined = false; |
|
47
|
|
|
|
|
48
|
|
|
/** @var array */ |
|
49
|
|
|
private $results = array(); |
|
50
|
|
|
|
|
51
|
|
|
/** @var int */ |
|
52
|
|
|
private $ownerPid; |
|
53
|
|
|
|
|
54
|
|
|
/** @var array */ |
|
55
|
|
|
private $tagsToPids = array(); |
|
56
|
|
|
|
|
57
|
|
|
/** @var array */ |
|
58
|
|
|
private $signals = array( |
|
59
|
|
|
SIGTERM, |
|
60
|
|
|
SIGINT, |
|
61
|
|
|
); |
|
62
|
|
|
|
|
63
|
|
|
/** @var int */ |
|
64
|
|
|
private $receivedSignal; |
|
65
|
|
|
|
|
66
|
|
|
/** @var \Ackintosh\Snidel\Token */ |
|
67
|
|
|
private $processToken; |
|
68
|
|
|
|
|
69
|
|
|
/** @var array */ |
|
70
|
|
|
private $processInformation = array(); |
|
|
|
|
|
|
71
|
|
|
|
|
72
|
|
|
/** @var bool */ |
|
73
|
|
|
private $exceptionHasOccured = false; |
|
74
|
|
|
|
|
75
|
|
|
public function __construct($concurrency = 5) |
|
76
|
|
|
{ |
|
77
|
|
|
$this->ownerPid = getmypid(); |
|
78
|
|
|
$this->childPids = array(); |
|
79
|
|
|
$this->concurrency = $concurrency; |
|
80
|
|
|
$this->token = new Token(getmypid(), $concurrency); |
|
81
|
|
|
$this->log = new Log(getmypid()); |
|
82
|
|
|
$this->error = new Error(); |
|
83
|
|
|
$this->pcntl = new Pcntl(); |
|
84
|
|
|
$this->dataRepository = new DataRepository(); |
|
85
|
|
|
$this->forkContainer = new ForkContainer(); |
|
86
|
|
|
|
|
87
|
|
|
foreach ($this->signals as $sig) { |
|
88
|
|
|
$this->pcntl->signal($sig, array($this, 'signalHandler'), false); |
|
89
|
|
|
} |
|
90
|
|
|
|
|
91
|
|
|
$this->log->info('parent pid: ' . $this->ownerPid); |
|
92
|
|
|
} |
|
93
|
|
|
|
|
94
|
|
|
/** |
|
95
|
|
|
* sets the resource for the log. |
|
96
|
|
|
* |
|
97
|
|
|
* @param resource $resource |
|
98
|
|
|
* @return void |
|
99
|
|
|
* @codeCoverageIgnore |
|
100
|
|
|
*/ |
|
101
|
|
|
public function setLoggingDestination($resource) |
|
102
|
|
|
{ |
|
103
|
|
|
$this->log->setDestination($resource); |
|
104
|
|
|
} |
|
105
|
|
|
|
|
106
|
|
|
/** |
|
107
|
|
|
* fork process |
|
108
|
|
|
* |
|
109
|
|
|
* @param callable $callable |
|
110
|
|
|
* @param array $args |
|
111
|
|
|
* @param string $tag |
|
112
|
|
|
* @return int $pid forked PID of forked child process |
|
113
|
|
|
* @throws \RuntimeException |
|
114
|
|
|
*/ |
|
115
|
|
|
public function fork($callable, $args = array(), $tag = null, Token $token = null) |
|
116
|
|
|
{ |
|
117
|
|
|
$this->processToken = $token ? $token : $this->token; |
|
118
|
|
|
if (!is_array($args)) { |
|
119
|
|
|
$args = array($args); |
|
120
|
|
|
} |
|
121
|
|
|
|
|
122
|
|
|
try { |
|
123
|
|
|
$fork = $this->forkContainer->fork(); |
|
124
|
|
|
} catch (\RuntimeException $e) { |
|
125
|
|
|
$this->log->error($e->getMessage()); |
|
126
|
|
|
throw $e; |
|
127
|
|
|
} |
|
128
|
|
|
|
|
129
|
|
|
$fork->setCallable($callable); |
|
130
|
|
|
$fork->setArgs($args); |
|
131
|
|
|
|
|
132
|
|
|
if ($pid = $fork->getPid()) { |
|
133
|
|
|
// parent |
|
134
|
|
|
$this->log->info('created child process. pid: ' . $pid); |
|
135
|
|
|
$this->childPids[] = $pid; |
|
136
|
|
|
if ($tag !== null) { |
|
137
|
|
|
$this->tagsToPids[$tag][] = $pid; |
|
138
|
|
|
} |
|
139
|
|
|
} else { |
|
140
|
|
|
// @codeCoverageIgnoreStart |
|
141
|
|
|
// child |
|
142
|
|
|
foreach ($this->signals as $sig) { |
|
143
|
|
|
$this->pcntl->signal($sig, SIG_DFL, true); |
|
144
|
|
|
} |
|
145
|
|
|
|
|
146
|
|
|
$result = new Result(); |
|
147
|
|
|
register_shutdown_function(function () use ($result) { |
|
148
|
|
|
$data = $this->dataRepository->load(getmypid()); |
|
149
|
|
|
try { |
|
150
|
|
|
$data->write($result); |
|
151
|
|
|
} catch (SharedMemoryControlException $e) { |
|
152
|
|
|
throw $e; |
|
153
|
|
|
} |
|
154
|
|
|
$this->log->info('<-- return token.'); |
|
155
|
|
|
$this->processToken->back(); |
|
156
|
|
|
}); |
|
157
|
|
|
|
|
158
|
|
|
$this->log->info('--> waiting for the token come around.'); |
|
159
|
|
|
if ($this->processToken->accept()) { |
|
160
|
|
|
$this->log->info('----> started the function.'); |
|
161
|
|
|
$result->setReturn(call_user_func_array($callable, $args)); |
|
162
|
|
|
$this->log->info('<---- completed the function.'); |
|
163
|
|
|
} |
|
164
|
|
|
|
|
165
|
|
|
$this->_exit(); |
|
166
|
|
|
// @codeCoverageIgnoreEnd |
|
167
|
|
|
} |
|
168
|
|
|
|
|
169
|
|
|
return $pid; |
|
170
|
|
|
} |
|
171
|
|
|
|
|
172
|
|
|
/** |
|
173
|
|
|
* waits until all children are completed |
|
174
|
|
|
* |
|
175
|
|
|
* @return void |
|
176
|
|
|
* @throws \Ackintosh\Snidel\Exception\SharedMemoryControlException |
|
177
|
|
|
*/ |
|
178
|
|
|
public function wait() |
|
179
|
|
|
{ |
|
180
|
|
|
if ($this->joined) { |
|
181
|
|
|
return; |
|
182
|
|
|
} |
|
183
|
|
|
|
|
184
|
|
|
$count = count($this->childPids); |
|
185
|
|
|
for ($i = 0; $i < $count; $i++) { |
|
186
|
|
|
try { |
|
187
|
|
|
$fork = $this->forkContainer->wait(); |
|
188
|
|
|
} catch (SharedMemoryControlException $e) { |
|
189
|
|
|
$this->exceptionHasOccured = true; |
|
190
|
|
|
throw $e; |
|
191
|
|
|
} |
|
192
|
|
|
|
|
193
|
|
|
$childPid = $fork->getPid(); |
|
194
|
|
|
$result = $fork->getResult(); |
|
195
|
|
|
if (!$fork->isSuccessful()) { |
|
196
|
|
|
$message = 'an error has occurred in child process. pid: ' . $childPid; |
|
197
|
|
|
$this->log->error($message); |
|
198
|
|
|
$this->error[$childPid] = array( |
|
199
|
|
|
'status' => $fork->getStatus(), |
|
200
|
|
|
'message' => $message, |
|
201
|
|
|
'callable' => $fork->getCallable(), |
|
202
|
|
|
'args' => $fork->getArgs(), |
|
203
|
|
|
'return' => $result->getReturn(), |
|
204
|
|
|
); |
|
205
|
|
|
} else { |
|
206
|
|
|
$this->results[$childPid] = $result->getReturn(); |
|
207
|
|
|
} |
|
208
|
|
|
unset($this->childPids[array_search($childPid, $this->childPids)]); |
|
209
|
|
|
} |
|
210
|
|
|
$this->joined = true; |
|
211
|
|
|
} |
|
212
|
|
|
|
|
213
|
|
|
/** |
|
214
|
|
|
* @return bool |
|
215
|
|
|
*/ |
|
216
|
|
|
public function hasError() |
|
217
|
|
|
{ |
|
218
|
|
|
return $this->error->exists(); |
|
219
|
|
|
} |
|
220
|
|
|
|
|
221
|
|
|
/** |
|
222
|
|
|
* @return \Ackintosh\Snidel\Error |
|
223
|
|
|
*/ |
|
224
|
|
|
public function getError() |
|
225
|
|
|
{ |
|
226
|
|
|
return $this->error; |
|
227
|
|
|
} |
|
228
|
|
|
|
|
229
|
|
|
/** |
|
230
|
|
|
* gets results |
|
231
|
|
|
* |
|
232
|
|
|
* @param string $tag |
|
233
|
|
|
* @return array $ret |
|
234
|
|
|
* @throws \InvalidArgumentException |
|
235
|
|
|
*/ |
|
236
|
|
|
public function get($tag = null) |
|
237
|
|
|
{ |
|
238
|
|
|
if (!$this->joined) { |
|
239
|
|
|
$this->wait(); |
|
240
|
|
|
} |
|
241
|
|
|
|
|
242
|
|
|
if ($tag === null) { |
|
243
|
|
|
return array_values($this->results); |
|
244
|
|
|
} else { |
|
245
|
|
|
try { |
|
246
|
|
|
return $this->getWithTag($tag); |
|
247
|
|
|
} catch (\InvalidArgumentException $e) { |
|
248
|
|
|
throw $e; |
|
249
|
|
|
} |
|
250
|
|
|
} |
|
251
|
|
|
} |
|
252
|
|
|
|
|
253
|
|
|
/** |
|
254
|
|
|
* gets results with tag |
|
255
|
|
|
* |
|
256
|
|
|
* @param string $tag |
|
257
|
|
|
* @return array $results |
|
258
|
|
|
* @throws \InvalidArgumentException |
|
259
|
|
|
*/ |
|
260
|
|
|
private function getWithTag($tag) |
|
261
|
|
|
{ |
|
262
|
|
|
if (!isset($this->tagsToPids[$tag])) { |
|
263
|
|
|
throw new \InvalidArgumentException('unknown tag: ' . $tag); |
|
264
|
|
|
} |
|
265
|
|
|
|
|
266
|
|
|
$results = array(); |
|
267
|
|
|
foreach ($this->tagsToPids[$tag] as $pid) { |
|
268
|
|
|
$results[] = $this->results[$pid]; |
|
269
|
|
|
} |
|
270
|
|
|
|
|
271
|
|
|
return $results; |
|
272
|
|
|
} |
|
273
|
|
|
|
|
274
|
|
|
/** |
|
275
|
|
|
* @param int $sig |
|
276
|
|
|
* @return void |
|
277
|
|
|
*/ |
|
278
|
|
|
public function signalHandler($sig) |
|
279
|
|
|
{ |
|
280
|
|
|
$this->log->info('received signal. signo: ' . $sig); |
|
281
|
|
|
$this->receivedSignal = $sig; |
|
282
|
|
|
|
|
283
|
|
|
$this->log->info('--> sending a signal to children.'); |
|
284
|
|
|
$this->sendSignalToChildren($sig); |
|
285
|
|
|
|
|
286
|
|
|
$this->log->info('--> deleting token.'); |
|
287
|
|
|
unset($this->token); |
|
288
|
|
|
|
|
289
|
|
|
$this->log->info('<-- signal handling has been completed successfully.'); |
|
290
|
|
|
$this->_exit(); |
|
291
|
|
|
} |
|
292
|
|
|
|
|
293
|
|
|
/** |
|
294
|
|
|
* sends signal to child |
|
295
|
|
|
* |
|
296
|
|
|
* @param int $sig |
|
297
|
|
|
* @return void |
|
298
|
|
|
*/ |
|
299
|
|
|
private function sendSignalToChildren($sig) |
|
300
|
|
|
{ |
|
301
|
|
|
foreach ($this->childPids as $pid) { |
|
302
|
|
|
$this->log->info('----> sending a signal to child. pid: ' . $pid); |
|
303
|
|
|
posix_kill($pid, $sig); |
|
304
|
|
|
} |
|
305
|
|
|
} |
|
306
|
|
|
|
|
307
|
|
|
/** |
|
308
|
|
|
* delete shared memory |
|
309
|
|
|
* |
|
310
|
|
|
* @return void |
|
311
|
|
|
* @throws \Ackintosh\Snidel\Exception\SharedMemoryControlException |
|
312
|
|
|
*/ |
|
313
|
|
|
private function deleteAllData() |
|
314
|
|
|
{ |
|
315
|
|
|
foreach ($this->childPids as $pid) { |
|
316
|
|
|
$data = $this->dataRepository->load($pid); |
|
317
|
|
|
try { |
|
318
|
|
|
$data->deleteIfExists(); |
|
319
|
|
|
} catch (SharedMemoryControlException $e) { |
|
320
|
|
|
throw $e; |
|
321
|
|
|
} |
|
322
|
|
|
} |
|
323
|
|
|
} |
|
324
|
|
|
|
|
325
|
|
|
/** |
|
326
|
|
|
* create map object |
|
327
|
|
|
* |
|
328
|
|
|
* @param array $args |
|
329
|
|
|
* @param callable $callable |
|
330
|
|
|
* @return \Ackintosh\Snidel\MapContainer |
|
331
|
|
|
*/ |
|
332
|
|
|
public function map(Array $args, $callable) |
|
333
|
|
|
{ |
|
334
|
|
|
return new MapContainer($args, $callable, $this->concurrency); |
|
335
|
|
|
} |
|
336
|
|
|
|
|
337
|
|
|
/** |
|
338
|
|
|
* run map object |
|
339
|
|
|
* |
|
340
|
|
|
* @param \Ackintosh\Snidel\MapContainer |
|
341
|
|
|
* @return array |
|
342
|
|
|
* @throws \RuntimeException |
|
343
|
|
|
*/ |
|
344
|
|
|
public function run(MapContainer $mapContainer) |
|
345
|
|
|
{ |
|
346
|
|
|
try { |
|
347
|
|
|
$this->forkTheFirstProcessing($mapContainer); |
|
348
|
|
|
$this->waitsAndConnectsProcess($mapContainer); |
|
349
|
|
|
} catch (\RuntimeException $e) { |
|
350
|
|
|
$this->exceptionHasOccured = true; |
|
351
|
|
|
throw $e; |
|
352
|
|
|
} |
|
353
|
|
|
|
|
354
|
|
|
return $this->getResultsOf($mapContainer); |
|
355
|
|
|
} |
|
356
|
|
|
|
|
357
|
|
|
/** |
|
358
|
|
|
* fork the first processing of the map container |
|
359
|
|
|
* |
|
360
|
|
|
* @param \Ackintosh\Snidel\MapContainer |
|
361
|
|
|
* @return void |
|
362
|
|
|
* @throws \RuntimeException |
|
363
|
|
|
*/ |
|
364
|
|
|
private function forkTheFirstProcessing(MapContainer $mapContainer) |
|
365
|
|
|
{ |
|
366
|
|
|
foreach ($mapContainer->getFirstArgs() as $args) { |
|
367
|
|
|
try { |
|
368
|
|
|
$childPid = $this->fork($mapContainer->getFirstMap()->getCallable(), $args); |
|
369
|
|
|
} catch (\RuntimeException $e) { |
|
370
|
|
|
throw $e; |
|
371
|
|
|
} |
|
372
|
|
|
$mapContainer->getFirstMap()->countTheForked(); |
|
373
|
|
|
$mapContainer->getFirstMap()->addChildPid($childPid); |
|
374
|
|
|
} |
|
375
|
|
|
} |
|
376
|
|
|
|
|
377
|
|
|
/** |
|
378
|
|
|
* waits and connects the process of map container |
|
379
|
|
|
* |
|
380
|
|
|
* @param \Ackintosh\Snidel\MapContainer |
|
381
|
|
|
* @return void |
|
382
|
|
|
* @throws \RuntimeException |
|
383
|
|
|
*/ |
|
384
|
|
|
private function waitsAndConnectsProcess(MapContainer $mapContainer) |
|
385
|
|
|
{ |
|
386
|
|
|
if ($this->joined) { |
|
387
|
|
|
return; |
|
388
|
|
|
} |
|
389
|
|
|
|
|
390
|
|
|
while ($mapContainer->isProcessing()) { |
|
391
|
|
|
try { |
|
392
|
|
|
$fork = $this->forkContainer->wait(); |
|
393
|
|
|
} catch (SharedMemoryControlException $e) { |
|
394
|
|
|
throw $e; |
|
395
|
|
|
} |
|
396
|
|
|
|
|
397
|
|
|
$childPid = $fork->getPid(); |
|
398
|
|
|
if (!$fork->isSuccessful()) { |
|
399
|
|
|
$message = 'an error has occurred in child process. pid: ' . $childPid; |
|
400
|
|
|
$this->log->error($message); |
|
401
|
|
|
throw new \RuntimeException($message); |
|
402
|
|
|
} |
|
403
|
|
|
|
|
404
|
|
|
$result = $fork->getResult(); |
|
405
|
|
|
$this->results[$childPid] = $result->getReturn(); |
|
406
|
|
|
unset($this->childPids[array_search($childPid, $this->childPids)]); |
|
407
|
|
|
if ($nextMap = $mapContainer->nextMap($childPid)) { |
|
408
|
|
|
try { |
|
409
|
|
|
$nextMapPid = $this->fork( |
|
410
|
|
|
$nextMap->getCallable(), |
|
411
|
|
|
array($this->results[$childPid]), |
|
412
|
|
|
null, |
|
413
|
|
|
$nextMap->getToken() |
|
414
|
|
|
); |
|
415
|
|
|
} catch (\RuntimeException $e) { |
|
416
|
|
|
throw $e; |
|
417
|
|
|
} |
|
418
|
|
|
$message = sprintf('processing is connected from [%d] to [%d]', $childPid, $nextMapPid); |
|
419
|
|
|
$this->log->info($message); |
|
420
|
|
|
$nextMap->countTheForked(); |
|
421
|
|
|
$nextMap->addChildPid($nextMapPid); |
|
422
|
|
|
} |
|
423
|
|
|
$mapContainer->countTheCompleted($childPid); |
|
424
|
|
|
} |
|
425
|
|
|
|
|
426
|
|
|
$this->joined = true; |
|
427
|
|
|
} |
|
428
|
|
|
|
|
429
|
|
|
/** |
|
430
|
|
|
* gets results of map container |
|
431
|
|
|
* |
|
432
|
|
|
* @param \Ackintosh\Snidel\MapContainer |
|
433
|
|
|
* @return array |
|
434
|
|
|
*/ |
|
435
|
|
|
private function getResultsOf(MapContainer $mapContainer) |
|
436
|
|
|
{ |
|
437
|
|
|
$results = array(); |
|
438
|
|
|
foreach ($mapContainer->getLastMapPids() as $pid) { |
|
439
|
|
|
$results[] = $this->results[$pid]; |
|
440
|
|
|
} |
|
441
|
|
|
|
|
442
|
|
|
return $results; |
|
443
|
|
|
} |
|
444
|
|
|
|
|
445
|
|
|
private function _exit($status = 0) |
|
446
|
|
|
{ |
|
447
|
|
|
exit($status); |
|
|
|
|
|
|
448
|
|
|
} |
|
449
|
|
|
|
|
450
|
|
|
public function __destruct() |
|
451
|
|
|
{ |
|
452
|
|
|
if ($this->exceptionHasOccured) { |
|
453
|
|
|
$this->log->info('destruct processes are started.(exception has occured)'); |
|
454
|
|
|
$this->log->info('--> deleting all shared memory.'); |
|
455
|
|
|
$this->deleteAllData(); |
|
456
|
|
|
} elseif ($this->ownerPid === getmypid() && !$this->joined && $this->receivedSignal === null) { |
|
457
|
|
|
$message = 'snidel will have to wait for the child process is completed. please use Snidel::wait()'; |
|
458
|
|
|
$this->log->error($message); |
|
459
|
|
|
$this->log->info('destruct processes are started.'); |
|
460
|
|
|
|
|
461
|
|
|
$this->log->info('--> sending a signal to children.'); |
|
462
|
|
|
$this->sendSignalToChildren(SIGTERM); |
|
463
|
|
|
|
|
464
|
|
|
$this->log->info('--> deleting all shared memory.'); |
|
465
|
|
|
$this->deleteAllData(); |
|
466
|
|
|
|
|
467
|
|
|
$this->log->info('--> deleting token.'); |
|
468
|
|
|
unset($this->token); |
|
469
|
|
|
|
|
470
|
|
|
$this->log->info('--> destruct processes are finished successfully.'); |
|
471
|
|
|
throw new \LogicException($message); |
|
472
|
|
|
} |
|
473
|
|
|
} |
|
474
|
|
|
} |
|
475
|
|
|
|
The PSR-1: Basic Coding Standard recommends that a file should either introduce new symbols, that is classes, functions, constants or similar, or have side effects. Side effects are anything that executes logic, like for example printing output, changing ini settings or writing to a file.
The idea behind this recommendation is that merely auto-loading a class should not change the state of an application. It also promotes a cleaner style of programming and makes your code less prone to errors, because the logic is not spread out all over the place.
To learn more about the PSR-1, please see the PHP-FIG site on the PSR-1.