Passed
Push — develop ( 70394b...5b6493 )
by Nikolay
07:58 queued 03:06
created

BeanstalkClient::sendRequest()   A

Complexity

Conditions 4
Paths 8

Size

Total Lines 34
Code Lines 23

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 23
c 0
b 0
f 0
dl 0
loc 34
rs 9.552
cc 4
nc 8
nop 3
1
<?php
2
/*
3
 * MikoPBX - free phone system for small business
4
 * Copyright © 2017-2023 Alexey Portnov and Nikolay Beketov
5
 *
6
 * This program is free software: you can redistribute it and/or modify
7
 * it under the terms of the GNU General Public License as published by
8
 * the Free Software Foundation; either version 3 of the License, or
9
 * (at your option) any later version.
10
 *
11
 * This program is distributed in the hope that it will be useful,
12
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14
 * GNU General Public License for more details.
15
 *
16
 * You should have received a copy of the GNU General Public License along with this program.
17
 * If not, see <https://www.gnu.org/licenses/>.
18
 */
19
20
namespace MikoPBX\Core\System;
21
22
use MikoPBX\Common\Handlers\CriticalErrorsHandler;
23
use Phalcon\Di\Injectable;
24
use Pheanstalk\Contract\PheanstalkInterface;
25
use Pheanstalk\Job;
26
use Pheanstalk\Pheanstalk;
27
use Throwable;
28
29
/**
30
 * Class BeanstalkClient
31
 *
32
 * Represents a client for interacting with Beanstalkd server.
33
 *
34
 * @package MikoPBX\Core\System
35
 */
