1
|
|
|
<?php |
2
|
|
|
declare(ticks = 1); |
3
|
|
|
|
4
|
|
|
namespace Ackintosh; |
5
|
|
|
|
6
|
|
|
use Ackintosh\Snidel\ForkContainer; |
7
|
|
|
use Ackintosh\Snidel\Result; |
8
|
|
|
use Ackintosh\Snidel\Token; |
9
|
|
|
use Ackintosh\Snidel\Log; |
10
|
|
|
use Ackintosh\Snidel\Pcntl; |
11
|
|
|
use Ackintosh\Snidel\DataRepository; |
12
|
|
|
use Ackintosh\Snidel\MapContainer; |
13
|
|
|
use Ackintosh\Snidel\Task; |
14
|
|
|
use Ackintosh\Snidel\TaskQueue; |
15
|
|
|
use Ackintosh\Snidel\ResultQueue; |
16
|
|
|
use Ackintosh\Snidel\Exception\SharedMemoryControlException; |
17
|
|
|
|
18
|
|
|
class Snidel |
19
|
|
|
{ |
20
|
|
|
/** @var string */ |
21
|
|
|
const VERSION = '0.6.0'; |
22
|
|
|
|
23
|
|
|
private $masterProcessId = null; |
24
|
|
|
|
25
|
|
|
/** @var array */ |
26
|
|
|
private $childPids = array(); |
27
|
|
|
|
28
|
|
|
/** @var \Ackintosh\Snidel\ForkContainer */ |
29
|
|
|
private $forkContainer; |
30
|
|
|
|
31
|
|
|
/** @var \Ackintosh\Snidel\Pcntl */ |
32
|
|
|
private $pcntl; |
33
|
|
|
|
34
|
|
|
/** @var int */ |
35
|
|
|
private $concurrency; |
36
|
|
|
|
37
|
|
|
/** @var \Ackintosh\Snidel\Token */ |
38
|
|
|
private $token; |
39
|
|
|
|
40
|
|
|
/** @var \Ackintosh\Snidel\Log */ |
41
|
|
|
private $log; |
42
|
|
|
|
43
|
|
|
/** @var \Ackintosh\Snidel\DataRepository */ |
44
|
|
|
private $dataRepository; |
45
|
|
|
|
46
|
|
|
/** @var bool */ |
47
|
|
|
private $joined = false; |
48
|
|
|
|
49
|
|
|
/** @var int */ |
50
|
|
|
private $ownerPid; |
51
|
|
|
|
52
|
|
|
/** @var array */ |
53
|
|
|
private $signals = array( |
54
|
|
|
SIGTERM, |
55
|
|
|
SIGINT, |
56
|
|
|
); |
57
|
|
|
|
58
|
|
|
/** @var int */ |
59
|
|
|
private $receivedSignal; |
60
|
|
|
|
61
|
|
|
/** @var \Ackintosh\Snidel\Token */ |
62
|
|
|
private $processToken; |
63
|
|
|
|
64
|
|
|
/** @var bool */ |
65
|
|
|
private $exceptionHasOccured = false; |
66
|
|
|
|
67
|
|
|
public function __construct($concurrency = 5) |
68
|
|
|
{ |
69
|
|
|
$this->ownerPid = getmypid(); |
70
|
|
|
$this->childPids = array(); |
71
|
|
|
$this->concurrency = $concurrency; |
72
|
|
|
$this->token = new Token(getmypid(), $concurrency); |
73
|
|
|
$this->log = new Log(getmypid()); |
74
|
|
|
$this->pcntl = new Pcntl(); |
75
|
|
|
$this->dataRepository = new DataRepository(); |
76
|
|
|
$this->forkContainer = new ForkContainer($this->ownerPid); |
77
|
|
|
|
78
|
|
|
$log = $this->log; |
79
|
|
|
$token = $this->token; |
80
|
|
|
$self = $this; |
81
|
|
|
foreach ($this->signals as $sig) { |
82
|
|
|
$this->pcntl->signal( |
83
|
|
|
$sig, |
84
|
|
|
function ($sig) use($log, $token, $self) { |
85
|
|
|
$log->info('received signal. signo: ' . $sig); |
86
|
|
|
$self->setReceivedSignal($sig); |
87
|
|
|
|
88
|
|
|
$log->info('--> sending a signal to children.'); |
89
|
|
|
$self->sendSignalToChildren($sig); |
90
|
|
|
|
91
|
|
|
$log->info('--> deleting token.'); |
92
|
|
|
unset($token); |
93
|
|
|
|
94
|
|
|
$log->info('<-- signal handling has been completed successfully.'); |
95
|
|
|
exit; |
|
|
|
|
96
|
|
|
}, |
97
|
|
|
false |
98
|
|
|
); |
99
|
|
|
} |
100
|
|
|
|
101
|
|
|
$this->log->info('parent pid: ' . $this->ownerPid); |
102
|
|
|
} |
103
|
|
|
|
104
|
|
|
/** |
105
|
|
|
* sets the resource for the log. |
106
|
|
|
* |
107
|
|
|
* @param resource $resource |
108
|
|
|
* @return void |
109
|
|
|
* @codeCoverageIgnore |
110
|
|
|
*/ |
111
|
|
|
public function setLoggingDestination($resource) |
112
|
|
|
{ |
113
|
|
|
$this->log->setDestination($resource); |
114
|
|
|
} |
115
|
|
|
|
116
|
|
|
/** |
117
|
|
|
* this method uses master / worker model. |
118
|
|
|
* |
119
|
|
|
* @param callable $callable |
120
|
|
|
* @param mixed $args |
121
|
|
|
* @param string $tag |
122
|
|
|
* @return void |
123
|
|
|
* @throws \RuntimeException |
124
|
|
|
*/ |
125
|
|
|
public function fork($callable, $args = array(), $tag = null) |
126
|
|
|
{ |
127
|
|
|
if ($this->masterProcessId === null) { |
128
|
|
|
$this->forkMaster(); |
129
|
|
|
} |
130
|
|
|
|
131
|
|
|
try { |
132
|
|
|
$this->forkContainer->enqueue(new Task($callable, $args, $tag)); |
133
|
|
|
} catch (\RuntimeException $e) { |
134
|
|
|
throw $e; |
135
|
|
|
} |
136
|
|
|
|
137
|
|
|
$this->log->info('queued task #' . $this->forkContainer->queuedCount()); |
138
|
|
|
} |
139
|
|
|
|
140
|
|
|
/** |
141
|
|
|
* fork master process |
142
|
|
|
* |
143
|
|
|
* @return void |
144
|
|
|
*/ |
145
|
|
|
private function forkMaster() |
146
|
|
|
{ |
147
|
|
|
$pid = $this->pcntl->fork(); |
148
|
|
|
$this->masterProcessId = ($pid === 0) ? getmypid() : $pid; |
149
|
|
|
$this->log->setMasterProcessId($this->masterProcessId); |
150
|
|
|
|
151
|
|
|
if ($pid) { |
152
|
|
|
// owner |
153
|
|
|
$this->log->info('pid: ' . getmypid()); |
154
|
|
|
} elseif ($pid === -1) { |
|
|
|
|
155
|
|
|
// error |
156
|
|
|
} else { |
157
|
|
|
// master |
158
|
|
|
$taskQueue = new TaskQueue($this->ownerPid); |
159
|
|
|
$this->log->info('pid: ' . $this->masterProcessId); |
160
|
|
|
|
161
|
|
|
foreach ($this->signals as $sig) { |
162
|
|
|
$this->pcntl->signal($sig, SIG_DFL, true); |
163
|
|
|
} |
164
|
|
|
|
165
|
|
|
while ($task = $taskQueue->dequeue()) { |
166
|
|
|
$this->log->info('dequeued task #' . $taskQueue->dequeuedCount()); |
167
|
|
|
if ($this->token->accept()) { |
168
|
|
|
$this->forkWorker($task); |
169
|
|
|
} |
170
|
|
|
} |
171
|
|
|
$this->_exit(); |
172
|
|
|
} |
173
|
|
|
} |
174
|
|
|
|
175
|
|
|
/** |
176
|
|
|
* fork worker process |
177
|
|
|
* |
178
|
|
|
* @param \Ackintosh\Snidel\Task |
179
|
|
|
* @return void |
180
|
|
|
* @throws \RuntimeException |
181
|
|
|
*/ |
182
|
|
|
private function forkWorker($task) |
183
|
|
|
{ |
184
|
|
|
try { |
185
|
|
|
$fork = $this->forkContainer->fork($task); |
186
|
|
|
} catch (\RuntimeException $e) { |
187
|
|
|
$this->log->error($e->getMessage()); |
188
|
|
|
throw $e; |
189
|
|
|
} |
190
|
|
|
|
191
|
|
|
$fork->setTask($task); |
192
|
|
|
|
193
|
|
|
if (getmypid() === $this->masterProcessId) { |
194
|
|
|
// master |
195
|
|
|
$this->log->info('forked worker. pid: ' . $fork->getPid()); |
196
|
|
|
} else { |
197
|
|
|
// worker |
198
|
|
|
$this->log->info('has forked. pid: ' . getmypid()); |
199
|
|
|
// @codeCoverageIgnoreStart |
200
|
|
|
|
201
|
|
|
foreach ($this->signals as $sig) { |
202
|
|
|
$this->pcntl->signal($sig, SIG_DFL, true); |
203
|
|
|
} |
204
|
|
|
|
205
|
|
|
$resultQueue = new ResultQueue($this->ownerPid); |
206
|
|
|
register_shutdown_function(function () use ($fork, $resultQueue) { |
207
|
|
|
if ($fork->hasNoResult() || !$fork->isQueued()) { |
208
|
|
|
$result = new Result(); |
209
|
|
|
$result->setFailure(); |
210
|
|
|
$fork->setResult($result); |
211
|
|
|
$resultQueue->enqueue($fork); |
212
|
|
|
} |
213
|
|
|
}); |
214
|
|
|
|
215
|
|
|
$this->log->info('----> started the function.'); |
216
|
|
|
$fork->executeTask(); |
217
|
|
|
$this->log->info('<---- completed the function.'); |
218
|
|
|
|
219
|
|
|
$resultQueue->enqueue($fork); |
220
|
|
|
$fork->setQueued(); |
221
|
|
|
$this->log->info('queued the result.'); |
222
|
|
|
|
223
|
|
|
$this->token->back(); |
224
|
|
|
$this->log->info('return the token and exit.'); |
225
|
|
|
$this->_exit(); |
226
|
|
|
// @codeCoverageIgnoreEnd |
227
|
|
|
} |
228
|
|
|
} |
229
|
|
|
|
230
|
|
|
/** |
231
|
|
|
* fork process |
232
|
|
|
* the processes which forked are wait for token. |
233
|
|
|
* |
234
|
|
|
* @param callable $callable |
235
|
|
|
* @param mixed $args |
236
|
|
|
* @param string $tag |
237
|
|
|
* @param \Ackintosh\Snidel\Token $token |
238
|
|
|
* @return void |
239
|
|
|
* @throws \RuntimeException |
240
|
|
|
*/ |
241
|
|
|
private function prefork($callable, $args = array(), $tag = null, Token $token = null) |
242
|
|
|
{ |
243
|
|
|
$this->processToken = $token ? $token : $this->token; |
244
|
|
|
$task = new Task($callable, $args, $tag); |
245
|
|
|
|
246
|
|
|
try { |
247
|
|
|
$fork = $this->forkContainer->fork($task); |
248
|
|
|
} catch (\RuntimeException $e) { |
249
|
|
|
$this->log->error($e->getMessage()); |
250
|
|
|
throw $e; |
251
|
|
|
} |
252
|
|
|
|
253
|
|
|
$fork->setTask($task); |
254
|
|
|
|
255
|
|
|
if (getmypid() === $this->ownerPid) { |
256
|
|
|
// parent |
257
|
|
|
$this->log->info('created child process. pid: ' . $fork->getPid()); |
258
|
|
|
$this->childPids[] = $fork->getPid(); |
259
|
|
|
} else { |
260
|
|
|
// @codeCoverageIgnoreStart |
261
|
|
|
// child |
262
|
|
|
foreach ($this->signals as $sig) { |
263
|
|
|
$this->pcntl->signal($sig, SIG_DFL, true); |
264
|
|
|
} |
265
|
|
|
|
266
|
|
|
/** |
267
|
|
|
* in php5.3, we can not use $this in anonymous functions |
268
|
|
|
*/ |
269
|
|
|
$dataRepository = $this->dataRepository; |
270
|
|
|
$log = $this->log; |
271
|
|
|
$processToken = $this->processToken; |
272
|
|
|
register_shutdown_function(function () use ($fork, $dataRepository, $log, $processToken) { |
273
|
|
|
$data = $dataRepository->load(getmypid()); |
274
|
|
|
try { |
275
|
|
|
$data->write($fork); |
276
|
|
|
} catch (SharedMemoryControlException $e) { |
277
|
|
|
throw $e; |
278
|
|
|
} |
279
|
|
|
$log->info('<-- return token.'); |
280
|
|
|
$processToken->back(); |
281
|
|
|
}); |
282
|
|
|
|
283
|
|
|
$log->info('--> waiting for the token come around.'); |
284
|
|
|
if ($processToken->accept()) { |
285
|
|
|
$log->info('----> started the function.'); |
286
|
|
|
$fork->executeTask(); |
287
|
|
|
$log->info('<---- completed the function.'); |
288
|
|
|
} |
289
|
|
|
|
290
|
|
|
$this->_exit(); |
291
|
|
|
// @codeCoverageIgnoreEnd |
292
|
|
|
} |
293
|
|
|
|
294
|
|
|
return $fork->getPid(); |
295
|
|
|
} |
296
|
|
|
|
297
|
|
|
/** |
298
|
|
|
* waits until all tasks that queued by Snidel::fork() are completed |
299
|
|
|
* |
300
|
|
|
* @return void |
301
|
|
|
*/ |
302
|
|
|
public function wait() |
303
|
|
|
{ |
304
|
|
|
$this->forkContainer->wait(); |
305
|
|
|
$this->joined = true; |
306
|
|
|
} |
307
|
|
|
|
308
|
|
|
/** |
309
|
|
|
* @return bool |
310
|
|
|
*/ |
311
|
|
|
public function hasError() |
312
|
|
|
{ |
313
|
|
|
return $this->forkContainer->hasError(); |
314
|
|
|
} |
315
|
|
|
|
316
|
|
|
/** |
317
|
|
|
* @return \Ackintosh\Snidel\Error |
318
|
|
|
*/ |
319
|
|
|
public function getError() |
320
|
|
|
{ |
321
|
|
|
return $this->forkContainer->getError(); |
322
|
|
|
} |
323
|
|
|
|
324
|
|
|
/** |
325
|
|
|
* gets results |
326
|
|
|
* |
327
|
|
|
* @param string $tag |
328
|
|
|
* @return \Ackintosh\Snidel\ForkCollection |
329
|
|
|
* @throws \InvalidArgumentException |
330
|
|
|
*/ |
331
|
|
|
public function get($tag = null) |
332
|
|
|
{ |
333
|
|
|
if (!$this->joined) { |
334
|
|
|
$this->wait(); |
335
|
|
|
} |
336
|
|
|
|
337
|
|
|
if (!$this->forkContainer->hasTag($tag)) { |
|
|
|
|
338
|
|
|
throw new \InvalidArgumentException('unknown tag: ' . $tag); |
339
|
|
|
} |
340
|
|
|
|
341
|
|
|
return $this->forkContainer->getCollection($tag); |
342
|
|
|
} |
343
|
|
|
|
344
|
|
|
/** |
345
|
|
|
* sends signal to child |
346
|
|
|
* |
347
|
|
|
* @param int $sig |
348
|
|
|
* @return void |
349
|
|
|
*/ |
350
|
|
|
public function sendSignalToChildren($sig) |
351
|
|
|
{ |
352
|
|
|
foreach ($this->childPids as $pid) { |
353
|
|
|
$this->log->info('----> sending a signal to child. pid: ' . $pid); |
354
|
|
|
posix_kill($pid, $sig); |
355
|
|
|
} |
356
|
|
|
} |
357
|
|
|
|
358
|
|
|
public function setReceivedSignal($sig) |
359
|
|
|
{ |
360
|
|
|
$this->receivedSignal = $sig; |
361
|
|
|
} |
362
|
|
|
|
363
|
|
|
/** |
364
|
|
|
* delete shared memory |
365
|
|
|
* |
366
|
|
|
* @return void |
367
|
|
|
* @throws \Ackintosh\Snidel\Exception\SharedMemoryControlException |
368
|
|
|
*/ |
369
|
|
|
private function deleteAllData() |
370
|
|
|
{ |
371
|
|
|
foreach ($this->childPids as $pid) { |
372
|
|
|
$data = $this->dataRepository->load($pid); |
373
|
|
|
try { |
374
|
|
|
$data->deleteIfExists(); |
375
|
|
|
} catch (SharedMemoryControlException $e) { |
376
|
|
|
throw $e; |
377
|
|
|
} |
378
|
|
|
} |
379
|
|
|
} |
380
|
|
|
|
381
|
|
|
/** |
382
|
|
|
* create map object |
383
|
|
|
* |
384
|
|
|
* @param array $args |
385
|
|
|
* @param callable $callable |
386
|
|
|
* @return \Ackintosh\Snidel\MapContainer |
387
|
|
|
*/ |
388
|
|
|
public function map(Array $args, $callable) |
389
|
|
|
{ |
390
|
|
|
return new MapContainer($args, $callable, $this->concurrency); |
391
|
|
|
} |
392
|
|
|
|
393
|
|
|
/** |
394
|
|
|
* run map object |
395
|
|
|
* |
396
|
|
|
* @param \Ackintosh\Snidel\MapContainer |
397
|
|
|
* @return array |
398
|
|
|
* @throws \RuntimeException |
399
|
|
|
*/ |
400
|
|
|
public function run(MapContainer $mapContainer) |
401
|
|
|
{ |
402
|
|
|
try { |
403
|
|
|
$this->forkTheFirstProcessing($mapContainer); |
404
|
|
|
$this->waitsAndConnectsProcess($mapContainer); |
405
|
|
|
} catch (\RuntimeException $e) { |
406
|
|
|
$this->exceptionHasOccured = true; |
407
|
|
|
throw $e; |
408
|
|
|
} |
409
|
|
|
|
410
|
|
|
return $this->getResultsOf($mapContainer); |
411
|
|
|
} |
412
|
|
|
|
413
|
|
|
/** |
414
|
|
|
* fork the first processing of the map container |
415
|
|
|
* |
416
|
|
|
* @param \Ackintosh\Snidel\MapContainer |
417
|
|
|
* @return void |
418
|
|
|
* @throws \RuntimeException |
419
|
|
|
*/ |
420
|
|
|
private function forkTheFirstProcessing(MapContainer $mapContainer) |
421
|
|
|
{ |
422
|
|
|
foreach ($mapContainer->getFirstArgs() as $args) { |
423
|
|
|
try { |
424
|
|
|
$childPid = $this->prefork($mapContainer->getFirstMap()->getCallable(), $args); |
425
|
|
|
} catch (\RuntimeException $e) { |
426
|
|
|
throw $e; |
427
|
|
|
} |
428
|
|
|
$mapContainer->getFirstMap()->countTheForked(); |
429
|
|
|
$mapContainer->getFirstMap()->addChildPid($childPid); |
430
|
|
|
} |
431
|
|
|
} |
432
|
|
|
|
433
|
|
|
/** |
434
|
|
|
* waits and connects the process of map container |
435
|
|
|
* |
436
|
|
|
* @param \Ackintosh\Snidel\MapContainer |
437
|
|
|
* @return void |
438
|
|
|
* @throws \RuntimeException |
439
|
|
|
*/ |
440
|
|
|
private function waitsAndConnectsProcess(MapContainer $mapContainer) |
441
|
|
|
{ |
442
|
|
|
if ($this->joined) { |
443
|
|
|
return; |
444
|
|
|
} |
445
|
|
|
|
446
|
|
|
while ($mapContainer->isProcessing()) { |
447
|
|
|
try { |
448
|
|
|
$fork = $this->forkContainer->waitSimply(); |
449
|
|
|
} catch (SharedMemoryControlException $e) { |
450
|
|
|
throw $e; |
451
|
|
|
} |
452
|
|
|
|
453
|
|
|
$childPid = $fork->getPid(); |
454
|
|
|
if ($fork->hasNotFinishedSuccessfully()) { |
455
|
|
|
$message = 'an error has occurred in child process. pid: ' . $childPid; |
456
|
|
|
$this->log->error($message); |
457
|
|
|
throw new \RuntimeException($message); |
458
|
|
|
} |
459
|
|
|
|
460
|
|
|
unset($this->childPids[array_search($childPid, $this->childPids)]); |
461
|
|
|
if ($nextMap = $mapContainer->nextMap($childPid)) { |
462
|
|
|
try { |
463
|
|
|
$nextMapPid = $this->prefork( |
464
|
|
|
$nextMap->getCallable(), |
465
|
|
|
$fork, |
466
|
|
|
null, |
467
|
|
|
$nextMap->getToken() |
468
|
|
|
); |
469
|
|
|
} catch (\RuntimeException $e) { |
470
|
|
|
throw $e; |
471
|
|
|
} |
472
|
|
|
$message = sprintf('processing is connected from [%d] to [%d]', $childPid, $nextMapPid); |
473
|
|
|
$this->log->info($message); |
474
|
|
|
$nextMap->countTheForked(); |
475
|
|
|
$nextMap->addChildPid($nextMapPid); |
476
|
|
|
} |
477
|
|
|
$mapContainer->countTheCompleted($childPid); |
478
|
|
|
} |
479
|
|
|
|
480
|
|
|
$this->joined = true; |
481
|
|
|
} |
482
|
|
|
|
483
|
|
|
/** |
484
|
|
|
* gets results of map container |
485
|
|
|
* |
486
|
|
|
* @param \Ackintosh\Snidel\MapContainer |
487
|
|
|
* @return array |
488
|
|
|
*/ |
489
|
|
|
private function getResultsOf(MapContainer $mapContainer) |
490
|
|
|
{ |
491
|
|
|
$results = array(); |
492
|
|
|
foreach ($mapContainer->getLastMapPids() as $pid) { |
493
|
|
|
$results[] = $this->forkContainer->get($pid)->getResult()->getReturn(); |
494
|
|
|
} |
495
|
|
|
|
496
|
|
|
return $results; |
497
|
|
|
} |
498
|
|
|
|
499
|
|
|
private function _exit($status = 0) |
500
|
|
|
{ |
501
|
|
|
exit($status); |
|
|
|
|
502
|
|
|
} |
503
|
|
|
|
504
|
|
|
public function __destruct() |
505
|
|
|
{ |
506
|
|
|
if ($this->masterProcessId !== null && $this->ownerPid === getmypid()) { |
507
|
|
|
$this->log->info('shutdown master process.'); |
508
|
|
|
posix_kill($this->masterProcessId, SIGTERM); |
509
|
|
|
|
510
|
|
|
unset($this->forkContainer); |
511
|
|
|
} |
512
|
|
|
|
513
|
|
|
if ($this->exceptionHasOccured) { |
514
|
|
|
$this->log->info('destruct processes are started.(exception has occured)'); |
515
|
|
|
$this->log->info('--> deleting all shared memory.'); |
516
|
|
|
$this->deleteAllData(); |
517
|
|
|
} elseif ($this->ownerPid === getmypid() && !$this->joined && $this->receivedSignal === null) { |
518
|
|
|
$message = 'snidel will have to wait for the child process is completed. please use Snidel::wait()'; |
519
|
|
|
$this->log->error($message); |
520
|
|
|
$this->log->info('destruct processes are started.'); |
521
|
|
|
|
522
|
|
|
$this->log->info('--> sending a signal to children.'); |
523
|
|
|
$this->sendSignalToChildren(SIGTERM); |
524
|
|
|
|
525
|
|
|
$this->log->info('--> deleting all shared memory.'); |
526
|
|
|
$this->deleteAllData(); |
527
|
|
|
|
528
|
|
|
$this->log->info('--> deleting token.'); |
529
|
|
|
unset($this->token); |
530
|
|
|
|
531
|
|
|
$this->log->info('--> destruct processes are finished successfully.'); |
532
|
|
|
throw new \LogicException($message); |
533
|
|
|
} |
534
|
|
|
} |
535
|
|
|
} |
536
|
|
|
|
An exit expression should only be used in rare cases. For example, if you write a short command line script.
In most cases however, using an
exit
expression makes the code untestable and often causes incompatibilities with other libraries. Thus, unless you are absolutely sure it is required here, we recommend to refactor your code to avoid its usage.