Passed
Push — develop ( 38cefb...f96cf6 )
by Nikolay
04:56
created

WorkerApiCommands::prepareAnswer()   F

Complexity

Conditions 12
Paths 968

Size

Total Lines 77
Code Lines 53

Duplication

Lines 0
Ratio 0 %

Importance

Changes 2
Bugs 0 Features 0
Metric Value
eloc 53
c 2
b 0
f 0
dl 0
loc 77
rs 2.8443
cc 12
nc 968
nop 1

How to fix   Long Method    Complexity   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

1
<?php
2
3
/*
4
 * MikoPBX - free phone system for small business
5
 * Copyright © 2017-2023 Alexey Portnov and Nikolay Beketov
6
 *
7
 * This program is free software: you can redistribute it and/or modify
8
 * it under the terms of the GNU General Public License as published by
9
 * the Free Software Foundation; either version 3 of the License, or
10
 * (at your option) any later version.
11
 *
12
 * This program is distributed in the hope that it will be useful,
13
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15
 * GNU General Public License for more details.
16
 *
17
 * You should have received a copy of the GNU General Public License along with this program.
18
 * If not, see <https://www.gnu.org/licenses/>.
19
 */
20
21
namespace MikoPBX\PBXCoreREST\Workers;
22
23
use InvalidArgumentException;
24
use JsonException;
25
use MikoPBX\Common\Handlers\CriticalErrorsHandler;
26
use MikoPBX\Common\Providers\BeanstalkConnectionWorkerApiProvider;
27
use MikoPBX\Core\System\{BeanstalkClient, Configs\BeanstalkConf, Directories, Processes, SystemMessages};
28
use MikoPBX\Core\Workers\WorkerBase;
29
use MikoPBX\PBXCoreREST\Lib\ModulesManagementProcessor;
30
use MikoPBX\PBXCoreREST\Lib\PBXApiResult;
31
use MikoPBX\PBXCoreREST\Lib\PbxExtensionsProcessor;
32
use MikoPBX\PBXCoreREST\Lib\SystemManagementProcessor;
33
use RuntimeException;
34
use Throwable;
35
use function xdebug_break;
36
37
require_once 'Globals.php';
38
39
40
/**
41
 * The WorkerApiCommands class is responsible for handling API command requests from the frontend.
42
 *
43
 * It handles API command requests, delegates the processing to the appropriate processor classes,
44
 * and checks for restart requirements based on the received requests.
45
 *
46
 *
47
 * @package MikoPBX\PBXCoreREST\Workers
48
 */
