Passed
Push — develop ( b7c00c...da1f69 )
by Nikolay
08:07 queued 01:43
created

WorkerApiCommands   A

Complexity

Total Complexity 18

Size/Duplication

Total Lines 139
Duplicated Lines 0 %

Importance

Changes 14
Bugs 0 Features 0
Metric Value
wmc 18
eloc 63
c 14
b 0
f 0
dl 0
loc 139
rs 10

5 Methods

Rating   Name   Duplication   Size   Complexity  
B checkNeedReload() 0 36 10
A prepareAnswer() 0 21 4
A start() 0 10 2
A registerProcessors() 0 14 1
A getNeedRestartActions() 0 9 1
1
<?php
2
/*
3
 * Copyright © MIKO LLC - All Rights Reserved
4
 * Unauthorized copying of this file, via any medium is strictly prohibited
5
 * Proprietary and confidential
6
 * Written by Alexey Portnov, 10 2020
7
 */
8
9
namespace MikoPBX\PBXCoreREST\Workers;
10
11
use MikoPBX\Core\System\{BeanstalkClient, System, Util, Processes};
12
use MikoPBX\Core\Workers\WorkerBase;
13
use MikoPBX\PBXCoreREST\Lib\AdvicesProcessor;
14
use MikoPBX\PBXCoreREST\Lib\CdrDBProcessor;
15
use MikoPBX\PBXCoreREST\Lib\IAXStackProcessor;
16
use MikoPBX\PBXCoreREST\Lib\LicenseManagementProcessor;
17
use MikoPBX\PBXCoreREST\Lib\PbxExtensionsProcessor;
18
use MikoPBX\PBXCoreREST\Lib\SysLogsManagementProcessor;
19
use MikoPBX\PBXCoreREST\Lib\PBXApiResult;
20
use MikoPBX\PBXCoreREST\Lib\SIPStackProcessor;
21
use MikoPBX\PBXCoreREST\Lib\StorageManagementProcessor;
22
use MikoPBX\PBXCoreREST\Lib\SysinfoManagementProcessor;
23
use MikoPBX\PBXCoreREST\Lib\SystemManagementProcessor;
24
use MikoPBX\PBXCoreREST\Lib\FilesManagementProcessor;
25
use Throwable;
26
27
require_once 'Globals.php';
28
29
class WorkerApiCommands extends WorkerBase
30
{
31
    /**
32
     * The maximum parallel worker processes
33
     *
34
     * @var int
35
     */
36
    protected int $maxProc = 1;
37
38
    /**
39
     * Available REST API processors
40
     */
41
    private array $processors;
42
43
44
    /**
45
     * @param $argv
46
     *
47
     */
48
    public function start($argv): void
49
    {
50
        $beanstalk = $this->di->getShared('beanstalkConnectionWorkerAPI');
51
        $beanstalk->subscribe($this->makePingTubeName(self::class), [$this, 'pingCallBack']);
52
        $beanstalk->subscribe(__CLASS__, [$this, 'prepareAnswer']);
53
54
        $this->registerProcessors();
55
56
        while ($this->needRestart === false) {
57
            $beanstalk->wait();
58
        }
59
    }
60
61
    /**
62
     * Prepares list of available processors
63
     */
64
    private function registerProcessors(): void
65
    {
66
        $this->processors = [
67
            'advices' => AdvicesProcessor::class,
68
            'cdr'     => CdrDBProcessor::class,
69
            'iax'     => IAXStackProcessor::class,
70
            'license' => LicenseManagementProcessor::class,
71
            'sip'     => SIPStackProcessor::class,
72
            'storage' => StorageManagementProcessor::class,
73
            'system'  => SystemManagementProcessor::class,
74
            'syslog'  => SysLogsManagementProcessor::class,
75
            'sysinfo' => SysinfoManagementProcessor::class,
76
            'files'   => FilesManagementProcessor::class,
77
            'modules' => PbxExtensionsProcessor::class
78
        ];
79
    }
80
81
    /**
82
     * Process API request from frontend
83
     *
84
     * @param BeanstalkClient $message
85
     *
86
     */
87
    public function prepareAnswer(BeanstalkClient $message): void
88
    {
89
        $res            = new PBXApiResult();
90
        $res->processor = __METHOD__;
91
        try {
92
            $request   = json_decode($message->getBody(), true, 512, JSON_THROW_ON_ERROR);
93
            $processor = $request['processor'];
94
95
            if (array_key_exists($processor, $this->processors)) {
96
                $res = $this->processors[$processor]::callback($request);
97
                if ($res->success) {
98
                    $this->checkNeedReload($request);
99
                }
100
            } else {
101
                $res->success    = false;
102
                $res->messages[] = "Unknown processor - {$processor} in prepareAnswer";
103
            }
104
        } catch (Throwable $exception) {
105
            $res->messages[] = 'Exception on WorkerApiCommands - ' . $exception->getMessage();
106
        } finally {
107
            $message->reply(json_encode($res->getResult()));
108
        }
109
    }
110
111
    /**
112
     * Checks if the module or worker needs to be reloaded.
113
     *
114
     * @param array $request
115
     */
116
    private function checkNeedReload(array $request): void
117
    {
118
        // Prevent loop
119
        if ($request['processor'] === 'workers'
120
            && $request['action'] === 'needRestartAPIWorkers') {
121
            $this->needRestart = true;
122
123
            return;
124
        }
125
126
        // Check if new code added from modules
127
        $restartActions = $this->getNeedRestartActions();
128
        foreach ($restartActions as $processor => $actions) {
129
            foreach ($actions as $action) {
130
                if ($processor === $request['processor']
131
                    && $action === $request['action']) {
132
                    $this->needRestart = true;
133
                    break;
134
                }
135
            }
136
        }
137
138
        if ($this->needRestart) {
139
            // Send soft restart to another workers processors
140
            $activeAnotherProcesses = Processes::getPidOfProcess(self::class, getmypid());
141
            $processes              = explode(' ', $activeAnotherProcesses);
142
            if (empty($processes[0])) {
143
                array_shift($processes);
144
            }
145
            foreach ($processes as $process) {
146
                posix_kill($process, SIGTERM);
147
            }
148
        }
149
150
        // Restart all another workers
151
        Processes::restartAllWorkers();
152
    }
153
154
    /**
155
     * Prepares array of processor => action depends restart this kind worker
156
     *
157
     * @return \string[][]
158
     */
159
    private function getNeedRestartActions(): array
160
    {
161
        return [
0 ignored issues
show
Bug Best Practice introduced by
The expression return array('system' =>...storeDefaultSettings')) returns the type array<string,array<integer,string>> which is incompatible with the documented return type array<mixed,string[]>.
Loading history...
162
            'system'  => [
163
                'enableModule',
164
                'disableModule',
165
                'uninstallModule',
166
                'installNewModule',
167
                'restoreDefaultSettings',
168
            ],
169
        ];
170
    }
171
172
}
173
174
// Start worker process
175
$workerClassname = WorkerApiCommands::class;
176
if (isset($argv) && count($argv) > 1 && $argv[1] === 'start') {
177
    cli_set_process_title($workerClassname);
178
    while (true) {
179
        try {
180
            $worker = new $workerClassname();
181
            $worker->start($argv);
182
        } catch (Throwable $e) {
183
            global $errorLogger;
184
            $errorLogger->captureException($e);
185
            Util::sysLogMsg("{$workerClassname}_EXCEPTION", $e->getMessage());
186
        }
187
    }
188
}
189