| Conditions | 27 |
| Paths | 17058 |
| Total Lines | 347 |
| Code Lines | 199 |
| Lines | 0 |
| Ratio | 0 % |
| Changes | 1 | ||
| Bugs | 0 | Features | 0 |
Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.
For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.
Commonly applied refactorings include:
If many parameters/temporary variables are present:
| 1 | <?php |
||
| 54 | public function sendRequestToBackendWorker( |
||
| 55 | string $processor, |
||
| 56 | string $actionName, |
||
| 57 | mixed $payload = null, |
||
| 58 | string $moduleName = '', |
||
| 59 | int $maxTimeout = 30, |
||
| 60 | int $priority = 0 |
||
|
|
|||
| 61 | ): void { |
||
| 62 | [$debug, $requestMessage] = $this->prepareRequestMessage($processor, $payload, $actionName, $moduleName); |
||
| 63 | if ($debug) { |
||
| 64 | $maxTimeout = 9999; |
||
| 65 | } else { |
||
| 66 | // Ensure minimum timeout for testing |
||
| 67 | $maxTimeout = max($maxTimeout, 30); |
||
| 68 | } |
||
| 69 | |||
| 70 | try { |
||
| 71 | // Initialize Redis connection |
||
| 72 | $redis = $this->di->get(RedisClientProvider::SERVICE_NAME); |
||
| 73 | |||
| 74 | // Generate unique request ID |
||
| 75 | $requestMessage['request_id'] = uniqid("req_{$actionName}_", true); |
||
| 76 | |||
| 77 | // Push request to queue |
||
| 78 | $pushResult = $redis->rpush(WorkerApiCommands::REDIS_API_QUEUE, json_encode($requestMessage)); |
||
| 79 | |||
| 80 | // Проверяем, что задача действительно добавлена в очередь |
||
| 81 | if ($pushResult <= 0) { |
||
| 82 | throw new \RuntimeException("Failed to push request to Redis queue"); |
||
| 83 | } |
||
| 84 | |||
| 85 | // Проверяем текущий размер очереди и логируем |
||
| 86 | $queueLength = $redis->lLen(WorkerApiCommands::REDIS_API_QUEUE); |
||
| 87 | SystemMessages::sysLogMsg( |
||
| 88 | static::class, |
||
| 89 | sprintf( |
||
| 90 | "Request added to queue: action=%s, id=%s, queue_position=%d/%d", |
||
| 91 | $actionName, |
||
| 92 | $requestMessage['request_id'], |
||
| 93 | $pushResult, |
||
| 94 | $queueLength |
||
| 95 | ), |
||
| 96 | LOG_DEBUG |
||
| 97 | ); |
||
| 98 | |||
| 99 | // Подсчитываем, сколько активных воркеров доступны для обработки |
||
| 100 | $activeWorkers = $redis->keys('worker_api_commands:*'); |
||
| 101 | $runningWorkers = count($activeWorkers); |
||
| 102 | |||
| 103 | // Если в очереди много запросов, но мало воркеров, логируем предупреждение |
||
| 104 | if ($queueLength > $runningWorkers * 2) { |
||
| 105 | SystemMessages::sysLogMsg( |
||
| 106 | static::class, |
||
| 107 | sprintf( |
||
| 108 | "WARNING: Queue backlog detected - %d requests in queue with only %d workers", |
||
| 109 | $queueLength, |
||
| 110 | $runningWorkers |
||
| 111 | ), |
||
| 112 | LOG_WARNING |
||
| 113 | ); |
||
| 114 | } |
||
| 115 | |||
| 116 | if ($requestMessage['async']) { |
||
| 117 | $this->response->setPayloadSuccess(['success' => true]); |
||
| 118 | return; |
||
| 119 | } |
||
| 120 | |||
| 121 | // Get response key |
||
| 122 | $responseKey = WorkerApiCommands::REDIS_API_RESPONSE_PREFIX . $requestMessage['request_id']; |
||
| 123 | $response = null; |
||
| 124 | $startTime = microtime(true); |
||
| 125 | $retryCount = 0; |
||
| 126 | $maxRetries = 5; |
||
| 127 | |||
| 128 | // Calculate end time |
||
| 129 | $endTime = $startTime + $maxTimeout; |
||
| 130 | |||
| 131 | // Polling intervals strategy: fast polling at first, then gradually slow down |
||
| 132 | $pollingIntervals = [ |
||
| 133 | // Fast polling for first second (10ms) |
||
| 134 | ['duration' => 1, 'interval' => 10000], |
||
| 135 | // Medium polling for next 4 seconds (50ms) |
||
| 136 | ['duration' => 4, 'interval' => 50000], |
||
| 137 | // Slower polling for next 10 seconds (100ms) |
||
| 138 | ['duration' => 10, 'interval' => 100000], |
||
| 139 | // Very slow polling for the rest (250ms) |
||
| 140 | ['duration' => $maxTimeout, 'interval' => 250000] |
||
| 141 | ]; |
||
| 142 | |||
| 143 | $currentPollingStage = 0; |
||
| 144 | $stageStartTime = $startTime; |
||
| 145 | $currentInterval = $pollingIntervals[0]['interval']; |
||
| 146 | $attempts = 0; |
||
| 147 | SystemMessages::sysLogMsg( |
||
| 148 | static::class, |
||
| 149 | sprintf( |
||
| 150 | "Waiting for response on key: %s", |
||
| 151 | $responseKey, |
||
| 152 | ), |
||
| 153 | LOG_DEBUG |
||
| 154 | ); |
||
| 155 | while (microtime(true) < $endTime) { |
||
| 156 | try { |
||
| 157 | // Check if we need to adjust polling interval |
||
| 158 | $elapsedTime = microtime(true) - $startTime; |
||
| 159 | $stageElapsedTime = microtime(true) - $stageStartTime; |
||
| 160 | |||
| 161 | if ($currentPollingStage < count($pollingIntervals) - 1 && |
||
| 162 | $stageElapsedTime >= $pollingIntervals[$currentPollingStage]['duration']) { |
||
| 163 | // Move to next polling stage |
||
| 164 | $currentPollingStage++; |
||
| 165 | $stageStartTime = microtime(true); |
||
| 166 | $currentInterval = $pollingIntervals[$currentPollingStage]['interval']; |
||
| 167 | |||
| 168 | // Skip logging interval changes to reduce noise |
||
| 169 | } |
||
| 170 | |||
| 171 | // Check for response |
||
| 172 | $attempts++; |
||
| 173 | $encodedResponse = $redis->get($responseKey); |
||
| 174 | |||
| 175 | if ($encodedResponse !== false) { |
||
| 176 | $responseTime = microtime(true); |
||
| 177 | $responseDelay = $responseTime - $startTime; |
||
| 178 | |||
| 179 | // Response received - only log if it took longer than expected (over 1 second) |
||
| 180 | if ($responseDelay > 1.0) { |
||
| 181 | SystemMessages::sysLogMsg( |
||
| 182 | static::class, |
||
| 183 | sprintf( |
||
| 184 | "Delayed response for action '%s' received after %.3fs (attempts: %d)", |
||
| 185 | $actionName, |
||
| 186 | $responseDelay, |
||
| 187 | $attempts |
||
| 188 | ), |
||
| 189 | LOG_NOTICE |
||
| 190 | ); |
||
| 191 | } |
||
| 192 | |||
| 193 | $response = json_decode($encodedResponse, true); |
||
| 194 | break; |
||
| 195 | } |
||
| 196 | |||
| 197 | // Short sleep before next check |
||
| 198 | usleep($currentInterval); |
||
| 199 | |||
| 200 | } catch (RedisException $redisException) { |
||
| 201 | // Specific handling for Redis exceptions |
||
| 202 | $retryCount++; |
||
| 203 | |||
| 204 | if ($retryCount > $maxRetries) { |
||
| 205 | // Log last failed attempt before throwing |
||
| 206 | SystemMessages::sysLogMsg( |
||
| 207 | static::class, |
||
| 208 | sprintf( |
||
| 209 | "Max Redis retry attempts exceeded: %s (action: %s)", |
||
| 210 | $redisException->getMessage(), |
||
| 211 | $actionName |
||
| 212 | ), |
||
| 213 | LOG_ERR |
||
| 214 | ); |
||
| 215 | throw $redisException; // Max retries exceeded |
||
| 216 | } |
||
| 217 | |||
| 218 | // Log Redis-specific retry attempt |
||
| 219 | $errorMessage = sprintf( |
||
| 220 | "Redis connection error during worker request, retrying (%d/%d): %s (action: %s)", |
||
| 221 | $retryCount, |
||
| 222 | $maxRetries, |
||
| 223 | $redisException->getMessage(), |
||
| 224 | $actionName |
||
| 225 | ); |
||
| 226 | |||
| 227 | // Log to system log |
||
| 228 | SystemMessages::sysLogMsg(static::class, $errorMessage, LOG_WARNING); |
||
| 229 | |||
| 230 | // Also log with error handler for potential Sentry capture |
||
| 231 | CriticalErrorsHandler::handleExceptionWithSyslog( |
||
| 232 | new \Exception($errorMessage) |
||
| 233 | ); |
||
| 234 | |||
| 235 | // Exponential backoff with jitter |
||
| 236 | $sleepTime = min(pow(2, $retryCount) * 100000, 2000000) + mt_rand(0, 100000); |
||
| 237 | usleep($sleepTime); |
||
| 238 | |||
| 239 | // Reconnect |
||
| 240 | $redis = $this->di->get(RedisClientProvider::SERVICE_NAME); |
||
| 241 | } catch (Throwable $otherException) { |
||
| 242 | // Handle other errors with retry |
||
| 243 | $retryCount++; |
||
| 244 | |||
| 245 | if ($retryCount > $maxRetries) { |
||
| 246 | // Log last failed attempt before throwing |
||
| 247 | SystemMessages::sysLogMsg( |
||
| 248 | static::class, |
||
| 249 | sprintf( |
||
| 250 | "Max retry attempts exceeded for general exception: %s (action: %s)", |
||
| 251 | $otherException->getMessage(), |
||
| 252 | $actionName |
||
| 253 | ), |
||
| 254 | LOG_ERR |
||
| 255 | ); |
||
| 256 | throw $otherException; // Max retries exceeded |
||
| 257 | } |
||
| 258 | |||
| 259 | // Log general exception retry attempt |
||
| 260 | $errorMessage = sprintf( |
||
| 261 | "Error during worker request, retrying (%d/%d): %s (action: %s)", |
||
| 262 | $retryCount, |
||
| 263 | $maxRetries, |
||
| 264 | $otherException->getMessage(), |
||
| 265 | $actionName |
||
| 266 | ); |
||
| 267 | |||
| 268 | // Log to system log |
||
| 269 | SystemMessages::sysLogMsg(static::class, $errorMessage, LOG_WARNING); |
||
| 270 | |||
| 271 | // Also log with error handler for potential Sentry capture |
||
| 272 | CriticalErrorsHandler::handleExceptionWithSyslog( |
||
| 273 | new \Exception($errorMessage) |
||
| 274 | ); |
||
| 275 | |||
| 276 | // Exponential backoff with jitter |
||
| 277 | $sleepTime = min(pow(2, $retryCount) * 100000, 2000000) + mt_rand(0, 100000); |
||
| 278 | usleep($sleepTime); |
||
| 279 | |||
| 280 | // Reconnect |
||
| 281 | $redis = $this->di->get(RedisClientProvider::SERVICE_NAME); |
||
| 282 | } |
||
| 283 | } |
||
| 284 | |||
| 285 | // Clean up |
||
| 286 | try { |
||
| 287 | $redis->del($responseKey); |
||
| 288 | } catch (Throwable $e) { |
||
| 289 | // Ignore cleanup errors |
||
| 290 | } |
||
| 291 | |||
| 292 | // Calculate total time taken |
||
| 293 | $totalTime = microtime(true) - $startTime; |
||
| 294 | |||
| 295 | if ($response === null) { |
||
| 296 | SystemMessages::sysLogMsg( |
||
| 297 | static::class, |
||
| 298 | sprintf( |
||
| 299 | "Request timeout after %.3fs: No response received for action %s", |
||
| 300 | $totalTime, |
||
| 301 | $actionName |
||
| 302 | ), |
||
| 303 | LOG_WARNING |
||
| 304 | ); |
||
| 305 | $this->response->setPayloadError('Request timeout or worker not responding'); |
||
| 306 | return; |
||
| 307 | } |
||
| 308 | |||
| 309 | // Handle file-based response |
||
| 310 | if (isset($response[WorkerApiCommands::REDIS_RESPONSE_IN_FILE])) { |
||
| 311 | $filename = $response[WorkerApiCommands::REDIS_RESPONSE_IN_FILE]; |
||
| 312 | if (file_exists($filename)) { |
||
| 313 | $fileContent = file_get_contents($filename); |
||
| 314 | if ($fileContent !== false) { |
||
| 315 | // Check if response is compressed |
||
| 316 | if (isset($response['compressed']) && $response['compressed'] === true) { |
||
| 317 | $fileContent = gzdecode($fileContent); |
||
| 318 | } |
||
| 319 | |||
| 320 | $response = unserialize($fileContent); |
||
| 321 | unlink($filename); |
||
| 322 | $this->response->setPayloadSuccess($response); |
||
| 323 | } else { |
||
| 324 | $this->response->setPayloadError('Failed to read response file'); |
||
| 325 | } |
||
| 326 | } else { |
||
| 327 | $this->response->setPayloadError('Response file not found'); |
||
| 328 | } |
||
| 329 | return; |
||
| 330 | } |
||
| 331 | |||
| 332 | // Handle compressed Redis-based large response |
||
| 333 | if (isset($response['large_response_redis'])) { |
||
| 334 | $redisKey = $response['large_response_redis']; |
||
| 335 | $compressedData = $redis->get($redisKey); |
||
| 336 | |||
| 337 | if ($compressedData !== false) { |
||
| 338 | // Clear the key since we're consuming the data |
||
| 339 | $redis->del($redisKey); |
||
| 340 | |||
| 341 | // Decompress and unserialize the data |
||
| 342 | $data = gzdecode($compressedData); |
||
| 343 | if ($data !== false) { |
||
| 344 | $response = unserialize($data); |
||
| 345 | $this->response->setPayloadSuccess($response); |
||
| 346 | } else { |
||
| 347 | $this->response->setPayloadError('Failed to decompress response data'); |
||
| 348 | } |
||
| 349 | } else { |
||
| 350 | $this->response->setPayloadError('Large response data not found in Redis'); |
||
| 351 | } |
||
| 352 | return; |
||
| 353 | } |
||
| 354 | |||
| 355 | |||
| 356 | if (array_key_exists(WorkerApiCommands::REDIS_JOB_PROCESSING_ERROR, $response)) { |
||
| 357 | SystemMessages::sysLogMsg( |
||
| 358 | static::class, |
||
| 359 | sprintf( |
||
| 360 | "Setting error response: job_id=%s, error=%s", |
||
| 361 | $requestMessage['request_id'] ?? 'unknown', |
||
| 362 | $response[WorkerApiCommands::REDIS_JOB_PROCESSING_ERROR] |
||
| 363 | ), |
||
| 364 | LOG_DEBUG |
||
| 365 | ); |
||
| 366 | $this->response->setPayloadError($response[WorkerApiCommands::REDIS_JOB_PROCESSING_ERROR]); |
||
| 367 | } else { |
||
| 368 | // Only log response time if it took longer than expected |
||
| 369 | if ($totalTime > 1.0) { |
||
| 370 | SystemMessages::sysLogMsg( |
||
| 371 | static::class, |
||
| 372 | sprintf( |
||
| 373 | "Slow response for action '%s': %.3fs with %d attempts", |
||
| 374 | $actionName, |
||
| 375 | $totalTime, |
||
| 376 | $attempts |
||
| 377 | ), |
||
| 378 | LOG_NOTICE |
||
| 379 | ); |
||
| 380 | } |
||
| 381 | $this->response->setPayloadSuccess($response); |
||
| 382 | } |
||
| 383 | |||
| 384 | } catch (Throwable $e) { |
||
| 385 | // Log the error with detailed information |
||
| 386 | $errorMessage = sprintf( |
||
| 387 | "Error in sendRequestToBackendWorker: %s (processor: %s, action: %s)", |
||
| 388 | $e->getMessage(), |
||
| 389 | $processor, |
||
| 390 | $actionName |
||
| 391 | ); |
||
| 392 | |||
| 393 | // Log to system log first for visibility in server logs |
||
| 394 | SystemMessages::sysLogMsg(static::class, $errorMessage, LOG_ERR); |
||
| 395 | |||
| 396 | // Then use critical error handler for potential Sentry capture |
||
| 397 | CriticalErrorsHandler::handleExceptionWithSyslog($e); |
||
| 398 | |||
| 399 | // Return error to client |
||
| 400 | $this->response->setPayloadError($e->getMessage()); |
||
| 401 | } |
||
| 493 |
This check looks for parameters that have been defined for a function or method, but which are not used in the method body.