Conditions | 27 |
Paths | 17058 |
Total Lines | 347 |
Code Lines | 199 |
Lines | 0 |
Ratio | 0 % |
Changes | 1 | ||
Bugs | 0 | Features | 0 |
Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.
For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.
Commonly applied refactorings include:
If many parameters/temporary variables are present:
1 | <?php |
||
54 | public function sendRequestToBackendWorker( |
||
55 | string $processor, |
||
56 | string $actionName, |
||
57 | mixed $payload = null, |
||
58 | string $moduleName = '', |
||
59 | int $maxTimeout = 30, |
||
60 | int $priority = 0 |
||
|
|||
61 | ): void { |
||
62 | [$debug, $requestMessage] = $this->prepareRequestMessage($processor, $payload, $actionName, $moduleName); |
||
63 | if ($debug) { |
||
64 | $maxTimeout = 9999; |
||
65 | } else { |
||
66 | // Ensure minimum timeout for testing |
||
67 | $maxTimeout = max($maxTimeout, 30); |
||
68 | } |
||
69 | |||
70 | try { |
||
71 | // Initialize Redis connection |
||
72 | $redis = $this->di->get(RedisClientProvider::SERVICE_NAME); |
||
73 | |||
74 | // Generate unique request ID |
||
75 | $requestMessage['request_id'] = uniqid("req_{$actionName}_", true); |
||
76 | |||
77 | // Push request to queue |
||
78 | $pushResult = $redis->rpush(WorkerApiCommands::REDIS_API_QUEUE, json_encode($requestMessage)); |
||
79 | |||
80 | // Проверяем, что задача действительно добавлена в очередь |
||
81 | if ($pushResult <= 0) { |
||
82 | throw new \RuntimeException("Failed to push request to Redis queue"); |
||
83 | } |
||
84 | |||
85 | // Проверяем текущий размер очереди и логируем |
||
86 | $queueLength = $redis->lLen(WorkerApiCommands::REDIS_API_QUEUE); |
||
87 | SystemMessages::sysLogMsg( |
||
88 | static::class, |
||
89 | sprintf( |
||
90 | "Request added to queue: action=%s, id=%s, queue_position=%d/%d", |
||
91 | $actionName, |
||
92 | $requestMessage['request_id'], |
||
93 | $pushResult, |
||
94 | $queueLength |
||
95 | ), |
||
96 | LOG_DEBUG |
||
97 | ); |
||
98 | |||
99 | // Подсчитываем, сколько активных воркеров доступны для обработки |
||
100 | $activeWorkers = $redis->keys('worker_api_commands:*'); |
||
101 | $runningWorkers = count($activeWorkers); |
||
102 | |||
103 | // Если в очереди много запросов, но мало воркеров, логируем предупреждение |
||
104 | if ($queueLength > $runningWorkers * 2) { |
||
105 | SystemMessages::sysLogMsg( |
||
106 | static::class, |
||
107 | sprintf( |
||
108 | "WARNING: Queue backlog detected - %d requests in queue with only %d workers", |
||
109 | $queueLength, |
||
110 | $runningWorkers |
||
111 | ), |
||
112 | LOG_WARNING |
||
113 | ); |
||
114 | } |
||
115 | |||
116 | if ($requestMessage['async']) { |
||
117 | $this->response->setPayloadSuccess(['success' => true]); |
||
118 | return; |
||
119 | } |
||
120 | |||
121 | // Get response key |
||
122 | $responseKey = WorkerApiCommands::REDIS_API_RESPONSE_PREFIX . $requestMessage['request_id']; |
||
123 | $response = null; |
||
124 | $startTime = microtime(true); |
||
125 | $retryCount = 0; |
||
126 | $maxRetries = 5; |
||
127 | |||
128 | // Calculate end time |
||
129 | $endTime = $startTime + $maxTimeout; |
||
130 | |||
131 | // Polling intervals strategy: fast polling at first, then gradually slow down |
||
132 | $pollingIntervals = [ |
||
133 | // Fast polling for first second (10ms) |
||
134 | ['duration' => 1, 'interval' => 10000], |
||
135 | // Medium polling for next 4 seconds (50ms) |
||
136 | ['duration' => 4, 'interval' => 50000], |
||
137 | // Slower polling for next 10 seconds (100ms) |
||
138 | ['duration' => 10, 'interval' => 100000], |
||
139 | // Very slow polling for the rest (250ms) |
||
140 | ['duration' => $maxTimeout, 'interval' => 250000] |
||
141 | ]; |
||
142 | |||
143 | $currentPollingStage = 0; |
||
144 | $stageStartTime = $startTime; |
||
145 | $currentInterval = $pollingIntervals[0]['interval']; |
||
146 | $attempts = 0; |
||
147 | SystemMessages::sysLogMsg( |
||
148 | static::class, |
||
149 | sprintf( |
||
150 | "Waiting for response on key: %s", |
||
151 | $responseKey, |
||
152 | ), |
||
153 | LOG_DEBUG |
||
154 | ); |
||
155 | while (microtime(true) < $endTime) { |
||
156 | try { |
||
157 | // Check if we need to adjust polling interval |
||
158 | $elapsedTime = microtime(true) - $startTime; |
||
159 | $stageElapsedTime = microtime(true) - $stageStartTime; |
||
160 | |||
161 | if ($currentPollingStage < count($pollingIntervals) - 1 && |
||
162 | $stageElapsedTime >= $pollingIntervals[$currentPollingStage]['duration']) { |
||
163 | // Move to next polling stage |
||
164 | $currentPollingStage++; |
||
165 | $stageStartTime = microtime(true); |
||
166 | $currentInterval = $pollingIntervals[$currentPollingStage]['interval']; |
||
167 | |||
168 | // Skip logging interval changes to reduce noise |
||
169 | } |
||
170 | |||
171 | // Check for response |
||
172 | $attempts++; |
||
173 | $encodedResponse = $redis->get($responseKey); |
||
174 | |||
175 | if ($encodedResponse !== false) { |
||
176 | $responseTime = microtime(true); |
||
177 | $responseDelay = $responseTime - $startTime; |
||
178 | |||
179 | // Response received - only log if it took longer than expected (over 1 second) |
||
180 | if ($responseDelay > 1.0) { |
||
181 | SystemMessages::sysLogMsg( |
||
182 | static::class, |
||
183 | sprintf( |
||
184 | "Delayed response for action '%s' received after %.3fs (attempts: %d)", |
||
185 | $actionName, |
||
186 | $responseDelay, |
||
187 | $attempts |
||
188 | ), |
||
189 | LOG_NOTICE |
||
190 | ); |
||
191 | } |
||
192 | |||
193 | $response = json_decode($encodedResponse, true); |
||
194 | break; |
||
195 | } |
||
196 | |||
197 | // Short sleep before next check |
||
198 | usleep($currentInterval); |
||
199 | |||
200 | } catch (RedisException $redisException) { |
||
201 | // Specific handling for Redis exceptions |
||
202 | $retryCount++; |
||
203 | |||
204 | if ($retryCount > $maxRetries) { |
||
205 | // Log last failed attempt before throwing |
||
206 | SystemMessages::sysLogMsg( |
||
207 | static::class, |
||
208 | sprintf( |
||
209 | "Max Redis retry attempts exceeded: %s (action: %s)", |
||
210 | $redisException->getMessage(), |
||
211 | $actionName |
||
212 | ), |
||
213 | LOG_ERR |
||
214 | ); |
||
215 | throw $redisException; // Max retries exceeded |
||
216 | } |
||
217 | |||
218 | // Log Redis-specific retry attempt |
||
219 | $errorMessage = sprintf( |
||
220 | "Redis connection error during worker request, retrying (%d/%d): %s (action: %s)", |
||
221 | $retryCount, |
||
222 | $maxRetries, |
||
223 | $redisException->getMessage(), |
||
224 | $actionName |
||
225 | ); |
||
226 | |||
227 | // Log to system log |
||
228 | SystemMessages::sysLogMsg(static::class, $errorMessage, LOG_WARNING); |
||
229 | |||
230 | // Also log with error handler for potential Sentry capture |
||
231 | CriticalErrorsHandler::handleExceptionWithSyslog( |
||
232 | new \Exception($errorMessage) |
||
233 | ); |
||
234 | |||
235 | // Exponential backoff with jitter |
||
236 | $sleepTime = min(pow(2, $retryCount) * 100000, 2000000) + mt_rand(0, 100000); |
||
237 | usleep($sleepTime); |
||
238 | |||
239 | // Reconnect |
||
240 | $redis = $this->di->get(RedisClientProvider::SERVICE_NAME); |
||
241 | } catch (Throwable $otherException) { |
||
242 | // Handle other errors with retry |
||
243 | $retryCount++; |
||
244 | |||
245 | if ($retryCount > $maxRetries) { |
||
246 | // Log last failed attempt before throwing |
||
247 | SystemMessages::sysLogMsg( |
||
248 | static::class, |
||
249 | sprintf( |
||
250 | "Max retry attempts exceeded for general exception: %s (action: %s)", |
||
251 | $otherException->getMessage(), |
||
252 | $actionName |
||
253 | ), |
||
254 | LOG_ERR |
||
255 | ); |
||
256 | throw $otherException; // Max retries exceeded |
||
257 | } |
||
258 | |||
259 | // Log general exception retry attempt |
||
260 | $errorMessage = sprintf( |
||
261 | "Error during worker request, retrying (%d/%d): %s (action: %s)", |
||
262 | $retryCount, |
||
263 | $maxRetries, |
||
264 | $otherException->getMessage(), |
||
265 | $actionName |
||
266 | ); |
||
267 | |||
268 | // Log to system log |
||
269 | SystemMessages::sysLogMsg(static::class, $errorMessage, LOG_WARNING); |
||
270 | |||
271 | // Also log with error handler for potential Sentry capture |
||
272 | CriticalErrorsHandler::handleExceptionWithSyslog( |
||
273 | new \Exception($errorMessage) |
||
274 | ); |
||
275 | |||
276 | // Exponential backoff with jitter |
||
277 | $sleepTime = min(pow(2, $retryCount) * 100000, 2000000) + mt_rand(0, 100000); |
||
278 | usleep($sleepTime); |
||
279 | |||
280 | // Reconnect |
||
281 | $redis = $this->di->get(RedisClientProvider::SERVICE_NAME); |
||
282 | } |
||
283 | } |
||
284 | |||
285 | // Clean up |
||
286 | try { |
||
287 | $redis->del($responseKey); |
||
288 | } catch (Throwable $e) { |
||
289 | // Ignore cleanup errors |
||
290 | } |
||
291 | |||
292 | // Calculate total time taken |
||
293 | $totalTime = microtime(true) - $startTime; |
||
294 | |||
295 | if ($response === null) { |
||
296 | SystemMessages::sysLogMsg( |
||
297 | static::class, |
||
298 | sprintf( |
||
299 | "Request timeout after %.3fs: No response received for action %s", |
||
300 | $totalTime, |
||
301 | $actionName |
||
302 | ), |
||
303 | LOG_WARNING |
||
304 | ); |
||
305 | $this->response->setPayloadError('Request timeout or worker not responding'); |
||
306 | return; |
||
307 | } |
||
308 | |||
309 | // Handle file-based response |
||
310 | if (isset($response[WorkerApiCommands::REDIS_RESPONSE_IN_FILE])) { |
||
311 | $filename = $response[WorkerApiCommands::REDIS_RESPONSE_IN_FILE]; |
||
312 | if (file_exists($filename)) { |
||
313 | $fileContent = file_get_contents($filename); |
||
314 | if ($fileContent !== false) { |
||
315 | // Check if response is compressed |
||
316 | if (isset($response['compressed']) && $response['compressed'] === true) { |
||
317 | $fileContent = gzdecode($fileContent); |
||
318 | } |
||
319 | |||
320 | $response = unserialize($fileContent); |
||
321 | unlink($filename); |
||
322 | $this->response->setPayloadSuccess($response); |
||
323 | } else { |
||
324 | $this->response->setPayloadError('Failed to read response file'); |
||
325 | } |
||
326 | } else { |
||
327 | $this->response->setPayloadError('Response file not found'); |
||
328 | } |
||
329 | return; |
||
330 | } |
||
331 | |||
332 | // Handle compressed Redis-based large response |
||
333 | if (isset($response['large_response_redis'])) { |
||
334 | $redisKey = $response['large_response_redis']; |
||
335 | $compressedData = $redis->get($redisKey); |
||
336 | |||
337 | if ($compressedData !== false) { |
||
338 | // Clear the key since we're consuming the data |
||
339 | $redis->del($redisKey); |
||
340 | |||
341 | // Decompress and unserialize the data |
||
342 | $data = gzdecode($compressedData); |
||
343 | if ($data !== false) { |
||
344 | $response = unserialize($data); |
||
345 | $this->response->setPayloadSuccess($response); |
||
346 | } else { |
||
347 | $this->response->setPayloadError('Failed to decompress response data'); |
||
348 | } |
||
349 | } else { |
||
350 | $this->response->setPayloadError('Large response data not found in Redis'); |
||
351 | } |
||
352 | return; |
||
353 | } |
||
354 | |||
355 | |||
356 | if (array_key_exists(WorkerApiCommands::REDIS_JOB_PROCESSING_ERROR, $response)) { |
||
357 | SystemMessages::sysLogMsg( |
||
358 | static::class, |
||
359 | sprintf( |
||
360 | "Setting error response: job_id=%s, error=%s", |
||
361 | $requestMessage['request_id'] ?? 'unknown', |
||
362 | $response[WorkerApiCommands::REDIS_JOB_PROCESSING_ERROR] |
||
363 | ), |
||
364 | LOG_DEBUG |
||
365 | ); |
||
366 | $this->response->setPayloadError($response[WorkerApiCommands::REDIS_JOB_PROCESSING_ERROR]); |
||
367 | } else { |
||
368 | // Only log response time if it took longer than expected |
||
369 | if ($totalTime > 1.0) { |
||
370 | SystemMessages::sysLogMsg( |
||
371 | static::class, |
||
372 | sprintf( |
||
373 | "Slow response for action '%s': %.3fs with %d attempts", |
||
374 | $actionName, |
||
375 | $totalTime, |
||
376 | $attempts |
||
377 | ), |
||
378 | LOG_NOTICE |
||
379 | ); |
||
380 | } |
||
381 | $this->response->setPayloadSuccess($response); |
||
382 | } |
||
383 | |||
384 | } catch (Throwable $e) { |
||
385 | // Log the error with detailed information |
||
386 | $errorMessage = sprintf( |
||
387 | "Error in sendRequestToBackendWorker: %s (processor: %s, action: %s)", |
||
388 | $e->getMessage(), |
||
389 | $processor, |
||
390 | $actionName |
||
391 | ); |
||
392 | |||
393 | // Log to system log first for visibility in server logs |
||
394 | SystemMessages::sysLogMsg(static::class, $errorMessage, LOG_ERR); |
||
395 | |||
396 | // Then use critical error handler for potential Sentry capture |
||
397 | CriticalErrorsHandler::handleExceptionWithSyslog($e); |
||
398 | |||
399 | // Return error to client |
||
400 | $this->response->setPayloadError($e->getMessage()); |
||
401 | } |
||
493 |
This check looks for parameters that have been defined for a function or method, but which are not used in the method body.