Passed
Push — develop ( 2a83e2...38cefb )
by Nikolay
05:24
created

WorkerBase   A

Complexity

Total Complexity 17

Size/Duplication

Total Lines 218
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
wmc 17
eloc 68
c 0
b 0
f 0
dl 0
loc 218
rs 10
1
<?php
2
/*
3
 * MikoPBX - free phone system for small business
4
 * Copyright © 2017-2023 Alexey Portnov and Nikolay Beketov
5
 *
6
 * This program is free software: you can redistribute it and/or modify
7
 * it under the terms of the GNU General Public License as published by
8
 * the Free Software Foundation; either version 3 of the License, or
9
 * (at your option) any later version.
10
 *
11
 * This program is distributed in the hope that it will be useful,
12
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14
 * GNU General Public License for more details.
15
 *
16
 * You should have received a copy of the GNU General Public License along with this program.
17
 * If not, see <https://www.gnu.org/licenses/>.
18
 */
19
20
namespace MikoPBX\Core\Workers;
21
22
use MikoPBX\Common\Handlers\CriticalErrorsHandler;
23
use MikoPBX\Core\Asterisk\AsteriskManager;
24
use MikoPBX\Core\System\BeanstalkClient;
25
use MikoPBX\Core\System\Processes;
26
use MikoPBX\Core\System\SystemMessages;
27
use MikoPBX\Core\System\Util;
28
use Phalcon\Di\Injectable;
29
use MikoPBX\Common\Library\Text;
30
use RuntimeException;
31
use Throwable;
32
33
/**
34
 * Base class for workers.
35
 * Provides core functionality for process management, signal handling, and worker lifecycle.
36
 *
37
 * @package MikoPBX\Core\Workers
38
 */
