1
|
|
|
<?php |
2
|
|
|
|
3
|
|
|
declare(ticks = 1); |
4
|
|
|
|
5
|
|
|
namespace DelayQueue\Process; |
6
|
|
|
|
7
|
|
|
use DelayQueue\Container\ContainerAwareTrait; |
8
|
|
|
use Exception; |
9
|
|
|
use DelayQueue\Handler\AbstractHandler; |
10
|
|
|
|
11
|
|
|
class Worker |
12
|
|
|
{ |
13
|
|
|
use ContainerAwareTrait; |
14
|
|
|
|
15
|
|
|
/** |
16
|
|
|
* @var array 轮询队列 |
17
|
|
|
*/ |
18
|
|
|
protected $topics; |
19
|
|
|
|
20
|
|
|
/** |
21
|
|
|
* @var bool 是否在下次循环中退出 |
22
|
|
|
*/ |
23
|
|
|
protected $shutdown = false; |
24
|
|
|
|
25
|
|
|
public function setTopics(array $topics) |
26
|
|
|
{ |
27
|
|
|
$this->topics = $topics; |
28
|
|
|
} |
29
|
|
|
|
30
|
|
|
|
31
|
|
|
public function run() |
32
|
|
|
{ |
33
|
|
|
$this->registerSignalHandlers(); |
34
|
|
|
while(true) { |
35
|
|
|
if ($this->shutdown) { |
36
|
|
|
break; |
37
|
|
|
} |
38
|
|
|
$data = null; |
39
|
|
|
try { |
40
|
|
|
$data = $this->delayQueue->pop($this->topics); |
41
|
|
|
} catch (Exception $exception) { |
42
|
|
|
$this->logger->warning(sprintf('polling queue exception: %s', $exception->getMessage())); |
43
|
|
|
continue; |
44
|
|
|
} |
45
|
|
|
|
46
|
|
|
if (!$data) { |
47
|
|
|
// 空轮询 |
48
|
|
|
continue; |
49
|
|
|
} |
50
|
|
|
|
51
|
|
|
try { |
52
|
|
|
$this->delayQueue->validateClassName($data['className']); |
53
|
|
|
} catch(Exception $exception) { |
54
|
|
|
$this->logger->emergency($exception->getMessage()); |
55
|
|
|
continue; |
56
|
|
|
} |
57
|
|
|
|
58
|
|
|
$this->perform($data); |
59
|
|
|
} |
60
|
|
|
} |
61
|
|
|
|
62
|
|
|
protected function perform(array $data) |
63
|
|
|
{ |
64
|
|
|
$pid = pcntl_fork(); |
65
|
|
|
if ($pid< 0) { |
66
|
|
|
$this->logger->emergency('Unable to fork child worker', ['job' => $data]); |
67
|
|
|
return; |
68
|
|
|
} |
69
|
|
|
if ($pid === 0) { |
70
|
|
|
// 子进程 |
71
|
|
|
/** @var AbstractHandler $class */ |
72
|
|
|
$class = new $data['className']($this->container); |
73
|
|
|
$class->setId($data['id']); |
74
|
|
|
$class->setBody($data['body']); |
75
|
|
|
$this->logger->info('Start processing Job', ['data' => $data]); |
76
|
|
|
$class->run(); |
77
|
|
|
$this->logger->info('Job finished', ['data' => $data]); |
78
|
|
|
exit(0); |
|
|
|
|
79
|
|
|
} |
80
|
|
|
// 父进程 |
81
|
|
|
$status = null; |
82
|
|
|
pcntl_wait($status); |
83
|
|
|
$exitStatus = pcntl_wexitstatus($status); |
84
|
|
|
if ($exitStatus !== 0) { |
85
|
|
|
// 执行失败 |
86
|
|
|
$this->logger->warning('Job exited with exit code ' . $exitStatus); |
87
|
|
|
} |
88
|
|
|
} |
89
|
|
|
|
90
|
|
|
/** |
91
|
|
|
* 注册信号处理 |
92
|
|
|
*/ |
93
|
|
|
protected function registerSignalHandlers() |
94
|
|
|
{ |
95
|
|
|
pcntl_signal(SIGTERM, [$this, 'shutdown']); |
96
|
|
|
pcntl_signal(SIGINT , [$this, 'shutdown']); |
97
|
|
|
} |
98
|
|
|
|
99
|
|
|
/** |
100
|
|
|
* 无Job处理时退出 |
101
|
|
|
*/ |
102
|
|
|
public function shutdown() |
103
|
|
|
{ |
104
|
|
|
$this->logger->notice('Shutting down'); |
105
|
|
|
$this->shutdown = true; |
106
|
|
|
} |
107
|
|
|
} |
An exit expression should only be used in rare cases. For example, if you write a short command line script.
In most cases however, using an
exit
expression makes the code untestable and often causes incompatibilities with other libraries. Thus, unless you are absolutely sure it is required here, we recommend to refactor your code to avoid its usage.