Passed
Branch develop (309f3f)
by Nikolay
10:15
created

WorkerCdr::checkNoAnswerCall()   A

Complexity

Conditions 4
Paths 4

Size

Total Lines 27
Code Lines 19

Duplication

Lines 0
Ratio 0 %

Importance

Changes 1
Bugs 0 Features 0
Metric Value
eloc 19
c 1
b 0
f 0
dl 0
loc 27
rs 9.6333
cc 4
nc 4
nop 1
1
<?php
2
/*
3
 * MikoPBX - free phone system for small business
4
 * Copyright (C) 2017-2020 Alexey Portnov and Nikolay Beketov
5
 *
6
 * This program is free software: you can redistribute it and/or modify
7
 * it under the terms of the GNU General Public License as published by
8
 * the Free Software Foundation; either version 3 of the License, or
9
 * (at your option) any later version.
10
 *
11
 * This program is distributed in the hope that it will be useful,
12
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14
 * GNU General Public License for more details.
15
 *
16
 * You should have received a copy of the GNU General Public License along with this program.
17
 * If not, see <https://www.gnu.org/licenses/>.
18
 */
19
20
namespace MikoPBX\Core\Workers;
21
22
require_once 'Globals.php';
23
24
use MikoPBX\Common\Models\{CallDetailRecordsTmp, Extensions, ModelsBase, Users};
25
use MikoPBX\Core\System\{BeanstalkClient, Processes, Util};
26
use Throwable;
27
28
/**
29
 * Class WorkerCdr
30
 * Обработка записей CDR. Заполение длительности звонков.
31
 */
