Passed
Push — develop ( 4a19fe...3acb68 )
by Nikolay
04:11
created

BeanstalkClient::__construct()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 5
Code Lines 3

Duplication

Lines 0
Ratio 0 %

Importance

Changes 1
Bugs 0 Features 0
Metric Value
eloc 3
dl 0
loc 5
rs 10
c 1
b 0
f 0
cc 1
nc 1
nop 2
1
<?php
2
/*
3
 * Copyright © MIKO LLC - All Rights Reserved
4
 * Unauthorized copying of this file, via any medium is strictly prohibited
5
 * Proprietary and confidential
6
 * Written by Alexey Portnov, 9 2020
7
 */
8
9
namespace MikoPBX\Core\System;
10
11
use Phalcon\Di\Injectable;
12
use Pheanstalk\Contract\PheanstalkInterface;
13
use Pheanstalk\Job;
14
use Pheanstalk\Pheanstalk;
15
use Throwable;
16
17
class BeanstalkClient extends Injectable
18
{
19
    public const INBOX_PREFIX='INBOX_';
20
    /** @var Pheanstalk */
21
    private Pheanstalk $queue;
22
    private bool $connected = false;
23
    private array $subscriptions = [];
24
    private string $tube;
25
    private int $reconnectsCount = 0;
26
    private $message;
27
    private $timeout_handler;
28
    private $error_handler;
29
30
    private string $port;
31
32
    /**
33
     * BeanstalkClient constructor.
34
     *
35
     * @param string $tube
36
     * @param string $port
37
     */
38
    public function __construct($tube = 'default', $port = '')
39
    {
40
        $this->tube = str_replace("\\", '-', $tube);
41
        $this->port = $port;
42
        $this->reconnect();
43
    }
44
45
    /**
46
     * Recreates connection to the beanstalkd server
47
     */
48
    public function reconnect(): void
49
    {
50
        $config = $this->di->get('config')->beanstalk;
51
        $port   = $config->port;
52
        if ( ! empty($this->port) && is_numeric($this->port)) {
53
            $port = $this->port;
54
        }
55
        $this->queue = Pheanstalk::create($config->host, $port);
56
        $this->queue->useTube($this->tube);
57
        foreach ($this->subscriptions as $tube => $callback){
58
            $this->subscribe($tube, $callback);
59
        }
60
        $this->connected = true;
61
    }
62
63
    /**
64
     * Returns connection status
65
     *
66
     * @return bool
67
     */
68
    public function isConnected(): bool
69
    {
70
        return $this->connected;
71
    }
72
73
    /**
74
     * Sends request and wait for answer from processor
75
     *
76
     * @param      $job_data
77
     * @param int  $timeout
78
     * @param int  $priority
79
     *
80
     * @return bool|string
81
     *
82
     */
83
    public function request(
84
        $job_data,
85
        int $timeout = 10,
86
        int $priority = PheanstalkInterface::DEFAULT_PRIORITY
87
    ): bool|string {
0 ignored issues
show
Bug introduced by
A parse error occurred: Syntax error, unexpected '|', expecting '{' or ';' on line 87 at column 11
Loading history...
88
        $this->message = false;
89
        $inbox_tube    = uniqid(self::INBOX_PREFIX, true);
90
        $this->queue->watch($inbox_tube);
91
92
        // Send message to backend worker
93
        $requestMessage = [
94
            $job_data,
95
            'inbox_tube' => $inbox_tube,
96
        ];
97
        $this->publish($requestMessage, null, $priority, 0, $timeout);
98
99
        // We wait until a worker process request.
100
        try {
101
            $job = $this->queue->reserveWithTimeout($timeout);
102
            if ($job !== null) {
103
                $this->message = $job->getData();
104
                $this->queue->delete($job);
105
            }
106
        } catch (Throwable $exception) {
107
            Util::sysLogMsg(__METHOD__, 'Exception: ' . $exception->getMessage(), LOG_ERR);
108
            if (isset($job)) {
109
                $this->queue->bury($job);
110
            }
111
        }
112
        $this->queue->ignore($inbox_tube);
113
114
        return $this->message;
115
    }
116
117
    /**
118
     * Puts a job in a beanstalkd server queue
119
     *
120
     * @param mixed   $job_data data to worker
121
     * @param ?string $tube     tube name
122
     * @param int     $priority Jobs with smaller priority values will be scheduled
123
     *                          before jobs with larger priorities. The most urgent priority is 0;
124
     *                          the least urgent priority is 4294967295.
125
     * @param int     $delay    delay before insert job into work query
126
     * @param int     $ttr      time to execute this job
127
     *
128
     * @return \Pheanstalk\Job
129
     */
130
    public function publish(
131
        $job_data,
132
        $tube = null,
133
        int $priority = PheanstalkInterface::DEFAULT_PRIORITY,
134
        int $delay = PheanstalkInterface::DEFAULT_DELAY,
135
        int $ttr = PheanstalkInterface::DEFAULT_TTR
136
    ): Job {
137
        $tube = str_replace("\\", '-', $tube);
138
        // Change tube
139
        if ( ! empty($tube) && $this->tube !== $tube) {
140
            $this->queue->useTube($tube);
141
        }
142
        $job_data = serialize($job_data);
143
        // Send JOB to queue
144
        $result = $this->queue->put($job_data, $priority, $delay, $ttr);
145
146
        // Return original tube
147
        $this->queue->useTube($this->tube);
148
149
        return $result;
150
    }
151
152
    /**
153
     * Drops orphaned tasks
154
     */
155
    public function cleanTubes()
156
    {
157
        $tubes = $this->queue->listTubes();
158
        $deletedJobInfo = [];
159
        foreach ($tubes as $tube) {
160
            try {
161
                $this->queue->useTube($tube);
162
                $queueStats = $this->queue->stats()->getArrayCopy();
163
164
                // Delete buried jobs
165
                $countBuried=$queueStats['current-jobs-buried'];
166
                while ($job = $this->queue->peekBuried()) {
167
                    $countBuried--;
168
                    if ($countBuried<0){
169
                        break;
170
                    }
171
                    $id = $job->getId();
172
                    Util::sysLogMsg(__METHOD__, "Deleted buried job with ID {$id} from {$tube} with message {$job->getData()}", LOG_DEBUG);
173
                    $this->queue->delete($job);
174
                    $deletedJobInfo[]="{$id} from {$tube}";
175
                }
176
177
                // Delete outdated jobs
178
                $countReady=$queueStats['current-jobs-ready'];
179
                while ($job = $this->queue->peekReady()) {
180
                    $countReady--;
181
                    if ($countReady<0){
182
                        break;
183
                    }
184
                    $id = $job->getId();
185
                    $jobStats = $this->queue->statsJob($job)->getArrayCopy();
186
                    $age                   = (int)$jobStats['age'];
187
                    $expectedTimeToExecute = (int)$jobStats['ttr'] * 2;
188
                    if ($age > $expectedTimeToExecute) {
189
                        Util::sysLogMsg(__METHOD__, "Deleted outdated job with ID {$id} from {$tube} with message {$job->getData()}", LOG_DEBUG);
190
                        $this->queue->delete($job);
191
                        $deletedJobInfo[]="{$id} from {$tube}";
192
                    }
193
                }
194
            } catch (Throwable $exception) {
195
                Util::sysLogMsg(__METHOD__, 'Exception: ' . $exception->getMessage(), LOG_ERR);
196
            }
197
        }
198
        if (count($deletedJobInfo)>0){
199
            Util::sysLogMsg(__METHOD__, "Delete outdated jobs".implode(PHP_EOL, $deletedJobInfo), LOG_WARNING);
200
        }
201
    }
202
203
    /**
204
     * Subscribe on new message in tube
205
     *
206
     * @param string           $tube     - listening tube
207
     * @param array | callable $callback - worker
208
     */
209
    public function subscribe(string $tube, $callback): void
210
    {
211
        $tube = str_replace("\\", '-', $tube);
212
        $this->queue->watch($tube);
213
        $this->queue->ignore('default');
214
        $this->subscriptions[$tube] = $callback;
215
    }
216
217
    /**
218
     * Job worker for loop cycles
219
     *
220
     * @param float $timeout
221
     *
222
     */
223
    public function wait(float $timeout = 5): void
224
    {
225
        $this->message = null;
226
        $start         = microtime(true);
227
        try {
228
            $job = $this->queue->reserveWithTimeout((int)$timeout);
229
        } catch (Throwable $exception) {
230
            Util::sysLogMsg(__METHOD__, 'Exception: ' . $exception->getMessage(), LOG_ERR);
231
        }
232
233
        if (!isset($job)) {
234
            $workTime = (microtime(true) - $start);
235
            if ($workTime < $timeout) {
236
                usleep(100000);
237
                // Если время ожидания $worktime меньше значения таймаута $timeout
238
                // И задача не получена $job === null
239
                // Что то не то, вероятно потеряна связь с сервером очередей
240
                $this->reconnect();
241
            }
242
            if (is_array($this->timeout_handler)) {
243
                call_user_func($this->timeout_handler);
244
            }
245
246
            return;
247
        }
248
249
        // Processing job over callable function attached in $this->subscribe
250
        if (json_decode($job->getData(), true) !== null) {
251
            $mData = $job->getData();
252
        } else {
253
            $mData = unserialize($job->getData(), [false]);
254
        }
255
        $this->message = $mData;
256
257
        $stats           = $this->queue->statsJob($job);
258
        $requestFormTube = $stats['tube'];
259
        $func            = $this->subscriptions[$requestFormTube] ?? null;
260
261
        if ($func === null) {
262
            // Action not found
263
            $this->queue->bury($job);
264
        } else {
265
            try {
266
                if (is_array($func)) {
267
                    call_user_func($func, $this);
268
                } elseif (is_callable($func) === true) {
269
                    $func($this);
270
                }
271
                // Removes the job from the queue when it has been successfully completed
272
                $this->queue->delete($job);
273
            } catch (Throwable $e) {
274
                // Marks the job as terminally failed and no workers will restart it.
275
                $this->queue->bury($job);
276
                Util::sysLogMsg(__METHOD__.'_EXCEPTION', $e->getMessage(), LOG_ERR);
277
            }
278
        }
279
    }
280
281
    /**
282
     * Gets request body
283
     *
284
     * @return string
285
     */
286
    public function getBody(): string
287
    {
288
        if (is_array($this->message)
289
            && isset($this->message['inbox_tube'])
290
            && count($this->message) === 2) {
291
            // Это поступил request, треует ответа. Данные были переданы первым параметром массива.
292
            $message_data = $this->message[0];
293
        } else {
294
            $message_data = $this->message;
295
        }
296
297
        return $message_data;
298
    }
299
300
    /**
301
     * Sends response to queue
302
     *
303
     * @param $response
304
     */
305
    public function reply($response): void
306
    {
307
        if (isset($this->message['inbox_tube'])) {
308
            $this->queue->useTube($this->message['inbox_tube']);
309
            $this->queue->put($response);
310
            $this->queue->useTube($this->tube);
311
        }
312
    }
313
314
    /**
315
     *
316
     * @param $handler
317
     */
318
    public function setErrorHandler($handler): void
319
    {
320
        $this->error_handler = $handler;
321
    }
322
323
    /**
324
     * @param $handler
325
     */
326
    public function setTimeoutHandler($handler): void
327
    {
328
        $this->timeout_handler = $handler;
329
    }
330
331
    /**
332
     * @return int
333
     */
334
    public function reconnectsCount(): int
335
    {
336
        return $this->reconnectsCount;
337
    }
338
339
    /**
340
     * Gets all messages from tube and clean it
341
     *
342
     * @param string $tube
343
     *
344
     * @return array
345
     */
346
    public function getMessagesFromTube(string $tube = ''): array
347
    {
348
        if ($tube !== '') {
349
            $this->queue->useTube($tube);
350
        }
351
        $arrayOfMessages = [];
352
        while ($job = $this->queue->peekReady()) {
353
            if (json_decode($job->getData(), true) !== null) {
354
                $mData = $job->getData();
355
            } else {
356
                $mData = unserialize($job->getData(), [false]);
357
            }
358
            $arrayOfMessages[] = $mData;
359
            $this->queue->delete($job);
360
        }
361
362
        return $arrayOfMessages;
363
    }
364
}