BaseController   A
last analyzed

Complexity

Total Complexity 38

Size/Duplication

Total Lines 451
Duplicated Lines 0 %

Importance

Changes 1
Bugs 0 Features 0
Metric Value
wmc 38
eloc 230
dl 0
loc 451
rs 9.36
c 1
b 0
f 0

4 Methods

Rating   Name   Duplication   Size   Complexity  
F sendRequestToBackendWorker() 0 347 27
A prepareRequestMessage() 0 31 4
A sanitizeData() 0 22 6
A sendError() 0 6 1
1
<?php
2
3
/*
4
 * MikoPBX - free phone system for small business
5
 * Copyright © 2017-2023 Alexey Portnov and Nikolay Beketov
6
 *
7
 * This program is free software: you can redistribute it and/or modify
8
 * it under the terms of the GNU General Public License as published by
9
 * the Free Software Foundation; either version 3 of the License, or
10
 * (at your option) any later version.
11
 *
12
 * This program is distributed in the hope that it will be useful,
13
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15
 * GNU General Public License for more details.
16
 *
17
 * You should have received a copy of the GNU General Public License along with this program.
18
 * If not, see <https://www.gnu.org/licenses/>.
19
 */
20
21
declare(strict_types=1);
22
23
namespace MikoPBX\PBXCoreREST\Controllers;
24
25
use MikoPBX\Common\Handlers\CriticalErrorsHandler;
26
use MikoPBX\Common\Providers\RedisClientProvider;
27
use MikoPBX\Core\System\SystemMessages;
28
use MikoPBX\PBXCoreREST\Lib\PbxExtensionsProcessor;
29
use MikoPBX\PBXCoreREST\Workers\WorkerApiCommands;
30
use Phalcon\Filter\Filter;
31
use Phalcon\Mvc\Controller;
32
use RedisException;
33
use Throwable;
34
35
/**
36
 * Class BaseController
37
 * @property \MikoPBX\PBXCoreREST\Http\Response $response
38
 * @property \MikoPBX\PBXCoreREST\Http\Request $request
39
 */