39
abstract class WorkerBase extends Injectable implements WorkerInterface
40
{
41
    /**
42
     * Signals that should be handled by the worker
43
     */
44
    private const array MANAGED_SIGNALS = [
1 ignored issue
show
Bug introduced by
A parse error occurred: Syntax error, unexpected T_STRING, expecting '=' on line 44 at column 24
Loading history...
45
        SIGUSR1,
46
        SIGTERM,
47
        SIGINT
48
    ];
49
    /**
50
     * Worker state constants
51
     */
52
    protected const int STATE_STARTING = 1;
53
    protected const int STATE_RUNNING = 2;
54
    protected const int STATE_STOPPING = 3;
55
    protected const int STATE_RESTARTING = 4;
56
57
    /**
58
     * Worker state constants with descriptions
59
     */
60
    protected const array WORKER_STATES = [
61
        self::STATE_STARTING   => 'STARTING',
62
        self::STATE_RUNNING    => 'RUNNING',
63
        self::STATE_STOPPING   => 'STOPPING',
64
        self::STATE_RESTARTING => 'RESTARTING'
65
    ];
66
67
    /**
68
     * File system constants
69
     */
70
    private const string PID_FILE_DIR = '/var/run';
71
    private const string PID_FILE_SUFFIX = '.pid';
72
73
    /**
74
     * Resource limits
75
     */
76
    private const string MEMORY_LIMIT = '256M';
77
    private const int ERROR_REPORTING_LEVEL = E_ALL;
78
79
    /**
80
     * Log message format constants
81
     */
82
    private const string LOG_FORMAT_STATE = '[%s][%s->%s] %s (%.3fs, PID:%d)';
83
    private const string LOG_FORMAT_SIGNAL = '[%s][%s] %s from %s (%.3fs, PID:%d)';
84
    private const string LOG_FORMAT_NORMAL_SHUTDOWN = '[%s][RUNNING->SHUTDOWN] Clean exit from %s (%.3fs, PID:%d)';
85
    private const string LOG_FORMAT_NORMAL_EXIT = '[%s] Successfully executed (%.3fs, PID:%d)';
86
    private const string LOG_FORMAT_ERROR_SHUTDOWN = '[%s][SHUTDOWN-ERROR] %s from %s (%.3fs, PID:%d)';
87
    private const string LOG_FORMAT_PING = '[%s][PING] %s from %s (%.3fs, PID:%d)';
88
89
90
    private const string LOG_NAMESPACE_SEPARATOR = '\\';
91
92
    /**
93
     * Maximum number of processes that can be created
94
     *
95
     * @var int
96
     */
97
    public int $maxProc = 1;
98
99
    /**
100
     * Instance of the Asterisk Manager
101
     *
102
     * @var AsteriskManager
103
     */
104
    protected AsteriskManager $am;
105
106
    /**
107
     * Flag indicating whether the worker needs to be restarted
108
     *
109
     * @var bool
110
     */
111
    protected bool $needRestart = false;
112
113
    /**
114
     * Time the worker started
115
     *
116
     * @var float
117
     */
118
    protected float $workerStartTime;
119
120
    /**
121
     * Current state of the worker
122
     *
123
     * @var int
124
     */
125
    protected int $workerState = self::STATE_STARTING;
126
127
    /**
128
     * Constructs a WorkerBase instance.
129
     * Initializes signal handlers, sets up resource limits, and saves PID file.
130
     *
131
     * @throws RuntimeException If critical initialization fails
132
     */
133
    final public function __construct()
134
    {
135
        try {
136
            $this->setResourceLimits();
137
            $this->initializeSignalHandlers();
138
            register_shutdown_function([$this, 'shutdownHandler']);
139
140
            $this->workerStartTime = microtime(true);
141
            $this->setWorkerState(self::STATE_STARTING);
142
            $this->savePidFile();
143
144
        } catch (Throwable $e) {
145
            CriticalErrorsHandler::handleExceptionWithSyslog($e);
146
            throw $e;
147
        }
148
    }
149
150
    /**
151
     * Sets resource limits for the worker process
152
     */
153
    protected function setResourceLimits(): void
154
    {
155
        ini_set('memory_limit', self::MEMORY_LIMIT);
156
        error_reporting(self::ERROR_REPORTING_LEVEL);
157
        ini_set('display_errors', '1');
158
        set_time_limit(0);
159
    }
160
161
162
    /**
163
     * Initializes signal handlers for the worker
164
     */
165
    private function initializeSignalHandlers(): void
166
    {
167
        pcntl_async_signals(true);
168
        foreach (self::MANAGED_SIGNALS as $signal) {
169
            pcntl_signal($signal, [$this, 'signalHandler'], true);
170
        }
171
    }
172
173
    /**
174
     * Updates worker state and logs the change
175
     */
176
    protected function setWorkerState(int $state): void
177
    {
178
        $oldState = $this->workerState ?? 'UNDEFINED';
179
        $this->workerState = $state;
180
181
        $workerName = basename(str_replace(self::LOG_NAMESPACE_SEPARATOR, '/', static::class));
182
        $timeElapsed = round(microtime(true) - $this->workerStartTime, 3);
183
        $namespacePath = implode('.', array_slice(explode(self::LOG_NAMESPACE_SEPARATOR, static::class), 0, -1));
184
185
        SystemMessages::sysLogMsg(
186
            static::class,
187
            sprintf(
188
                self::LOG_FORMAT_STATE,
189
                $workerName,
190
                self::WORKER_STATES[$oldState] ?? 'UNDEFINED',
191
                self::WORKER_STATES[$state] ?? 'UNKNOWN',
192
                $namespacePath,
193
                $timeElapsed,
194
                getmypid()
195
            ),
196
            LOG_DEBUG
197
        );
198
    }
199
200
201
    /**
202
     * Save PID to file(s) with error handling
203
     *
204
     * @throws RuntimeException If unable to write PID file
205
     */
206
    private function savePidFile(): void
207
    {
208
        try {
209
            $activeProcesses = Processes::getPidOfProcess(static::class);
210
            $processes = array_filter(explode(' ', $activeProcesses));
211
212
            if (count($processes) === 1) {
213
                $this->saveSinglePidFile($activeProcesses);
214
            } else {
215
                $this->saveMultiplePidFiles($processes);
216
            }
217
        } catch (Throwable $e) {
218
            SystemMessages::sysLogMsg(
219
                static::class,
220
                "Failed to save PID file: " . $e->getMessage(),
221
                LOG_WARNING
222
            );
223
            throw new RuntimeException('PID file operation failed', 0, $e);
224
        }
225
    }
226
227
    /**
228
     * Saves a single PID to file
229
     *
230
     * @param string $pid Process ID to save
231
     * @throws RuntimeException If write fails
232
     */
233
    private function saveSinglePidFile(string $pid): void
234
    {
235
        if (!file_put_contents($this->getPidFile(), $pid)) {
236
            throw new RuntimeException('Could not write to PID file');
237
        }
238
    }
239
240
    /**
241
     * Saves multiple PIDs to separate files
242
     *
243
     * @param array $processes Array of process IDs
244
     * @throws RuntimeException If write fails
245
     */
246
    private function saveMultiplePidFiles(array $processes): void
247
    {
248
        $pidFilesDir = dirname($this->getPidFile());
249
        $baseName = (string)pathinfo($this->getPidFile(), PATHINFO_BASENAME);
250
        $pidFile = $pidFilesDir . '/' . $baseName;
251
252
        // Delete old PID files
253
        $rm = Util::which('rm');
254
        Processes::mwExec("$rm -rf $pidFile*");
255
256
        foreach ($processes as $index => $process) {
257
            $pidFilePath = sprintf("%s-%d%s", $pidFile, $index + 1, self::PID_FILE_SUFFIX);
258
            if (!file_put_contents($pidFilePath, $process)) {
259
                throw new RuntimeException("Could not write to PID file: $pidFilePath");
260
            }
261
        }
262
    }
263
264
    /**
265
     * Generate the PID file path for the worker
266
     *
267
     * @return string The path to the PID file
268
     */
269
    public function getPidFile(): string
270
    {
271
        $name = str_replace("\\", '-', static::class);
272
        return self::PID_FILE_DIR . "/$name" . self::PID_FILE_SUFFIX;
273
    }
274
275
    /**
276
     * Starts the worker process
277
     *
278
     * @param array $argv Command-line arguments
279
     * @param bool $setProcName Whether to set process name
280
     * @return void
281
     */
282
    public static function startWorker(array $argv, bool $setProcName = true): void
283
    {
284
        $action = $argv[1] ?? '';
285
        if ($action === 'start') {
286
            $workerClassname = static::class;
287
288
            if ($setProcName) {
289
                cli_set_process_title($workerClassname);
290
            }
291
292
            try {
293
                $worker = new $workerClassname();
294
                $worker->setWorkerState(self::STATE_RUNNING);
295
                $worker->start($argv);
296
                $worker->logNormalExit();
297
            } catch (Throwable $e) {
298
                CriticalErrorsHandler::handleExceptionWithSyslog($e);
299
                sleep(1);
300
            }
301
        }
302
    }
303
304
    /**
305
     * Handles various signals received by the worker
306
     *
307
     * @param int $signal Signal number
308
     * @return void
309
     */
310
    public function signalHandler(int $signal): void
311
    {
312
        $workerName = basename(str_replace(self::LOG_NAMESPACE_SEPARATOR, '/', static::class));
313
        $timeElapsed = round(microtime(true) - $this->workerStartTime, 3);
314
        $namespacePath = implode('.', array_slice(explode(self::LOG_NAMESPACE_SEPARATOR, static::class), 0, -1));
315
316
        $signalNames = [
317
            SIGUSR1 => 'SIGUSR1',
318
            SIGTERM => 'SIGTERM',
319
            SIGINT  => 'SIGINT'
320
        ];
321
322
        SystemMessages::sysLogMsg(
323
            static::class,
324
            sprintf(
325
                self::LOG_FORMAT_SIGNAL,
326
                $workerName,
327
                $signalNames[$signal] ?? "SIG_$signal",
328
                'received',
329
                $namespacePath,
330
                $timeElapsed,
331
                getmypid()
332
            ),
333
            LOG_DEBUG
334
        );
335
336
        switch ($signal) {
337
            case SIGUSR1:
338
                $this->setWorkerState(self::STATE_RESTARTING);
339
                $this->needRestart = true;
340
                break;
341
            case SIGTERM:
342
            case SIGINT:
343
                $this->setWorkerState(self::STATE_STOPPING);
344
                exit(0);
345
            default:
346
                // Log unhandled signal
347
                SystemMessages::sysLogMsg(
348
                    $workerName,
349
                    sprintf("Unhandled signal received: %d", $signal),
350
                    LOG_WARNING
351
                );
352
        }
353
    }
354
355
356
    /**
357
     * Handles the shutdown event of the worker
358
     *
359
     * @return void
360
     */
361
    public function shutdownHandler(): void
362
    {
363
        $timeElapsedSecs = round(microtime(true) - $this->workerStartTime, 2);
364
        $processTitle = cli_get_process_title();
365
366
        $error = error_get_last();
367
        if ($error === null) {
368
            $this->logNormalShutdown($processTitle, $timeElapsedSecs);
369
        } else {
370
            $this->logErrorShutdown($processTitle, $timeElapsedSecs, $error);
371
        }
372
373
        $this->cleanupPidFile();
374
    }
375
376
    /**
377
     * Logs normal shutdown event
378
     *
379
     * @param string $processTitle Process title
380
     * @param float $timeElapsedSecs Time elapsed since start
381
     */
382
    private function logNormalShutdown(string $processTitle, float $timeElapsedSecs): void
383
    {
384
        $workerName = basename(str_replace(self::LOG_NAMESPACE_SEPARATOR, '/', static::class));
385
        $namespacePath = implode('.', array_slice(explode(self::LOG_NAMESPACE_SEPARATOR, static::class), 0, -1));
386
387
        SystemMessages::sysLogMsg(
388
            $processTitle,
389
            sprintf(
390
                self::LOG_FORMAT_NORMAL_SHUTDOWN,
391
                $workerName,
392
                $namespacePath,
393
                $timeElapsedSecs,
394
                getmypid()
395
            ),
396
            LOG_DEBUG
397
        );
398
    }
399
400
    /**
401
     * Logs error shutdown event
402
     *
403
     * @param string $processTitle Process title
404
     * @param float $timeElapsedSecs Time elapsed since start
405
     * @param array $error Error details
406
     */
407
    private function logErrorShutdown(string $processTitle, float $timeElapsedSecs, array $error): void
408
    {
409
        $workerName = basename(str_replace(self::LOG_NAMESPACE_SEPARATOR, '/', static::class));
410
        $namespacePath = implode('.', array_slice(explode(self::LOG_NAMESPACE_SEPARATOR, static::class), 0, -1));
411
        $errorMessage = $error['message'] ?? 'Unknown error';
412
413
        SystemMessages::sysLogMsg(
414
            $processTitle,
415
            sprintf(
416
                self::LOG_FORMAT_ERROR_SHUTDOWN,
417
                $workerName,
418
                $errorMessage,
419
                $namespacePath,
420
                $timeElapsedSecs,
421
                getmypid()
422
            ),
423
            LOG_ERR
424
        );
425
    }
426
427
    /**
428
     * Cleans up PID file during shutdown
429
     */
430
    private function cleanupPidFile(): void
431
    {
432
        try {
433
            $pidFile = $this->getPidFile();
434
            if (file_exists($pidFile)) {
435
                unlink($pidFile);
436
            }
437
        } catch (Throwable $e) {
438
            SystemMessages::sysLogMsg(
439
                static::class,
440
                "Failed to cleanup PID file: " . $e->getMessage(),
441
                LOG_WARNING
442
            );
443
        }
444
    }
445
446
    /**
447
     * Handles ping callback to keep connection alive
448
     *
449
     * @param BeanstalkClient $message Received message
450
     * @return void
451
     */
452
    public function pingCallBack(BeanstalkClient $message): void
453
    {
454
        try {
455
            $workerName = basename(str_replace(self::LOG_NAMESPACE_SEPARATOR, '/', static::class));
456
            $namespacePath = implode('.', array_slice(explode(self::LOG_NAMESPACE_SEPARATOR, static::class), 0, -1));
457
            $timeElapsed = round(microtime(true) - $this->workerStartTime, 3);
458
459
            SystemMessages::sysLogMsg(
460
                cli_get_process_title(),
461
                sprintf(
462
                    self::LOG_FORMAT_PING,
463
                    $workerName,
464
                    substr(json_encode($message->getBody()), 0, 50),  // Truncate long messages
465
                    $namespacePath,
466
                    $timeElapsed,
467
                    getmypid()
468
                ),
469
                LOG_DEBUG
470
            );
471
472
            $message->reply(json_encode($message->getBody() . ':pong', JSON_THROW_ON_ERROR));
473
        } catch (Throwable $e) {
474
            SystemMessages::sysLogMsg(
475
                static::class,
476
                sprintf(
477
                    '[%s][PING-ERROR] %s from %s (%.3fs, PID:%d)',
478
                    $workerName,
479
                    $e->getMessage(),
480
                    $namespacePath,
481
                    $timeElapsed,
482
                    getmypid()
483
                ),
484
                LOG_WARNING
485
            );
486
        }
487
    }
488
489
    /**
490
     * Logs normal exit after operation
491
     */
492
    private function logNormalExit(): void
493
    {
494
        $workerName = basename(str_replace(self::LOG_NAMESPACE_SEPARATOR, '/', static::class));
495
        $timeElapsed = round(microtime(true) - $this->workerStartTime, 3);
496
497
        SystemMessages::sysLogMsg(
498
            static::class,
499
            sprintf(
500
                self::LOG_FORMAT_NORMAL_EXIT,
501
                $workerName,
502
                $timeElapsed,
503
                getmypid()
504
            ),
505
            LOG_DEBUG
506
        );
507
    }
508
509
    /**
510
     * Replies to a ping request from the worker
511
     *
512
     * @param array $parameters Request parameters
513
     * @return bool True if ping request was processed
514
     */
515
    public function replyOnPingRequest(array $parameters): bool
516
    {
517
        try {
518
            $pingTube = $this->makePingTubeName(static::class);
519
            if ($pingTube === $parameters['UserEvent']) {
520
                $this->am->UserEvent("{$pingTube}Pong", []);
521
                return true;
522
            }
523
        } catch (Throwable $e) {
524
            SystemMessages::sysLogMsg(
525
                static::class,
526
                "Ping reply failed: " . $e->getMessage(),
527
                LOG_WARNING
528
            );
529
        }
530
        return false;
531
    }
532
533
    /**
534
     * Generates the ping tube name for a worker class
535
     *
536
     * @param string $workerClassName Worker class name
537
     * @return string Generated ping tube name
538
     */
539
    public function makePingTubeName(string $workerClassName): string
540
    {
541
        return Text::camelize("ping_$workerClassName", '\\');
542
    }
543
544
    /**
545
     * Destructor - ensures PID file is saved on object destruction
546
     */
547
    public function __destruct()
548
    {
549
        try {
550
            if ($this->workerState !== self::STATE_STOPPING) {
551
                $this->savePidFile();
552
            }
553
        } catch (Throwable $e) {
554
            SystemMessages::sysLogMsg(
555
                static::class,
556
                "Destructor failed: " . $e->getMessage(),
557
                LOG_WARNING
558
            );
559
        }
560
    }
561
}