Passed
Push — develop ( a4bdef...471b02 )
by Портнов
12:12 queued 14s
created

WorkerCallEvents::cleanTimeOutChannel()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 7
Code Lines 5

Duplication

Lines 0
Ratio 0 %

Importance

Changes 1
Bugs 0 Features 0
Metric Value
eloc 5
dl 0
loc 7
rs 10
c 1
b 0
f 0
cc 1
nc 1
nop 1

1 Method

Rating   Name   Duplication   Size   Complexity  
A WorkerCallEvents::errorHandler() 0 3 1
1
<?php
2
/*
3
 * MikoPBX - free phone system for small business
4
 * Copyright (C) 2017-2020 Alexey Portnov and Nikolay Beketov
5
 *
6
 * This program is free software: you can redistribute it and/or modify
7
 * it under the terms of the GNU General Public License as published by
8
 * the Free Software Foundation; either version 3 of the License, or
9
 * (at your option) any later version.
10
 *
11
 * This program is distributed in the hope that it will be useful,
12
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14
 * GNU General Public License for more details.
15
 *
16
 * You should have received a copy of the GNU General Public License along with this program.
17
 * If not, see <https://www.gnu.org/licenses/>.
18
 */
19
20
namespace MikoPBX\Core\Workers;
21
require_once 'Globals.php';
22
23
use MikoPBX\Core\System\{BeanstalkClient, Storage, Util};
24
use MikoPBX\Common\Models\Extensions;
25
use MikoPBX\Common\Models\PbxSettings;
26
use MikoPBX\Common\Models\Sip;
27
use MikoPBX\Core\Asterisk\Configs\CelConf;
28
use MikoPBX\Core\Workers\Libs\WorkerCallEvents\ActionCelAnswer;
29
use MikoPBX\Core\Workers\Libs\WorkerCallEvents\SelectCDR;
30
use MikoPBX\Core\Workers\Libs\WorkerCallEvents\UpdateDataInDB;
31
use Phalcon\Text;
32
33
class WorkerCallEvents extends WorkerBase
34
{
35
    public    array $mixMonitorChannels = [];
36
    protected bool  $record_calls       = true;
37
    protected bool  $split_audio_thread = false;
38
    public    array $checkChanHangupTransfer = [];
39
    private   array $activeChannels = [];
40
41
    private array $innerNumbers       = [];
42
    private array $exceptionsNumbers  = [];
43
    private bool  $notRecInner        = false;
44
    public const REC_DISABLE          = 'Conversation recording is disabled';
45
46
    /**
47
     * Наполняем кэш реальных каналов.
48
     * @param string $channel
49
     * @return void
50
     */
51
    public function addActiveChan(string $channel):void
52
    {
53
        if(stripos($channel, 'local') === 0){
54
            return;
55
        }
56
        $this->activeChannels[$channel] = true;
57
    }
58
59
    /**
60
     * Очищаем кэш реальных каналов.
61
     * @param string $channel
62
     * @return void
63
     */
64
    public function removeActiveChan(string $channel):void
65
    {
66
        unset($this->activeChannels[$channel]);
67
    }
68
69
    /**
70
     * Проверяет существует ли канал в кэш.
71
     * @param string $channel
72
     * @return void
73
     */
74
    public function existsActiveChan(string $channel):bool
75
    {
76
        return isset($this->activeChannels[$channel]);
0 ignored issues
show
Bug Best Practice introduced by
The expression return IssetNode returns the type boolean which is incompatible with the documented return type void.
Loading history...
77
    }
78
79
    /**
80
     * @param string $src
81
     * @param string $dst
82
     * @param string $error
83
     * @return bool
84
     */
85
    public function enableMonitor(string $src, string $dst):bool
86
    {
87
        $src = substr($src,-9);
88
        $dst = substr($dst,-9);
89
        $enable = true;
90
        $isInner = in_array($src, $this->innerNumbers,true) && in_array($dst, $this->innerNumbers,true);
91
        if(($this->notRecInner && $isInner) ||
92
            in_array($src, $this->exceptionsNumbers,true) || in_array($dst, $this->exceptionsNumbers,true)){
93
            $enable = false;
94
        }
95
        return $enable;
96
    }
97
98
    /**
99
     * Инициирует запись разговора на канале.
100
     *
101
     * @param string    $channel
102
     * @param ?string   $file_name
103
     * @param ?string   $sub_dir
104
     * @param ?string   $full_name
105
     *
106
     * @return string
107
     */
108
    public function MixMonitor($channel, $file_name = null, $sub_dir = null, $full_name = null, string $actionID=''): string
109
    {
110
        $resFile = $this->mixMonitorChannels[$channel]??'';
111
        if($resFile !== ''){
112
            return $resFile;
113
        }
114
        $resFile           = '';
115
        $file_name = str_replace('/', '_', $file_name);
116
        if ($this->record_calls) {
117
            [$f, $options] = $this->setMonitorFilenameOptions($full_name, $sub_dir, $file_name);
118
            $arr = $this->am->GetChannels(false);
119
            if(!in_array($channel, $arr, true)){
120
                return '';
121
            }
122
            $srcFile = "{$f}.wav";
123
            $resFile = "{$f}.mp3";
124
            $this->am->MixMonitor($channel, $srcFile, $options, '', $actionID);
125
            $this->mixMonitorChannels[$channel] = $resFile;
126
            $this->am->UserEvent('StartRecording', ['recordingfile' => $resFile, 'recchan' => $channel]);
127
        }
128
        return $resFile;
129
    }
130
131
    /**
132
     * @param string|null $full_name
133
     * @param string|null $sub_dir
134
     * @param string|null $file_name
135
     * @return array
136
     */
137
    public function setMonitorFilenameOptions(?string $full_name, ?string $sub_dir, ?string $file_name): array{
138
        if (!file_exists((string)$full_name)) {
139
            $monitor_dir = Storage::getMonitorDir();
140
            if ($sub_dir === null) {
141
                $sub_dir = date('Y/m/d/H/');
142
            }
143
            $f = "{$monitor_dir}/{$sub_dir}{$file_name}";
144
        } else {
145
            $f = Util::trimExtensionForFile($full_name);
146
        }
147
        if ($this->split_audio_thread) {
148
            $options = "abSr({$f}_in.wav)t({$f}_out.wav)";
149
        } else {
150
            $options = 'ab';
151
        }
152
        return array($f, $options);
153
    }
154
155
    /**
156
     * Останавливает запись разговора на канале.
157
     * @param string $channel
158
     * @param string $actionID
159
     */
160
    public function StopMixMonitor($channel, string $actionID=''): void
161
    {
162
        if(isset($this->mixMonitorChannels[$channel])){
163
            unset($this->mixMonitorChannels[$channel]);
164
        }else{
165
            return;
166
        }
167
        if ($this->record_calls) {
168
            $this->am->StopMixMonitor($channel, $actionID);
169
        }
170
    }
171
172
    /**
173
     *
174
     * @param $params
175
     */
176
    public function start($params): void
177
    {
178
        $this->updateRecordingOptions();
179
        $this->mixMonitorChannels       = [];
180
        $this->checkChanHangupTransfer  = [];
181
        $this->am                 = Util::getAstManager('off');
182
183
        // PID сохраняем при начале работы Worker.
184
        $client = new BeanstalkClient(self::class);
185
        $client->subscribe(CelConf::BEANSTALK_TUBE,    [$this, 'callEventsWorker']);
186
        $client->subscribe(self::class,                [$this, 'otherEvents']);
187
        $client->subscribe(WorkerCdr::SELECT_CDR_TUBE, [$this, 'selectCDRWorker']);
188
        $client->subscribe(WorkerCdr::UPDATE_CDR_TUBE, [$this, 'updateCDRWorker']);
189
        $client->subscribe($this->makePingTubeName(self::class), [$this, 'pingCallBack']);
190
        $client->setErrorHandler([$this, 'errorHandler']);
191
192
        while ($this->needRestart === false) {
193
            $client->wait();
194
        }
195
    }
196
197
    /**
198
     * @return void
199
     */
200
    private function updateRecordingOptions():void
201
    {
202
        $usersNumbers = [];
203
        $users = [];
204
        $filter = [
205
            'conditions' => 'userid <> "" and userid>0 ',
206
            'columns' => 'userid,number,type',
207
            'order' => 'type DESC'
208
        ];
209
        $extensionsData = Extensions::find($filter);
210
        /** @var Extensions $extension */
211
        foreach ($extensionsData as $extension){
212
            if($extension->type === "SIP"){
213
                $usersNumbers[$extension->number][] = $extension->number;
214
                $users[$extension->userid] = $extension->number;
215
            }else{
216
                $internalNumber = $users[$extension->userid]??'';
217
                if($internalNumber !==''){
218
                    $usersNumbers[$internalNumber][] = $extension->number;
219
                }
220
            }
221
        }
222
        unset($users, $extensionsData);
223
        $filter = [
224
            'conditions' => 'type="peer"',
225
            'columns'    => 'extension,enableRecording',
226
        ];
227
        $peers = Sip::find($filter);
228
        foreach ($peers as $peer) {
229
            $numbers = $usersNumbers[$peer->extension]??[];
230
            foreach ($numbers as $num){
231
                $num = substr($num,-9);
232
                $this->innerNumbers[] = $num;
233
                if($peer->enableRecording === '0'){
234
                    $this->exceptionsNumbers[] = $num;
235
                }
236
            }
237
        }
238
        $this->notRecInner        = PbxSettings::getValueByKey('PBXRecordCallsInner') === '0';
239
        $this->record_calls       = PbxSettings::getValueByKey('PBXRecordCalls') === '1';
240
        $this->split_audio_thread = PbxSettings::getValueByKey('PBXSplitAudioThread') === '1';
241
    }
242
243
    /**
244
     * Ping callback for keep alive check
245
     *
246
     * @param BeanstalkClient $message
247
     */
248
    public function pingCallBack(BeanstalkClient $message): void
249
    {
250
        parent::pingCallBack($message);
251
        $this->updateRecordingOptions();
252
    }
253
254
    /**
255
     * @param $tube
256
     * @param $data
257
     * @return void
258
     */
259
    public function otherEvents($tube, array $data=[]): void
260
    {
261
        if(empty($data)){
262
            $data = json_decode($tube->getBody(), true);
263
        }
264
        $funcName = "Action_".$data['action']??'';
265
        if ( method_exists($this, $funcName) ) {
266
            $this->$funcName($data);
267
        }
268
        $className = __NAMESPACE__.'\Libs\WorkerCallEvents\\'.Text::camelize($funcName, '_');
269
        if( method_exists($className, 'execute') ){
270
            $className::execute($this, $data);
271
        }
272
    }
273
274
    /**
275
     * Обработчик событий изменения состояния звонка.
276
     *
277
     * @param array | BeanstalkClient $tube
278
     */
279
    public function callEventsWorker($tube): void
280
    {
281
        $data  = json_decode($tube->getBody(), true);
282
        $event = $data['EventName']??'';
283
        if('ANSWER' === $event){
284
            ActionCelAnswer::execute($this, $data);
285
            return;
286
        }elseif('USER_DEFINED' !== $event){
287
            return;
288
        }
289
        try {
290
            $data = json_decode(
291
                base64_decode($data['AppData']??''),
292
                true,
293
                512,
294
                JSON_THROW_ON_ERROR
295
            );
296
        }catch (\Throwable $e){
297
            $data = [];
298
        }
299
        $this->otherEvents($tube, $data);
300
    }
301
302
    /**
303
     * Получения CDR к обработке.
304
     *
305
     * @param array | BeanstalkClient $tube
306
     */
307
    public function updateCDRWorker($tube): void
308
    {
309
        $task    = $tube->getBody();
310
        $data = json_decode($task, true);
311
        UpdateDataInDB::execute($data);
312
        $tube->reply(json_encode(true));
313
    }
314
315
    /**
316
     * @param array | BeanstalkClient $tube
317
     */
318
    public function selectCDRWorker($tube): void
319
    {
320
        $filter   = json_decode($tube->getBody(), true);
321
        $res_data = SelectCDR::execute($filter);
322
        $tube->reply($res_data);
323
    }
324
325
    public function errorHandler($m): void
326
    {
327
        Util::sysLogMsg(self::class . '_ERROR', $m, LOG_ERR);
328
    }
329
}
330
331
332
// Start worker process
333
WorkerCallEvents::startWorker($argv??null);