Passed
Pull Request — master (#23)
by Nikolay
09:42 queued 03:08
created

WorkerCdr::getCheckResult()   A

Complexity

Conditions 5
Paths 6

Size

Total Lines 15
Code Lines 10

Duplication

Lines 0
Ratio 0 %

Importance

Changes 1
Bugs 0 Features 0
Metric Value
eloc 10
c 1
b 0
f 0
dl 0
loc 15
rs 9.6111
cc 5
nc 6
nop 0
1
<?php
2
/*
3
 * Copyright © MIKO LLC - All Rights Reserved
4
 * Unauthorized copying of this file, via any medium is strictly prohibited
5
 * Proprietary and confidential
6
 * Written by Alexey Portnov, 9 2020
7
 */
8
9
namespace MikoPBX\Core\Workers;
10
11
require_once 'Globals.php';
12
13
use MikoPBX\Common\Models\{CallDetailRecordsTmp, Extensions, Users};
14
use MikoPBX\Core\System\{BeanstalkClient, Processes, Util};
15
use Throwable;
16
17
/**
18
 * Class WorkerCdr
19
 * Обработка записей CDR. Заполение длительности звонков.
20
 */
21
class WorkerCdr extends WorkerBase
22
{
23
24
    public const SELECT_CDR_TUBE = 'select_cdr_tube';
25
    public const UPDATE_CDR_TUBE = 'update_cdr_tube';
26
27
    private BeanstalkClient $client_queue;
28
    private $internal_numbers = [];
29
    private $no_answered_calls = [];
30
31
32
    /**
33
     * Entry point
34
     *
35
     * @param $argv
36
     *
37
     */
38
    public function start($argv): void
39
    {
40
        $filter = [
41
            '(work_completed<>1 OR work_completed IS NULL) AND endtime IS NOT NULL',
42
            'miko_tmp_db'         => true,
43
            'columns'             => 'start,answer,src_num,dst_num,dst_chan,endtime,linkedid,recordingfile,dialstatus,UNIQUEID',
44
            'miko_result_in_file' => true,
45
        ];
46
47
48
        $this->client_queue = new BeanstalkClient(self::SELECT_CDR_TUBE);
49
        $this->client_queue->subscribe($this->makePingTubeName(self::class), [$this, 'pingCallBack']);
50
51
        $this->initSettings();
52
53
        while ($this->needRestart === false) {
54
            $result = $this->client_queue->request(json_encode($filter), 10);
55
56
            if ($result !== false) {
57
                $this->updateCdr();
58
            }
59
            $this->client_queue->wait(5); // instead of sleep
60
        }
61
    }
62
63
    /**
64
     * Fills settings
65
     */
66
    private function initSettings()
67
    {
68
        $this->internal_numbers  = [];
69
        $this->no_answered_calls = [];
70
71
        $usersClass = Users::class;
72
        $parameters = [
73
            'columns'=>[
74
                'email'=>'email',
75
                'language'=>'language',
76
                'number'=>'Extensions.number'
77
            ],
78
            'joins'      => [
79
                'Extensions' => [
80
                    0 => Extensions::class,
81
                    1 => "Extensions.userid={$usersClass}.id",
82
                    2 => 'Extensions',
83
                    3 => 'INNER',
84
                ],
85
            ],
86
            'cache' => [
87
                'key'=>'Users-WorkerCdr',
88
                'lifetime' => 3600,
89
            ]
90
        ];
91
92
        $results   = Users::find($parameters);
93
        foreach ($results as $record) {
94
            if (empty($record->email)) {
95
                continue;
96
            }
97
            $this->internal_numbers[$record->number] = [
98
                'email'    => $record->email,
99
                'language' => $record->language,
100
            ];
101
        }
102
    }
103
104
    /**
105
     * Обработчик результата запроса.
106
     *
107
     */
108
    private function updateCdr(): void
109
    {
110
        $this->initSettings();
111
        $result = $this->getCheckResult();
112
        if (count($result) < 1) {
113
            return;
114
        }
115
        $arr_update_cdr = [];
116
        // Получаем идентификаторы активных каналов.
117
        $channels_id = $this->getActiveIdChannels();
118
        foreach ($result as $row) {
119
            if (array_key_exists($row['linkedid'], $channels_id)) {
120
                // Цепочка вызовов еще не завершена.
121
                continue;
122
            }
123
124
            $start      = strtotime($row['start']);
125
            $answer     = strtotime($row['answer']);
126
            $end        = strtotime($row['endtime']);
127
            $dialstatus = trim($row['dialstatus']);
128
129
            $duration = max(($end - $start), 0);
130
            $billsec  = ($end && $answer) ? ($end - $answer) : 0;
131
132
            [$disposition, $row] = $this->setDisposition($billsec, $dialstatus, $row);
133
            [$row, $billsec]     = $this->checkBillsecMakeRecFile($billsec, $row);
134
135
            $data = [
136
                'work_completed' => 1,
137
                'duration'       => $duration,
138
                'billsec'        => $billsec,
139
                'disposition'    => $disposition,
140
                'UNIQUEID'       => $row['UNIQUEID'],
141
                'recordingfile'  => ($disposition === 'ANSWERED') ? $row['recordingfile'] : '',
142
                'tmp_linked_id'  => $row['linkedid'],
143
            ];
144
145
            $arr_update_cdr[] = $data;
146
            $this->checkNoAnswerCall(array_merge($row, $data));
147
        }
148
149
        $this->setStatusAndPublish($arr_update_cdr);
150
        $this->notifyByEmail();
151
    }
152
153
    /**
154
     * Функция позволяет получить активные каналы.
155
     * Возвращает ассоциативный массив. Ключ - Linkedid, значение - массив каналов.
156
     *
157
     * @return array
158
     */
159
    private function getActiveIdChannels(): array
160
    {
161
        $am           = Util::getAstManager('off');
162
        return $am->GetChannels(true);
163
    }
164
165
    /**
166
     * Анализируем не отвеченные вызовы. Наполняем временный массив для дальнейшей обработки.
167
     *
168
     * @param $row
169
     */
170
    private function checkNoAnswerCall($row): void
171
    {
172
        if ($row['disposition'] === 'ANSWERED') {
173
            $this->no_answered_calls[$row['linkedid']]['NOANSWER'] = false;
174
            return;
175
        }
176
        if ( ! array_key_exists($row['dst_num'], $this->internal_numbers)) {
177
            // dst_num - не является номером сотрудника. Это исходящий.
178
            return;
179
        }
180
        $is_internal = false;
181
        if ((array_key_exists($row['src_num'], $this->internal_numbers))) {
182
            // Это внутренний вызов.
183
            $is_internal = true;
184
        }
185
186
        $this->no_answered_calls[$row['linkedid']][] = [
187
            'from_number' => $row['src_num'],
188
            'to_number'   => $row['dst_num'],
189
            'start'       => $row['start'],
190
            'answer'      => $row['answer'],
191
            'endtime'     => $row['endtime'],
192
            'email'       => $this->internal_numbers[$row['dst_num']]['email'],
193
            'language'    => $this->internal_numbers[$row['dst_num']]['language'],
194
            'is_internal' => $is_internal,
195
            'duration'    => $row['duration'],
196
        ];
197
    }
198
199
200
    /**
201
     * Постановка задачи в очередь на оповещение по email.
202
     */
203
    private function notifyByEmail(): void
204
    {
205
        foreach ($this->no_answered_calls as $call) {
206
            $this->client_queue->publish(json_encode($call), WorkerNotifyByEmail::class);
207
        }
208
        $this->no_answered_calls = [];
209
    }
210
211
    /**
212
     * @param array $arr_update_cdr
213
     */
214
    private function setStatusAndPublish(array $arr_update_cdr): void{
215
        foreach ($arr_update_cdr as $data) {
216
            $linkedid = $data['tmp_linked_id'];
217
            $data['GLOBAL_STATUS'] = $data['disposition'];
218
            if (isset($this->no_answered_calls[$linkedid]['NOANSWER']) && $this->no_answered_calls[$linkedid]['NOANSWER'] === false) {
219
                $data['GLOBAL_STATUS'] = 'ANSWERED';
220
                // Это отвеченный вызов (на очередь). Удаляем из списка.
221
                unset($this->no_answered_calls[$linkedid]);
222
            }
223
            unset($data['tmp_linked_id']);
224
            $this->client_queue->publish(json_encode($data), self::UPDATE_CDR_TUBE);
225
        }
226
    }
227
228
    /**
229
     * @param int $billsec
230
     * @param     $row
231
     * @return array
232
     */
233
    private function checkBillsecMakeRecFile(int $billsec, $row): array{
234
        if ($billsec <= 0) {
235
            $row['answer'] = '';
236
            $billsec = 0;
237
238
            if (!empty($row['recordingfile'])) {
239
                // Удаляем файлы
240
                $p_info = pathinfo($row['recordingfile']);
241
                $fileName = $p_info['dirname'] . '/' . $p_info['filename'];
242
                $file_list = [$fileName . '.mp3', $fileName . '.wav', $fileName . '_in.wav', $fileName . '_out.wav',];
243
                foreach ($file_list as $file) {
244
                    if (!file_exists($file) || is_dir($file)) {
245
                        continue;
246
                    }
247
                    Processes::mwExec("rm -rf '{$file}'");
248
                }
249
            }
250
        } elseif (trim($row['recordingfile']) !== '') {
251
            // Если каналов не существует с ID, то можно удалить временные файлы.
252
            $p_info = pathinfo($row['recordingfile']);
253
            // Запускаем процесс конвертации в mp3
254
            $wav2mp3Path = Util::which('wav2mp3.sh');
255
            $nicePath = Util::which('nice');
256
            Processes::mwExecBg("{$nicePath} -n 19 {$wav2mp3Path} '{$p_info['dirname']}/{$p_info['filename']}'");
257
            // В последствии конвертации (успешной) исходные файлы будут удалены.
258
        }
259
        return array($row, $billsec);
260
    }
261
262
    /**
263
     */
264
    private function getCheckResult(){
265
        $result_data = $this->client_queue->getBody();
266
        // Получаем результат.
267
        $result = json_decode($result_data, true);
268
        if (file_exists($result)) {
269
            $file_data = json_decode(file_get_contents($result), true);
270
            if (!is_dir($result)) {
271
                Processes::mwExec("rm -rf {$result}");
272
            }
273
            $result = $file_data;
274
        }
275
        if ( ! is_array($result) && ! is_object($result)) {
276
            $result = [];
277
        }
278
        return $result;
279
    }
280
281
    /**
282
     * @param int    $billsec
283
     * @param string $dialstatus
284
     * @param        $row
285
     * @return array
286
     */
287
    private function setDisposition(int $billsec, string $dialstatus, $row): array{
288
        $disposition = 'NOANSWER';
289
        if ($billsec > 0) {
290
            $disposition = 'ANSWERED';
291
        } elseif ('' !== $dialstatus) {
292
            $disposition = ($dialstatus === 'ANSWERED') ? $disposition : $dialstatus;
293
        }
294
295
        if ($disposition !== 'ANSWERED') {
296
            if (file_exists($row['recordingfile']) && !is_dir($row['recordingfile'])) {
297
                Processes::mwExec("rm -rf {$row['recordingfile']}");
298
            }
299
        } elseif (!empty($row['recordingfile']) &&
300
            !file_exists(Util::trimExtensionForFile($row['recordingfile']) . '.wav') &&
301
            !file_exists($row['recordingfile'])) {
302
            /** @var CallDetailRecordsTmp $rec_data */
303
            $rec_data = CallDetailRecordsTmp::findFirst("linkedid='{$row['linkedid']}' AND dst_chan='{$row['dst_chan']}'");
304
            if ($rec_data !== null) {
305
                $row['recordingfile'] = $rec_data->recordingfile;
306
            }
307
        }
308
        return array($disposition, $row);
309
    }
310
311
}
312
313
// Start worker process
314
$workerClassname = WorkerCdr::class;
315
if (isset($argv) && count($argv) > 1 && $argv[1] === 'start') {
316
    cli_set_process_title($workerClassname);
317
    try {
318
        $worker = new $workerClassname();
319
        $worker->start($argv);
320
    } catch (Throwable $e) {
321
        global $errorLogger;
322
        $errorLogger->captureException($e);
323
        Util::sysLogMsg("{$workerClassname}_EXCEPTION", $e->getMessage());
324
    }
325
}