1 | <?php |
||
2 | |||
3 | declare(strict_types=1); |
||
4 | |||
5 | /** |
||
6 | * TaskScheduler |
||
7 | * |
||
8 | * @author Raffael Sahli <[email protected]> |
||
9 | * @copyright Copryright (c) 2017-2019 gyselroth GmbH (https://gyselroth.com) |
||
10 | * @license MIT https://opensource.org/licenses/MIT |
||
11 | */ |
||
12 | |||
13 | namespace TaskScheduler; |
||
14 | |||
15 | use MongoDB\BSON\ObjectId; |
||
16 | use MongoDB\BSON\UTCDateTime; |
||
17 | use MongoDB\Database; |
||
18 | use Psr\Container\ContainerInterface; |
||
19 | use Psr\Log\LoggerInterface; |
||
20 | use TaskScheduler\Exception\InvalidJobException; |
||
21 | |||
22 | class Worker |
||
23 | { |
||
24 | use InjectTrait; |
||
25 | |||
26 | /** |
||
27 | * Scheduler. |
||
28 | * |
||
29 | * @var Scheduler |
||
30 | */ |
||
31 | protected $scheduler; |
||
32 | |||
33 | /** |
||
34 | * Database. |
||
35 | * |
||
36 | * @var Database |
||
37 | */ |
||
38 | protected $db; |
||
39 | |||
40 | /** |
||
41 | * Logger. |
||
42 | * |
||
43 | * @var LoggerInterface |
||
44 | */ |
||
45 | protected $logger; |
||
46 | |||
47 | /** |
||
48 | * Container. |
||
49 | * |
||
50 | * @var ContainerInterface |
||
51 | */ |
||
52 | protected $container; |
||
53 | |||
54 | /** |
||
55 | * Local queue. |
||
56 | * |
||
57 | * @var array |
||
58 | */ |
||
59 | protected $queue = []; |
||
60 | |||
61 | /** |
||
62 | * Current processing job. |
||
63 | * |
||
64 | * @var null|array |
||
65 | */ |
||
66 | protected $current_job; |
||
67 | |||
68 | /** |
||
69 | * Process ID (fork posix pid). |
||
70 | * |
||
71 | * @var int |
||
72 | */ |
||
73 | protected $process; |
||
74 | |||
75 | /** |
||
76 | * Jobs queue. |
||
77 | * |
||
78 | * @var MessageQueue |
||
79 | */ |
||
80 | protected $jobs; |
||
81 | |||
82 | /** |
||
83 | * Worker ID. |
||
84 | * |
||
85 | * @var ObjectId |
||
86 | */ |
||
87 | protected $id; |
||
88 | |||
89 | /** |
||
90 | * Init worker. |
||
91 | */ |
||
92 | 29 | public function __construct(ObjectId $id, Scheduler $scheduler, Database $db, LoggerInterface $logger, ?ContainerInterface $container = null) |
|
93 | { |
||
94 | 29 | $this->id = $id; |
|
95 | 29 | $this->process = getmypid(); |
|
96 | 29 | $this->scheduler = $scheduler; |
|
97 | 29 | $this->db = $db; |
|
98 | 29 | $this->logger = $logger; |
|
99 | 29 | $this->container = $container; |
|
100 | 29 | $this->jobs = new MessageQueue($db, $scheduler->getJobQueue(), $scheduler->getJobQueueSize(), $logger); |
|
101 | 29 | } |
|
102 | |||
103 | /** |
||
104 | * Handle worker timeout. |
||
105 | */ |
||
106 | 5 | public function timeout(): ?ObjectId |
|
107 | { |
||
108 | 5 | if (null === $this->current_job) { |
|
109 | 2 | $this->logger->debug('reached worker timeout signal, no job is currently processing, ignore it', [ |
|
110 | 2 | 'category' => get_class($this), |
|
111 | 2 | 'pm' => $this->process, |
|
112 | ]); |
||
113 | |||
114 | 2 | return null; |
|
115 | } |
||
116 | |||
117 | 3 | $this->logger->debug('received timeout signal, reschedule current processing job ['.$this->current_job['_id'].']', [ |
|
118 | 3 | 'category' => get_class($this), |
|
119 | 3 | 'pm' => $this->process, |
|
120 | ]); |
||
121 | |||
122 | 3 | $this->updateJob($this->current_job, JobInterface::STATUS_TIMEOUT); |
|
123 | |||
124 | 3 | $this->db->{$this->scheduler->getEventQueue()}->insertOne([ |
|
125 | 3 | 'job' => $this->current_job['_id'], |
|
126 | 3 | 'worker' => $this->id, |
|
127 | 'status' => JobInterface::STATUS_TIMEOUT, |
||
128 | 3 | 'timestamp' => new UTCDateTime(), |
|
129 | ]); |
||
130 | |||
131 | 3 | $job = $this->current_job; |
|
132 | |||
133 | 3 | if (0 !== $job['options']['retry']) { |
|
134 | 1 | $this->logger->debug('failed job ['.$job['_id'].'] has a retry interval of ['.$job['options']['retry'].']', [ |
|
135 | 1 | 'category' => get_class($this), |
|
136 | 1 | 'pm' => $this->process, |
|
137 | ]); |
||
138 | |||
139 | 1 | --$job['options']['retry']; |
|
140 | 1 | $job['options']['at'] = time() + $job['options']['retry_interval']; |
|
141 | 1 | $job = $this->scheduler->addJob($job['class'], $job['data'], $job['options']); |
|
142 | |||
143 | 1 | return $job->getId(); |
|
144 | } |
||
145 | 2 | if ($job['options']['interval'] > 0) { |
|
146 | 1 | $this->logger->debug('job ['.$job['_id'].'] has an interval of ['.$job['options']['interval'].'s]', [ |
|
147 | 1 | 'category' => get_class($this), |
|
148 | 1 | 'pm' => $this->process, |
|
149 | ]); |
||
150 | |||
151 | 1 | $job['options']['at'] = time() + $job['options']['interval']; |
|
152 | 1 | $job = $this->scheduler->addJob($job['class'], $job['data'], $job['options']); |
|
153 | |||
154 | 1 | return $job->getId(); |
|
155 | } |
||
156 | 1 | if ($job['options']['interval'] <= -1) { |
|
157 | $this->logger->debug('job ['.$job['_id'].'] has an endless interval', [ |
||
158 | 'category' => get_class($this), |
||
159 | 'pm' => $this->process, |
||
160 | ]); |
||
161 | |||
162 | unset($job['options']['at']); |
||
163 | $job = $this->scheduler->addJob($job['class'], $job['data'], $job['options']); |
||
164 | |||
165 | return $job->getId(); |
||
166 | } |
||
167 | |||
168 | 1 | $this->current_job = null; |
|
169 | 1 | posix_kill($this->process, SIGTERM); |
|
170 | |||
171 | 1 | return null; |
|
172 | } |
||
173 | |||
174 | /** |
||
175 | * Start worker. |
||
176 | */ |
||
177 | 11 | public function processAll(): void |
|
178 | { |
||
179 | 11 | $this->logger->info('start job listener', [ |
|
180 | 11 | 'category' => get_class($this), |
|
181 | ]); |
||
182 | |||
183 | 11 | $cursor = $this->jobs->getCursor([ |
|
184 | 11 | 'options.force_spawn' => false, |
|
185 | '$or' => [ |
||
186 | ['status' => JobInterface::STATUS_WAITING], |
||
187 | ['status' => JobInterface::STATUS_POSTPONED], |
||
188 | ], |
||
189 | ]); |
||
190 | |||
191 | 11 | $this->catchSignal(); |
|
192 | |||
193 | 11 | while ($this->loop()) { |
|
194 | 11 | $this->processLocalQueue(); |
|
195 | |||
196 | 11 | if (null === $cursor->current()) { |
|
197 | 2 | if ($cursor->getInnerIterator()->isDead()) { |
|
0 ignored issues
–
show
Bug
introduced
by
![]() |
|||
198 | $this->logger->error('job queue cursor is dead, is it a capped collection?', [ |
||
199 | 'category' => get_class($this), |
||
200 | 'pm' => $this->process, |
||
201 | ]); |
||
202 | |||
203 | $this->jobs->create(); |
||
204 | |||
205 | $this->processAll(); |
||
206 | |||
207 | break; |
||
208 | } |
||
209 | |||
210 | 2 | $this->jobs->next($cursor, function () { |
|
211 | $this->processAll(); |
||
212 | 2 | }); |
|
213 | |||
214 | 2 | continue; |
|
215 | } |
||
216 | |||
217 | 10 | $job = $cursor->current(); |
|
218 | |||
219 | 10 | $this->logger->debug('found job ['.$job['_id'].'] in queue with status ['.$job['status'].']', [ |
|
220 | 10 | 'category' => get_class($this), |
|
221 | ]); |
||
222 | |||
223 | 10 | $this->jobs->next($cursor, function () { |
|
224 | $this->processAll(); |
||
225 | 10 | }); |
|
226 | |||
227 | 10 | $this->queueJob($job); |
|
228 | } |
||
229 | 11 | } |
|
230 | |||
231 | /** |
||
232 | * Process one. |
||
233 | */ |
||
234 | 7 | public function processOne(ObjectId $id): void |
|
235 | { |
||
236 | 7 | $this->catchSignal(); |
|
237 | |||
238 | 7 | $this->logger->debug('process job ['.$id.'] and exit', [ |
|
239 | 7 | 'category' => get_class($this), |
|
240 | ]); |
||
241 | |||
242 | try { |
||
243 | 7 | $job = $this->scheduler->getJob($id)->toArray(); |
|
244 | 7 | $this->queueJob($job); |
|
245 | } catch (\Exception $e) { |
||
246 | $this->logger->error('failed process job ['.$id.']', [ |
||
247 | 'category' => get_class($this), |
||
248 | 'exception' => $e, |
||
249 | ]); |
||
250 | } |
||
251 | 7 | } |
|
252 | |||
253 | /** |
||
254 | * Cleanup and exit. |
||
255 | */ |
||
256 | 3 | public function cleanup() |
|
257 | { |
||
258 | 3 | $this->saveState(); |
|
259 | |||
260 | 3 | if (null === $this->current_job) { |
|
261 | 2 | $this->logger->debug('received cleanup call on worker ['.$this->id.'], no job is currently processing, exit now', [ |
|
262 | 2 | 'category' => get_class($this), |
|
263 | 2 | 'pm' => $this->process, |
|
264 | ]); |
||
265 | |||
266 | 2 | $this->exit(); |
|
267 | |||
268 | 2 | return null; |
|
269 | } |
||
270 | |||
271 | 1 | $this->logger->debug('received cleanup call on worker ['.$this->id.'], reschedule current processing job ['.$this->current_job['_id'].']', [ |
|
272 | 1 | 'category' => get_class($this), |
|
273 | 1 | 'pm' => $this->process, |
|
274 | ]); |
||
275 | |||
276 | 1 | $this->updateJob($this->current_job, JobInterface::STATUS_CANCELED); |
|
277 | |||
278 | 1 | $this->db->{$this->scheduler->getEventQueue()}->insertOne([ |
|
279 | 1 | 'job' => $this->current_job['_id'], |
|
280 | 1 | 'worker' => $this->id, |
|
281 | 'status' => JobInterface::STATUS_CANCELED, |
||
282 | 1 | 'timestamp' => new UTCDateTime(), |
|
283 | ]); |
||
284 | |||
285 | 1 | $options = $this->current_job['options']; |
|
286 | 1 | $options['at'] = 0; |
|
287 | |||
288 | 1 | $result = $this->scheduler->addJob($this->current_job['class'], $this->current_job['data'], $options)->getId(); |
|
289 | 1 | $this->exit(); |
|
290 | |||
291 | 1 | return $result; |
|
292 | } |
||
293 | |||
294 | /** |
||
295 | * Save local queue. |
||
296 | */ |
||
297 | 3 | protected function saveState(): self |
|
298 | { |
||
299 | 3 | foreach ($this->queue as $key => $job) { |
|
300 | 1 | $this->db->selectCollection($this->scheduler->getJobQueue())->updateOne( |
|
301 | 1 | ['_id' => $job['_id'], '$isolated' => true], |
|
302 | 1 | ['$setOnInsert' => $job], |
|
303 | 1 | ['upsert' => true] |
|
304 | ); |
||
305 | } |
||
306 | |||
307 | 3 | return $this; |
|
308 | } |
||
309 | |||
310 | /** |
||
311 | * Catch signals and cleanup. |
||
312 | */ |
||
313 | 19 | protected function catchSignal(): self |
|
314 | { |
||
315 | 19 | pcntl_async_signals(true); |
|
316 | 19 | pcntl_signal(SIGTERM, [$this, 'cleanup']); |
|
317 | 19 | pcntl_signal(SIGINT, [$this, 'cleanup']); |
|
318 | 19 | pcntl_signal(SIGALRM, [$this, 'timeout']); |
|
319 | |||
320 | 19 | return $this; |
|
321 | } |
||
322 | |||
323 | /** |
||
324 | * Queue job. |
||
325 | */ |
||
326 | 17 | protected function queueJob(array $job): bool |
|
327 | { |
||
328 | 17 | if (!isset($job['status'])) { |
|
329 | return false; |
||
330 | } |
||
331 | |||
332 | 17 | if (true === $this->collectJob($job, JobInterface::STATUS_PROCESSING)) { |
|
333 | 17 | $this->processJob($job); |
|
334 | 1 | } elseif (JobInterface::STATUS_POSTPONED === $job['status']) { |
|
335 | 1 | $this->logger->debug('found postponed job ['.$job['_id'].'] to requeue', [ |
|
336 | 1 | 'category' => get_class($this), |
|
337 | 1 | 'pm' => $this->process, |
|
338 | ]); |
||
339 | |||
340 | 1 | $this->queue[(string) $job['_id']] = $job; |
|
341 | } |
||
342 | |||
343 | 17 | return true; |
|
344 | } |
||
345 | |||
346 | /** |
||
347 | * Update job status. |
||
348 | */ |
||
349 | 19 | protected function collectJob(array $job, int $status, $from_status = JobInterface::STATUS_WAITING): bool |
|
350 | { |
||
351 | $set = [ |
||
352 | 19 | 'status' => $status, |
|
353 | ]; |
||
354 | |||
355 | 19 | if (JobInterface::STATUS_PROCESSING === $status) { |
|
356 | 19 | $set['started'] = new UTCDateTime(); |
|
357 | 19 | $set['worker'] = $this->id; |
|
358 | } |
||
359 | |||
360 | 19 | $result = $this->db->{$this->scheduler->getJobQueue()}->updateMany([ |
|
361 | 19 | '_id' => $job['_id'], |
|
362 | 19 | 'status' => $from_status, |
|
363 | '$isolated' => true, |
||
364 | ], [ |
||
365 | 19 | '$set' => $set, |
|
366 | ]); |
||
367 | |||
368 | 19 | $this->logger->debug('collect job ['.$job['_id'].'] with status ['.$from_status.']', [ |
|
369 | 19 | 'category' => get_class($this), |
|
370 | 19 | 'pm' => $this->process, |
|
371 | ]); |
||
372 | |||
373 | 19 | if (1 === $result->getModifiedCount()) { |
|
374 | 19 | $this->logger->debug('job ['.$job['_id'].'] collected; update status to ['.$status.']', [ |
|
375 | 19 | 'category' => get_class($this), |
|
376 | 19 | 'pm' => $this->process, |
|
377 | ]); |
||
378 | |||
379 | 19 | $this->db->{$this->scheduler->getEventQueue()}->insertOne([ |
|
380 | 19 | 'job' => $job['_id'], |
|
381 | 19 | 'worker' => $this->id, |
|
382 | 19 | 'status' => $status, |
|
383 | 19 | 'timestamp' => new UTCDateTime(), |
|
384 | ]); |
||
385 | |||
386 | 19 | return true; |
|
387 | } |
||
388 | |||
389 | 2 | $this->logger->debug('job ['.$job['_id'].'] is already collected with status ['.$job['status'].']', [ |
|
390 | 2 | 'category' => get_class($this), |
|
391 | 2 | 'pm' => $this->process, |
|
392 | ]); |
||
393 | |||
394 | 2 | return false; |
|
395 | } |
||
396 | |||
397 | /** |
||
398 | * Update job status. |
||
399 | */ |
||
400 | 22 | protected function updateJob(array $job, int $status): bool |
|
401 | { |
||
402 | $set = [ |
||
403 | 22 | 'status' => $status, |
|
404 | ]; |
||
405 | |||
406 | 22 | if ($status >= JobInterface::STATUS_DONE) { |
|
407 | 18 | $set['ended'] = new UTCDateTime(); |
|
408 | } |
||
409 | |||
410 | 22 | $result = $this->db->{$this->scheduler->getJobQueue()}->updateMany([ |
|
411 | 22 | '_id' => $job['_id'], |
|
412 | '$isolated' => true, |
||
413 | ], [ |
||
414 | 22 | '$set' => $set, |
|
415 | ]); |
||
416 | |||
417 | 22 | return $result->isAcknowledged(); |
|
418 | } |
||
419 | |||
420 | /** |
||
421 | * Check local queue for postponed jobs. |
||
422 | */ |
||
423 | 11 | protected function processLocalQueue(): bool |
|
424 | { |
||
425 | 11 | $now = time(); |
|
426 | 11 | foreach ($this->queue as $key => $job) { |
|
427 | 2 | $this->db->{$this->scheduler->getJobQueue()}->updateOne( |
|
428 | 2 | ['_id' => $job['_id'], '$isolated' => true], |
|
429 | 2 | ['$setOnInsert' => $job], |
|
430 | 2 | ['upsert' => true] |
|
431 | ); |
||
432 | |||
433 | 2 | if ($job['options']['at'] <= $now) { |
|
434 | 2 | $this->logger->info('postponed job ['.$job['_id'].'] ['.$job['class'].'] can now be executed', [ |
|
435 | 2 | 'category' => get_class($this), |
|
436 | 2 | 'pm' => $this->process, |
|
437 | ]); |
||
438 | |||
439 | 2 | unset($this->queue[$key]); |
|
440 | 2 | $job['options']['at'] = 0; |
|
441 | |||
442 | 2 | if (true === $this->collectJob($job, JobInterface::STATUS_PROCESSING, JobInterface::STATUS_POSTPONED)) { |
|
443 | 2 | $this->processJob($job); |
|
444 | } |
||
445 | } |
||
446 | } |
||
447 | |||
448 | 11 | return true; |
|
449 | } |
||
450 | |||
451 | /** |
||
452 | * Process job. |
||
453 | */ |
||
454 | 17 | protected function processJob(array $job): ObjectId |
|
455 | { |
||
456 | 17 | $now = time(); |
|
457 | |||
458 | 17 | if ($job['options']['at'] > $now) { |
|
459 | 5 | $this->updateJob($job, JobInterface::STATUS_POSTPONED); |
|
460 | 5 | $job['status'] = JobInterface::STATUS_POSTPONED; |
|
461 | 5 | $this->queue[(string) $job['_id']] = $job; |
|
462 | |||
463 | 5 | $this->logger->debug('execution of job ['.$job['_id'].'] ['.$job['class'].'] is postponed at ['.$job['options']['at'].']', [ |
|
464 | 5 | 'category' => get_class($this), |
|
465 | 5 | 'pm' => $this->process, |
|
466 | ]); |
||
467 | |||
468 | 5 | return $job['_id']; |
|
469 | } |
||
470 | |||
471 | 14 | $this->logger->debug('execute job ['.$job['_id'].'] ['.$job['class'].'] on worker ['.$this->id.']', [ |
|
472 | 14 | 'category' => get_class($this), |
|
473 | 14 | 'pm' => $this->process, |
|
474 | 14 | 'options' => $job['options'], |
|
475 | 14 | 'params' => $job['data'], |
|
476 | ]); |
||
477 | |||
478 | 14 | $this->current_job = $job; |
|
479 | 14 | pcntl_alarm($job['options']['timeout']); |
|
480 | |||
481 | try { |
||
482 | 14 | $this->executeJob($job); |
|
483 | 8 | $this->current_job = null; |
|
484 | 6 | } catch (\Throwable $e) { |
|
485 | 6 | pcntl_alarm(0); |
|
486 | |||
487 | 6 | $this->logger->error('failed execute job ['.$job['_id'].'] of type ['.$job['class'].'] on worker ['.$this->id.']', [ |
|
488 | 6 | 'category' => get_class($this), |
|
489 | 6 | 'pm' => $this->process, |
|
490 | 6 | 'exception' => $e, |
|
491 | ]); |
||
492 | |||
493 | 6 | $this->updateJob($job, JobInterface::STATUS_FAILED); |
|
494 | 6 | $this->current_job = null; |
|
495 | |||
496 | 6 | $this->db->{$this->scheduler->getEventQueue()}->insertOne([ |
|
497 | 6 | 'job' => $job['_id'], |
|
498 | 6 | 'worker' => $this->id, |
|
499 | 'status' => JobInterface::STATUS_FAILED, |
||
500 | 6 | 'timestamp' => new UTCDateTime(), |
|
501 | 'exception' => [ |
||
502 | 6 | 'class' => get_class($e), |
|
503 | 6 | 'message' => $e->getMessage(), |
|
504 | 6 | 'file' => $e->getFile(), |
|
505 | 6 | 'line' => $e->getLine(), |
|
506 | 6 | 'code' => $e->getCode(), |
|
507 | ], |
||
508 | ]); |
||
509 | |||
510 | 6 | if (0 !== $job['options']['retry']) { |
|
511 | 4 | $this->logger->debug('failed job ['.$job['_id'].'] has a retry interval of ['.$job['options']['retry'].']', [ |
|
512 | 4 | 'category' => get_class($this), |
|
513 | 4 | 'pm' => $this->process, |
|
514 | ]); |
||
515 | |||
516 | 4 | --$job['options']['retry']; |
|
517 | 4 | $job['options']['at'] = time() + $job['options']['retry_interval']; |
|
518 | 4 | $job = $this->scheduler->addJob($job['class'], $job['data'], $job['options']); |
|
519 | |||
520 | 4 | return $job->getId(); |
|
521 | } |
||
522 | } |
||
523 | |||
524 | 11 | pcntl_alarm(0); |
|
525 | |||
526 | 11 | if ($job['options']['interval'] > 0) { |
|
527 | 1 | $this->logger->debug('job ['.$job['_id'].'] has an interval of ['.$job['options']['interval'].'s]', [ |
|
528 | 1 | 'category' => get_class($this), |
|
529 | 1 | 'pm' => $this->process, |
|
530 | ]); |
||
531 | |||
532 | 1 | $job['options']['at'] = time() + $job['options']['interval']; |
|
533 | 1 | $job = $this->scheduler->addJob($job['class'], $job['data'], $job['options']); |
|
534 | |||
535 | 1 | return $job->getId(); |
|
536 | } |
||
537 | 10 | if ($job['options']['interval'] <= -1) { |
|
538 | 1 | $this->logger->debug('job ['.$job['_id'].'] has an endless interval', [ |
|
539 | 1 | 'category' => get_class($this), |
|
540 | 1 | 'pm' => $this->process, |
|
541 | ]); |
||
542 | |||
543 | 1 | unset($job['options']['at']); |
|
544 | 1 | $job = $this->scheduler->addJob($job['class'], $job['data'], $job['options']); |
|
545 | |||
546 | 1 | return $job->getId(); |
|
547 | } |
||
548 | |||
549 | 9 | return $job['_id']; |
|
550 | } |
||
551 | |||
552 | /** |
||
553 | * Execute job. |
||
554 | */ |
||
555 | 15 | protected function executeJob(array $job): bool |
|
556 | { |
||
557 | 15 | if (!class_exists($job['class'])) { |
|
558 | 1 | throw new InvalidJobException('job class does not exists'); |
|
559 | } |
||
560 | |||
561 | 14 | if (null === $this->container) { |
|
562 | 13 | $instance = new $job['class'](); |
|
563 | } else { |
||
564 | 1 | $instance = $this->container->get($job['class']); |
|
565 | } |
||
566 | |||
567 | 14 | if (!($instance instanceof JobInterface)) { |
|
568 | throw new InvalidJobException('job must implement JobInterface'); |
||
569 | } |
||
570 | |||
571 | $instance |
||
572 | 14 | ->setData($job['data']) |
|
573 | 14 | ->setId($job['_id']) |
|
574 | 14 | ->start(); |
|
575 | |||
576 | 8 | $return = $this->updateJob($job, JobInterface::STATUS_DONE); |
|
577 | |||
578 | 8 | $this->db->{$this->scheduler->getEventQueue()}->insertOne([ |
|
579 | 8 | 'job' => $job['_id'], |
|
580 | 8 | 'worker' => $this->id, |
|
581 | 'status' => JobInterface::STATUS_DONE, |
||
582 | 8 | 'timestamp' => new UTCDateTime(), |
|
583 | ]); |
||
584 | |||
585 | 8 | unset($instance); |
|
586 | |||
587 | 8 | return $return; |
|
588 | } |
||
589 | } |
||
590 |