Passed
Push — develop ( 0cacf0...905a46 )
by Портнов
05:08
created

WorkerCdr::initSettings()   A

Complexity

Conditions 3
Paths 3

Size

Total Lines 34
Code Lines 24

Duplication

Lines 0
Ratio 0 %

Importance

Changes 1
Bugs 0 Features 0
Metric Value
eloc 24
c 1
b 0
f 0
dl 0
loc 34
rs 9.536
cc 3
nc 3
nop 0
1
<?php
2
/*
3
 * MikoPBX - free phone system for small business
4
 * Copyright (C) 2017-2020 Alexey Portnov and Nikolay Beketov
5
 *
6
 * This program is free software: you can redistribute it and/or modify
7
 * it under the terms of the GNU General Public License as published by
8
 * the Free Software Foundation; either version 3 of the License, or
9
 * (at your option) any later version.
10
 *
11
 * This program is distributed in the hope that it will be useful,
12
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14
 * GNU General Public License for more details.
15
 *
16
 * You should have received a copy of the GNU General Public License along with this program.
17
 * If not, see <https://www.gnu.org/licenses/>.
18
 */
19
20
namespace MikoPBX\Core\Workers;
21
22
require_once 'Globals.php';
23
24
use MikoPBX\Common\Models\{Extensions, ModelsBase, Users};
25
use MikoPBX\Core\System\{BeanstalkClient, Processes, Util};
26
use MikoPBX\Common\Providers\CDRDatabaseProvider;
27
use Throwable;
28
29
/**
30
 * Class WorkerCdr
31
 * Обработка записей CDR. Заполение длительности звонков.
32
 */