32
class WorkerCdr extends WorkerBase
33
{
34
35
    public const SELECT_CDR_TUBE = 'select_cdr_tube';
36
    public const UPDATE_CDR_TUBE = 'update_cdr_tube';
37
38
    private BeanstalkClient $client_queue;
39
    private array $internal_numbers  = [];
40
    private array $no_answered_calls = [];
41
42
43
    /**
44
     * Entry point
45
     *
46
     * @param $argv
47
     *
48
     */
49
    public function start($argv): void
50
    {
51
        $this->client_queue = new BeanstalkClient(self::SELECT_CDR_TUBE);
52
        $this->client_queue->subscribe($this->makePingTubeName(self::class), [$this, 'pingCallBack']);
53
54
        $this->initSettings();
55
56
        while ($this->needRestart === false) {
57
            $result = $this->getTempCdr();
58
            if (!empty($result)) {
59
                $this->updateCdr($result);
60
            }
61
            $this->client_queue->wait();
62
        }
63
    }
64
65
    /**
66
     * Возвращает все завершенные временные CDR.
67
     * @return array
68
     */
69
    private function getTempCdr():array
70
    {
71
        $filter = [
72
            'work_completed<>1 AND endtime<>""',
73
            'columns'=> 'start,answer,src_num,dst_num,dst_chan,endtime,linkedid,recordingfile,dialstatus,UNIQUEID',
74
            'order' => 'answer',
75
            'miko_result_in_file' => true,
76
            'miko_tmp_db' => true,
77
        ];
78
        $client = new BeanstalkClient(WorkerCdr::SELECT_CDR_TUBE);
79
        try {
80
            $result   = $client->request(json_encode($filter), 2);
81
            $filename = json_decode($result, true, 512, JSON_THROW_ON_ERROR);
0 ignored issues
show
Bug introduced by
It seems like $result can also be of type false; however, parameter $json of json_decode() does only seem to accept string, maybe add an additional type check? ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-type  annotation

81
            $filename = json_decode(/** @scrutinizer ignore-type */ $result, true, 512, JSON_THROW_ON_ERROR);
Loading history...
82
        }catch (Throwable $e){
83
            $filename = '';
84
        }
85
        $result_data = [];
86
        if (file_exists($filename)) {
87
            try {
88
                $result_data = json_decode(file_get_contents($filename), true, 512, JSON_THROW_ON_ERROR);
89
            }catch (Throwable $e){
90
                Util::sysLogMsg('SELECT_CDR_TUBE', 'Error parse response.');
91
            }
92
            unlink($filename);
93
        }
94
95
        return $result_data;
96
    }
97
98
    /**
99
     * Fills settings
100
     */
101
    private function initSettings(): void
102
    {
103
        $this->internal_numbers  = [];
104
        $this->no_answered_calls = [];
105
106
        $usersClass = Users::class;
107
        $parameters = [
108
            'columns'=>[
109
                'email'=>'email',
110
                'language'=>'language',
111
                'number'=>'Extensions.number'
112
            ],
113
            'joins'      => [
114
                'Extensions' => [
115
                    0 => Extensions::class,
116
                    1 => "Extensions.userid={$usersClass}.id",
117
                    2 => 'Extensions',
118
                    3 => 'INNER',
119
                ],
120
            ],
121
            'cache' => [
122
                'key'=> ModelsBase::makeCacheKey(Users::class, 'Workers-WorkerCdr-initSettings'),
123
                'lifetime' => 300,
124
            ]
125
        ];
126
127
        $results   = Users::find($parameters);
128
        foreach ($results as $record) {
129
            if (empty($record->email)) {
130
                continue;
131
            }
132
            $this->internal_numbers[$record->number] = [
133
                'email'    => $record->email,
134
                'language' => $record->language,
135
            ];
136
        }
137
    }
138
139
    /**
140
     * Обработчик результата запроса.
141
     * @param $result
142
     */
143
    private function updateCdr($result): void
144
    {
145
        $this->initSettings();
146
        $arr_update_cdr = [];
147
        // Получаем идентификаторы активных каналов.
148
        $channels_id = $this->getActiveIdChannels();
149
        foreach ($result as $row) {
150
            if (array_key_exists($row['linkedid'], $channels_id)) {
151
                // Цепочка вызовов еще не завершена.
152
                continue;
153
            }
154
155
            $start      = strtotime($row['start']);
156
            $answer     = strtotime($row['answer']);
157
            $end        = strtotime($row['endtime']);
158
            $dialstatus = trim($row['dialstatus']);
159
160
            $duration = max(($end - $start), 0);
161
            $billsec  = ($end && $answer) ? ($end - $answer) : 0;
162
163
            [$disposition, $row] = $this->setDisposition($billsec, $dialstatus, $row);
164
            [$row, $billsec]     = $this->checkBillsecMakeRecFile($billsec, $row);
165
166
            $data = [
167
                'work_completed' => 1,
168
                'duration'       => $duration,
169
                'billsec'        => $billsec,
170
                'disposition'    => $disposition,
171
                'UNIQUEID'       => $row['UNIQUEID'],
172
                'recordingfile'  => ($disposition === 'ANSWERED') ? $row['recordingfile'] : '',
173
                'tmp_linked_id'  => $row['linkedid'],
174
            ];
175
176
            $arr_update_cdr[] = $data;
177
            $this->checkNoAnswerCall(array_merge($row, $data));
178
        }
179
180
        $this->setStatusAndPublish($arr_update_cdr);
181
        $this->notifyByEmail();
182
    }
183
184
    /**
185
     * Функция позволяет получить активные каналы.
186
     * Возвращает ассоциативный массив. Ключ - Linkedid, значение - массив каналов.
187
     *
188
     * @return array
189
     */
190
    private function getActiveIdChannels(): array
191
    {
192
        $am           = Util::getAstManager('off');
193
        return $am->GetChannels(true);
194
    }
195
196
    /**
197
     * Анализируем не отвеченные вызовы. Наполняем временный массив для дальнейшей обработки.
198
     *
199
     * @param $row
200
     */
201
    private function checkNoAnswerCall($row): void
202
    {
203
        if ($row['disposition'] === 'ANSWERED') {
204
            $this->no_answered_calls[$row['linkedid']]['ANSWERED'] = true;
205
            return;
206
        }
207
        if ( ! array_key_exists($row['dst_num'], $this->internal_numbers)) {
208
            // dst_num - не является номером сотрудника. Это исходящий.
209
            return;
210
        }
211
        $is_internal = false;
212
        if ((array_key_exists($row['src_num'], $this->internal_numbers))) {
213
            // Это внутренний вызов.
214
            $is_internal = true;
215
        }
216
217
        $this->no_answered_calls[$row['linkedid']][] = [
218
            'from_number' => $row['src_num'],
219
            'to_number'   => $row['dst_num'],
220
            'start'       => $row['start'],
221
            'answer'      => $row['answer'],
222
            'endtime'     => $row['endtime'],
223
            'email'       => $this->internal_numbers[$row['dst_num']]['email'],
224
            'language'    => $this->internal_numbers[$row['dst_num']]['language'],
225
            'is_internal' => $is_internal,
226
            'duration'    => $row['duration'],
227
            'NOANSWER'    => true
228
        ];
229
    }
230
231
232
    /**
233
     * Постановка задачи в очередь на оповещение по email.
234
     */
235
    private function notifyByEmail(): void
236
    {
237
        foreach ($this->no_answered_calls as $call) {
238
            $this->client_queue->publish(json_encode($call), WorkerNotifyByEmail::class);
239
        }
240
        $this->no_answered_calls = [];
241
    }
242
243
    /**
244
     * @param array $arr_update_cdr
245
     */
246
    private function setStatusAndPublish(array $arr_update_cdr): void{
247
        $idForDelete = [];
248
249
        foreach ($arr_update_cdr as $data) {
250
            $linkedId = $data['tmp_linked_id'];
251
            $data['GLOBAL_STATUS'] = $data['disposition'];
252
            if (isset($this->no_answered_calls[$linkedId]['ANSWERED'])) {
253
                $data['GLOBAL_STATUS'] = 'ANSWERED';
254
                // Это отвеченный вызов (на очередь). Удаляем из списка.
255
                $idForDelete[$linkedId]=true;
256
            }
257
            unset($data['tmp_linked_id']);
258
            $this->client_queue->publish(json_encode($data), self::UPDATE_CDR_TUBE);
259
        }
260
261
        // Чистим память.
262
        foreach ($idForDelete as $linkedId => $data){
263
            unset($this->no_answered_calls[$linkedId]);
264
        }
265
    }
266
267
    /**
268
     * @param int $billsec
269
     * @param     $row
270
     * @return array
271
     */
272
    private function checkBillsecMakeRecFile(int $billsec, $row): array{
273
        if ($billsec <= 0) {
274
            $row['answer'] = '';
275
            $billsec = 0;
276
277
            if (!empty($row['recordingfile'])) {
278
                if($row['dst_chan'] === "App:{$row['dst_num']}"){
279
                    // Для приложения не может быть записи разговора.
280
                    // Запись должна относится к конечному устройству.
281
                    $row['recordingfile'] = '';
282
                }else{
283
                    // Удаляем файлы
284
                    $p_info = pathinfo($row['recordingfile']);
285
                    $fileName = $p_info['dirname'] . '/' . $p_info['filename'];
286
                    $file_list = [$fileName . '.mp3', $fileName . '.wav', $fileName . '_in.wav', $fileName . '_out.wav',];
287
                    foreach ($file_list as $file) {
288
                        if (!file_exists($file) || is_dir($file)) {
289
                            continue;
290
                        }
291
                        Processes::mwExec("rm -rf '{$file}'");
292
                    }
293
                }
294
            }
295
        } elseif (trim($row['recordingfile']) !== '') {
296
            // Если каналов не существует с ID, то можно удалить временные файлы.
297
            $p_info = pathinfo($row['recordingfile']);
298
            // Запускаем процесс конвертации в mp3
299
            $wav2mp3Path = Util::which('wav2mp3.sh');
300
            $nicePath = Util::which('nice');
301
            Processes::mwExecBg("{$nicePath} -n -19 {$wav2mp3Path} '{$p_info['dirname']}/{$p_info['filename']}'");
302
            // В последствии конвертации (успешной) исходные файлы будут удалены.
303
        }
304
        return array($row, $billsec);
305
    }
306
307
    /**
308
     * @param int    $billsec
309
     * @param string $dialstatus
310
     * @param        $row
311
     * @return array
312
     */
313
    private function setDisposition(int $billsec, string $dialstatus, $row): array{
314
        $disposition = 'NOANSWER';
315
        if ($billsec > 0) {
316
            $disposition = 'ANSWERED';
317
        } elseif ('' !== $dialstatus) {
318
            $disposition = ($dialstatus === 'ANSWERED') ? $disposition : $dialstatus;
319
        }
320
321
        if ($disposition !== 'ANSWERED') {
322
            if (file_exists($row['recordingfile']) && !is_dir($row['recordingfile'])) {
323
                Processes::mwExec("rm -rf {$row['recordingfile']}");
324
            }
325
        } elseif (!empty($row['recordingfile']) &&
326
            !file_exists(Util::trimExtensionForFile($row['recordingfile']) . '.wav') &&
327
            !file_exists($row['recordingfile'])) {
328
            /** @var CallDetailRecordsTmp $rec_data */
329
            $rec_data = CallDetailRecordsTmp::findFirst("linkedid='{$row['linkedid']}' AND dst_chan='{$row['dst_chan']}'");
330
            if ($rec_data !== null) {
331
                $row['recordingfile'] = $rec_data->recordingfile;
332
            }
333
        }
334
        return array($disposition, $row);
335
    }
336
337
}
338
339
// Start worker process
340
WorkerCdr::startWorker($argv??null);