Passed
Push — develop ( 42ae39...8db92a )
by Портнов
07:32 queued 10s
created

WorkerCdr::initSettings()   A

Complexity

Conditions 4
Paths 3

Size

Total Lines 14
Code Lines 10

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 10
dl 0
loc 14
rs 9.9332
c 0
b 0
f 0
cc 4
nc 3
nop 0
1
<?php
2
/*
3
 * Copyright © MIKO LLC - All Rights Reserved
4
 * Unauthorized copying of this file, via any medium is strictly prohibited
5
 * Proprietary and confidential
6
 * Written by Alexey Portnov, 9 2020
7
 */
8
9
namespace MikoPBX\Core\Workers;
10
11
require_once 'Globals.php';
12
13
use MikoPBX\Common\Models\{CallDetailRecordsTmp, Users};
14
use MikoPBX\Core\System\{BeanstalkClient, Util};
15
use Pheanstalk\Contract\PheanstalkInterface;
16
17
/**
18
 * Class WorkerCdr
19
 * Обработка записей CDR. Заполение длительности звонков.
20
 */
21
class WorkerCdr extends WorkerBase
22
{
23
24
    public const SELECT_CDR_TUBE = 'select_cdr_tube';
25
    public const UPDATE_CDR_TUBE = 'update_cdr_tube';
26
    protected int $maxProc=1;
27
28
29
    private BeanstalkClient $client_queue;
30
    private $internal_numbers = [];
31
    private $no_answered_calls = [];
32
33
34
    /**
35
     * Entry point
36
     *
37
     * @param $argv
38
     *
39
     */
40
    public function start($argv): void
41
    {
42
        $filter = [
43
            '(work_completed<>1 OR work_completed IS NULL) AND endtime IS NOT NULL',
44
            'miko_tmp_db'         => true,
45
            'columns'             => 'start,answer,src_num,dst_num,dst_chan,endtime,linkedid,recordingfile,dialstatus,UNIQUEID',
46
            'miko_result_in_file' => true,
47
        ];
48
49
50
        $this->client_queue = new BeanstalkClient(self::SELECT_CDR_TUBE);
51
        $this->client_queue->subscribe($this->makePingTubeName(self::class), [$this, 'pingCallBack']);
52
53
        $this->initSettings();
54
55
        while (true) {
56
            $result = $this->client_queue->request(json_encode($filter), 10);
57
58
            if ($result !== false) {
59
                $this->updateCdr();
60
            }
61
            $this->client_queue->wait(5); // instead of sleep
62
        }
63
    }
64
65
    private function initSettings()
66
    {
67
        $this->internal_numbers  = [];
68
        $this->no_answered_calls = [];
69
        $users                   = Users::find();
70
        foreach ($users as $user) {
71
            if (empty($user->email)) {
72
                continue;
73
            }
74
75
            foreach ($user->Extensions as $exten) {
76
                $this->internal_numbers[$exten->number] = [
77
                    'email'    => $user->email,
78
                    'language' => $user->language,
79
                ];
80
            }
81
        }
82
    }
83
84
    /**
85
     * Обработчик результата запроса.
86
     *
87
     */
88
    private function updateCdr(): void
89
    {
90
        $this->initSettings();
91
        $result = $this->getCheckResult();
92
        if (count($result) < 1) {
93
            return;
94
        }
95
        $arr_update_cdr = [];
96
        // Получаем идентификаторы активных каналов.
97
        $channels_id = $this->getActiveIdChannels();
98
        foreach ($result as $row) {
99
            if (array_key_exists($row['linkedid'], $channels_id)) {
100
                // Цепочка вызовов еще не завершена.
101
                continue;
102
            }
103
104
            $start      = strtotime($row['start']);
105
            $answer     = strtotime($row['answer']);
106
            $end        = strtotime($row['endtime']);
107
            $dialstatus = trim($row['dialstatus']);
108
109
            $duration = max(($end - $start), 0);
110
            $billsec  = ($end !== 0 && $answer !== 0) ? ($end - $answer) : 0;
111
112
            [$disposition, $row] = $this->setDisposition($billsec, $dialstatus, $row);
113
            [$row, $billsec]     = $this->checkBillsecMakeRecFile($billsec, $row);
114
115
            $data = [
116
                'work_completed' => 1,
117
                'duration'       => $duration,
118
                'billsec'        => $billsec,
119
                'disposition'    => $disposition,
120
                'UNIQUEID'       => $row['UNIQUEID'],
121
                'recordingfile'  => ($disposition === 'ANSWERED') ? $row['recordingfile'] : '',
122
                'tmp_linked_id'  => $row['linkedid'],
123
            ];
124
125
            $arr_update_cdr[] = $data;
126
            $this->checkNoAnswerCall(array_merge($row, $data));
127
        }
128
129
        $this->setStatusAndPublish($arr_update_cdr);
130
        $this->notifyByEmail();
131
    }
132
133
    /**
134
     * Функция позволяет получить активные каналы.
135
     * Возвращает ассоциативный массив. Ключ - Linkedid, значение - массив каналов.
136
     *
137
     * @return array
138
     */
139
    private function getActiveIdChannels(): array
140
    {
141
        $am           = Util::getAstManager('off');
142
        return $am->GetChannels(true);
143
    }
144
145
    /**
146
     * Анализируем не отвеченные вызовы. Наполняем временный массив для дальнейшей обработки.
147
     *
148
     * @param $row
149
     */
150
    private function checkNoAnswerCall($row): void
151
    {
152
        if ($row['disposition'] === 'ANSWERED') {
153
            $this->no_answered_calls[$row['linkedid']]['NOANSWER'] = false;
154
            return;
155
        }
156
        if ( ! array_key_exists($row['dst_num'], $this->internal_numbers)) {
157
            // dst_num - не является номером сотрудника. Это исходящий.
158
            return;
159
        }
160
        $is_internal = false;
161
        if ((array_key_exists($row['src_num'], $this->internal_numbers))) {
162
            // Это внутренний вызов.
163
            $is_internal = true;
164
        }
165
166
        $this->no_answered_calls[$row['linkedid']][] = [
167
            'from_number' => $row['src_num'],
168
            'to_number'   => $row['dst_num'],
169
            'start'       => $row['start'],
170
            'answer'      => $row['answer'],
171
            'endtime'     => $row['endtime'],
172
            'email'       => $this->internal_numbers[$row['dst_num']]['email'],
173
            'language'    => $this->internal_numbers[$row['dst_num']]['language'],
174
            'is_internal' => $is_internal,
175
            'duration'    => $row['duration'],
176
        ];
177
    }
178
179
180
    /**
181
     * Постановка задачи в очередь на оповещение по email.
182
     */
183
    private function notifyByEmail(): void
184
    {
185
        foreach ($this->no_answered_calls as $call) {
186
            $this->client_queue->publish(json_encode($call), WorkerNotifyByEmail::class);
187
        }
188
        $this->no_answered_calls = [];
189
    }
190
191
    /**
192
     * @param array $arr_update_cdr
193
     */
194
    private function setStatusAndPublish(array $arr_update_cdr): void{
195
        foreach ($arr_update_cdr as $data) {
196
            $linkedid = $data['tmp_linked_id'];
197
            $data['GLOBAL_STATUS'] = $data['disposition'];
198
            if (isset($this->no_answered_calls[$linkedid]['NOANSWER']) && $this->no_answered_calls[$linkedid]['NOANSWER'] === false) {
199
                $data['GLOBAL_STATUS'] = 'ANSWERED';
200
                // Это отвеченный вызов (на очередь). Удаляем из списка.
201
                unset($this->no_answered_calls[$linkedid]);
202
            }
203
            unset($data['tmp_linked_id']);
204
            $this->client_queue->publish(json_encode($data), self::UPDATE_CDR_TUBE);
205
        }
206
    }
207
208
    /**
209
     * @param int $billsec
210
     * @param     $row
211
     * @return array
212
     */
213
    private function checkBillsecMakeRecFile(int $billsec, $row): array{
214
        if ($billsec <= 0) {
215
            $row['answer'] = '';
216
            $billsec = 0;
217
218
            if (!empty($row['recordingfile'])) {
219
                // Удаляем файлы
220
                $p_info = pathinfo($row['recordingfile']);
221
                $fileName = $p_info['dirname'] . '/' . $p_info['filename'];
222
                $file_list = [$fileName . '.mp3', $fileName . '.wav', $fileName . '_in.wav', $fileName . '_out.wav',];
223
                foreach ($file_list as $file) {
224
                    if (!file_exists($file) || is_dir($file)) {
225
                        continue;
226
                    }
227
                    Util::mwExec("rm -rf '{$file}'");
228
                }
229
            }
230
        } elseif (trim($row['recordingfile']) !== '') {
231
            // Если каналов не существует с ID, то можно удалить временные файлы.
232
            $p_info = pathinfo($row['recordingfile']);
233
            // Запускаем процесс конвертации в mp3
234
            $wav2mp3Path = Util::which('wav2mp3.sh');
235
            $nicePath = Util::which('nice');
236
            Util::mwExecBg("{$nicePath} -n 19 {$wav2mp3Path} '{$p_info['dirname']}/{$p_info['filename']}'");
237
            // В последствии конвертации (успешной) исходные файлы будут удалены.
238
        }
239
        return array($row, $billsec);
240
    }
241
242
    /**
243
     */
244
    private function getCheckResult(){
245
        $result_data = $this->client_queue->getBody();
246
        // Получаем результат.
247
        $result = json_decode($result_data, true);
248
        if (file_exists($result)) {
249
            $file_data = json_decode(file_get_contents($result), true);
250
            if (!is_dir($result)) {
251
                Util::mwExec("rm -rf {$result}");
252
            }
253
            $result = $file_data;
254
        }
255
        if ( ! is_array($result) && ! is_object($result)) {
256
            $result = [];
257
        }
258
        return $result;
259
    }
260
261
    /**
262
     * @param int    $billsec
263
     * @param string $dialstatus
264
     * @param        $row
265
     * @return array
266
     */
267
    private function setDisposition(int $billsec, string $dialstatus, $row): array{
268
        $disposition = 'NOANSWER';
269
        if ($billsec > 0) {
270
            $disposition = 'ANSWERED';
271
        } elseif ('' !== $dialstatus) {
272
            $disposition = ($dialstatus === 'ANSWERED') ? $disposition : $dialstatus;
273
        }
274
275
        if ($disposition !== 'ANSWERED') {
276
            if (file_exists($row['recordingfile']) && !is_dir($row['recordingfile'])) {
277
                Util::mwExec("rm -rf {$row['recordingfile']}");
278
            }
279
        } elseif (!empty($row['recordingfile']) && !file_exists(Util::trimExtensionForFile($row['recordingfile']) . 'wav') && !file_exists($row['recordingfile'])) {
280
            /** @var CallDetailRecordsTmp $rec_data */
281
            $rec_data = CallDetailRecordsTmp::findFirst("linkedid='{$row['linkedid']}' AND dst_chan='{$row['dst_chan']}'");
282
            if ($rec_data !== null) {
283
                $row['recordingfile'] = $rec_data->recordingfile;
284
            }
285
        }
286
        return array($disposition, $row);
287
    }
288
289
}
290
291
// Start worker process
292
$workerClassname = WorkerCdr::class;
293
if (isset($argv) && count($argv) > 1 && $argv[1] === 'start') {
294
    cli_set_process_title($workerClassname);
295
    try {
296
        $worker = new $workerClassname();
297
        $worker->start($argv);
298
    } catch (\Error $e) {
299
        global $errorLogger;
300
        $errorLogger->captureException($e);
301
        Util::sysLogMsg("{$workerClassname}_EXCEPTION", $e->getMessage());
302
    }
303
}