Passed
Push — develop ( 577003...8472ee )
by Портнов
10:56
created

BeanstalkClient::reqrequestuest()   A

Complexity

Conditions 4
Paths 8

Size

Total Lines 32
Code Lines 18

Duplication

Lines 0
Ratio 0 %

Importance

Changes 1
Bugs 0 Features 0
Metric Value
eloc 18
c 1
b 0
f 0
dl 0
loc 32
rs 9.6666
cc 4
nc 8
nop 3
1
<?php
2
/*
3
 * Copyright © MIKO LLC - All Rights Reserved
4
 * Unauthorized copying of this file, via any medium is strictly prohibited
5
 * Proprietary and confidential
6
 * Written by Alexey Portnov, 9 2020
7
 */
8
9
namespace MikoPBX\Core\System;
10
11
use Phalcon\Di\Injectable;
12
use Pheanstalk\Contract\PheanstalkInterface;
13
use Pheanstalk\Job;
14
use Pheanstalk\Pheanstalk;
15
use Throwable;
16
17
class BeanstalkClient extends Injectable
18
{
19
    public const INBOX_PREFIX='INBOX_';
20
    /** @var Pheanstalk */
21
    private Pheanstalk $queue;
22
    private bool $connected = false;
23
    private array $subscriptions = [];
24
    private string $tube;
25
    private int $reconnectsCount = 0;
26
    private $message;
27
    private $timeout_handler;
28
    private $error_handler;
29
30
    private string $port;
31
32
    /**
33
     * BeanstalkClient constructor.
34
     *
35
     * @param string $tube
36
     * @param string $port
37
     */
38
    public function __construct($tube = 'default', $port = '')
39
    {
40
        $this->tube = str_replace("\\", '-', $tube);
41
        $this->port = $port;
42
        $this->reconnect();
43
    }
44
45
    /**
46
     * Recreates connection to the beanstalkd server
47
     */
48
    public function reconnect(): void
49
    {
50
        $config = $this->di->get('config')->beanstalk;
51
        $port   = $config->port;
52
        if ( ! empty($this->port) && is_numeric($this->port)) {
53
            $port = $this->port;
54
        }
55
        $this->queue = Pheanstalk::create($config->host, $port);
56
        $this->queue->useTube($this->tube);
57
        foreach ($this->subscriptions as $tube => $callback){
58
            $this->subscribe($tube, $callback);
59
        }
60
        $this->connected = true;
61
    }
62
63
    /**
64
     * Returns connection status
65
     *
66
     * @return bool
67
     */
68
    public function isConnected(): bool
69
    {
70
        return $this->connected;
71
    }
72
73
    /**
74
     * Sends request and wait for answer from processor
75
     *
76
     * @param      $job_data
77
     * @param int  $timeout
78
     * @param int  $priority
79
     *
80
     * @return bool|mixed
81
     *
82
     */
83
    public function reqrequestuest(
84
        $job_data,
85
        int $timeout = 10,
86
        int $priority = PheanstalkInterface::DEFAULT_PRIORITY
87
    ) {
88
        $this->message = false;
89
        $inbox_tube    = uniqid(self::INBOX_PREFIX, true);
90
        $this->queue->watch($inbox_tube);
91
92
        // Send message to backend worker
93
        $requestMessage = [
94
            $job_data,
95
            'inbox_tube' => $inbox_tube,
96
        ];
97
        $this->publish($requestMessage, null, $priority, 0, $timeout);
98
99
        // We wait until a worker process request.
100
        try {
101
            $job = $this->queue->reserveWithTimeout($timeout);
102
            if ($job !== null) {
103
                $this->message = $job->getData();
104
                $this->queue->delete($job);
105
            }
106
        } catch (Throwable $exception) {
107
            Util::sysLogMsg(__METHOD__, 'Exception: ' . $exception->getMessage(), LOG_ERR);
108
            if (isset($job)) {
109
                $this->queue->bury($job);
110
            }
111
        }
112
        $this->queue->ignore($inbox_tube);
113
114
        return $this->message;
115
    }
116
117
    /**
118
     * Puts a job in a beanstalkd server queue
119
     *
120
     * @param mixed   $job_data data to worker
121
     * @param ?string $tube     tube name
122
     * @param int     $priority Jobs with smaller priority values will be scheduled
123
     *                          before jobs with larger priorities. The most urgent priority is 0;
124
     *                          the least urgent priority is 4294967295.
125
     * @param int     $delay    delay before insert job into work query
126
     * @param int     $ttr      time to execute this job
127
     *
128
     * @return \Pheanstalk\Job
129
     */
130
    public function publish(
131
        $job_data,
132
        $tube = null,
133
        int $priority = PheanstalkInterface::DEFAULT_PRIORITY,
134
        int $delay = PheanstalkInterface::DEFAULT_DELAY,
135
        int $ttr = PheanstalkInterface::DEFAULT_TTR
136
    ): Job {
137
        $tube = str_replace("\\", '-', $tube);
138
        // Change tube
139
        if ( ! empty($tube) && $this->tube !== $tube) {
140
            $this->queue->useTube($tube);
141
        }
142
        $job_data = serialize($job_data);
143
        // Send JOB to queue
144
        $result = $this->queue->put($job_data, $priority, $delay, $ttr);
145
146
        // Return original tube
147
        $this->queue->useTube($this->tube);
148
149
        return $result;
150
    }
151
152
    /**
153
     * Drops orphaned tasks
154
     */
155
    public function cleanTubes()
156
    {
157
        $tubes = $this->queue->listTubes();
158
        $deletedJobInfo = [];
159
        foreach ($tubes as $tube) {
160
            if(strpos($tube, self::INBOX_PREFIX) !== 0){
161
                // Чистим только INBOX очереди.
162
                // Только для них может случится, что нет worker для обработки.
163
                continue;
164
            }
165
            try {
166
                $this->queue->useTube($tube);
167
                $queueStats = $this->queue->stats()->getArrayCopy();
168
169
                // Delete buried jobs
170
                $countBuried=$queueStats['current-jobs-buried'];
171
                while ($job = $this->queue->peekBuried()) {
172
                    $countBuried--;
173
                    if ($countBuried<0){
174
                        break;
175
                    }
176
                    $id = $job->getId();
177
                    Util::sysLogMsg(__METHOD__, "Deleted buried job with ID {$id} from {$tube} with message {$job->getData()}", LOG_DEBUG);
178
                    $this->queue->delete($job);
179
                    $deletedJobInfo[]="{$id} from {$tube}";
180
                }
181
182
                // Delete outdated jobs
183
                $countReady=$queueStats['current-jobs-ready'];
184
                while ($job = $this->queue->peekReady()) {
185
                    $countReady--;
186
                    if ($countReady<0){
187
                        break;
188
                    }
189
                    $id = $job->getId();
190
                    $jobStats = $this->queue->statsJob($job)->getArrayCopy();
191
                    $age                   = (int)$jobStats['age'];
192
                    $expectedTimeToExecute = (int)$jobStats['ttr'] * 2;
193
                    if ($age > $expectedTimeToExecute) {
194
                        Util::sysLogMsg(__METHOD__, "Deleted outdated job with ID {$id} from {$tube} with message {$job->getData()}", LOG_DEBUG);
195
                        $this->queue->delete($job);
196
                        $deletedJobInfo[]="{$id} from {$tube}";
197
                    }
198
                }
199
            } catch (Throwable $exception) {
200
                Util::sysLogMsg(__METHOD__, 'Exception: ' . $exception->getMessage(), LOG_ERR);
201
            }
202
        }
203
        if (count($deletedJobInfo)>0){
204
            Util::sysLogMsg(__METHOD__, "Delete outdated jobs".implode(PHP_EOL, $deletedJobInfo), LOG_WARNING);
205
        }
206
    }
207
208
    /**
209
     * Subscribe on new message in tube
210
     *
211
     * @param string           $tube     - listening tube
212
     * @param array | callable $callback - worker
213
     */
214
    public function subscribe(string $tube, $callback): void
215
    {
216
        $tube = str_replace("\\", '-', $tube);
217
        $this->queue->watch($tube);
218
        $this->queue->ignore('default');
219
        $this->subscriptions[$tube] = $callback;
220
    }
221
222
    /**
223
     * Job worker for loop cycles
224
     *
225
     * @param float $timeout
226
     *
227
     */
228
    public function wait(float $timeout = 5): void
229
    {
230
        $this->message = null;
231
        $start         = microtime(true);
232
        try {
233
            $job = $this->queue->reserveWithTimeout((int)$timeout);
234
        } catch (Throwable $exception) {
235
            Util::sysLogMsg(__METHOD__, 'Exception: ' . $exception->getMessage(), LOG_ERR);
236
        }
237
238
        if (!isset($job)) {
239
            $workTime = (microtime(true) - $start);
240
            if ($workTime < $timeout) {
241
                usleep(100000);
242
                // Если время ожидания $worktime меньше значения таймаута $timeout
243
                // И задача не получена $job === null
244
                // Что то не то, вероятно потеряна связь с сервером очередей
245
                $this->reconnect();
246
            }
247
            if (is_array($this->timeout_handler)) {
248
                call_user_func($this->timeout_handler);
249
            }
250
251
            return;
252
        }
253
254
        // Processing job over callable function attached in $this->subscribe
255
        if (json_decode($job->getData(), true) !== null) {
256
            $mData = $job->getData();
257
        } else {
258
            $mData = unserialize($job->getData(), [false]);
259
        }
260
        $this->message = $mData;
261
262
        $stats           = $this->queue->statsJob($job);
263
        $requestFormTube = $stats['tube'];
264
        $func            = $this->subscriptions[$requestFormTube] ?? null;
265
266
        if ($func === null) {
267
            // Action not found
268
            $this->queue->bury($job);
269
        } else {
270
            try {
271
                if (is_array($func)) {
272
                    call_user_func($func, $this);
273
                } elseif (is_callable($func) === true) {
274
                    $func($this);
275
                }
276
                // Removes the job from the queue when it has been successfully completed
277
                $this->queue->delete($job);
278
            } catch (Throwable $e) {
279
                // Marks the job as terminally failed and no workers will restart it.
280
                $this->queue->bury($job);
281
                Util::sysLogMsg(__METHOD__.'_EXCEPTION', $e->getMessage(), LOG_ERR);
282
            }
283
        }
284
    }
285
286
    /**
287
     * Gets request body
288
     *
289
     * @return string
290
     */
291
    public function getBody(): string
292
    {
293
        if (is_array($this->message)
294
            && isset($this->message['inbox_tube'])
295
            && count($this->message) === 2) {
296
            // Это поступил request, треует ответа. Данные были переданы первым параметром массива.
297
            $message_data = $this->message[0];
298
        } else {
299
            $message_data = $this->message;
300
        }
301
302
        return $message_data;
303
    }
304
305
    /**
306
     * Sends response to queue
307
     *
308
     * @param $response
309
     */
310
    public function reply($response): void
311
    {
312
        if (isset($this->message['inbox_tube'])) {
313
            $this->queue->useTube($this->message['inbox_tube']);
314
            $this->queue->put($response);
315
            $this->queue->useTube($this->tube);
316
        }
317
    }
318
319
    /**
320
     *
321
     * @param $handler
322
     */
323
    public function setErrorHandler($handler): void
324
    {
325
        $this->error_handler = $handler;
326
    }
327
328
    /**
329
     * @param $handler
330
     */
331
    public function setTimeoutHandler($handler): void
332
    {
333
        $this->timeout_handler = $handler;
334
    }
335
336
    /**
337
     * @return int
338
     */
339
    public function reconnectsCount(): int
340
    {
341
        return $this->reconnectsCount;
342
    }
343
344
    /**
345
     * Gets all messages from tube and clean it
346
     *
347
     * @param string $tube
348
     *
349
     * @return array
350
     */
351
    public function getMessagesFromTube(string $tube = ''): array
352
    {
353
        if ($tube !== '') {
354
            $this->queue->useTube($tube);
355
        }
356
        $arrayOfMessages = [];
357
        while ($job = $this->queue->peekReady()) {
358
            if (json_decode($job->getData(), true) !== null) {
359
                $mData = $job->getData();
360
            } else {
361
                $mData = unserialize($job->getData(), [false]);
362
            }
363
            $arrayOfMessages[] = $mData;
364
            $this->queue->delete($job);
365
        }
366
367
        return $arrayOfMessages;
368
    }
369
}