1
|
|
|
<?php |
2
|
|
|
|
3
|
|
|
namespace DelayQueue; |
4
|
|
|
|
5
|
|
|
use GuzzleHttp\Client as HttpClient; |
6
|
|
|
use ReflectionClass; |
7
|
|
|
use DelayQueue\Exception\ClassNotFoundException; |
8
|
|
|
use DelayQueue\Exception\InvalidResponseBodyException; |
9
|
|
|
use DelayQueue\Exception\SubClassException; |
10
|
|
|
use Exception; |
11
|
|
|
|
12
|
|
|
class DelayQueue |
13
|
|
|
{ |
14
|
|
|
/** |
15
|
|
|
* @var string 延迟队列服务器地址 http://127.0.0.1:9277 |
16
|
|
|
*/ |
17
|
|
|
protected $server; |
18
|
|
|
|
19
|
|
|
/** |
20
|
|
|
* @var int httpClient超时设置 |
21
|
|
|
*/ |
22
|
|
|
protected $timeout = 10; |
23
|
|
|
|
24
|
|
|
public function __construct($server) |
25
|
|
|
{ |
26
|
|
|
$this->server = rtrim($server, '/'); |
27
|
|
|
} |
28
|
|
|
|
29
|
|
|
/** |
30
|
|
|
* @param int $timeout |
31
|
|
|
*/ |
32
|
|
|
public function setTimeout($timeout) |
33
|
|
|
{ |
34
|
|
|
$this->timeout = $timeout; |
35
|
|
|
} |
36
|
|
|
|
37
|
|
|
/** |
38
|
|
|
* @return int |
39
|
|
|
*/ |
40
|
|
|
public function getTimeout() |
41
|
|
|
{ |
42
|
|
|
return $this->timeout; |
43
|
|
|
} |
44
|
|
|
|
45
|
|
|
/** |
46
|
|
|
* 添加Job到延迟队列中 |
47
|
|
|
* |
48
|
|
|
* @param string $className 处理Job的类名, 必须是[DelayQueue\Handler\AbstractHandler]的子类 |
49
|
|
|
* @param Job $job |
50
|
|
|
* @throws ClassNotFoundException |
51
|
|
|
* @throws Exception |
52
|
|
|
* @throws InvalidResponseBodyException |
53
|
|
|
* @throws SubClassException |
54
|
|
|
*/ |
55
|
|
|
public function push($className, Job $job) |
56
|
|
|
{ |
57
|
|
|
$this->validateClassName($className); |
58
|
|
|
$job->appendValueToBody('className', $className); |
59
|
|
|
|
60
|
|
|
$response = $this->getHttpClient()->post('/push', [ |
61
|
|
|
'json' => $job, |
62
|
|
|
]); |
63
|
|
|
$this->checkResponseBody($response->json()); |
64
|
|
|
} |
65
|
|
|
|
66
|
|
|
/** |
67
|
|
|
* 从队列中取出已过期的Job |
68
|
|
|
* |
69
|
|
|
* @param array $topics 队列名称 |
70
|
|
|
* @return null|array |
71
|
|
|
* @throws Exception |
72
|
|
|
* @throws InvalidResponseBodyException |
73
|
|
|
*/ |
74
|
|
|
public function pop(array $topics) |
75
|
|
|
{ |
76
|
|
|
if (!$topics) { |
|
|
|
|
77
|
|
|
return null; |
78
|
|
|
} |
79
|
|
|
$response = $this->getHttpClient()->post('/pop', [ |
80
|
|
|
'json' => [ |
81
|
|
|
'topic' => implode(',', $topics), |
82
|
|
|
] |
83
|
|
|
]); |
84
|
|
|
|
85
|
|
|
$data = $response->json(); |
86
|
|
|
$this->checkResponseBody($data); |
87
|
|
|
if (!isset($data['data']) || empty($data['data'])) { |
88
|
|
|
return null; |
89
|
|
|
} |
90
|
|
|
|
91
|
|
|
if (!isset($data['data']['id']) || !isset($data['data']['body'])) { |
92
|
|
|
throw new InvalidResponseBodyException('response body miss required parameter, id or body'); |
93
|
|
|
} |
94
|
|
|
$id = $data['data']['id']; |
95
|
|
|
$body = json_decode($data['data']['body'], true); |
96
|
|
|
if (!isset($body['className'])) { |
97
|
|
|
throw new InvalidResponseBodyException('response body miss required parameter className'); |
98
|
|
|
} |
99
|
|
|
$className = $body['className']; |
100
|
|
|
unset($body['className']); |
101
|
|
|
|
102
|
|
|
return [ |
103
|
|
|
'className' => $className, |
104
|
|
|
'id' => $id, |
105
|
|
|
'body' => $body, |
106
|
|
|
]; |
107
|
|
|
} |
108
|
|
|
|
109
|
|
|
/** |
110
|
|
|
* 从延迟队列中删除Job |
111
|
|
|
* |
112
|
|
|
* @param string $id Job唯一标识 |
113
|
|
|
* @throws Exception |
114
|
|
|
* @throws InvalidResponseBodyException |
115
|
|
|
*/ |
116
|
|
View Code Duplication |
public function delete($id) |
|
|
|
|
117
|
|
|
{ |
118
|
|
|
$response = $this->getHttpClient()->post('/delete', [ |
119
|
|
|
'json' => [ |
120
|
|
|
'id' => $id |
121
|
|
|
] |
122
|
|
|
]); |
123
|
|
|
$body = $response->json(); |
124
|
|
|
$this->checkResponseBody($body); |
125
|
|
|
} |
126
|
|
|
|
127
|
|
|
/** |
128
|
|
|
* Job处理完成, 确认删除 |
129
|
|
|
* |
130
|
|
|
* @param string $id Job唯一标识 |
131
|
|
|
* @return true |
132
|
|
|
* @throws Exception |
133
|
|
|
* @throws InvalidResponseBodyException |
134
|
|
|
*/ |
135
|
|
View Code Duplication |
public function finish($id) |
|
|
|
|
136
|
|
|
{ |
137
|
|
|
$response = $this->getHttpClient()->post('/finish', [ |
138
|
|
|
'json' => [ |
139
|
|
|
'id' => $id, |
140
|
|
|
] |
141
|
|
|
]); |
142
|
|
|
$body = $response->json(); |
143
|
|
|
$this->checkResponseBody($body); |
144
|
|
|
} |
145
|
|
|
|
146
|
|
|
public function validateClassName($className) { |
147
|
|
|
if (!class_exists($className)) { |
148
|
|
|
throw new ClassNotFoundException(sprintf('can not find class [%s]', $className)); |
149
|
|
|
} |
150
|
|
|
$reflection = new ReflectionClass($className); |
151
|
|
|
$parentClassName = 'DelayQueue\Handler\AbstractHandler'; |
152
|
|
|
if (!$reflection->isSubclassOf($parentClassName)) { |
153
|
|
|
throw new SubClassException(sprintf('[%s] is not subclass of [%s]', $className, $parentClassName)); |
154
|
|
|
} |
155
|
|
|
} |
156
|
|
|
|
157
|
|
|
protected function getHttpClient() |
158
|
|
|
{ |
159
|
|
|
$httpClient = new HttpClient( |
160
|
|
|
[ |
161
|
|
|
'base_url' => $this->server, |
162
|
|
|
'defaults' => [ |
163
|
|
|
'timeout' => $this->timeout, |
164
|
|
|
'allow_redirects' => false, |
165
|
|
|
] |
166
|
|
|
] |
167
|
|
|
); |
168
|
|
|
|
169
|
|
|
return $httpClient; |
170
|
|
|
} |
171
|
|
|
|
172
|
|
|
/** |
173
|
|
|
* @param array $body |
174
|
|
|
* @throws Exception |
175
|
|
|
* @throws InvalidResponseBodyException |
176
|
|
|
*/ |
177
|
|
|
protected function checkResponseBody(array $body) |
178
|
|
|
{ |
179
|
|
|
if (!array_key_exists('code', $body) || !array_key_exists('message', $body)) { |
180
|
|
|
throw new InvalidResponseBodyException('response body miss required parameter, code or message'); |
181
|
|
|
} |
182
|
|
|
if ($body['code'] !== 0) { |
183
|
|
|
throw new Exception($body['message']); |
184
|
|
|
} |
185
|
|
|
} |
186
|
|
|
} |
This check marks implicit conversions of arrays to boolean values in a comparison. While in PHP an empty array is considered to be equal (but not identical) to false, this is not always apparent.
Consider making the comparison explicit by using
empty(..)
or! empty(...)
instead.