WorkerSafeScriptsCore::checkPidNotAlert()   A
last analyzed

Complexity

Conditions 5
Paths 7

Size

Total Lines 36
Code Lines 20

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 20
dl 0
loc 36
rs 9.2888
c 0
b 0
f 0
cc 5
nc 7
nop 1
1
<?php
2
/*
3
 * MikoPBX - free phone system for small business
4
 * Copyright © 2017-2025 Alexey Portnov and Nikolay Beketov
5
 *
6
 * This program is free software: you can redistribute it and/or modify
7
 * it under the terms of the GNU General Public License as published by
8
 * the Free Software Foundation; either version 3 of the License, or
9
 * (at your option) any later version.
10
 *
11
 * This program is distributed in the hope that it will be useful,
12
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14
 * GNU General Public License for more details.
15
 *
16
 * You should have received a copy of the GNU General Public License along with this program.
17
 * If not, see <https://www.gnu.org/licenses/>.
18
 */
19
20
namespace MikoPBX\Core\Workers\Cron;
21
22
require_once 'Globals.php';
23
24
use Fiber;
25
use MikoPBX\Common\Handlers\CriticalErrorsHandler;
26
use MikoPBX\Common\Providers\PBXConfModulesProvider;
27
use MikoPBX\Core\System\System;
28
use MikoPBX\Core\System\{BeanstalkClient, PBX, Processes, SystemMessages, Util};
29
use MikoPBX\Core\Workers\WorkerBase;
30
use MikoPBX\Core\Workers\WorkerBeanstalkdTidyUp;
31
use MikoPBX\Core\Workers\WorkerCallEvents;
32
use MikoPBX\Core\Workers\WorkerCdr;
33
use MikoPBX\Core\Workers\WorkerCheckFail2BanAlive;
34
use MikoPBX\Core\Workers\WorkerLogRotate;
35
use MikoPBX\Core\Workers\WorkerMarketplaceChecker;
36
use MikoPBX\Core\Workers\WorkerModelsEvents;
37
use MikoPBX\Core\Workers\WorkerNotifyAdministrator;
38
use MikoPBX\Core\Workers\WorkerNotifyByEmail;
39
use MikoPBX\Core\Workers\WorkerPrepareAdvice;
40
use MikoPBX\Core\Workers\WorkerRedisBase;
41
use MikoPBX\Core\Workers\WorkerRemoveOldRecords;
42
use MikoPBX\Modules\Config\SystemConfigInterface;
43
use MikoPBX\PBXCoreREST\Workers\WorkerApiCommands;
44
use RuntimeException;
45
use Throwable;
46
47
/**
48
 * Class WorkerSafeScriptsCore
49
 *
50
 * Represents the core worker for safe scripts.
51
 * Implements Singleton pattern for continuous process monitoring.
52
 *
53
 * @package MikoPBX\Core\Workers\Cron
54
 */
