Total Complexity | 44 |
Total Lines | 335 |
Duplicated Lines | 0 % |
Changes | 10 | ||
Bugs | 0 | Features | 0 |
Complex classes like BeanstalkClient often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.
Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.
While breaking up the class, it is a good idea to analyze how other classes use BeanstalkClient, and based on these observations, apply Extract Interface, too.
1 | <?php |
||
17 | class BeanstalkClient extends Injectable |
||
18 | { |
||
19 | /** @var Pheanstalk */ |
||
20 | private Pheanstalk $queue; |
||
21 | private bool $connected = false; |
||
22 | private array $subscriptions = []; |
||
23 | private string $tube; |
||
24 | private int $reconnectsCount = 0; |
||
25 | private $message; |
||
26 | private $timeout_handler; |
||
27 | private $error_handler; |
||
28 | |||
29 | private string $port; |
||
30 | |||
31 | /** |
||
32 | * BeanstalkClient constructor. |
||
33 | * |
||
34 | * @param string $tube |
||
35 | * @param string $port |
||
36 | */ |
||
37 | public function __construct($tube = 'default', $port = '') |
||
42 | } |
||
43 | |||
44 | /** |
||
45 | * Recreates connection to the beanstalkd server |
||
46 | */ |
||
47 | public function reconnect(): void |
||
48 | { |
||
49 | $config = $this->di->get('config')->beanstalk; |
||
50 | $port = $config->port; |
||
51 | if ( ! empty($this->port) && is_numeric($this->port)) { |
||
52 | $port = $this->port; |
||
53 | } |
||
54 | |||
55 | $this->queue = Pheanstalk::create($config->host, $port); |
||
56 | $this->queue->useTube($this->tube); |
||
57 | $this->connected = true; |
||
58 | } |
||
59 | |||
60 | /** |
||
61 | * Returns connection status |
||
62 | * |
||
63 | * @return bool |
||
64 | */ |
||
65 | public function isConnected(): bool |
||
68 | } |
||
69 | |||
70 | /** |
||
71 | * Sends request and wait for answer from processor |
||
72 | * |
||
73 | * @param $job_data |
||
74 | * @param int $timeout |
||
75 | * @param int $priority |
||
76 | * |
||
77 | * @return bool|mixed |
||
78 | * |
||
79 | */ |
||
80 | public function request( |
||
81 | $job_data, |
||
82 | int $timeout = 10, |
||
83 | int $priority = PheanstalkInterface::DEFAULT_PRIORITY |
||
84 | ) { |
||
85 | $this->message = false; |
||
86 | $inbox_tube = uniqid('INBOX_', true); |
||
87 | $this->queue->watch($inbox_tube); |
||
88 | |||
89 | // Отправляем данные для обработки. |
||
90 | $requestMessage = [ |
||
91 | $job_data, |
||
92 | 'inbox_tube' => $inbox_tube, |
||
93 | ]; |
||
94 | $this->publish($requestMessage, null, $priority, 0, $timeout); |
||
95 | |||
96 | // Получаем ответ от сервера. |
||
97 | $job = null; |
||
|
|||
98 | try { |
||
99 | $job = $this->queue->reserveWithTimeout($timeout); |
||
100 | if ($job !== null) { |
||
101 | $this->message = $job->getData(); |
||
102 | $this->queue->delete($job); |
||
103 | } |
||
104 | } catch (Throwable $exception) { |
||
105 | Util::sysLogMsg(__METHOD__, 'Exception: ' . $exception->getMessage()); |
||
106 | if ($job !== null) { |
||
107 | $this->queue->bury($job); |
||
108 | } |
||
109 | } |
||
110 | $this->queue->ignore($inbox_tube); |
||
111 | |||
112 | return $this->message; |
||
113 | } |
||
114 | |||
115 | /** |
||
116 | * Puts a job in a beanstalkd server queue |
||
117 | * |
||
118 | * @param mixed $job_data data to worker |
||
119 | * @param ?string $tube tube name |
||
120 | * @param int $priority Jobs with smaller priority values will be scheduled |
||
121 | * before jobs with larger priorities. The most urgent priority is 0; |
||
122 | * the least urgent priority is 4294967295. |
||
123 | * @param int $delay delay before insert job into work query |
||
124 | * @param int $ttr time to execute this job |
||
125 | * |
||
126 | * @return \Pheanstalk\Job |
||
127 | */ |
||
128 | public function publish( |
||
148 | } |
||
149 | |||
150 | /** |
||
151 | * Drops orphaned tasks |
||
152 | */ |
||
153 | public function cleanTubes() |
||
154 | { |
||
155 | $tubes = $this->queue->listTubes(); |
||
156 | foreach ($tubes as $tube) { |
||
157 | try { |
||
158 | $this->queue->useTube($tube); |
||
159 | $queueStats = $this->queue->stats()->getArrayCopy(); |
||
160 | |||
161 | // Delete buried jobs |
||
162 | $countBuried=$queueStats['current-jobs-buried']; |
||
163 | while ($job = $this->queue->peekBuried()) { |
||
164 | $countBuried--; |
||
165 | if ($countBuried<0){ |
||
166 | break; |
||
167 | } |
||
168 | $id = $job->getId(); |
||
169 | $this->queue->delete($job); |
||
170 | Util::sysLogMsg(__METHOD__, "Deleted buried job with ID {$id} from {$tube}"); |
||
171 | } |
||
172 | |||
173 | // Delete outdated jobs |
||
174 | $countReady=$queueStats['current-jobs-ready']; |
||
175 | while ($job = $this->queue->peekReady()) { |
||
176 | $countReady--; |
||
177 | if ($countReady<0){ |
||
178 | break; |
||
179 | } |
||
180 | $id = $job->getId(); |
||
181 | $jobStats = $this->queue->statsJob($job)->getArrayCopy(); |
||
182 | $age = (int)$jobStats['age']; |
||
183 | $expectedTimeToExecute = (int)$jobStats['ttr'] * 2; |
||
184 | if ($age > $expectedTimeToExecute) { |
||
185 | $this->queue->delete($job); |
||
186 | Util::sysLogMsg(__METHOD__, "Deleted outdated job with ID {$id} from {$tube}"); |
||
187 | } |
||
188 | } |
||
189 | } catch (Throwable $exception) { |
||
190 | Util::sysLogMsg(__METHOD__, 'Exception: ' . $exception->getMessage()); |
||
191 | } |
||
192 | } |
||
193 | } |
||
194 | |||
195 | /** |
||
196 | * Subscribe on new message in tube |
||
197 | * |
||
198 | * @param string $tube - listening tube |
||
199 | * @param array | callable $callback - worker |
||
200 | */ |
||
201 | public function subscribe(string $tube, $callback): void |
||
202 | { |
||
203 | $tube = str_replace("\\", '-', $tube); |
||
204 | $this->queue->watch($tube); |
||
205 | $this->queue->ignore('default'); |
||
206 | $this->subscriptions[$tube] = $callback; |
||
207 | } |
||
208 | |||
209 | /** |
||
210 | * Job worker |
||
211 | * |
||
212 | * @param float $timeout |
||
213 | * |
||
214 | */ |
||
215 | public function wait(float $timeout = 10): void |
||
216 | { |
||
217 | $this->message = null; |
||
218 | $start = microtime(true); |
||
219 | $job = null; |
||
220 | try { |
||
221 | $job = $this->queue->reserveWithTimeout($timeout); |
||
222 | } catch (Throwable $exception) { |
||
223 | Util::sysLogMsg(__METHOD__, 'Exception: ' . $exception->getMessage()); |
||
224 | } |
||
225 | |||
226 | if ($job === null) { |
||
227 | $worktime = (microtime(true) - $start); |
||
228 | if ($worktime < 0.5) { |
||
229 | // Что то не то, вероятно потеряна связь с сервером очередей. |
||
230 | $this->reconnect(); |
||
231 | } |
||
232 | if (is_array($this->timeout_handler)) { |
||
233 | call_user_func($this->timeout_handler); |
||
234 | } |
||
235 | |||
236 | return; |
||
237 | } |
||
238 | |||
239 | // Processing job over callable function attached in $this->subscribe |
||
240 | if (json_decode($job->getData(), true) !== null) { |
||
241 | $mData = $job->getData(); |
||
242 | } else { |
||
243 | $mData = unserialize($job->getData(), [false]); |
||
244 | } |
||
245 | $this->message = $mData; |
||
246 | |||
247 | $stats = $this->queue->statsJob($job); |
||
248 | $requestFormTube = $stats['tube']; |
||
249 | $func = $this->subscriptions[$requestFormTube] ?? null; |
||
250 | |||
251 | if ($func === null) { |
||
252 | // Action not found |
||
253 | $this->queue->bury($job); |
||
254 | } else { |
||
255 | try { |
||
256 | if (is_array($func)) { |
||
257 | call_user_func($func, $this); |
||
258 | } elseif (is_callable($func) === true) { |
||
259 | $func($this); |
||
260 | } |
||
261 | // Removes the job from the queue when it has been successfully completed |
||
262 | $this->queue->delete($job); |
||
263 | } catch (Throwable $e) { |
||
264 | // Marks the job as terminally failed and no workers will restart it. |
||
265 | $this->queue->bury($job); |
||
266 | } |
||
267 | } |
||
268 | } |
||
269 | |||
270 | /** |
||
271 | * Gets request body |
||
272 | * |
||
273 | * @return string |
||
274 | */ |
||
275 | public function getBody(): string |
||
276 | { |
||
277 | if (is_array($this->message) |
||
278 | && isset($this->message['inbox_tube']) |
||
279 | && count($this->message) === 2) { |
||
280 | // Это поступил request, треует ответа. Данные были переданы первым параметром массива. |
||
281 | $message_data = $this->message[0]; |
||
282 | } else { |
||
283 | $message_data = $this->message; |
||
284 | } |
||
285 | |||
286 | return $message_data; |
||
287 | } |
||
288 | |||
289 | /** |
||
290 | * Sends response to queue |
||
291 | * |
||
292 | * @param $response |
||
293 | */ |
||
294 | public function reply($response): void |
||
295 | { |
||
296 | if (isset($this->message['inbox_tube'])) { |
||
297 | $this->queue->useTube($this->message['inbox_tube']); |
||
298 | $this->queue->put($response); |
||
299 | $this->queue->useTube($this->tube); |
||
300 | } |
||
301 | } |
||
302 | |||
303 | /** |
||
304 | * |
||
305 | * @param $handler |
||
306 | */ |
||
307 | public function setErrorHandler($handler): void |
||
308 | { |
||
309 | $this->error_handler = $handler; |
||
310 | } |
||
311 | |||
312 | /** |
||
313 | * @param $handler |
||
314 | */ |
||
315 | public function setTimeoutHandler($handler): void |
||
318 | } |
||
319 | |||
320 | /** |
||
321 | * @return int |
||
322 | */ |
||
323 | public function reconnectsCount(): int |
||
324 | { |
||
325 | return $this->reconnectsCount; |
||
326 | } |
||
327 | |||
328 | /** |
||
329 | * Gets all messages from tube and clean it |
||
330 | * |
||
331 | * @param string $tube |
||
332 | * |
||
333 | * @return array |
||
334 | */ |
||
335 | public function getMessagesFromTube(string $tube = ''): array |
||
352 | } |
||
353 | } |