1 | <?php |
||||
2 | |||||
3 | declare(strict_types=1); |
||||
4 | |||||
5 | /** |
||||
6 | * TaskScheduler |
||||
7 | * |
||||
8 | * @author Raffael Sahli <[email protected]> |
||||
9 | * @copyright Copryright (c) 2017-2019 gyselroth GmbH (https://gyselroth.com) |
||||
10 | * @license MIT https://opensource.org/licenses/MIT |
||||
11 | */ |
||||
12 | |||||
13 | namespace TaskScheduler; |
||||
14 | |||||
15 | use MongoDB\BSON\UTCDateTime; |
||||
16 | use MongoDB\Database; |
||||
17 | use Psr\Log\LoggerInterface; |
||||
18 | use TaskScheduler\Exception\SpawnForkException; |
||||
19 | |||||
20 | class Queue |
||||
21 | { |
||||
22 | use InjectTrait; |
||||
23 | |||||
24 | /** |
||||
25 | * Database. |
||||
26 | * |
||||
27 | * @var Database |
||||
28 | */ |
||||
29 | protected $db; |
||||
30 | |||||
31 | /** |
||||
32 | * Logger. |
||||
33 | * |
||||
34 | * @var LoggerInterface |
||||
35 | */ |
||||
36 | protected $logger; |
||||
37 | |||||
38 | /** |
||||
39 | * Worker factory. |
||||
40 | * |
||||
41 | * @var WorkerFactoryInterface |
||||
42 | */ |
||||
43 | protected $factory; |
||||
44 | |||||
45 | /** |
||||
46 | * Jobs queue. |
||||
47 | * |
||||
48 | * @var MessageQueue |
||||
49 | */ |
||||
50 | protected $jobs; |
||||
51 | |||||
52 | /** |
||||
53 | * Events queue. |
||||
54 | * |
||||
55 | * @var MessageQueue |
||||
56 | */ |
||||
57 | protected $events; |
||||
58 | |||||
59 | /** |
||||
60 | * Worker manager pid. |
||||
61 | * |
||||
62 | * @var int |
||||
63 | */ |
||||
64 | protected $manager_pid; |
||||
65 | |||||
66 | /** |
||||
67 | * Sysmfsg queue. |
||||
68 | * |
||||
69 | * @var resource |
||||
70 | */ |
||||
71 | protected $queue; |
||||
72 | |||||
73 | /** |
||||
74 | * Init queue. |
||||
75 | */ |
||||
76 | 1 | public function __construct(Scheduler $scheduler, Database $db, WorkerFactoryInterface $factory, LoggerInterface $logger) |
|||
77 | { |
||||
78 | 1 | $this->db = $db; |
|||
79 | 1 | $this->logger = $logger; |
|||
80 | 1 | $this->factory = $factory; |
|||
81 | 1 | $this->jobs = new MessageQueue($db, $scheduler->getJobQueue(), $scheduler->getJobQueueSize(), $logger); |
|||
82 | 1 | $this->events = new MessageQueue($db, $scheduler->getEventQueue(), $scheduler->getEventQueueSize(), $logger); |
|||
83 | 1 | } |
|||
84 | |||||
85 | /** |
||||
86 | * Startup (blocking process). |
||||
87 | */ |
||||
88 | 1 | public function process(): void |
|||
89 | { |
||||
90 | try { |
||||
91 | 1 | $this->queue = msg_get_queue(ftok(__FILE__, 't')); |
|||
92 | 1 | $this->catchSignal(); |
|||
93 | 1 | $this->initWorkerManager(); |
|||
94 | 1 | $this->main(); |
|||
95 | } catch (\Exception $e) { |
||||
96 | $this->logger->error('main() throw an exception, cleanup and exit', [ |
||||
97 | 'class' => get_class($this), |
||||
98 | 'exception' => $e, |
||||
99 | ]); |
||||
100 | |||||
101 | $this->cleanup(SIGTERM); |
||||
102 | } |
||||
103 | 1 | } |
|||
104 | |||||
105 | /** |
||||
106 | * Wait for worker manager. |
||||
107 | */ |
||||
108 | public function exitWorkerManager(int $sig, array $pid): void |
||||
109 | { |
||||
110 | $this->logger->debug('fork manager ['.$pid['pid'].'] exit with ['.$sig.']', [ |
||||
111 | 'category' => get_class($this), |
||||
112 | ]); |
||||
113 | |||||
114 | pcntl_waitpid($pid['pid'], $status, WNOHANG | WUNTRACED); |
||||
115 | $this->cleanup(SIGTERM); |
||||
116 | } |
||||
117 | |||||
118 | /** |
||||
119 | * Cleanup. |
||||
120 | */ |
||||
121 | public function cleanup(int $sig): void |
||||
122 | { |
||||
123 | if (null !== $this->manager_pid) { |
||||
124 | $this->logger->debug('received exit signal ['.$sig.'], forward signal to the fork manager ['.$sig.']', [ |
||||
125 | 'category' => get_class($this), |
||||
126 | ]); |
||||
127 | |||||
128 | posix_kill($this->manager_pid, $sig); |
||||
129 | } |
||||
130 | |||||
131 | $this->exit(); |
||||
132 | } |
||||
133 | |||||
134 | /** |
||||
135 | * Fork a worker manager. |
||||
136 | */ |
||||
137 | 1 | protected function initWorkerManager() |
|||
138 | { |
||||
139 | 1 | $pid = pcntl_fork(); |
|||
140 | 1 | $this->manager_pid = $pid; |
|||
141 | |||||
142 | 1 | if (-1 === $pid) { |
|||
143 | throw new SpawnForkException('failed to spawn fork manager'); |
||||
144 | } |
||||
145 | |||||
146 | 1 | if (!$pid) { |
|||
147 | $manager = $this->factory->buildManager(); |
||||
148 | $manager->process(); |
||||
149 | exit(); |
||||
0 ignored issues
–
show
|
|||||
150 | } |
||||
151 | 1 | } |
|||
152 | |||||
153 | /** |
||||
154 | * Fork handling, blocking process. |
||||
155 | */ |
||||
156 | 1 | protected function main(): void |
|||
157 | { |
||||
158 | 1 | $this->logger->info('start job listener', [ |
|||
159 | 1 | 'category' => get_class($this), |
|||
160 | ]); |
||||
161 | |||||
162 | 1 | $cursor_jobs = $this->jobs->getCursor([ |
|||
163 | 1 | '$or' => [ |
|||
164 | ['status' => JobInterface::STATUS_WAITING], |
||||
165 | ['status' => JobInterface::STATUS_POSTPONED], |
||||
166 | ], |
||||
167 | ]); |
||||
168 | |||||
169 | 1 | $cursor_events = $this->events->getCursor([ |
|||
170 | 1 | 'timestamp' => ['$gte' => new UTCDateTime()], |
|||
171 | 'job' => ['$exists' => true], |
||||
172 | 'status' => ['$gt' => JobInterface::STATUS_POSTPONED], |
||||
173 | ]); |
||||
174 | |||||
175 | 1 | $this->catchSignal(); |
|||
176 | |||||
177 | 1 | while ($this->loop()) { |
|||
178 | 1 | while ($this->loop()) { |
|||
179 | if (null === $cursor_events->current()) { |
||||
180 | if ($cursor_events->getInnerIterator()->isDead()) { |
||||
0 ignored issues
–
show
The method
isDead() does not exist on Iterator . It seems like you code against a sub-type of Iterator such as Helmich\MongoMock\MockCursor or Nette\Iterators\CachingIterator .
(
Ignorable by Annotation
)
If this is a false-positive, you can also ignore this issue in your code via the
![]() |
|||||
181 | $this->logger->error('event queue cursor is dead, is it a capped collection?', [ |
||||
182 | 'category' => get_class($this), |
||||
183 | ]); |
||||
184 | |||||
185 | $this->events->create(); |
||||
186 | |||||
187 | $this->main(); |
||||
188 | |||||
189 | break; |
||||
190 | } |
||||
191 | |||||
192 | $this->events->next($cursor_events, function () { |
||||
193 | $this->main(); |
||||
194 | }); |
||||
195 | } |
||||
196 | |||||
197 | $event = $cursor_events->current(); |
||||
198 | $this->events->next($cursor_events, function () { |
||||
199 | $this->main(); |
||||
200 | }); |
||||
201 | |||||
202 | if($event === null) { |
||||
203 | break; |
||||
204 | } |
||||
205 | |||||
206 | $this->handleEvent($event); |
||||
207 | } |
||||
208 | |||||
209 | 1 | if (null === $cursor_jobs->current()) { |
|||
210 | 1 | if ($cursor_jobs->getInnerIterator()->isDead()) { |
|||
211 | $this->logger->error('job queue cursor is dead, is it a capped collection?', [ |
||||
212 | 'category' => get_class($this), |
||||
213 | ]); |
||||
214 | |||||
215 | $this->jobs->create(); |
||||
216 | $this->main(); |
||||
217 | |||||
218 | break; |
||||
219 | } |
||||
220 | |||||
221 | 1 | $this->jobs->next($cursor_jobs, function () { |
|||
222 | $this->main(); |
||||
223 | 1 | }); |
|||
224 | |||||
225 | 1 | continue; |
|||
226 | } |
||||
227 | |||||
228 | $job = $cursor_jobs->current(); |
||||
229 | |||||
230 | $this->jobs->next($cursor_jobs, function () { |
||||
231 | $this->main(); |
||||
232 | }); |
||||
233 | |||||
234 | $this->handleJob($job); |
||||
235 | } |
||||
236 | 1 | } |
|||
237 | |||||
238 | /** |
||||
239 | * Handle events. |
||||
240 | */ |
||||
241 | protected function handleEvent(array $event): self |
||||
242 | { |
||||
243 | $this->logger->debug('received event ['.$event['status'].'] for job ['.$event['job'].'], write into systemv queue', [ |
||||
244 | 'category' => get_class($this), |
||||
245 | ]); |
||||
246 | |||||
247 | msg_send($this->queue, WorkerManager::TYPE_EVENT, $event); |
||||
248 | |||||
249 | return $this; |
||||
250 | } |
||||
251 | |||||
252 | /** |
||||
253 | * Handle job. |
||||
254 | */ |
||||
255 | protected function handleJob(array $job): self |
||||
256 | { |
||||
257 | $this->logger->debug('received job ['.$job['_id'].'], write in systemv message queue', [ |
||||
258 | 'category' => get_class($this), |
||||
259 | ]); |
||||
260 | |||||
261 | msg_send($this->queue, WorkerManager::TYPE_JOB, [ |
||||
262 | '_id' => $job['_id'], |
||||
263 | 'options' => $job['options'], |
||||
264 | ]); |
||||
265 | |||||
266 | return $this; |
||||
267 | } |
||||
268 | |||||
269 | /** |
||||
270 | * Catch signals and cleanup. |
||||
271 | */ |
||||
272 | 1 | protected function catchSignal(): self |
|||
273 | { |
||||
274 | 1 | pcntl_async_signals(true); |
|||
275 | 1 | pcntl_signal(SIGTERM, [$this, 'cleanup']); |
|||
276 | 1 | pcntl_signal(SIGINT, [$this, 'cleanup']); |
|||
277 | 1 | pcntl_signal(SIGCHLD, [$this, 'exitWorkerManager']); |
|||
278 | |||||
279 | 1 | return $this; |
|||
280 | } |
||||
281 | } |
||||
282 |
In general, usage of exit should be done with care and only when running in a scripting context like a CLI script.