36
class BeanstalkClient extends Injectable
37
{
38
    public const INBOX_PREFIX = 'INBOX_';
39
40
    public const QUEUE_ERROR = 'queue_error';
41
42
    /** @var Pheanstalk */
43
    private Pheanstalk $queue;
44
    private bool $connected = false;
45
    private array $subscriptions = [];
46
    private string $tube;
47
    private int $reconnectsCount = 0;
48
    private $message;
49
    private $timeout_handler;
50
    private $error_handler;
51
52
    private string $port;
53
54
    /**
55
     * BeanstalkClient constructor.
56
     *
57
     * @param string $tube The name of the tube.
58
     * @param string $port The port number for the Beanstalkd server.
59
     */
60
    public function __construct(string $tube = 'default', string $port = '')
61
    {
62
        $this->tube = str_replace("\\", '-', $tube);
63
        $this->port = $port;
64
        $this->reconnect();
65
    }
66
67
    /**
68
     * Recreates connection to the Beanstalkd server.
69
     */
70
    public function reconnect(): void
71
    {
72
        $config = $this->di->get('config')->beanstalk;
73
        $tmpPort   = $config->port;
74
        if ( ! empty($this->port) && is_numeric($this->port)) {
75
            $tmpPort = $this->port;
76
        }
77
        $this->queue = Pheanstalk::create($config->host, $tmpPort);
78
        try {
79
            $this->queue->useTube($this->tube);
80
        }catch (Throwable $e){
81
            CriticalErrorsHandler::handleExceptionWithSyslog($e);
82
            $this->connected = false;
83
            return;
84
        }
85
        foreach ($this->subscriptions as $tube => $callback) {
86
            $this->subscribe($tube, $callback);
87
        }
88
        $this->connected = true;
89
    }
90
91
    /**
92
     * Subscribe on new message in tube
93
     *
94
     * @param string           $tube     - listening tube
95
     * @param array | callable $callback - worker
96
     */
97
    public function subscribe(string $tube, $callback): void
98
    {
99
        $tube = str_replace("\\", '-', $tube);
100
        $this->queue->watch($tube);
101
        $this->queue->ignore('default');
102
        $this->subscriptions[$tube] = $callback;
103
    }
104
105
    /**
106
     * Returns connection status
107
     *
108
     * @return bool
109
     */
110
    public function isConnected(): bool
111
    {
112
        return $this->connected;
113
    }
114
115
    /**
116
     * Sends request and wait for answer from processor
117
     *
118
     * @deprecated Use sendRequest instead with array result: list($result, $message) = $client->sendRequest(...)
119
     *
120
     * @param      $job_data
121
     * @param int  $timeout
122
     * @param int  $priority
123
     *
124
     * @return bool|string
125
     */
126
    public function request($job_data, int $timeout = 10, int $priority = PheanstalkInterface::DEFAULT_PRIORITY) {
127
        $this->message = false;
128
        $inbox_tube    = uniqid(self::INBOX_PREFIX, true);
129
        $this->queue->watch($inbox_tube);
130
131
        // Send message to backend worker
132
        $requestMessage = [
133
            $job_data,
134
            'inbox_tube' => $inbox_tube,
135
        ];
136
        $this->publish($requestMessage, null, $priority, 0, $timeout);
137
138
        // We wait until a worker process request.
139
        try {
140
            $job = $this->queue->reserveWithTimeout($timeout);
141
            if ($job !== null) {
142
                $this->message = $job->getData();
143
                $this->queue->delete($job);
144
            }
145
        } catch (Throwable $exception) {
146
            Util::sysLogMsg(__METHOD__, $exception->getMessage(), LOG_ERR);
147
            if(isset($job)){
148
                $this->buryJob($job);
149
            }
150
        }
151
        $this->queue->ignore($inbox_tube);
152
153
        return $this->message;
154
    }
155
156
157
    /**
158
     * Sends request and wait for answer from processor
159
     *
160
     * @param      $job_data
161
     * @param int  $timeout
162
     * @param int  $priority
163
     *
164
     * @return array
165
     */
166
    public function sendRequest($job_data, int $timeout = 10, int $priority = PheanstalkInterface::DEFAULT_PRIORITY):array
167
    {
168
        $result = true;
169
        $inbox_tube    = uniqid(self::INBOX_PREFIX, true);
170
        $this->queue->watch($inbox_tube);
171
172
        // Send message to backend worker
173
        $requestMessage = [
174
            $job_data,
175
            'inbox_tube' => $inbox_tube,
176
        ];
177
        $this->publish($requestMessage, null, $priority, 0, $timeout);
178
179
        // We wait until a worker process request.
180
        try {
181
            $job = $this->queue->reserveWithTimeout($timeout);
182
            if ($job !== null) {
183
                $this->message = $job->getData();
184
                $this->queue->delete($job);
185
            } else {
186
                $this->message = '{"'.self::QUEUE_ERROR.'":"Worker did not answer within timeout '.$timeout.' sec"}';
187
                $result = false;
188
            }
189
        } catch (Throwable $e) {
190
            if(isset($job)){
191
                $this->buryJob($job);
192
            }
193
            $prettyMessage = CriticalErrorsHandler::handleExceptionWithSyslog($e);
194
            $this->message = '{"'.self::QUEUE_ERROR.'":"Exception on '.__METHOD__.' with message: '.$prettyMessage.'"}';
195
            $result = false;
196
        }
197
        $this->queue->ignore($inbox_tube);
198
199
        return [$result, $this->message];
200
    }
201
202
    /**
203
     * Puts a job in a beanstalkd server queue
204
     *
205
     * @param mixed   $job_data data to worker
206
     * @param ?string $tube     tube name
207
     * @param int     $priority Jobs with smaller priority values will be scheduled
208
     *                          before jobs with larger priorities. The most urgent priority is 0;
209
     *                          the least urgent priority is 4294967295.
210
     * @param int     $delay    delay before insert job into work query
211
     * @param int     $ttr      time to execute this job
212
     *
213
     * @return \Pheanstalk\Job
214
     */
215
    public function publish(
216
        $job_data,
217
        $tube = null,
218
        int $priority = PheanstalkInterface::DEFAULT_PRIORITY,
219
        int $delay = PheanstalkInterface::DEFAULT_DELAY,
220
        int $ttr = PheanstalkInterface::DEFAULT_TTR
221
    ): Job {
222
        $tube = str_replace("\\", '-', $tube);
223
        // Change tube
224
        if ( ! empty($tube) && $this->tube !== $tube) {
225
            $this->queue->useTube($tube);
226
        }
227
        $job_data = serialize($job_data);
228
        // Send JOB to queue
229
        $result = $this->queue->put($job_data, $priority, $delay, $ttr);
230
231
        // Return original tube
232
        $this->queue->useTube($this->tube);
233
234
        return $result;
235
    }
236
237
    /**
238
     * Drops orphaned tasks
239
     */
240
    public function cleanTubes()
241
    {
242
        $tubes          = $this->queue->listTubes();
243
        $deletedJobInfo = [];
244
        foreach ($tubes as $tube) {
245
            try {
246
                $this->queue->useTube($tube);
247
                $queueStats = $this->queue->stats()->getArrayCopy();
248
249
                // Delete buried jobs
250
                $countBuried = $queueStats['current-jobs-buried'];
251
                while ($job = $this->queue->peekBuried()) {
252
                    $countBuried--;
253
                    if ($countBuried < 0) {
254
                        break;
255
                    }
256
                    $id = $job->getId();
257
                    Util::sysLogMsg(
258
                        __METHOD__,
259
                        "Deleted buried job with ID {$id} from {$tube} with message {$job->getData()}",
260
                        LOG_DEBUG
261
                    );
262
                    $this->queue->delete($job);
263
                    $deletedJobInfo[] = "{$id} from {$tube}";
264
                }
265
266
                // Delete outdated jobs
267
                $countReady = $queueStats['current-jobs-ready'];
268
                while ($job = $this->queue->peekReady()) {
269
                    $countReady--;
270
                    if ($countReady < 0) {
271
                        break;
272
                    }
273
                    $id                    = $job->getId();
274
                    $jobStats              = $this->queue->statsJob($job)->getArrayCopy();
275
                    $age                   = (int)$jobStats['age'];
276
                    $expectedTimeToExecute = (int)$jobStats['ttr'] * 2;
277
                    if ($age > $expectedTimeToExecute) {
278
                        Util::sysLogMsg(
279
                            __METHOD__,
280
                            "Deleted outdated job with ID {$id} from {$tube} with message {$job->getData()}",
281
                            LOG_DEBUG
282
                        );
283
                        $this->queue->delete($job);
284
                        $deletedJobInfo[] = "{$id} from {$tube}";
285
                    }
286
                }
287
            } catch (Throwable $e) {
288
                CriticalErrorsHandler::handleExceptionWithSyslog($e);
289
            }
290
        }
291
        if (count($deletedJobInfo) > 0) {
292
            Util::sysLogMsg(__METHOD__, "Delete outdated jobs" . implode(PHP_EOL, $deletedJobInfo), LOG_WARNING);
293
        }
294
    }
295
296
    /**
297
     * Waits for a job from the Beanstalkd server.
298
     *
299
     * @param float $timeout The timeout value in seconds.
300
     */
301
    public function wait(float $timeout = 5): void
302
    {
303
        $this->message = null;
304
        $start         = microtime(true);
305
        try {
306
            $job = $this->queue->reserveWithTimeout((int)$timeout);
307
        } catch (Throwable $e) {
308
            CriticalErrorsHandler::handleExceptionWithSyslog($e);
309
        }
310
311
        if ( ! isset($job)) {
312
            $workTime = (microtime(true) - $start);
313
            if ($workTime < $timeout) {
314
                usleep(100000);
315
                // If the work time $workTime is less than the timeout value $timeout
316
                // and no job is received $job === null
317
                // something is wrong, probably lost connection with the queue server
318
                $this->reconnect();
319
            }
320
            if (is_array($this->timeout_handler)) {
321
                call_user_func($this->timeout_handler);
322
            }
323
324
            return;
325
        }
326
327
        // Processing job over callable function attached in $this->subscribe
328
        if (json_decode($job->getData(), true) !== null) {
329
            $mData = $job->getData();
330
        } else {
331
            $mData = unserialize($job->getData(), [false]);
332
        }
333
        $this->message = $mData;
334
335
        $stats           = $this->queue->statsJob($job);
336
        $requestFormTube = $stats['tube'];
337
        $func            = $this->subscriptions[$requestFormTube] ?? null;
338
339
        if ($func === null) {
340
            // Action not found
341
            $this->buryJob($job);
342
        } else {
343
            try {
344
                if (is_array($func)) {
345
                    call_user_func($func, $this);
346
                } elseif (is_callable($func) === true) {
347
                    $func($this);
348
                }
349
                // Removes the job from the queue when it has been successfully completed
350
                $this->queue->delete($job);
351
            } catch (Throwable $e) {
352
                // Marks the job as terminally failed and no workers will restart it.
353
                $this->buryJob($job);
354
                CriticalErrorsHandler::handleExceptionWithSyslog($e);
355
            }
356
        }
357
    }
358
359
    /**
360
     * Buries a job in the Beanstalkd server.
361
     *
362
     * @param mixed $job The job to be buried.
363
     */
364
    private function buryJob($job):void
365
    {
366
        if(!isset($job)){
367
            return;
368
        }
369
        try {
370
            $this->queue->bury($job);
371
        } catch (Throwable $e) {
372
            CriticalErrorsHandler::handleExceptionWithSyslog($e);
373
        }
374
    }
375
376
    /**
377
     * Returns the body of the message.
378
     *
379
     * @return string The body of the message.
380
     */
381
    public function getBody(): string
382
    {
383
        if (is_array($this->message)
384
            && isset($this->message['inbox_tube'])
385
            && count($this->message) === 2) {
386
            // If it's a request that requires a response, the data was passed as the first element of the array.
387
            $message_data = $this->message[0];
388
        } else {
389
            $message_data = $this->message;
390
        }
391
392
        return $message_data;
393
    }
394
395
    /**
396
     * Sends a reply message.
397
     *
398
     * @param mixed $response The response message.
399
     * @return void
400
     */
401
    public function reply($response): void
402
    {
403
        if (isset($this->message['inbox_tube'])) {
404
            $this->queue->useTube($this->message['inbox_tube']);
405
            $this->queue->put($response);
406
            $this->queue->useTube($this->tube);
407
        }
408
    }
409
410
    /**
411
     * Sets the error handler for the Beanstalk client.
412
     *
413
     * @param mixed $handler The error handler.
414
     * @return void
415
     */
416
    public function setErrorHandler($handler): void
417
    {
418
        $this->error_handler = $handler;
419
    }
420
421
    /**
422
     * Sets the timeout handler for the Beanstalk client.
423
     *
424
     * @param mixed $handler The timeout handler.
425
     * @return void
426
     */
427
    public function setTimeoutHandler($handler): void
428
    {
429
        $this->timeout_handler = $handler;
430
    }
431
432
    /**
433
     * Returns the number of times the Beanstalk client has reconnected.
434
     *
435
     * @return int The number of reconnects.
436
     */
437
    public function reconnectsCount(): int
438
    {
439
        return $this->reconnectsCount;
440
    }
441
442
    /**
443
     * Retrieves messages from a tube.
444
     *
445
     * @param string $tube The name of the tube. If empty, uses the default tube.
446
     * @return array An array of messages retrieved from the tube.
447
     */
448
    public function getMessagesFromTube(string $tube = ''): array
449
    {
450
        if ($tube !== '') {
451
            $this->queue->useTube($tube);
452
        }
453
        $arrayOfMessages = [];
454
        while ($job = $this->queue->peekReady()) {
455
            if (json_decode($job->getData(), true) !== null) {
456
                $mData = $job->getData();
457
            } else {
458
                $mData = unserialize($job->getData(), [false]);
459
            }
460
            $arrayOfMessages[] = $mData;
461
            $this->queue->delete($job);
462
        }
463
464
        return $arrayOfMessages;
465
    }
466
}