|
1
|
|
|
<?php |
|
2
|
|
|
declare(ticks = 1); |
|
3
|
|
|
|
|
4
|
|
|
namespace Ackintosh; |
|
5
|
|
|
|
|
6
|
|
|
use Ackintosh\Snidel\Config; |
|
7
|
|
|
use Ackintosh\Snidel\Fork\Container; |
|
8
|
|
|
use Ackintosh\Snidel\Log; |
|
9
|
|
|
use Ackintosh\Snidel\Pcntl; |
|
10
|
|
|
use Ackintosh\Snidel\Task\Task; |
|
11
|
|
|
|
|
12
|
|
|
class Snidel |
|
13
|
|
|
{ |
|
14
|
|
|
/** @var \Ackintosh\Snidel\Config */ |
|
15
|
|
|
private $config; |
|
16
|
|
|
|
|
17
|
|
|
/** @var \Ackintosh\Snidel\Fork\Container */ |
|
18
|
|
|
private $container; |
|
19
|
|
|
|
|
20
|
|
|
/** @var \Ackintosh\Snidel\Pcntl */ |
|
21
|
|
|
private $pcntl; |
|
22
|
|
|
|
|
23
|
|
|
/** @var \Ackintosh\Snidel\Log */ |
|
24
|
|
|
private $log; |
|
25
|
|
|
|
|
26
|
|
|
/** @var bool */ |
|
27
|
|
|
private $joined = false; |
|
28
|
|
|
|
|
29
|
|
|
/** @var int */ |
|
30
|
|
|
private $ownerPid; |
|
31
|
|
|
|
|
32
|
|
|
/** @var array */ |
|
33
|
|
|
private $signals = [ |
|
34
|
|
|
SIGTERM, |
|
35
|
|
|
SIGINT, |
|
36
|
|
|
]; |
|
37
|
|
|
|
|
38
|
|
|
/** @var int */ |
|
39
|
|
|
private $receivedSignal; |
|
40
|
|
|
|
|
41
|
|
|
/** |
|
42
|
|
|
* @param mixed $parameter |
|
43
|
|
|
* @throws \InvalidArgumentException |
|
44
|
|
|
*/ |
|
45
|
|
|
public function __construct($parameter = null) |
|
46
|
|
|
{ |
|
47
|
|
|
if (is_null($parameter)) { |
|
48
|
|
|
$this->config = new Config(); |
|
49
|
|
|
} elseif (is_int($parameter) && $parameter >= 1) { |
|
50
|
|
|
$this->config = new Config( |
|
51
|
|
|
['concurrency' => $parameter] |
|
52
|
|
|
); |
|
53
|
|
|
} elseif (is_array($parameter)) { |
|
54
|
|
|
$this->config = new Config($parameter); |
|
55
|
|
|
} else { |
|
56
|
|
|
throw new \InvalidArgumentException(); |
|
57
|
|
|
} |
|
58
|
|
|
|
|
59
|
|
|
$this->ownerPid = getmypid(); |
|
60
|
|
|
$this->log = new Log($this->ownerPid, $this->config->get('logger')); |
|
61
|
|
|
$this->container = new Container($this->ownerPid, $this->log, $this->config); |
|
62
|
|
|
$this->pcntl = new Pcntl(); |
|
63
|
|
|
|
|
64
|
|
|
foreach ($this->signals as $sig) { |
|
65
|
|
|
$this->pcntl->signal( |
|
66
|
|
|
$sig, |
|
67
|
|
|
function ($sig) { |
|
68
|
|
|
$this->log->info('received signal. signo: ' . $sig); |
|
69
|
|
|
$this->setReceivedSignal($sig); |
|
70
|
|
|
|
|
71
|
|
|
$this->log->info('--> sending a signal " to children.'); |
|
72
|
|
|
$this->container->sendSignalToMaster($sig); |
|
73
|
|
|
$this->log->info('<-- signal handling has been completed successfully.'); |
|
74
|
|
|
exit; |
|
|
|
|
|
|
75
|
|
|
}, |
|
76
|
|
|
false |
|
77
|
|
|
); |
|
78
|
|
|
} |
|
79
|
|
|
|
|
80
|
|
|
$this->log->info('parent pid: ' . $this->ownerPid); |
|
81
|
|
|
} |
|
82
|
|
|
|
|
83
|
|
|
/** |
|
84
|
|
|
* this method uses master / worker model. |
|
85
|
|
|
* |
|
86
|
|
|
* @param callable $callable |
|
87
|
|
|
* @param mixed $args |
|
88
|
|
|
* @param string $tag |
|
89
|
|
|
* @return void |
|
90
|
|
|
* @throws \RuntimeException |
|
91
|
|
|
*/ |
|
92
|
|
|
public function fork($callable, $args = [], $tag = null) |
|
93
|
|
|
{ |
|
94
|
|
|
$this->joined = false; |
|
95
|
|
|
|
|
96
|
|
|
if (!$this->container->existsMaster()) { |
|
97
|
|
|
$this->container->forkMaster(); |
|
98
|
|
|
} |
|
99
|
|
|
|
|
100
|
|
|
try { |
|
101
|
|
|
$this->container->enqueue(new Task($callable, $args, $tag)); |
|
102
|
|
|
} catch (\RuntimeException $e) { |
|
103
|
|
|
throw $e; |
|
104
|
|
|
} |
|
105
|
|
|
|
|
106
|
|
|
$this->log->info('queued task #' . $this->container->queuedCount()); |
|
107
|
|
|
} |
|
108
|
|
|
|
|
109
|
|
|
/** |
|
110
|
|
|
* waits until all tasks that queued by Snidel::fork() are completed |
|
111
|
|
|
* |
|
112
|
|
|
* @return void |
|
113
|
|
|
*/ |
|
114
|
|
|
public function wait() |
|
115
|
|
|
{ |
|
116
|
|
|
$this->container->wait(); |
|
117
|
|
|
$this->joined = true; |
|
118
|
|
|
} |
|
119
|
|
|
|
|
120
|
|
|
/** |
|
121
|
|
|
* @return bool |
|
122
|
|
|
*/ |
|
123
|
|
|
public function hasError() |
|
124
|
|
|
{ |
|
125
|
|
|
return $this->container->hasError(); |
|
126
|
|
|
} |
|
127
|
|
|
|
|
128
|
|
|
/** |
|
129
|
|
|
* @return \Ackintosh\Snidel\Error |
|
130
|
|
|
*/ |
|
131
|
|
|
public function getError() |
|
132
|
|
|
{ |
|
133
|
|
|
return $this->container->getError(); |
|
134
|
|
|
} |
|
135
|
|
|
|
|
136
|
|
|
/** |
|
137
|
|
|
* gets results |
|
138
|
|
|
* |
|
139
|
|
|
* @param string $tag |
|
140
|
|
|
* @return \Ackintosh\Snidel\Result\Collection |
|
141
|
|
|
* @throws \InvalidArgumentException |
|
142
|
|
|
*/ |
|
143
|
|
|
public function get($tag = null) |
|
144
|
|
|
{ |
|
145
|
|
|
if (!$this->joined) { |
|
146
|
|
|
$this->wait(); |
|
147
|
|
|
} |
|
148
|
|
|
if ($tag !== null && !$this->container->hasTag($tag)) { |
|
149
|
|
|
throw new \InvalidArgumentException('unknown tag: ' . $tag); |
|
150
|
|
|
} |
|
151
|
|
|
|
|
152
|
|
|
return $this->container->getCollection($tag); |
|
153
|
|
|
} |
|
154
|
|
|
|
|
155
|
|
|
public function setReceivedSignal($sig) |
|
156
|
|
|
{ |
|
157
|
|
|
$this->receivedSignal = $sig; |
|
158
|
|
|
} |
|
159
|
|
|
|
|
160
|
|
|
public function __destruct() |
|
161
|
|
|
{ |
|
162
|
|
|
if ($this->ownerPid === getmypid()) { |
|
163
|
|
|
if ($this->container->existsMaster()) { |
|
164
|
|
|
$this->log->info('shutdown master process.'); |
|
165
|
|
|
$this->container->sendSignalToMaster(); |
|
166
|
|
|
} |
|
167
|
|
|
|
|
168
|
|
|
unset($this->container); |
|
169
|
|
|
} |
|
170
|
|
|
|
|
171
|
|
|
if ($this->ownerPid === getmypid() && !$this->joined && $this->receivedSignal === null) { |
|
172
|
|
|
$message = 'snidel will have to wait for the child process is completed. please use Snidel::wait()'; |
|
173
|
|
|
$this->log->error($message); |
|
174
|
|
|
throw new \LogicException($message); |
|
175
|
|
|
} |
|
176
|
|
|
} |
|
177
|
|
|
} |
|
178
|
|
|
|
An exit expression should only be used in rare cases. For example, if you write a short command line script.
In most cases however, using an
exitexpression makes the code untestable and often causes incompatibilities with other libraries. Thus, unless you are absolutely sure it is required here, we recommend to refactor your code to avoid its usage.