Passed
Push — develop ( ee3b43...8dc9c9 )
by Nikolay
05:38
created

BeanstalkClient::setTimeoutHandler()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 3
Code Lines 1

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 1
dl 0
loc 3
rs 10
c 0
b 0
f 0
cc 1
nc 1
nop 1
1
<?php
2
/*
3
 * MikoPBX - free phone system for small business
4
 * Copyright (C) 2017-2020 Alexey Portnov and Nikolay Beketov
5
 *
6
 * This program is free software: you can redistribute it and/or modify
7
 * it under the terms of the GNU General Public License as published by
8
 * the Free Software Foundation; either version 3 of the License, or
9
 * (at your option) any later version.
10
 *
11
 * This program is distributed in the hope that it will be useful,
12
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14
 * GNU General Public License for more details.
15
 *
16
 * You should have received a copy of the GNU General Public License along with this program.
17
 * If not, see <https://www.gnu.org/licenses/>.
18
 */
19
20
namespace MikoPBX\Core\System;
21
22
use Phalcon\Di\Injectable;
23
use Pheanstalk\Contract\PheanstalkInterface;
24
use Pheanstalk\Job;
25
use Pheanstalk\Pheanstalk;
26
use Throwable;
27
28
class BeanstalkClient extends Injectable
29
{
30
    public const INBOX_PREFIX = 'INBOX_';
31
32
    /** @var Pheanstalk */
33
    private Pheanstalk $queue;
34
    private bool $connected = false;
35
    private array $subscriptions = [];
36
    private string $tube;
37
    private int $reconnectsCount = 0;
38
    private $message;
39
    private $timeout_handler;
40
    private $error_handler;
41
42
    private string $port;
43
44
    /**
45
     * BeanstalkClient constructor.
46
     *
47
     * @param string $tube
48
     * @param string $port
49
     */
50
    public function __construct($tube = 'default', $port = '')
51
    {
52
        $this->tube = str_replace("\\", '-', $tube);
53
        $this->port = $port;
54
        $this->reconnect();
55
    }
56
57
    /**
58
     * Recreates connection to the beanstalkd server
59
     */
60
    public function reconnect(): void
61
    {
62
        $config = $this->di->get('config')->beanstalk;
63
        $port   = $config->port;
64
        if ( ! empty($this->port) && is_numeric($this->port)) {
65
            $port = $this->port;
66
        }
67
        $this->queue = Pheanstalk::create($config->host, $port);
68
        $this->queue->useTube($this->tube);
69
        foreach ($this->subscriptions as $tube => $callback) {
70
            $this->subscribe($tube, $callback);
71
        }
72
        $this->connected = true;
73
    }
74
75
    /**
76
     * Subscribe on new message in tube
77
     *
78
     * @param string           $tube     - listening tube
79
     * @param array | callable $callback - worker
80
     */
81
    public function subscribe(string $tube, $callback): void
82
    {
83
        $tube = str_replace("\\", '-', $tube);
84
        $this->queue->watch($tube);
85
        $this->queue->ignore('default');
86
        $this->subscriptions[$tube] = $callback;
87
    }
88
89
    /**
90
     * Returns connection status
91
     *
92
     * @return bool
93
     */
94
    public function isConnected(): bool
95
    {
96
        return $this->connected;
97
    }
98
99
    /**
100
     * Sends request and wait for answer from processor
101
     *
102
     * @param      $job_data
103
     * @param int  $timeout
104
     * @param int  $priority
105
     *
106
     * @return bool|string
107
     */
108
    public function request(
109
        $job_data,
110
        int $timeout = 10,
111
        int $priority = PheanstalkInterface::DEFAULT_PRIORITY
112
    ) {
113
        $this->message = false;
114
        $inbox_tube    = uniqid(self::INBOX_PREFIX, true);
115
        $this->queue->watch($inbox_tube);
116
117
        // Send message to backend worker
118
        $requestMessage = [
119
            $job_data,
120
            'inbox_tube' => $inbox_tube,
121
        ];
122
        $this->publish($requestMessage, null, $priority, 0, $timeout);
123
124
        // We wait until a worker process request.
125
        try {
126
            $job = $this->queue->reserveWithTimeout($timeout);
127
            if ($job !== null) {
128
                $this->message = $job->getData();
129
                $this->queue->delete($job);
130
            }
131
        } catch (Throwable $exception) {
132
            Util::sysLogMsg(__METHOD__, 'Exception: ' . $exception->getMessage(), LOG_ERR);
133
            if (isset($job)) {
134
                $this->queue->bury($job);
135
            }
136
        }
137
        $this->queue->ignore($inbox_tube);
138
139
        return $this->message;
140
    }
141
142
    /**
143
     * Puts a job in a beanstalkd server queue
144
     *
145
     * @param mixed   $job_data data to worker
146
     * @param ?string $tube     tube name
147
     * @param int     $priority Jobs with smaller priority values will be scheduled
148
     *                          before jobs with larger priorities. The most urgent priority is 0;
149
     *                          the least urgent priority is 4294967295.
150
     * @param int     $delay    delay before insert job into work query
151
     * @param int     $ttr      time to execute this job
152
     *
153
     * @return \Pheanstalk\Job
154
     */
155
    public function publish(
156
        $job_data,
157
        $tube = null,
158
        int $priority = PheanstalkInterface::DEFAULT_PRIORITY,
159
        int $delay = PheanstalkInterface::DEFAULT_DELAY,
160
        int $ttr = PheanstalkInterface::DEFAULT_TTR
161
    ): Job {
162
        $tube = str_replace("\\", '-', $tube);
163
        // Change tube
164
        if ( ! empty($tube) && $this->tube !== $tube) {
165
            $this->queue->useTube($tube);
166
        }
167
        $job_data = serialize($job_data);
168
        // Send JOB to queue
169
        $result = $this->queue->put($job_data, $priority, $delay, $ttr);
170
171
        // Return original tube
172
        $this->queue->useTube($this->tube);
173
174
        return $result;
175
    }
176
177
    /**
178
     * Drops orphaned tasks
179
     */
180
    public function cleanTubes()
181
    {
182
        $tubes          = $this->queue->listTubes();
183
        $deletedJobInfo = [];
184
        foreach ($tubes as $tube) {
185
            try {
186
                $this->queue->useTube($tube);
187
                $queueStats = $this->queue->stats()->getArrayCopy();
188
189
                // Delete buried jobs
190
                $countBuried = $queueStats['current-jobs-buried'];
191
                while ($job = $this->queue->peekBuried()) {
192
                    $countBuried--;
193
                    if ($countBuried < 0) {
194
                        break;
195
                    }
196
                    $id = $job->getId();
197
                    Util::sysLogMsg(
198
                        __METHOD__,
199
                        "Deleted buried job with ID {$id} from {$tube} with message {$job->getData()}",
200
                        LOG_DEBUG
201
                    );
202
                    $this->queue->delete($job);
203
                    $deletedJobInfo[] = "{$id} from {$tube}";
204
                }
205
206
                // Delete outdated jobs
207
                $countReady = $queueStats['current-jobs-ready'];
208
                while ($job = $this->queue->peekReady()) {
209
                    $countReady--;
210
                    if ($countReady < 0) {
211
                        break;
212
                    }
213
                    $id                    = $job->getId();
214
                    $jobStats              = $this->queue->statsJob($job)->getArrayCopy();
215
                    $age                   = (int)$jobStats['age'];
216
                    $expectedTimeToExecute = (int)$jobStats['ttr'] * 2;
217
                    if ($age > $expectedTimeToExecute) {
218
                        Util::sysLogMsg(
219
                            __METHOD__,
220
                            "Deleted outdated job with ID {$id} from {$tube} with message {$job->getData()}",
221
                            LOG_DEBUG
222
                        );
223
                        $this->queue->delete($job);
224
                        $deletedJobInfo[] = "{$id} from {$tube}";
225
                    }
226
                }
227
            } catch (Throwable $exception) {
228
                Util::sysLogMsg(__METHOD__, 'Exception: ' . $exception->getMessage(), LOG_ERR);
229
            }
230
        }
231
        if (count($deletedJobInfo) > 0) {
232
            Util::sysLogMsg(__METHOD__, "Delete outdated jobs" . implode(PHP_EOL, $deletedJobInfo), LOG_WARNING);
233
        }
234
    }
235
236
    /**
237
     * Job worker for loop cycles
238
     *
239
     * @param float $timeout
240
     *
241
     */
242
    public function wait(float $timeout = 5): void
243
    {
244
        $this->message = null;
245
        $start         = microtime(true);
246
        try {
247
            $job = $this->queue->reserveWithTimeout((int)$timeout);
248
        } catch (Throwable $exception) {
249
            Util::sysLogMsg(__METHOD__, 'Exception: ' . $exception->getMessage(), LOG_ERR);
250
        }
251
252
        if ( ! isset($job)) {
253
            $workTime = (microtime(true) - $start);
254
            if ($workTime < $timeout) {
255
                usleep(100000);
256
                // Если время ожидания $worktime меньше значения таймаута $timeout
257
                // И задача не получена $job === null
258
                // Что то не то, вероятно потеряна связь с сервером очередей
259
                $this->reconnect();
260
            }
261
            if (is_array($this->timeout_handler)) {
262
                call_user_func($this->timeout_handler);
263
            }
264
265
            return;
266
        }
267
268
        // Processing job over callable function attached in $this->subscribe
269
        if (json_decode($job->getData(), true) !== null) {
270
            $mData = $job->getData();
271
        } else {
272
            $mData = unserialize($job->getData(), [false]);
273
        }
274
        $this->message = $mData;
275
276
        $stats           = $this->queue->statsJob($job);
277
        $requestFormTube = $stats['tube'];
278
        $func            = $this->subscriptions[$requestFormTube] ?? null;
279
280
        if ($func === null) {
281
            // Action not found
282
            $this->queue->bury($job);
283
        } else {
284
            try {
285
                if (is_array($func)) {
286
                    call_user_func($func, $this);
287
                } elseif (is_callable($func) === true) {
288
                    $func($this);
289
                }
290
                // Removes the job from the queue when it has been successfully completed
291
                $this->queue->delete($job);
292
            } catch (Throwable $e) {
293
                // Marks the job as terminally failed and no workers will restart it.
294
                $this->queue->bury($job);
295
                Util::sysLogMsg(__METHOD__ . '_EXCEPTION', $e->getMessage(), LOG_ERR);
296
            }
297
        }
298
    }
299
300
    /**
301
     * Gets request body
302
     *
303
     * @return string
304
     */
305
    public function getBody(): string
306
    {
307
        if (is_array($this->message)
308
            && isset($this->message['inbox_tube'])
309
            && count($this->message) === 2) {
310
            // Это поступил request, треует ответа. Данные были переданы первым параметром массива.
311
            $message_data = $this->message[0];
312
        } else {
313
            $message_data = $this->message;
314
        }
315
316
        return $message_data;
317
    }
318
319
    /**
320
     * Sends response to queue
321
     *
322
     * @param $response
323
     */
324
    public function reply($response): void
325
    {
326
        if (isset($this->message['inbox_tube'])) {
327
            $this->queue->useTube($this->message['inbox_tube']);
328
            $this->queue->put($response);
329
            $this->queue->useTube($this->tube);
330
        }
331
    }
332
333
    /**
334
     *
335
     * @param $handler
336
     */
337
    public function setErrorHandler($handler): void
338
    {
339
        $this->error_handler = $handler;
340
    }
341
342
    /**
343
     * @param $handler
344
     */
345
    public function setTimeoutHandler($handler): void
346
    {
347
        $this->timeout_handler = $handler;
348
    }
349
350
    /**
351
     * @return int
352
     */
353
    public function reconnectsCount(): int
354
    {
355
        return $this->reconnectsCount;
356
    }
357
358
    /**
359
     * Gets all messages from tube and clean it
360
     *
361
     * @param string $tube
362
     *
363
     * @return array
364
     */
365
    public function getMessagesFromTube(string $tube = ''): array
366
    {
367
        if ($tube !== '') {
368
            $this->queue->useTube($tube);
369
        }
370
        $arrayOfMessages = [];
371
        while ($job = $this->queue->peekReady()) {
372
            if (json_decode($job->getData(), true) !== null) {
373
                $mData = $job->getData();
374
            } else {
375
                $mData = unserialize($job->getData(), [false]);
376
            }
377
            $arrayOfMessages[] = $mData;
378
            $this->queue->delete($job);
379
        }
380
381
        return $arrayOfMessages;
382
    }
383
}