40
class BaseController extends Controller
41
{
42
    /**
43
     * Send a request to the backend worker.
44
     *
45
     * @param string $processor The name of the processor.
46
     * @param string $actionName The name of the action.
47
     * @param mixed|null $payload The payload data to send with the request.
48
     * @param string $moduleName The name of the module (only for 'modules' processor).
49
     * @param int $maxTimeout The maximum timeout for the request in seconds.
50
     * @param int $priority The priority of the request.
51
     *
52
     * @return void
53
     */
54
    public function sendRequestToBackendWorker(
55
        string $processor,
56
        string $actionName,
57
        mixed $payload = null,
58
        string $moduleName = '',
59
        int $maxTimeout = 30,
60
        int $priority = 0
0 ignored issues
show
Unused Code introduced by
The parameter $priority is not used and could be removed. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-unused  annotation

60
        /** @scrutinizer ignore-unused */ int $priority = 0

This check looks for parameters that have been defined for a function or method, but which are not used in the method body.

Loading history...
61
    ): void {
62
        [$debug, $requestMessage] = $this->prepareRequestMessage($processor, $payload, $actionName, $moduleName);
63
        if ($debug) {
64
            $maxTimeout = 9999;
65
        } else {
66
            // Ensure minimum timeout for testing
67
            $maxTimeout = max($maxTimeout, 30);
68
        }
69
70
        try {
71
            // Initialize Redis connection
72
            $redis = $this->di->get(RedisClientProvider::SERVICE_NAME);
73
74
            // Generate unique request ID
75
            $requestMessage['request_id'] = uniqid("req_{$actionName}_", true);
76
77
            // Push request to queue
78
            $pushResult = $redis->rpush(WorkerApiCommands::REDIS_API_QUEUE, json_encode($requestMessage));
79
            
80
            // Проверяем, что задача действительно добавлена в очередь
81
            if ($pushResult <= 0) {
82
                throw new \RuntimeException("Failed to push request to Redis queue");
83
            }
84
            
85
            // Проверяем текущий размер очереди и логируем
86
            $queueLength = $redis->lLen(WorkerApiCommands::REDIS_API_QUEUE);
87
            SystemMessages::sysLogMsg(
88
                static::class,
89
                sprintf(
90
                    "Request added to queue: action=%s, id=%s, queue_position=%d/%d",
91
                    $actionName,
92
                    $requestMessage['request_id'],
93
                    $pushResult,
94
                    $queueLength
95
                ),
96
                LOG_DEBUG
97
            );
98
            
99
            // Подсчитываем, сколько активных воркеров доступны для обработки
100
            $activeWorkers = $redis->keys('worker_api_commands:*');
101
            $runningWorkers = count($activeWorkers);
102
            
103
            // Если в очереди много запросов, но мало воркеров, логируем предупреждение
104
            if ($queueLength > $runningWorkers * 2) {
105
                SystemMessages::sysLogMsg(
106
                    static::class,
107
                    sprintf(
108
                        "WARNING: Queue backlog detected - %d requests in queue with only %d workers",
109
                        $queueLength,
110
                        $runningWorkers
111
                    ),
112
                    LOG_WARNING
113
                );
114
            }
115
116
            if ($requestMessage['async']) {
117
                $this->response->setPayloadSuccess(['success' => true]);
118
                return;
119
            }
120
121
            // Get response key
122
            $responseKey = WorkerApiCommands::REDIS_API_RESPONSE_PREFIX . $requestMessage['request_id'];
123
            $response = null;
124
            $startTime = microtime(true);
125
            $retryCount = 0;
126
            $maxRetries = 5;
127
            
128
            // Calculate end time
129
            $endTime = $startTime + $maxTimeout;
130
            
131
            // Polling intervals strategy: fast polling at first, then gradually slow down
132
            $pollingIntervals = [
133
                // Fast polling for first second (10ms)
134
                ['duration' => 1, 'interval' => 10000],
135
                // Medium polling for next 4 seconds (50ms)
136
                ['duration' => 4, 'interval' => 50000],
137
                // Slower polling for next 10 seconds (100ms)
138
                ['duration' => 10, 'interval' => 100000],
139
                // Very slow polling for the rest (250ms)
140
                ['duration' => $maxTimeout, 'interval' => 250000]
141
            ];
142
            
143
            $currentPollingStage = 0;
144
            $stageStartTime = $startTime;
145
            $currentInterval = $pollingIntervals[0]['interval'];
146
            $attempts = 0;
147
            SystemMessages::sysLogMsg(
148
                static::class,
149
                sprintf(
150
                    "Waiting for response on key: %s",
151
                    $responseKey,
152
                ),
153
                LOG_DEBUG
154
            );
155
            while (microtime(true) < $endTime) {
156
                try {
157
                    // Check if we need to adjust polling interval
158
                    $elapsedTime = microtime(true) - $startTime;
0 ignored issues
show
Unused Code introduced by
The assignment to $elapsedTime is dead and can be removed.
Loading history...
159
                    $stageElapsedTime = microtime(true) - $stageStartTime;
160
                    
161
                    if ($currentPollingStage < count($pollingIntervals) - 1 && 
162
                        $stageElapsedTime >= $pollingIntervals[$currentPollingStage]['duration']) {
163
                        // Move to next polling stage
164
                        $currentPollingStage++;
165
                        $stageStartTime = microtime(true);
166
                        $currentInterval = $pollingIntervals[$currentPollingStage]['interval'];
167
                        
168
                        // Skip logging interval changes to reduce noise
169
                    }
170
                    
171
                    // Check for response
172
                    $attempts++;
173
                    $encodedResponse = $redis->get($responseKey);
174
                    
175
                    if ($encodedResponse !== false) {
176
                        $responseTime = microtime(true);
177
                        $responseDelay = $responseTime - $startTime;
178
                        
179
                        // Response received - only log if it took longer than expected (over 1 second)
180
                        if ($responseDelay > 1.0) {
181
                            SystemMessages::sysLogMsg(
182
                                static::class,
183
                                sprintf(
184
                                    "Delayed response for action '%s' received after %.3fs (attempts: %d)",
185
                                    $actionName,
186
                                    $responseDelay,
187
                                    $attempts
188
                                ),
189
                                LOG_NOTICE
190
                            );
191
                        }
192
                        
193
                        $response = json_decode($encodedResponse, true);
194
                        break;
195
                    }
196
                    
197
                    // Short sleep before next check
198
                    usleep($currentInterval);
199
                    
200
                } catch (RedisException $redisException) {
201
                    // Specific handling for Redis exceptions
202
                    $retryCount++;
203
                    
204
                    if ($retryCount > $maxRetries) {
205
                        // Log last failed attempt before throwing
206
                        SystemMessages::sysLogMsg(
207
                            static::class,
208
                            sprintf(
209
                                "Max Redis retry attempts exceeded: %s (action: %s)",
210
                                $redisException->getMessage(),
211
                                $actionName
212
                            ),
213
                            LOG_ERR
214
                        );
215
                        throw $redisException; // Max retries exceeded
216
                    }
217
                    
218
                    // Log Redis-specific retry attempt
219
                    $errorMessage = sprintf(
220
                        "Redis connection error during worker request, retrying (%d/%d): %s (action: %s)",
221
                        $retryCount, 
222
                        $maxRetries, 
223
                        $redisException->getMessage(),
224
                        $actionName
225
                    );
226
                    
227
                    // Log to system log
228
                    SystemMessages::sysLogMsg(static::class, $errorMessage, LOG_WARNING);
229
                    
230
                    // Also log with error handler for potential Sentry capture
231
                    CriticalErrorsHandler::handleExceptionWithSyslog(
232
                        new \Exception($errorMessage)
233
                    );
234
                    
235
                    // Exponential backoff with jitter
236
                    $sleepTime = min(pow(2, $retryCount) * 100000, 2000000) + mt_rand(0, 100000);
237
                    usleep($sleepTime);
238
                    
239
                    // Reconnect
240
                    $redis = $this->di->get(RedisClientProvider::SERVICE_NAME);
241
                } catch (Throwable $otherException) {
242
                    // Handle other errors with retry
243
                    $retryCount++;
244
                    
245
                    if ($retryCount > $maxRetries) {
246
                        // Log last failed attempt before throwing
247
                        SystemMessages::sysLogMsg(
248
                            static::class,
249
                            sprintf(
250
                                "Max retry attempts exceeded for general exception: %s (action: %s)",
251
                                $otherException->getMessage(),
252
                                $actionName
253
                            ),
254
                            LOG_ERR
255
                        );
256
                        throw $otherException; // Max retries exceeded
257
                    }
258
                    
259
                    // Log general exception retry attempt
260
                    $errorMessage = sprintf(
261
                        "Error during worker request, retrying (%d/%d): %s (action: %s)",
262
                        $retryCount, 
263
                        $maxRetries, 
264
                        $otherException->getMessage(),
265
                        $actionName
266
                    );
267
                    
268
                    // Log to system log
269
                    SystemMessages::sysLogMsg(static::class, $errorMessage, LOG_WARNING);
270
                    
271
                    // Also log with error handler for potential Sentry capture
272
                    CriticalErrorsHandler::handleExceptionWithSyslog(
273
                        new \Exception($errorMessage)
274
                    );
275
                    
276
                    // Exponential backoff with jitter
277
                    $sleepTime = min(pow(2, $retryCount) * 100000, 2000000) + mt_rand(0, 100000);
278
                    usleep($sleepTime);
279
                    
280
                    // Reconnect
281
                    $redis = $this->di->get(RedisClientProvider::SERVICE_NAME);
282
                }
283
            }
284
285
            // Clean up
286
            try {
287
                $redis->del($responseKey);
288
            } catch (Throwable $e) {
289
                // Ignore cleanup errors
290
            }
291
292
            // Calculate total time taken
293
            $totalTime = microtime(true) - $startTime;
294
295
            if ($response === null) {
296
                SystemMessages::sysLogMsg(
297
                    static::class,
298
                    sprintf(
299
                        "Request timeout after %.3fs: No response received for action %s",
300
                        $totalTime,
301
                        $actionName
302
                    ),
303
                    LOG_WARNING
304
                );
305
                $this->response->setPayloadError('Request timeout or worker not responding');
306
                return;
307
            }
308
309
           // Handle file-based response
310
           if (isset($response[WorkerApiCommands::REDIS_RESPONSE_IN_FILE])) {
311
            $filename = $response[WorkerApiCommands::REDIS_RESPONSE_IN_FILE];
312
            if (file_exists($filename)) {
313
                $fileContent = file_get_contents($filename);
314
                if ($fileContent !== false) {
315
                    // Check if response is compressed
316
                    if (isset($response['compressed']) && $response['compressed'] === true) {
317
                        $fileContent = gzdecode($fileContent);
318
                    }
319
                    
320
                    $response = unserialize($fileContent);
321
                    unlink($filename);
322
                    $this->response->setPayloadSuccess($response);
323
                } else {
324
                    $this->response->setPayloadError('Failed to read response file');
325
                }
326
            } else {
327
                $this->response->setPayloadError('Response file not found');
328
            }
329
            return;
330
        }
331
        
332
        // Handle compressed Redis-based large response
333
        if (isset($response['large_response_redis'])) {
334
            $redisKey = $response['large_response_redis'];
335
            $compressedData = $redis->get($redisKey);
336
            
337
            if ($compressedData !== false) {
338
                // Clear the key since we're consuming the data
339
                $redis->del($redisKey);
340
                
341
                // Decompress and unserialize the data
342
                $data = gzdecode($compressedData);
343
                if ($data !== false) {
344
                    $response = unserialize($data);
345
                    $this->response->setPayloadSuccess($response);
346
                } else {
347
                    $this->response->setPayloadError('Failed to decompress response data');
348
                }
349
            } else {
350
                $this->response->setPayloadError('Large response data not found in Redis');
351
            }
352
            return;
353
        }
354
355
356
            if (array_key_exists(WorkerApiCommands::REDIS_JOB_PROCESSING_ERROR, $response)) {
357
                SystemMessages::sysLogMsg(
358
                    static::class,
359
                    sprintf(
360
                        "Setting error response: job_id=%s, error=%s",
361
                        $requestMessage['request_id'] ?? 'unknown',
362
                        $response[WorkerApiCommands::REDIS_JOB_PROCESSING_ERROR]
363
                    ),
364
                    LOG_DEBUG
365
                );
366
                $this->response->setPayloadError($response[WorkerApiCommands::REDIS_JOB_PROCESSING_ERROR]);
367
            } else {
368
                // Only log response time if it took longer than expected
369
                if ($totalTime > 1.0) {
370
                    SystemMessages::sysLogMsg(
371
                        static::class,
372
                        sprintf(
373
                            "Slow response for action '%s': %.3fs with %d attempts",
374
                            $actionName,
375
                            $totalTime,
376
                            $attempts
377
                        ),
378
                        LOG_NOTICE
379
                    );
380
                }
381
                $this->response->setPayloadSuccess($response);
382
            }
383
384
        } catch (Throwable $e) {
385
            // Log the error with detailed information
386
            $errorMessage = sprintf(
387
                "Error in sendRequestToBackendWorker: %s (processor: %s, action: %s)", 
388
                $e->getMessage(),
389
                $processor,
390
                $actionName
391
            );
392
            
393
            // Log to system log first for visibility in server logs
394
            SystemMessages::sysLogMsg(static::class, $errorMessage, LOG_ERR);
395
            
396
            // Then use critical error handler for potential Sentry capture
397
            CriticalErrorsHandler::handleExceptionWithSyslog($e);
398
            
399
            // Return error to client
400
            $this->response->setPayloadError($e->getMessage());
401
        }
402
    }
403
404
405
    /**
406
     * Sets the response with an error code
407
     *
408
     * @param int    $code
409
     * @param string $description
410
     */
411
    protected function sendError(int $code, string $description = ''): void
412
    {
413
        $this
414
            ->response
415
            ->setPayloadError($this->response->getHttpCodeDescription($code) . ' ' . $description)
416
            ->setStatusCode($code);
417
    }
418
419
    /**
420
     * Prepare a request message for sending to backend worker
421
     *
422
     * @param string $processor
423
     * @param mixed $payload
424
     * @param string $actionName
425
     * @param string $moduleName
426
     * @return array
427
     */
428
    public function prepareRequestMessage(
429
        string $processor,
430
        mixed $payload,
431
        string $actionName,
432
        string $moduleName
433
    ): array {
434
        // Old style modules, we can remove it after 2025
435
        if ($processor === 'modules') {
436
            $processor = PbxExtensionsProcessor::class;
437
        }
438
439
        $requestMessage = [
440
            'processor' => $processor,
441
            'data' => $payload,
442
            'action' => $actionName,
443
            'async' => false,
444
            'asyncChannelId' => ''
445
        ];
446
447
        if ($this->request->isAsyncRequest()) {
448
            $requestMessage['async'] = true;
449
            $requestMessage['asyncChannelId'] = $this->request->getAsyncRequestChannelId();
450
        }
451
452
        if ($processor === PbxExtensionsProcessor::class) {
453
            $requestMessage['module'] = $moduleName;
454
        }
455
456
        $requestMessage['debug'] = $this->request->isDebugRequest();
457
458
        return array($requestMessage['debug'], $requestMessage);
459
    }
460
461
    /**
462
     * Recursively sanitizes input data based on the provided filter.
463
     *
464
     * @param array $data The data to be sanitized.
465
     * @param \Phalcon\Filter\FilterInterface $filter The filter object used for sanitization.
466
     *
467
     * @return array The sanitized data.
468
     */
469
    public static function sanitizeData(array $data, \Phalcon\Filter\FilterInterface $filter): array
470
    {
471
        foreach ($data as $key => $value) {
472
            if (is_array($value)) {
473
                // Recursively sanitize array values
474
                $data[$key] = self::sanitizeData($value, $filter);
475
            } elseif (is_string($value)) {
476
                // Check if the string starts with 'http'
477
                if (stripos($value, 'http') === 0) {
478
                    // If the string starts with 'http', sanitize it as a URL
479
                    $data[$key] = $filter->sanitize($value, FILTER::FILTER_URL);
480
                } else {
481
                    // Sanitize regular strings (trim and remove illegal characters)
482
                    $data[$key] = $filter->sanitize($value, [FILTER::FILTER_STRING, FILTER::FILTER_TRIM]);
483
                }
484
            } elseif (is_numeric($value)) {
485
                // Sanitize numeric values as integers
486
                $data[$key] = $filter->sanitize($value, FILTER::FILTER_INT);
487
            }
488
        }
489
490
        return $data;
491
    }
492
}
493