Passed
Push — develop ( da1f69...28c097 )
by Nikolay
04:52
created

WorkerBase::signalHandler()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 3
Code Lines 1

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 1
c 0
b 0
f 0
dl 0
loc 3
rs 10
cc 1
nc 1
nop 0
1
<?php
2
/*
3
 * Copyright © MIKO LLC - All Rights Reserved
4
 * Unauthorized copying of this file, via any medium is strictly prohibited
5
 * Proprietary and confidential
6
 * Written by Alexey Portnov, 9 2020
7
 */
8
9
namespace MikoPBX\Core\Workers;
10
11
use MikoPBX\Core\Asterisk\AsteriskManager;
12
use MikoPBX\Core\System\BeanstalkClient;
13
use MikoPBX\Core\System\Util;
14
use MikoPBX\Core\System\Processes;
15
use Phalcon\Di;
16
use Phalcon\Text;
17
18
abstract class WorkerBase extends Di\Injectable implements WorkerInterface
19
{
20
    public int $currentProcId = 1;
21
    protected AsteriskManager $am;
22
    protected int $maxProc = 1;
23
    protected bool $needRestart = false;
24
25
    /**
26
     * Workers shared constructor
27
     */
28
    public function __construct()
29
    {
30
        pcntl_async_signals(true);
31
        pcntl_signal(
32
            SIGUSR1,
33
            [$this, 'signalHandler']
34
        );
35
36
        $this->checkCountProcesses();
37
        $this->savePidFile();
38
    }
39
40
    /**
41
     * Calculates how many processes have started already and kill excess or old processes
42
     */
43
    private function checkCountProcesses(): void
44
    {
45
        $activeAnotherProcesses = Processes::getPidOfProcess(static::class, getmypid());
46
        $processes              = explode(' ', $activeAnotherProcesses);
47
        if (empty($processes[0])) {
48
            array_shift($processes);
49
        }
50
        $countProc = count($processes) + 1;
51
        $killApp   = Util::which('kill');
52
        if ($this->maxProc === 1 && $countProc > 1) {
53
            // Kill old processes with timeout, maybe it is soft restart and worker die without any help
54
            Processes::mwExec("{$killApp} SIGUSR1 {$activeAnotherProcesses}");
55
        } elseif ($this->maxProc > $countProc) {
56
            // Start additional processes
57
            while ($countProc < $this->maxProc) {
58
                sleep(3);
59
                Processes::processPHPWorker(static::class, 'start', 'multiStart');
60
                $countProc++;
61
            }
62
        } elseif ($this->maxProc < $countProc) {
63
            // Получим количество лишних процессов.
64
            $countProc4Kill = $countProc - $this->maxProc;
65
            // Завершим лишние
66
            while ($countProc4Kill >= 0) {
67
                if ( ! isset($processes[$countProc4Kill])) {
68
                    break;
69
                }
70
                // Kill old processes with timeout, maybe it is soft restart and worker die without any help
71
                Processes::mwExec("{$killApp} SIGUSR1 {$processes[$countProc4Kill]}");
72
                $countProc4Kill--;
73
            }
74
        }
75
    }
76
77
    /**
78
     * Saves pid to pidfile
79
     */
80
    private function savePidFile(): void
81
    {
82
        $activeProcesses = Processes::getPidOfProcess(static::class);
83
        $processes       = explode(' ', $activeProcesses);
84
        if (count($processes) === 1) {
85
            file_put_contents($this->getPidFile(), $activeProcesses);
86
        } else {
87
            $pidFilesDir = dirname($this->getPidFile());
88
            $pidFile     = $pidFilesDir . '/' . pathinfo($this->getPidFile(), PATHINFO_BASENAME);
89
            // Delete old PID files
90
            $rm = Util::which('rm');
91
            Processes::mwExec("{$rm} -rf {$pidFile}*");
92
            $i = 1;
93
            foreach ($processes as $process) {
94
                file_put_contents("{$pidFile}-{$i}.pid", $process);
95
                $i++;
96
            }
97
        }
98
    }
99
100
    /**
101
     * Create PID file for worker
102
     */
103
    public function getPidFile(): string
104
    {
105
        $name = str_replace("\\", '-', static::class);
106
107
        return "/var/run/{$name}.pid";
108
    }
109
110
    /**
111
     * Process async system signal
112
     *
113
     */
114
    public function signalHandler(): void
115
    {
116
        $this->needRestart = true;
117
    }
118
119
    /**
120
     * Ping callback for keep alive check
121
     *
122
     * @param BeanstalkClient $message
123
     */
124
    public function pingCallBack(BeanstalkClient $message): void
125
    {
126
        $message->reply(json_encode($message->getBody() . ':pong'));
127
    }
128
129
    /**
130
     * If it was Ping request to check worker, we answer Pong and return True
131
     *
132
     * @param $parameters
133
     *
134
     * @return bool
135
     */
136
    public function replyOnPingRequest($parameters): bool
137
    {
138
        $pingTube = $this->makePingTubeName(static::class);
139
        if ($pingTube === $parameters['UserEvent']) {
140
            $this->am->UserEvent("{$pingTube}Pong", []);
141
142
            return true;
143
        }
144
145
        return false;
146
    }
147
148
    /**
149
     * Makes ping tube from classname and ping word
150
     *
151
     * @param string $workerClassName
152
     *
153
     * @return string
154
     */
155
    public function makePingTubeName(string $workerClassName): string
156
    {
157
        return Text::camelize("ping_{$workerClassName}", '\\');
158
    }
159
160
    /**
161
     * Deletes old PID files
162
     */
163
    public function __destruct()
164
    {
165
        $pidFilesDir = dirname($this->getPidFile());
166
        $pidFile     = $pidFilesDir . '/' . pathinfo($this->getPidFile(), PATHINFO_BASENAME);
167
        // Delete old PID files
168
        $rm = Util::which('rm');
169
        Processes::mwExec("{$rm} -rf {$pidFile}*");
170
    }
171
}