49
class WorkerApiCommands extends WorkerBase
50
{
51
    /**
52
     * Maximum time to wait for child process (seconds)
53
     */
54
    private const int CHILD_PROCESS_TIMEOUT = 60;
0 ignored issues
show
Bug introduced by
A parse error occurred: Syntax error, unexpected T_STRING, expecting '=' on line 54 at column 22
Loading history...
55
56
57
    /**
58
     * Starts the worker.
59
     *
60
     * @param array $argv The command-line arguments passed to the worker.
61
     *
62
     * @return void
63
     */
64
    public function start(array $argv): void
65
    {
66
        /** @var BeanstalkConnectionWorkerApiProvider $beanstalk */
67
        $beanstalk = $this->di->getShared(BeanstalkConnectionWorkerApiProvider::SERVICE_NAME);
68
        if ($beanstalk->isConnected() === false) {
69
            SystemMessages::sysLogMsg(self::class, 'Fail connect to beanstalkd...');
70
            sleep(2);
71
            return;
72
        }
73
        $beanstalk->subscribe($this->makePingTubeName(self::class), [$this, 'pingCallBack']);
74
        $beanstalk->subscribe(__CLASS__, [$this, 'prepareAnswer']);
75
76
        while ($this->needRestart === false) {
77
            $beanstalk->wait();
78
        }
79
    }
80
81
    /**
82
     * Process API request from frontend
83
     *
84
     * @param BeanstalkClient $message
85
     *
86
     */
87
    public function prepareAnswer(BeanstalkClient $message): void
88
    {
89
        // Use fork to run the callback in a separate process
90
        $pid = pcntl_fork();
91
        if ($pid === -1) {
92
            // Fork failed
93
            throw new \RuntimeException("Failed to fork a new process.");
94
        }
95
96
        if ($pid === 0) {
97
            try {
98
                // Child process
99
                $this->setForked();
100
                $this->processRequest($message);
101
            } catch (Throwable $e) {
102
                CriticalErrorsHandler::handleExceptionWithSyslog($e);
103
                exit(1); // Exit with error
104
            }
105
            exit(0);
106
        }
107
108
        // Parent process
109
        $startTime = time();
110
        $status = 0;
111
112
        // Wait for child with timeout
113
        while (time() - $startTime < self::CHILD_PROCESS_TIMEOUT) {
114
            $res = pcntl_waitpid($pid, $status, WNOHANG);
115
            if ($res === -1) {
116
                throw new RuntimeException("Failed to wait for child process");
117
            }
118
            if ($res > 0) {
119
                // Child process completed
120
                if (pcntl_wifexited($status)) {
121
                    $exitStatus = pcntl_wexitstatus($status);
122
                    if ($exitStatus !== 0) {
123
                        throw new RuntimeException("Child process failed with status: $exitStatus");
124
                    }
125
                    return;
126
                }
127
                if (pcntl_wifsignaled($status)) {
128
                    $signal = pcntl_wtermsig($status);
129
                    throw new RuntimeException("Child process terminated by signal: $signal");
130
                }
131
                return;
132
            }
133
            usleep(100000); // Sleep 100ms
134
        }
135
136
        // Timeout reached
137
        posix_kill($pid, SIGTERM);
138
        throw new RuntimeException("Child process timed out");
139
    }
140
141
    /**
142
     * Process individual API request
143
     *
144
     * @param BeanstalkClient $message The message from beanstalk queue
145
     *
146
     * @throws JsonException If JSON parsing fails
147
     * @throws RuntimeException|Throwable If processor execution fails
148
     */
149
    private function processRequest(BeanstalkClient $message): void
150
    {
151
        $res = new PBXApiResult();
152
        $res->processor = __METHOD__;
153
        try {
154
            // Parse request JSON
155
            $request = $this->parseRequestJson($message);
156
157
            // Setup basic request parameters
158
            $async = (bool)($request['async'] ?? false);
159
            $processor = $this->resolveProcessor($request);
160
161
            $res->processor = $processor;
162
163
            // Old style, we can remove it in 2025
164
            if ($processor === 'modules') {
165
                $processor = PbxExtensionsProcessor::class;
166
            }
167
168
            // Handle debug mode if needed
169
            $this->handleDebugMode($request);
170
171
            // Process the request
172
            if (!method_exists($processor, 'callback')) {
173
                throw new RuntimeException("Unknown processor - {$processor}");
174
            }
175
176
            cli_set_process_title(__CLASS__ . '-' . $request['action']);
177
178
            // Execute request based on async flag
179
            if ($async) {
180
                $this->handleAsyncRequest($message, $request, $res);
181
            } else {
182
                $res = $processor::callback($request);
183
                $this->sendResponse($message, $res);
184
            }
185
186
            // Check if reload is needed after successful execution
187
            if ($res->success) {
188
                $this->checkNeedReload($request);
189
            }
190
191
        } catch (JsonException $e) {
192
            $this->handleError($res, "Invalid JSON in request: {$e->getMessage()}");
193
            $this->sendResponse($message, $res);
194
        } catch (InvalidArgumentException $e) {
195
            $this->handleError($res, "Invalid request parameters: {$e->getMessage()}");
196
            $this->sendResponse($message, $res);
197
        } catch (Throwable $e) {
198
            $this->handleError($res, CriticalErrorsHandler::handleExceptionWithSyslog($e));
199
            $this->sendResponse($message, $res);
200
            throw $e; // Re-throw for parent process to handle
201
        }
202
    }
203
204
    /**
205
     * Parse and validate request JSON
206
     *
207
     * @param BeanstalkClient $message
208
     * @return array
209
     * @throws JsonException
210
     */
211
    private function parseRequestJson(BeanstalkClient $message): array
212
    {
213
        $request = json_decode(
214
            $message->getBody(),
215
            true,
216
            512,
217
            JSON_THROW_ON_ERROR
218
        );
219
220
        if (!is_array($request)) {
221
            throw new InvalidArgumentException('Request must be a JSON object');
222
        }
223
224
        return $request;
225
    }
226
227
    /**
228
     * Resolve processor class name
229
     *
230
     * @param array $request
231
     * @return string
232
     * @throws InvalidArgumentException
233
     */
234
    private function resolveProcessor(array $request): string
235
    {
236
        $processor = $request['processor'] ?? '';
237
238
        // Handle legacy 'modules' processor name
239
        if ($processor === 'modules') {
240
            return PbxExtensionsProcessor::class;
241
        }
242
243
        if (empty($processor)) {
244
            throw new InvalidArgumentException('Processor name is required');
245
        }
246
247
        return $processor;
248
    }
249
250
    /**
251
     * Handle asynchronous request execution
252
     *
253
     * @param BeanstalkClient $message
254
     * @param array $request
255
     * @param PBXApiResult $res
256
     */
257
    private function handleAsyncRequest(
258
        BeanstalkClient $message,
259
        array           $request,
260
        PBXApiResult    $res
261
    ): void
262
    {
263
        $res->success = true;
264
        $res->messages['info'][] = sprintf(
265
            'The async job %s starts in background, you will receive answer on %s nchan channel',
266
            $request['action'],
267
            $request['asyncChannelId']
268
        );
269
270
        $this->sendResponse($message, $res);
271
        $request['processor']::callback($request);
272
    }
273
274
    /**
275
     * Send response back through beanstalk
276
     *
277
     * @param BeanstalkClient $message
278
     * @param PBXApiResult $res
279
     * @throws RuntimeException
280
     */
281
    private function sendResponse(BeanstalkClient $message, PBXApiResult $res): void
282
    {
283
        try {
284
            $result = $res->getResult();
285
            $encodedResult = json_encode($result);
286
287
            if ($encodedResult === false) {
288
                $res->data = [];
289
                $res->messages['error'][] = 'Failed to encode response to JSON';
290
                $encodedResult = json_encode($res->getResult());
291
            }
292
293
            // Handle large responses
294
            if (strlen($encodedResult) > BeanstalkConf::JOB_DATA_SIZE_LIMIT) {
295
                $encodedResult = $this->handleLargeResponse($result);
296
            }
297
298
            $message->reply($encodedResult);
299
300
        } catch (Throwable $e) {
301
            throw new RuntimeException(
302
                "Failed to send response: {$e->getMessage()}",
303
                0,
304
                $e
305
            );
306
        }
307
    }
308
309
    /**
310
     * Handle large response by storing in temporary file
311
     *
312
     * @param array $result
313
     * @return string JSON encoded response with file reference
314
     * @throws RuntimeException
315
     */
316
    private function handleLargeResponse(array $result): string
317
    {
318
        $downloadCacheDir = Directories::getDir(Directories::WWW_DOWNLOAD_CACHE_DIR);
319
320
        // Generate unique filename using uniqid()
321
        $filenameTmp = sprintf(
322
            '%s/temp-%s_%s.data',
323
            $downloadCacheDir,
324
            __FUNCTION__,
325
            uniqid('', true)
326
        );
327
328
        // Check available disk space
329
        if (disk_free_space($downloadCacheDir) < strlen(serialize($result))) {
330
            throw new RuntimeException('Insufficient disk space for temporary file');
331
        }
332
333
        if (!file_put_contents($filenameTmp, serialize($result))) {
334
            throw new RuntimeException("Failed to write response to temporary file");
335
        }
336
337
        return json_encode([BeanstalkClient::RESPONSE_IN_FILE => $filenameTmp]);
338
    }
339
340
    /**
341
     * Handle error cases
342
     *
343
     * @param PBXApiResult $res
344
     * @param string $message
345
     */
346
    private function handleError(PBXApiResult $res, string $message): void
347
    {
348
        $res->success = false;
349
        $res->messages['error'][] = $message;
350
        $res->data = [];
351
    }
352
353
354
    /**
355
     * Checks if the module or worker needs to be reloaded.
356
     *
357
     * @param array $request
358
     */
359
    private function checkNeedReload(array $request): void
360
    {
361
        // Check if new code added from modules
362
        $restartActions = $this->getNeedRestartActions();
363
        foreach ($restartActions as $processor => $actions) {
364
            foreach ($actions as $action) {
365
                if (
366
                    $processor === $request['processor']
367
                    && $action === $request['action']
368
                ) {
369
                    $this->needRestart = true;
370
                    Processes::restartAllWorkers();
371
                    return;
372
                }
373
            }
374
        }
375
    }
376
377
    /**
378
     * Prepares array of processor => action depends restart this kind worker
379
     *
380
     * @return array
381
     */
382
    private function getNeedRestartActions(): array
383
    {
384
        return [
385
            SystemManagementProcessor::class => [
386
                'restoreDefault',
387
            ],
388
            ModulesManagementProcessor::class => [
389
                'enableModule',
390
                'disableModule',
391
                'uninstallModule',
392
            ],
393
        ];
394
    }
395
396
    /**
397
     * Start xdebug session if request called with special header: "X-Debug-The-Request"
398
     *
399
     * Add xdebug.start_with_request = trigger to xdebug.ini
400
     *
401
     * @examples
402
     * curl -X POST \
403
     * -H 'Content-Type: application/json' \
404
     * -H 'Cookie: XDEBUG_SESSION=PHPSTORM' \
405
     * -H 'X-Debug-The-Request: 1' \
406
     * -d '{"filename": "/storage/usbdisk1/mikopbx/tmp/mikopbx-2023.1.223-x86_64.img"}' \
407
     * http://127.0.0.1/pbxcore/api/system/upgrade
408
     *
409
     * Or add a header at any semantic API request
410
     * $.api({
411
     *      url: ...,
412
     *      on: 'now',
413
     *      method: 'POST',
414
     *      beforeXHR(xhr) {
415
     *          xhr.setRequestHeader ('X-Debug-The-Request', 1);
416
     *          return xhr;
417
     *      },
418
     *      ...
419
     * });
420
     */
421
    private function handleDebugMode(array $request): void
422
    {
423
        if (isset($request['debug']) && $request['debug'] === true && extension_loaded('xdebug')) {
424
            if (function_exists('xdebug_connect_to_client')) {
425
                if (xdebug_connect_to_client()) {
426
                    xdebug_break();
427
                }
428
            }
429
        }
430
    }
431
}
432
433
// Start a worker process
434
WorkerApiCommands::startWorker($argv ?? []);
435