55
class WorkerSafeScriptsCore extends WorkerBase
56
{
57
    // Constants to denote the methods of checking workers' statuses.
58
    public const CHECK_BY_BEANSTALK = 'checkWorkerBeanstalk';
59
    public const CHECK_BY_AMI = 'checkWorkerAMI';
60
    public const CHECK_BY_PID_NOT_ALERT = 'checkPidNotAlert';
61
    public const CHECK_BY_REDIS = 'checkWorkerRedis';
62
63
    /**
64
     * Singleton instance
65
     */
66
    private static ?self $instance = null;
67
68
    /**
69
     * Last check timestamps for each worker
70
     * @var array<string, int>
71
     */
72
    private array $lastCheckTimes = [];
73
74
    /**
75
     * Redis connection instance
76
     * @var \Redis
77
     */
78
    protected $redis;
79
80
    /**
81
     * Initialize the singleton instance
82
     * This is called after construction to set up the instance
83
     */
84
    private function initialize(): void
85
    {
86
        // Any initialization code can go here
87
    }
88
89
    /**
90
     * Get singleton instance
91
     */
92
    public static function getInstance(): self
93
    {
94
        if (self::$instance === null) {
95
            self::$instance = new self();
96
            self::$instance->initialize();
0 ignored issues
show
Bug introduced by
The method initialize() does not exist on null. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-call  annotation

96
            self::$instance->/** @scrutinizer ignore-call */ 
97
                             initialize();

This check looks for calls to methods that do not seem to exist on a given type. It looks for the method on the type itself as well as in inherited classes or implemented interfaces.

This is most likely a typographical error or the method has been renamed.

Loading history...
97
        }
98
        return self::$instance;
0 ignored issues
show
Bug Best Practice introduced by
The expression return self::instance could return the type null which is incompatible with the type-hinted return MikoPBX\Core\Workers\Cron\WorkerSafeScriptsCore. Consider adding an additional type-check to rule them out.
Loading history...
99
    }
100
101
    /**
102
     * Get check interval for specific worker
103
     */
104
    private function getWorkerInterval(string $workerClass): int
105
    {
106
        if (class_exists($workerClass) && method_exists($workerClass, 'getCheckInterval')) {
107
            return $workerClass::getCheckInterval();
108
        }
109
        return self::KEEP_ALLIVE_CHECK_INTERVAL;
110
    }
111
112
    /**
113
     * Check if worker needs to be monitored based on its interval
114
     */
115
    private function shouldCheckWorker(string $workerClass): bool
116
    {
117
        $currentTime = time();
118
        $lastCheck = $this->lastCheckTimes[$workerClass] ?? 0;
119
        $interval = $this->getWorkerInterval($workerClass);
120
        
121
        return ($currentTime - $lastCheck) >= $interval;
122
    }
123
124
    /**
125
     * Update last check time for worker
126
     */
127
    private function updateLastCheckTime(string $workerClass): void
128
    {
129
        $this->lastCheckTimes[$workerClass] = time();
130
    }
131
132
    /**
133
     * Executes tasks in parallel using PHP Fibers
134
     *
135
     * @param array<callable> $tasks Array of callables to execute
136
     * @return void
137
     */
138
    private function executeParallel(array $tasks): void 
139
    {
140
        $fibers = [];
141
        
142
        // Create fibers for each task
143
        foreach ($tasks as $task) {
144
            $fiber = new Fiber($task);
145
            $fibers[] = $fiber;
146
            $fiber->start();
147
        }
148
        
149
        // Resume fibers until all are complete
150
        while (!empty($fibers)) {
151
            foreach ($fibers as $index => $fiber) {
152
                if ($fiber->isTerminated()) {
153
                    unset($fibers[$index]);
154
                    continue;
155
                }
156
                
157
                if ($fiber->isSuspended()) {
158
                    $fiber->resume();
159
                }
160
            }
161
        }
162
    }
163
164
    /**
165
     * Restarts all registered workers with improved pipeline.
166
     * Uses parallel processing with Fibers for efficiency.
167
     *
168
     * @throws Throwable
169
     */
170
    public function restart(): void
171
    {
172
        // Get all workers that need to be restarted
173
        $arrWorkers = $this->prepareWorkersList();
174
        
175
        // Collect all running workers and their forks
176
        $runningWorkers = [];
177
        $workerPools = [];
178
        
179
        // Create tasks for collecting PIDs
180
        $pidCollectionTasks = [];
181
        foreach ($arrWorkers as $workerType => $workersWithCurrentType) {
182
            foreach ($workersWithCurrentType as $worker) {
183
                $pidCollectionTasks[] = function() use ($worker, &$runningWorkers, &$workerPools) {
184
                    // Check if worker uses pool
185
                    $maxProc = $this->getWorkerInstanceCount($worker);
186
                    
187
                    if ($maxProc > 1) {
188
                        // This is a pool worker - collect all instances
189
                        $pattern = "$worker";
190
                        $poolPids = Processes::getPidOfProcess($pattern);
191
                        if (!empty($poolPids)) {
192
                            $workerPools[$worker] = [
193
                                'maxProc' => $maxProc,
194
                                'pids' => array_filter(explode(' ', $poolPids))
195
                            ];
196
                        }
197
                    } else {
198
                        // Regular single worker
199
                        $mainPid = Processes::getPidOfProcess($worker);
200
                        if (!empty($mainPid)) {
201
                            $runningWorkers[$worker] = $mainPid;
202
                        }
203
                    }
204
                    Fiber::suspend();
205
                };
206
            }
207
        }
208
        
209
        // Execute PID collection in parallel
210
        $this->executeParallel($pidCollectionTasks);
211
        
212
        // Count total workers to restart
213
        $totalWorkers = count($runningWorkers) + array_sum(array_map(function($pool) {
214
            return count($pool['pids']);
215
        }, $workerPools));
216
        
217
        // Log the start of restart process with detailed information
218
        SystemMessages::sysLogMsg(
219
            static::class,
220
            sprintf(
221
                "Starting restart process for %d worker instances. Regular workers: %d, Pool workers: %d", 
222
                $totalWorkers,
223
                count($runningWorkers),
224
                count($workerPools)
225
            ),
226
            LOG_NOTICE
227
        );
228
        
229
        // Детально логируем обнаруженные пулы воркеров
230
        foreach ($workerPools as $worker => $poolInfo) {
231
            SystemMessages::sysLogMsg(
232
                static::class,
233
                sprintf(
234
                    "Detected worker pool: %s, maxProc: %d, running PIDs: %s",
235
                    $worker,
236
                    $poolInfo['maxProc'],
237
                    implode(',', $poolInfo['pids'])
238
                ),
239
                LOG_NOTICE
240
            );
241
        }
242
        
243
        // STEP 1: Start new worker instances first
244
        // This ensures new workers are ready to take jobs before old ones exit
245
        $startTasks = [];
246
        $startedWorkers = [];
247
248
        // Приоритизируем запуск WorkerApiCommands - они самые критичные для UI
249
        foreach ($arrWorkers as $workerType => $workersWithCurrentType) {
250
            // Сортируем воркеры так, чтобы API-воркеры запускались первыми
251
            $sortedWorkers = $workersWithCurrentType;
252
            usort($sortedWorkers, function($a, $b) {
253
                $isApiWorkerA = strpos($a, 'WorkerApiCommands') !== false;
254
                $isApiWorkerB = strpos($b, 'WorkerApiCommands') !== false;
255
                
256
                if ($isApiWorkerA && !$isApiWorkerB) {
257
                    return -1; // A должен быть раньше B
258
                } elseif (!$isApiWorkerA && $isApiWorkerB) {
259
                    return 1; // B должен быть раньше A
260
                }
261
                return 0; // Порядок не важен
262
            });
263
            
264
            foreach ($sortedWorkers as $worker) {
265
                $maxProc = $this->getWorkerInstanceCount($worker);
266
                
267
                // Если это API-воркер, создаем все экземпляры сразу (не в фоновом режиме)
268
                // и дожидаемся полной их инициализации перед продолжением
269
                if (strpos($worker, 'WorkerApiCommands') !== false) {
270
                    SystemMessages::sysLogMsg(
271
                        static::class,
272
                        "Starting ALL instances of critical worker {$worker} synchronously",
273
                        LOG_NOTICE
274
                    );
275
                    
276
                    // Для WorkerApiCommands создаем все инстансы сразу, но используем стандартный механизм
277
                    for ($i = 1; $i <= $maxProc; $i++) {
278
                        SystemMessages::sysLogMsg(
279
                            static::class,
280
                            "Starting critical worker instance {$i}/{$maxProc}: {$worker}",
281
                            LOG_NOTICE
282
                        );
283
                        
284
                        // Используем стандартный механизм запуска, но с правильными параметрами instanceId
285
                        // Это гарантирует, что процесс будет корректно запущен и залогирован
286
                        Processes::processPHPWorker($worker, 'start');
287
                        
288
                        // Дополнительно проверим, что воркер действительно запущен
289
                        sleep(1);
290
                        $pidFile = Processes::getPidFilePath($worker, $i);
291
                        if (file_exists($pidFile)) {
292
                            $pid = trim(file_get_contents($pidFile));
293
                            SystemMessages::sysLogMsg(
294
                                static::class,
295
                                sprintf("Worker %s instance %d started with PID %s", $worker, $i, $pid),
296
                                LOG_NOTICE
297
                            );
298
                        } else {
299
                            SystemMessages::sysLogMsg(
300
                                static::class,
301
                                sprintf("WARNING: PID file not found for %s instance %d", $worker, $i),
302
                                LOG_WARNING
303
                            );
304
                        }
305
                    }
306
                    
307
                    // Ждем дополнительное время, чтобы воркеры полностью инициализировались
308
                    sleep(3); 
309
                    $startedWorkers[$worker] = true;
310
                    
311
                    // Проверяем, что все воркеры действительно запустились
312
                    $allWorkersRunning = true;
313
                    for ($i = 1; $i <= $maxProc; $i++) {
314
                        $pidFile = Processes::getPidFilePath($worker, $i);
315
                        if (!file_exists($pidFile)) {
316
                            $allWorkersRunning = false;
317
                            SystemMessages::sysLogMsg(
318
                                static::class,
319
                                "WARNING: PID file not found for {$worker} instance {$i}",
320
                                LOG_WARNING
321
                            );
322
                        }
323
                    }
324
                    
325
                    if (!$allWorkersRunning) {
326
                        SystemMessages::sysLogMsg(
327
                            static::class,
328
                            "ERROR: Not all instances of {$worker} started correctly!",
329
                            LOG_ERR
330
                        );
331
                    } else {
332
                        SystemMessages::sysLogMsg(
333
                            static::class,
334
                            "All {$maxProc} instances of {$worker} started successfully",
335
                            LOG_NOTICE
336
                        );
337
                    }
338
                } else if ($maxProc > 1) {
339
                    // Это пул воркеров, но не API-воркеры
340
                    // Добавляем их в список задач для запуска
341
                    for ($i = 1; $i <= $maxProc; $i++) {
342
                        $startTasks[] = function() use ($worker, $i, &$startedWorkers) {
343
                            SystemMessages::sysLogMsg(
344
                                static::class,
345
                                "Starting new instance {$i} of pool worker {$worker}",
346
                                LOG_NOTICE
347
                            );
348
                            Processes::processPHPWorker($worker, 'start');
349
                            $startedWorkers[$worker] = true;
350
                            Fiber::suspend();
351
                        };
352
                    }
353
                } else {
354
                    // Обычный одиночный воркер
355
                    $startTasks[] = function() use ($worker, &$startedWorkers) {
356
                        SystemMessages::sysLogMsg(
357
                            static::class,
358
                            "Starting new instance of {$worker}",
359
                            LOG_NOTICE
360
                        );
361
                        Processes::processPHPWorker($worker, 'start');
362
                        $startedWorkers[$worker] = true;
363
                        Fiber::suspend();
364
                    };
365
                }
366
            }
367
        }
368
        
369
        // Start remaining new instances in parallel
370
        if (!empty($startTasks)) {
371
            $this->executeParallel($startTasks);
372
        }
373
        
374
        // Wait for new instances to initialize and become ready to process jobs
375
        SystemMessages::sysLogMsg(
376
            static::class,
377
            "Started new worker instances, waiting for them to initialize",
378
            LOG_NOTICE
379
        );
380
        sleep(5); // Give more time for proper initialization
381
        
382
        // STEP 2: Signal old workers to gracefully shutdown
383
        // They will finish current jobs and then exit
384
        $shutdownTasks = [];
385
        
386
        // Handle regular workers
387
        foreach ($runningWorkers as $worker => $pid) {
388
            $shutdownTasks[] = function() use ($pid, $worker) {
389
                if (posix_kill((int)$pid, SIGUSR1)) {
390
                    SystemMessages::sysLogMsg(
391
                        static::class,
392
                        "Sent SIGUSR1 to {$worker} (PID: {$pid})",
393
                        LOG_DEBUG
394
                    );
395
                }
396
                Fiber::suspend();
397
            };
398
        }
399
        
400
        // Handle worker pools
401
        foreach ($workerPools as $worker => $poolInfo) {
402
            foreach ($poolInfo['pids'] as $pid) {
403
                $shutdownTasks[] = function() use ($pid, $worker) {
404
                    if (posix_kill((int)$pid, SIGUSR1)) {
405
                        SystemMessages::sysLogMsg(
406
                            static::class,
407
                            "Sent SIGUSR1 to {$worker} pool instance (PID: {$pid})",
408
                            LOG_DEBUG
409
                        );
410
                    }
411
                    Fiber::suspend();
412
                };
413
            }
414
        }
415
        
416
        // Execute graceful shutdown signals in parallel
417
        $this->executeParallel($shutdownTasks);
418
        
419
        // STEP 3: Wait for graceful shutdown with sufficient timeout
420
        SystemMessages::sysLogMsg(
421
            static::class,
422
            "Waiting for workers to complete active tasks (graceful shutdown)",
423
            LOG_NOTICE
424
        );
425
        
426
        $gracefulShutdownStart = time();
427
        $gracefulShutdownTimeout = 30; // Increased timeout to allow job completion
428
        
429
        while (time() - $gracefulShutdownStart < $gracefulShutdownTimeout) {
430
            $allShutdown = true;
431
            
432
            // Check regular workers
433
            foreach ($runningWorkers as $worker => $pid) {
434
                if (posix_kill((int)$pid, 0)) {
435
                    // Process still exists
436
                    $allShutdown = false;
437
                    break;
438
                }
439
            }
440
            
441
            // Check pool workers
442
            foreach ($workerPools as $worker => $poolInfo) {
443
                foreach ($poolInfo['pids'] as $pid) {
444
                    if (posix_kill((int)$pid, 0)) {
445
                        // Process still exists
446
                        $allShutdown = false;
447
                        break 2;
448
                    }
449
                }
450
            }
451
            
452
            if ($allShutdown) {
453
                SystemMessages::sysLogMsg(
454
                    static::class,
455
                    "All workers have gracefully shutdown",
456
                    LOG_NOTICE
457
                );
458
                break;
459
            }
460
            
461
            sleep(1);
462
        }
463
        
464
        // STEP 4: Force terminate any remaining processes
465
        $terminateTasks = [];
466
        
467
        // Regular workers
468
        foreach ($runningWorkers as $worker => $pid) {
469
            if (posix_kill((int)$pid, 0)) {
470
                SystemMessages::sysLogMsg(
471
                    static::class,
472
                    "Worker {$worker} (PID: {$pid}) didn't shut down gracefully, sending SIGTERM",
473
                    LOG_WARNING
474
                );
475
                
476
                $terminateTasks[] = function() use ($pid) {
477
                    posix_kill((int)$pid, SIGTERM);
478
                    Fiber::suspend();
479
                };
480
            }
481
        }
482
        
483
        // Pool worker instances
484
        foreach ($workerPools as $worker => $poolInfo) {
485
            foreach ($poolInfo['pids'] as $pid) {
486
                if (posix_kill((int)$pid, 0)) {
487
                    SystemMessages::sysLogMsg(
488
                        static::class,
489
                        "Worker {$worker} pool instance (PID: {$pid}) didn't shut down gracefully, sending SIGTERM",
490
                        LOG_WARNING
491
                    );
492
                    
493
                    $terminateTasks[] = function() use ($pid) {
494
                        posix_kill((int)$pid, SIGTERM);
495
                        Fiber::suspend();
496
                    };
497
                }
498
            }
499
        }
500
        
501
        // Execute termination in parallel if needed
502
        if (!empty($terminateTasks)) {
503
            $this->executeParallel($terminateTasks);
504
            // Give processes time to terminate
505
            sleep(2);
506
            
507
            // Final check and SIGKILL if necessary for regular workers
508
            foreach ($runningWorkers as $worker => $pid) {
509
                if (posix_kill((int)$pid, 0)) {
510
                    SystemMessages::sysLogMsg(
511
                        static::class,
512
                        "Worker {$worker} (PID: {$pid}) still alive, sending SIGKILL",
513
                        LOG_WARNING
514
                    );
515
                    posix_kill((int)$pid, SIGKILL);
516
                }
517
            }
518
            
519
            // Final check and SIGKILL if necessary for pool workers
520
            foreach ($workerPools as $worker => $poolInfo) {
521
                foreach ($poolInfo['pids'] as $pid) {
522
                    if (posix_kill((int)$pid, 0)) {
523
                        SystemMessages::sysLogMsg(
524
                            static::class,
525
                            "Worker {$worker} pool instance (PID: {$pid}) still alive, sending SIGKILL",
526
                            LOG_WARNING
527
                        );
528
                        posix_kill((int)$pid, SIGKILL);
529
                    }
530
                }
531
            }
532
        }
533
        
534
        SystemMessages::sysLogMsg(
535
            static::class, 
536
            "Worker restart completed - new instances are running with updated code", 
537
            LOG_NOTICE
538
        );
539
    }
540
541
    /**
542
     * Prepares the list of workers to start and restart.
543
     * Collects core and module workers.
544
     *
545
     * @return array<string, array<string>> The prepared workers list.
546
     */
547
    private function prepareWorkersList(): array
548
    {
549
        // Initialize the workers' list.
550
        // Each worker type corresponds to a list of workers.
551
        $arrWorkers = [
552
            self::CHECK_BY_REDIS =>
553
                [
554
                    WorkerApiCommands::class,
555
                    WorkerPrepareAdvice::class,
556
                ],
557
            self::CHECK_BY_AMI =>
558
                [
559
                ],
560
            self::CHECK_BY_BEANSTALK =>
561
                [
562
                    WorkerCdr::class,
563
                    WorkerCallEvents::class,
564
                    WorkerModelsEvents::class,
565
                    WorkerNotifyByEmail::class,
566
                ],
567
            self::CHECK_BY_PID_NOT_ALERT =>
568
                [
569
                    WorkerMarketplaceChecker::class,
570
                    WorkerBeanstalkdTidyUp::class,
571
                    WorkerCheckFail2BanAlive::class,
572
                    WorkerLogRotate::class,
573
                    WorkerRemoveOldRecords::class,
574
                    WorkerNotifyAdministrator::class,
575
                ],
576
        ];
577
578
        // Get the list of module workers.
579
        $arrModulesWorkers = PBXConfModulesProvider::hookModulesMethod(SystemConfigInterface::GET_MODULE_WORKERS);
580
        $arrModulesWorkers = array_values($arrModulesWorkers);
581
        $arrModulesWorkers = array_merge(...$arrModulesWorkers);
582
583
        // If there are module workers, add them to the workers' list.
584
        if (!empty($arrModulesWorkers)) {
585
            foreach ($arrModulesWorkers as $moduleWorker) {
586
                $arrWorkers[$moduleWorker['type']][] = $moduleWorker['worker'];
587
            }
588
        }
589
590
        // Return the prepared workers' list.
591
        return $arrWorkers;
592
    }
593
594
    /**
595
     * Starts all workers and continuously monitors them.
596
     *
597
     * @param array $argv The command-line arguments passed to the worker.
598
     * @throws Throwable
599
     */
600
    public function start(array $argv): void
601
    {
602
        // Wait for the system to fully boot.
603
        PBX::waitFullyBooted();
604
605
        while (true) {
606
607
            // If the system is booting, do not start the workers.
608
            if (System::isBooting()) {
609
                sleep(5);
610
                continue;
611
            }
612
613
614
            // Prepare the list of workers to be started.
615
            $arrWorkers = $this->prepareWorkersList();
616
            
617
            $tasks = [];
618
            foreach ($arrWorkers as $workerType => $workersWithCurrentType) {
619
                foreach ($workersWithCurrentType as $worker) {
620
                    if ($this->shouldCheckWorker($worker)) {
621
                        $tasks[] = match($workerType) {
622
                            self::CHECK_BY_BEANSTALK => fn() => $this->checkWorkerBeanstalk($worker),
623
                            self::CHECK_BY_PID_NOT_ALERT => fn() => $this->checkPidNotAlert($worker),
624
                            self::CHECK_BY_AMI => fn() => $this->checkWorkerAMI($worker),
625
                            self::CHECK_BY_REDIS => fn() => $this->checkWorkerRedis($worker),
626
                            default => null,
627
                        };
628
                        $this->updateLastCheckTime($worker);
629
                    }
630
                }
631
            }
632
            
633
            // Filter out null tasks and execute
634
            $tasks = array_filter($tasks);
635
            if (!empty($tasks)) {
636
                $this->executeParallel($tasks);
637
            }
638
            
639
            // Sleep for a short interval before next check
640
            sleep(5);
641
        }
642
    }
643
644
    /**
645
     * Pings a worker to check if it is dead. If it is, it is killed and started again.
646
     * Uses Beanstalk queue to send ping and check workers.
647
     *
648
     * @param string $workerClassName The class name of the worker.
649
     */
650
    public function checkWorkerBeanstalk(string $workerClassName): void
651
    {
652
        try {
653
            // Get the number of instances to maintain
654
            $maxProc = $this->getWorkerInstanceCount($workerClassName);
655
            
656
            // Check if we need to manage a pool of workers
657
            if ($maxProc > 1) {
658
                $this->checkWorkerPool($workerClassName, $maxProc);
659
                return;
660
            }
661
            
662
            $start = microtime(true);
663
            $WorkerPID = Processes::getPidOfProcess($workerClassName);
664
            $result = false;
665
            if ($WorkerPID !== '') {
666
                // We had service PID, so we will ping it
667
                $queue = new BeanstalkClient($this->makePingTubeName($workerClassName));
668
                // Check service with higher priority
669
                [$result] = $queue->sendRequest('ping', 5, 1);
670
            }
671
            if (false === $result) {
672
                // Kill the entire process group before restarting
673
                Processes::processPHPWorker($workerClassName);
674
                SystemMessages::sysLogMsg(__METHOD__, "Service {$workerClassName} started.", LOG_NOTICE);
675
            }
676
            $timeElapsedSecs = round(microtime(true) - $start, 2);
677
            if ($timeElapsedSecs > 10) {
678
                SystemMessages::sysLogMsg(
679
                    __METHOD__,
680
                    "WARNING: Service {$workerClassName} processed more than {$timeElapsedSecs} seconds",
681
                    LOG_WARNING
682
                );
683
            }
684
        } catch (Throwable $e) {
685
            CriticalErrorsHandler::handleExceptionWithSyslog($e);
686
        }
687
        Fiber::suspend();
688
    }
689
690
    /**
691
     * Checks the PID worker and starts it if it died.
692
     *
693
     * @param string $workerClassName The class name of the worker.
694
     */
695
    public function checkPidNotAlert(string $workerClassName): void
696
    {
697
        // Get the number of instances to maintain
698
        $maxProc = $this->getWorkerInstanceCount($workerClassName);
699
        
700
        // Check if we need to manage a pool of workers
701
        if ($maxProc > 1) {
702
            $this->checkWorkerPool($workerClassName, $maxProc);
703
            return;
704
        }
705
        
706
        // Check if the worker is alive based on its PID. If not, restart it.
707
        $start = microtime(true);
708
        $WorkerPID = Processes::getPidOfProcess($workerClassName);
709
        $result = ($WorkerPID !== '');
710
        if (false === $result) {
711
            // Kill the entire process group before restarting
712
            if ($WorkerPID !== '') {
713
                // Send SIGTERM to process group
714
                posix_kill(-intval($WorkerPID), SIGTERM);
715
                sleep(1); // Give processes time to cleanup
716
                // Force kill any remaining processes
717
                posix_kill(-intval($WorkerPID), SIGKILL);
718
            }
719
            
720
            Processes::processPHPWorker($workerClassName);
721
        }
722
        $timeElapsedSecs = round(microtime(true) - $start, 2);
723
        if ($timeElapsedSecs > 10) {
724
            SystemMessages::sysLogMsg(
725
                __CLASS__,
726
                "WARNING: Service {$workerClassName} processed more than {$timeElapsedSecs} seconds",
727
                LOG_WARNING
728
            );
729
        }
730
        Fiber::suspend();
731
    }
732
733
    /**
734
     * Pings a worker to check if it is dead. If it is, it is killed and started again.
735
     * Uses AMI UserEvent to send ping and check workers.
736
     *
737
     * @param string $workerClassName The class name of the worker.
738
     * @param int $level The recursion level.
739
     */
740
    public function checkWorkerAMI(string $workerClassName, int $level = 0): void
741
    {
742
        try {
743
            // Get the number of instances to maintain
744
            $maxProc = $this->getWorkerInstanceCount($workerClassName);
745
            
746
            // Check if we need to manage a pool of workers
747
            if ($maxProc > 1) {
748
                $this->checkWorkerPool($workerClassName, $maxProc);
749
                return;
750
            }
751
            
752
            $start = microtime(true);
753
            $res_ping = false;
754
            $WorkerPID = Processes::getPidOfProcess($workerClassName);
755
            if ($WorkerPID !== '') {
756
                // We have the service PID, so we will ping it
757
                $am = Util::getAstManager();
758
                $res_ping = $am->pingAMIListener($this->makePingTubeName($workerClassName));
759
                if (false === $res_ping) {
760
                    SystemMessages::sysLogMsg(__METHOD__, 'Restart...', LOG_ERR);
761
                }
762
            }
763
764
            if ($res_ping === false && $level < 10) {
765
                Processes::processPHPWorker($workerClassName);
766
                SystemMessages::sysLogMsg(__METHOD__, "Service {$workerClassName} started.", LOG_NOTICE);
767
                // Wait 1 second while service will be ready to listen requests
768
                sleep(1);
769
770
                // Check service again
771
                $this->checkWorkerAMI($workerClassName, $level + 1);
772
            }
773
            $timeElapsedSecs = round(microtime(true) - $start, 2);
774
            if ($timeElapsedSecs > 10) {
775
                SystemMessages::sysLogMsg(
776
                    __METHOD__,
777
                    "WARNING: Service {$workerClassName} processed more than {$timeElapsedSecs} seconds",
778
                    LOG_WARNING
779
                );
780
            }
781
        } catch (Throwable $e) {
782
            CriticalErrorsHandler::handleExceptionWithSyslog($e);
783
        }
784
        Fiber::suspend();
785
    }
786
787
    /**
788
     * Check worker status using Redis with enhanced ping-pong mechanism
789
     *
790
     * @param string $workerClassName The class name of the worker to check
791
     * @throws RuntimeException If Redis connection fails
792
     */
793
    protected function checkWorkerRedis(string $workerClassName): void
794
    {
795
        try {
796
            // Initialize Redis connection if needed
797
            if (!isset($this->redis) || !$this->redis) {
798
                $this->redis = $this->di->get('redis');
799
            }
800
            
801
            // Get the number of instances to maintain
802
            $maxProc = $this->getWorkerInstanceCount($workerClassName);
803
            
804
            // Check if we need to manage a pool of workers
805
            if ($maxProc > 1) {
806
                $this->checkWorkerPool($workerClassName, $maxProc);
807
                return;
808
            }
809
            
810
            // Regular single worker check
811
            $heartbeatKey = WorkerRedisBase::REDIS_HEARTBEAT_KEY_PREFIX . $workerClassName;
812
813
            $lastHeartbeat = $this->redis->get($heartbeatKey);
814
815
            if ($lastHeartbeat !== false && (time() - (int)$lastHeartbeat) < 10) {
816
                return;
817
            }
818
819
            // Check status key instead of using pub/sub
820
            $statusKey = WorkerRedisBase::REDIS_STATUS_KEY_PREFIX . $workerClassName;
821
            $status = $this->redis->get($statusKey);
822
823
            if ($status !== false) {
824
                $status = json_decode($status, true);
825
                if (isset($status['updated_at']) && (microtime(true) - $status['updated_at']) < 10) {
826
                    return;
827
                }
828
            }
829
830
            SystemMessages::sysLogMsg(
831
                static::class,
832
                "Worker status not found or stale - restarting $workerClassName",
833
                LOG_WARNING
834
            );
835
836
            // Restart the worker
837
            Processes::processPHPWorker($workerClassName);
838
839
        } catch (Throwable $e) {
840
            SystemMessages::sysLogMsg(
841
                static::class,
842
                "Error checking worker: " . $e->getMessage(),
843
                LOG_WARNING
844
            );
845
        }
846
    }
847
848
    /**
849
     * Get the number of worker instances that should be maintained
850
     * 
851
     * @param string $workerClassName The worker class name
852
     * @return int Number of instances to maintain
853
     */
854
    private function getWorkerInstanceCount(string $workerClassName): int
855
    {
856
        if (class_exists($workerClassName)) {
857
            try {
858
                $reflectionClass = new \ReflectionClass($workerClassName);
859
                
860
                // Проверяем есть ли свойство maxProc
861
                if ($reflectionClass->hasProperty('maxProc')) {
862
                    $property = $reflectionClass->getProperty('maxProc');
0 ignored issues
show
Unused Code introduced by
The assignment to $property is dead and can be removed.
Loading history...
863
                    
864
                    // Получаем значение из дефолтных свойств класса
865
                    $defaultProperties = $reflectionClass->getDefaultProperties();
866
                    if (isset($defaultProperties['maxProc'])) {
867
                        $maxProc = (int)$defaultProperties['maxProc'];
868
                        
869
                        // Добавляем логирование для отладки
870
                        SystemMessages::sysLogMsg(
871
                            static::class,
872
                            sprintf("Worker %s has maxProc=%d", $workerClassName, $maxProc),
873
                            LOG_DEBUG
874
                        );
875
                        
876
                        return $maxProc;
877
                    }
878
                }
879
            } catch (Throwable $e) {
880
                SystemMessages::sysLogMsg(
881
                    static::class,
882
                    "Error getting maxProc for $workerClassName: " . $e->getMessage(),
883
                    LOG_WARNING
884
                );
885
            }
886
        }
887
        
888
        // Default to single instance if maxProc can't be determined
889
        return 1;
890
    }
891
    
892
    /**
893
     * Check and maintain a pool of worker instances
894
     * 
895
     * @param string $workerClassName The worker class name
896
     * @param int $targetCount Number of instances to maintain
897
     */
898
    private function checkWorkerPool(string $workerClassName, int $targetCount): void
899
    {
900
        try {
901
            // Инициализируем Redis соединение при необходимости
902
            if (!isset($this->redis) || !$this->redis) {
903
                $this->redis = $this->di->get('redis');
904
            }
905
            
906
            $runningInstances = [];
907
            
908
            // Get all instances of this worker
909
            for ($i = 1; $i <= $targetCount; $i++) {
910
                // Check if worker with this instance ID is running
911
                $pidFile = Processes::getPidFilePath($workerClassName, $i);
912
                if (file_exists($pidFile)) {
913
                    $pid = trim(file_get_contents($pidFile));
914
                    if (!empty($pid) && posix_kill((int)$pid, 0)) {
915
                        $runningInstances[$i] = (int)$pid;
916
                    }
917
                }
918
            }
919
            
920
            // Count currently running instances
921
            $runningCount = count($runningInstances);
922
            
923
            SystemMessages::sysLogMsg(
924
                static::class,
925
                "Checking worker pool: $workerClassName - $runningCount/$targetCount instances running",
926
                LOG_DEBUG
927
            );
928
            
929
            // Start missing instances with their specific instance-id
930
            for ($i = 1; $i <= $targetCount; $i++) {
931
                if (!isset($runningInstances[$i])) {
932
                    SystemMessages::sysLogMsg(
933
                        static::class,
934
                        "Starting worker instance $i for $workerClassName",
935
                        LOG_NOTICE
936
                    );
937
                    
938
                    $workerPath = Util::getFilePathByClassName($workerClassName);
939
                    $php = Util::which('php');
940
                    $command = "$php -f $workerPath start --instance-id=$i > /dev/null 2>&1 &";
941
                    shell_exec($command);
942
                }
943
            }
944
            
945
            // Check if any instances are stale
946
            foreach ($runningInstances as $instanceId => $pid) {
947
                // We can detect stale workers by checking their heartbeat in Redis
948
                $heartbeatKey = WorkerRedisBase::REDIS_HEARTBEAT_KEY_PREFIX . $workerClassName;
949
                $statusKey = WorkerRedisBase::REDIS_STATUS_KEY_PREFIX . $workerClassName;
950
                
951
                // Most workers update general heartbeat and status keys
952
                if (class_exists($workerClassName) && is_subclass_of($workerClassName, WorkerRedisBase::class)) {
953
                    // For Redis-based workers, we can check their heartbeat
954
                    $lastHeartbeat = $this->redis->get($heartbeatKey);
955
                    $status = $this->redis->get($statusKey);
956
                    
957
                    $isStale = false;
958
                    
959
                    if ($lastHeartbeat === false) {
960
                        // No heartbeat found, check status
961
                        if ($status === false) {
962
                            $isStale = true;
963
                        } else {
964
                            $status = json_decode($status, true);
965
                            if (!isset($status['updated_at']) || (microtime(true) - $status['updated_at']) > 30) {
966
                                $isStale = true;
967
                            }
968
                        }
969
                    } else if ((time() - (int)$lastHeartbeat) > 30) {
970
                        // Heartbeat too old
971
                        $isStale = true;
972
                    }
973
                    
974
                    if ($isStale) {
975
                        SystemMessages::sysLogMsg(
976
                            static::class,
977
                            "Worker instance $workerClassName #$instanceId (PID: $pid) appears stale, terminating",
978
                            LOG_WARNING
979
                        );
980
                        
981
                        // Try graceful shutdown first
982
                        posix_kill((int)$pid, SIGUSR1);
983
                        
984
                        // Wait a bit and check if it's still running
985
                        sleep(1);
986
                        
987
                        // If still running, force terminate
988
                        if (posix_kill((int)$pid, 0)) {
989
                            posix_kill((int)$pid, SIGTERM);
990
                        }
991
                    }
992
                }
993
            }
994
            
995
        } catch (Throwable $e) {
996
            SystemMessages::sysLogMsg(
997
                static::class,
998
                "Error checking worker pool for $workerClassName: " . $e->getMessage(),
999
                LOG_WARNING
1000
            );
1001
        }
1002
    }
1003
}
1004
1005
// Start worker process
1006
$workerClassname = WorkerSafeScriptsCore::class;
1007
try {
1008
    // If command-line arguments are provided, set the process title and check for active processes.
1009
    if (isset($argv) && count($argv) > 1) {
1010
        cli_set_process_title("{$workerClassname} {$argv[1]}");
1011
        $activeProcesses = Processes::getPidOfProcess("{$workerClassname} {$argv[1]}", posix_getpid());
1012
        if (!empty($activeProcesses)) {
1013
             return;
1014
        }
1015
        $worker = WorkerSafeScriptsCore::getInstance();
1016
1017
        // Depending on the command-line argument, start or restart the worker.
1018
        if ($argv[1] === 'start') {
1019
            $worker->start($argv);
1020
            SystemMessages::sysLogMsg($workerClassname, "Normal exit after start ended", LOG_DEBUG);
1021
        } elseif ($argv[1] === 'restart' || $argv[1] === 'reload') {
1022
            $worker->restart();
1023
            SystemMessages::sysLogMsg($workerClassname, "Normal exit after restart ended", LOG_DEBUG);
1024
        }
1025
    }
1026
} catch (Throwable $e) {
1027
    // If an exception is thrown, log it.
1028
    CriticalErrorsHandler::handleExceptionWithSyslog($e);
1029
}