33
class WorkerCdr extends WorkerBase
34
{
35
36
    public const SELECT_CDR_TUBE = 'select_cdr_tube';
37
    public const UPDATE_CDR_TUBE = 'update_cdr_tube';
38
39
    private BeanstalkClient $client_queue;
40
    private array $internal_numbers  = [];
41
    private array $no_answered_calls = [];
42
43
44
    /**
45
     * Entry point
46
     *
47
     * @param $argv
48
     *
49
     */
50
    public function start($argv): void
51
    {
52
        $this->client_queue = new BeanstalkClient(self::SELECT_CDR_TUBE);
53
        $this->client_queue->subscribe($this->makePingTubeName(self::class), [$this, 'pingCallBack']);
54
55
        $this->initSettings();
56
57
        while ($this->needRestart === false) {
58
            $result = CDRDatabaseProvider::getTempCdr();
59
            if (!empty($result)) {
60
                $this->updateCdr($result);
61
            }
62
            $this->client_queue->wait();
63
        }
64
    }
65
66
    /**
67
     * Fills settings
68
     */
69
    private function initSettings(): void
70
    {
71
        $this->internal_numbers  = [];
72
        $this->no_answered_calls = [];
73
74
        $usersClass = Users::class;
75
        $parameters = [
76
            'columns'=>[
77
                'email'=>'email',
78
                'language'=>'language',
79
                'number'=>'Extensions.number'
80
            ],
81
            'joins'      => [
82
                'Extensions' => [
83
                    0 => Extensions::class,
84
                    1 => "Extensions.userid={$usersClass}.id",
85
                    2 => 'Extensions',
86
                    3 => 'INNER',
87
                ],
88
            ],
89
            'cache' => [
90
                'key'=> ModelsBase::makeCacheKey(Users::class, 'Workers-WorkerCdr-initSettings'),
91
                'lifetime' => 300,
92
            ]
93
        ];
94
95
        $results   = Users::find($parameters);
96
        foreach ($results as $record) {
97
            if (empty($record->email)) {
98
                continue;
99
            }
100
            $this->internal_numbers[$record->number] = [
101
                'email'    => $record->email,
102
                'language' => $record->language,
103
            ];
104
        }
105
    }
106
107
    /**
108
     * Обработчик результата запроса.
109
     * @param $result
110
     */
111
    private function updateCdr($result): void
112
    {
113
        $this->initSettings();
114
        $arr_update_cdr = [];
115
        // Получаем идентификаторы активных каналов.
116
        $channels_id = $this->getActiveIdChannels();
117
        foreach ($result as $row) {
118
            if (array_key_exists($row['linkedid'], $channels_id)) {
119
                // Цепочка вызовов еще не завершена.
120
                continue;
121
            }
122
123
            $start      = strtotime($row['start']);
124
            $answer     = strtotime($row['answer']);
125
            $end        = strtotime($row['endtime']);
126
            $dialstatus = trim($row['dialstatus']);
127
128
            $duration = max(($end - $start), 0);
129
            $billsec  = ($end && $answer) ? ($end - $answer) : 0;
130
131
            [$disposition, $row] = $this->setDisposition($billsec, $dialstatus, $row);
132
            [$row, $billsec]     = $this->checkBillsecMakeRecFile($billsec, $row);
133
134
            $data = [
135
                'work_completed' => 1,
136
                'duration'       => $duration,
137
                'billsec'        => $billsec,
138
                'disposition'    => $disposition,
139
                'UNIQUEID'       => $row['UNIQUEID'],
140
                'recordingfile'  => ($disposition === 'ANSWERED') ? $row['recordingfile'] : '',
141
                'tmp_linked_id'  => $row['linkedid'],
142
            ];
143
144
            $arr_update_cdr[] = $data;
145
            $this->checkNoAnswerCall(array_merge($row, $data));
146
        }
147
148
        $this->setStatusAndPublish($arr_update_cdr);
149
        $this->notifyByEmail();
150
    }
151
152
    /**
153
     * Функция позволяет получить активные каналы.
154
     * Возвращает ассоциативный массив. Ключ - Linkedid, значение - массив каналов.
155
     *
156
     * @return array
157
     */
158
    private function getActiveIdChannels(): array
159
    {
160
        $am           = Util::getAstManager('off');
161
        return $am->GetChannels(true);
162
    }
163
164
    /**
165
     * Анализируем не отвеченные вызовы. Наполняем временный массив для дальнейшей обработки.
166
     *
167
     * @param $row
168
     */
169
    private function checkNoAnswerCall($row): void
170
    {
171
        if ($row['disposition'] === 'ANSWERED') {
172
            $this->no_answered_calls[$row['linkedid']]['ANSWERED'] = true;
173
            return;
174
        }
175
        if ( ! array_key_exists($row['dst_num'], $this->internal_numbers)) {
176
            // dst_num - не является номером сотрудника. Это исходящий.
177
            return;
178
        }
179
        $is_internal = false;
180
        if ((array_key_exists($row['src_num'], $this->internal_numbers))) {
181
            // Это внутренний вызов.
182
            $is_internal = true;
183
        }
184
185
        $this->no_answered_calls[$row['linkedid']][] = [
186
            'from_number' => $row['src_num'],
187
            'to_number'   => $row['dst_num'],
188
            'start'       => $row['start'],
189
            'answer'      => $row['answer'],
190
            'endtime'     => $row['endtime'],
191
            'email'       => $this->internal_numbers[$row['dst_num']]['email'],
192
            'language'    => $this->internal_numbers[$row['dst_num']]['language'],
193
            'is_internal' => $is_internal,
194
            'duration'    => $row['duration'],
195
            'NOANSWER'    => true
196
        ];
197
    }
198
199
200
    /**
201
     * Постановка задачи в очередь на оповещение по email.
202
     */
203
    private function notifyByEmail(): void
204
    {
205
        foreach ($this->no_answered_calls as $call) {
206
            $this->client_queue->publish(json_encode($call), WorkerNotifyByEmail::class);
207
        }
208
        $this->no_answered_calls = [];
209
    }
210
211
    /**
212
     * @param array $arr_update_cdr
213
     */
214
    private function setStatusAndPublish(array $arr_update_cdr): void{
215
        $idForDelete = [];
216
217
        foreach ($arr_update_cdr as $data) {
218
            $linkedId = $data['tmp_linked_id'];
219
            $data['GLOBAL_STATUS'] = $data['disposition'];
220
            if (isset($this->no_answered_calls[$linkedId]['ANSWERED'])) {
221
                $data['GLOBAL_STATUS'] = 'ANSWERED';
222
                // Это отвеченный вызов (на очередь). Удаляем из списка.
223
                $idForDelete[$linkedId]=true;
224
            }
225
            unset($data['tmp_linked_id']);
226
            $this->client_queue->publish(json_encode($data), self::UPDATE_CDR_TUBE);
227
        }
228
229
        // Чистим память.
230
        foreach ($idForDelete as $linkedId => $data){
231
            unset($this->no_answered_calls[$linkedId]);
232
        }
233
    }
234
235
    /**
236
     * @param int $billsec
237
     * @param     $row
238
     * @return array
239
     */
240
    private function checkBillsecMakeRecFile(int $billsec, $row): array{
241
        if ($billsec <= 0) {
242
            $row['answer'] = '';
243
            $billsec = 0;
244
245
            if (!empty($row['recordingfile'])) {
246
                if($row['dst_chan'] === "App:{$row['dst_num']}"){
247
                    // Для приложения не может быть записи разговора.
248
                    // Запись должна относится к конечному устройству.
249
                    $row['recordingfile'] = '';
250
                }else{
251
                    // Удаляем файлы
252
                    $p_info = pathinfo($row['recordingfile']);
253
                    $fileName = $p_info['dirname'] . '/' . $p_info['filename'];
254
                    $file_list = [$fileName . '.mp3', $fileName . '.wav', $fileName . '_in.wav', $fileName . '_out.wav',];
255
                    foreach ($file_list as $file) {
256
                        if (!file_exists($file) || is_dir($file)) {
257
                            continue;
258
                        }
259
                        Processes::mwExec("rm -rf '{$file}'");
260
                    }
261
                }
262
            }
263
        } elseif (trim($row['recordingfile']) !== '') {
264
            // Если каналов не существует с ID, то можно удалить временные файлы.
265
            $p_info = pathinfo($row['recordingfile']);
266
            // Запускаем процесс конвертации в mp3
267
            $wav2mp3Path = Util::which('wav2mp3.sh');
268
            $lostWav2mp3Path = Util::which('convert-lost-wav2mp3.sh');
269
            $nicePath = Util::which('nice');
270
            Processes::mwExecBg("{$nicePath} -n -19 {$wav2mp3Path} '{$p_info['dirname']}/{$p_info['filename']}'");
271
272
            // Получим каталог с записями за текущемий месяц.
273
            $dir = dirname($p_info['dirname'], 2);
274
            Processes::mwExecBg("{$nicePath} -n -19 {$lostWav2mp3Path} '$dir'");
275
            // В последствии конвертации (успешной) исходные файлы будут удалены.
276
        }
277
        return array($row, $billsec);
278
    }
279
280
    /**
281
     * @param int    $billsec
282
     * @param string $dialstatus
283
     * @param        $row
284
     * @return array
285
     */
286
    private function setDisposition(int $billsec, string $dialstatus, $row): array{
287
        $disposition = 'NOANSWER';
288
        if ($billsec > 0) {
289
            $disposition = 'ANSWERED';
290
        } elseif ('' !== $dialstatus) {
291
            $disposition = ($dialstatus === 'ANSWERED') ? $disposition : $dialstatus;
292
        }
293
294
        if ($disposition !== 'ANSWERED') {
295
            if (file_exists($row['recordingfile']) && !is_dir($row['recordingfile'])) {
296
                Processes::mwExec("rm -rf {$row['recordingfile']}");
297
            }
298
        } elseif (!empty($row['recordingfile']) &&
299
            !file_exists($row['recordingfile']) &&
300
            !file_exists(Util::trimExtensionForFile($row['recordingfile']) . '.wav') ) {
301
            $filter = [
302
                "linkedid='{$row['linkedid']}' AND dst_chan='{$row['dst_chan']}'",
303
                'limit' => 1,
304
            ];
305
            $data = CDRDatabaseProvider::getTempCdr($filter);
306
            $recordingfile = $data[0]['recordingfile']??'';
307
            if (!empty($recordingfile)) {
308
                $row['recordingfile'] = $recordingfile;
309
            }
310
        }
311
        return array($disposition, $row);
312
    }
313
314
}
315
316
// Start worker process
317
WorkerCdr::startWorker($argv??null);