|
1
|
|
|
<?php |
|
2
|
|
|
declare(ticks = 1); |
|
3
|
|
|
|
|
4
|
|
|
namespace Ackintosh; |
|
5
|
|
|
|
|
6
|
|
|
use Ackintosh\Snidel\Fork\Container; |
|
7
|
|
|
use Ackintosh\Snidel\Result\Result; |
|
8
|
|
|
use Ackintosh\Snidel\Log; |
|
9
|
|
|
use Ackintosh\Snidel\Pcntl; |
|
10
|
|
|
use Ackintosh\Snidel\DataRepository; |
|
11
|
|
|
use Ackintosh\Snidel\MapContainer; |
|
12
|
|
|
use Ackintosh\Snidel\Task\Task; |
|
13
|
|
|
use Ackintosh\Snidel\Exception\SharedMemoryControlException; |
|
14
|
|
|
|
|
15
|
|
|
class Snidel |
|
16
|
|
|
{ |
|
17
|
|
|
/** @var string */ |
|
18
|
|
|
const VERSION = '0.6.5'; |
|
19
|
|
|
|
|
20
|
|
|
/** @var \Ackintosh\Snidel\Fork\Container */ |
|
21
|
|
|
private $container; |
|
22
|
|
|
|
|
23
|
|
|
/** @var \Ackintosh\Snidel\Pcntl */ |
|
24
|
|
|
private $pcntl; |
|
25
|
|
|
|
|
26
|
|
|
/** @var int */ |
|
27
|
|
|
private $concurrency; |
|
28
|
|
|
|
|
29
|
|
|
/** @var \Ackintosh\Snidel\Log */ |
|
30
|
|
|
private $log; |
|
31
|
|
|
|
|
32
|
|
|
/** @var bool */ |
|
33
|
|
|
private $joined = false; |
|
34
|
|
|
|
|
35
|
|
|
/** @var int */ |
|
36
|
|
|
private $ownerPid; |
|
37
|
|
|
|
|
38
|
|
|
/** @var array */ |
|
39
|
|
|
private $signals = array( |
|
40
|
|
|
SIGTERM, |
|
41
|
|
|
SIGINT, |
|
42
|
|
|
); |
|
43
|
|
|
|
|
44
|
|
|
/** @var int */ |
|
45
|
|
|
private $receivedSignal; |
|
46
|
|
|
|
|
47
|
|
|
/** @var bool */ |
|
48
|
|
|
private $exceptionHasOccured = false; |
|
49
|
|
|
|
|
50
|
|
|
public function __construct($concurrency = 5) |
|
51
|
|
|
{ |
|
52
|
|
|
$this->ownerPid = getmypid(); |
|
53
|
|
|
$this->concurrency = $concurrency; |
|
54
|
|
|
$this->log = new Log($this->ownerPid); |
|
55
|
|
|
$this->pcntl = new Pcntl(); |
|
56
|
|
|
$this->container = new Container($this->ownerPid, $this->log, $this->concurrency); |
|
57
|
|
|
|
|
58
|
|
|
$log = $this->log; |
|
59
|
|
|
$self = $this; |
|
60
|
|
|
foreach ($this->signals as $sig) { |
|
61
|
|
|
$this->pcntl->signal( |
|
62
|
|
|
$sig, |
|
63
|
|
|
function ($sig) use($log, $self) { |
|
64
|
|
|
$log->info('received signal. signo: ' . $sig); |
|
65
|
|
|
$self->setReceivedSignal($sig); |
|
66
|
|
|
|
|
67
|
|
|
$log->info('--> sending a signal " to children.'); |
|
68
|
|
|
$self->container->sendSignalToMaster($sig); |
|
69
|
|
|
$log->info('<-- signal handling has been completed successfully.'); |
|
70
|
|
|
exit; |
|
|
|
|
|
|
71
|
|
|
}, |
|
72
|
|
|
false |
|
73
|
|
|
); |
|
74
|
|
|
} |
|
75
|
|
|
|
|
76
|
|
|
$this->log->info('parent pid: ' . $this->ownerPid); |
|
77
|
|
|
} |
|
78
|
|
|
|
|
79
|
|
|
/** |
|
80
|
|
|
* sets the resource for the log. |
|
81
|
|
|
* |
|
82
|
|
|
* @param resource $resource |
|
83
|
|
|
* @return void |
|
84
|
|
|
* @codeCoverageIgnore |
|
85
|
|
|
*/ |
|
86
|
|
|
public function setLoggingDestination($resource) |
|
87
|
|
|
{ |
|
88
|
|
|
$this->log->setDestination($resource); |
|
89
|
|
|
} |
|
90
|
|
|
|
|
91
|
|
|
/** |
|
92
|
|
|
* this method uses master / worker model. |
|
93
|
|
|
* |
|
94
|
|
|
* @param callable $callable |
|
95
|
|
|
* @param mixed $args |
|
96
|
|
|
* @param string $tag |
|
97
|
|
|
* @return void |
|
98
|
|
|
* @throws \RuntimeException |
|
99
|
|
|
*/ |
|
100
|
|
|
public function fork($callable, $args = array(), $tag = null) |
|
101
|
|
|
{ |
|
102
|
|
|
$this->joined = false; |
|
103
|
|
|
|
|
104
|
|
|
if (!$this->container->existsMaster()) { |
|
105
|
|
|
$this->container->forkMaster(); |
|
106
|
|
|
} |
|
107
|
|
|
|
|
108
|
|
|
try { |
|
109
|
|
|
$this->container->enqueue(new Task($callable, $args, $tag)); |
|
110
|
|
|
} catch (\RuntimeException $e) { |
|
111
|
|
|
throw $e; |
|
112
|
|
|
} |
|
113
|
|
|
|
|
114
|
|
|
$this->log->info('queued task #' . $this->container->queuedCount()); |
|
115
|
|
|
} |
|
116
|
|
|
|
|
117
|
|
|
/** |
|
118
|
|
|
* waits until all tasks that queued by Snidel::fork() are completed |
|
119
|
|
|
* |
|
120
|
|
|
* @return void |
|
121
|
|
|
*/ |
|
122
|
|
|
public function wait() |
|
123
|
|
|
{ |
|
124
|
|
|
$this->container->wait(); |
|
125
|
|
|
$this->joined = true; |
|
126
|
|
|
} |
|
127
|
|
|
|
|
128
|
|
|
/** |
|
129
|
|
|
* @return bool |
|
130
|
|
|
*/ |
|
131
|
|
|
public function hasError() |
|
132
|
|
|
{ |
|
133
|
|
|
return $this->container->hasError(); |
|
134
|
|
|
} |
|
135
|
|
|
|
|
136
|
|
|
/** |
|
137
|
|
|
* @return \Ackintosh\Snidel\Error |
|
138
|
|
|
*/ |
|
139
|
|
|
public function getError() |
|
140
|
|
|
{ |
|
141
|
|
|
return $this->container->getError(); |
|
142
|
|
|
} |
|
143
|
|
|
|
|
144
|
|
|
/** |
|
145
|
|
|
* gets results |
|
146
|
|
|
* |
|
147
|
|
|
* @param string $tag |
|
148
|
|
|
* @return \Ackintosh\Snidel\Result\Collection |
|
149
|
|
|
* @throws \InvalidArgumentException |
|
150
|
|
|
*/ |
|
151
|
|
|
public function get($tag = null) |
|
152
|
|
|
{ |
|
153
|
|
|
if (!$this->joined) { |
|
154
|
|
|
$this->wait(); |
|
155
|
|
|
} |
|
156
|
|
|
if ($tag !== null && !$this->container->hasTag($tag)) { |
|
157
|
|
|
throw new \InvalidArgumentException('unknown tag: ' . $tag); |
|
158
|
|
|
} |
|
159
|
|
|
|
|
160
|
|
|
return $this->container->getCollection($tag); |
|
161
|
|
|
} |
|
162
|
|
|
|
|
163
|
|
|
public function setReceivedSignal($sig) |
|
164
|
|
|
{ |
|
165
|
|
|
$this->receivedSignal = $sig; |
|
166
|
|
|
} |
|
167
|
|
|
/** |
|
168
|
|
|
* delete shared memory |
|
169
|
|
|
* |
|
170
|
|
|
* @return void |
|
171
|
|
|
* @throws \Ackintosh\Snidel\Exception\SharedMemoryControlException |
|
172
|
|
|
*/ |
|
173
|
|
|
private function deleteAllData() |
|
174
|
|
|
{ |
|
175
|
|
|
$dataRepository = new DataRepository(); |
|
176
|
|
|
try { |
|
177
|
|
|
$dataRepository->deleteAll(); |
|
178
|
|
|
} catch (SharedMemoryControlException $e) { |
|
179
|
|
|
throw $e; |
|
180
|
|
|
} |
|
181
|
|
|
} |
|
182
|
|
|
|
|
183
|
|
|
public function __destruct() |
|
184
|
|
|
{ |
|
185
|
|
|
if ($this->ownerPid === getmypid()) { |
|
186
|
|
|
if ($this->container->existsMaster()) { |
|
187
|
|
|
$this->log->info('shutdown master process.'); |
|
188
|
|
|
$this->container->sendSignalToMaster(); |
|
189
|
|
|
} |
|
190
|
|
|
|
|
191
|
|
|
unset($this->container); |
|
192
|
|
|
} |
|
193
|
|
|
|
|
194
|
|
|
if ($this->exceptionHasOccured) { |
|
195
|
|
|
$this->log->info('destruct processes are started.(exception has occured)'); |
|
196
|
|
|
$this->log->info('--> deleting all shared memory.'); |
|
197
|
|
|
$this->deleteAllData(); |
|
198
|
|
|
} elseif ($this->ownerPid === getmypid() && !$this->joined && $this->receivedSignal === null) { |
|
199
|
|
|
$message = 'snidel will have to wait for the child process is completed. please use Snidel::wait()'; |
|
200
|
|
|
$this->log->error($message); |
|
201
|
|
|
$this->log->info('destruct processes are started.'); |
|
202
|
|
|
|
|
203
|
|
|
$this->log->info('--> deleting all shared memory.'); |
|
204
|
|
|
$this->deleteAllData(); |
|
205
|
|
|
|
|
206
|
|
|
$this->log->info('--> destruct processes are finished successfully.'); |
|
207
|
|
|
throw new \LogicException($message); |
|
208
|
|
|
} |
|
209
|
|
|
} |
|
210
|
|
|
} |
|
211
|
|
|
|
An exit expression should only be used in rare cases. For example, if you write a short command line script.
In most cases however, using an
exitexpression makes the code untestable and often causes incompatibilities with other libraries. Thus, unless you are absolutely sure it is required here, we recommend to refactor your code to avoid its usage.