Passed
Pull Request — master (#23)
by Nikolay
09:42 queued 03:08
created

BeanstalkClient::__construct()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 5
Code Lines 3

Duplication

Lines 0
Ratio 0 %

Importance

Changes 1
Bugs 0 Features 0
Metric Value
eloc 3
dl 0
loc 5
rs 10
c 1
b 0
f 0
cc 1
nc 1
nop 2
1
<?php
2
/*
3
 * Copyright © MIKO LLC - All Rights Reserved
4
 * Unauthorized copying of this file, via any medium is strictly prohibited
5
 * Proprietary and confidential
6
 * Written by Alexey Portnov, 9 2020
7
 */
8
9
namespace MikoPBX\Core\System;
10
11
use Phalcon\Di\Injectable;
12
use Pheanstalk\Contract\PheanstalkInterface;
13
use Pheanstalk\Job;
14
use Pheanstalk\Pheanstalk;
15
use Throwable;
16
17
class BeanstalkClient extends Injectable
18
{
19
    /** @var Pheanstalk */
20
    private Pheanstalk $queue;
21
    private bool $connected = false;
22
    private array $subscriptions = [];
23
    private string $tube;
24
    private int $reconnectsCount = 0;
25
    private $message;
26
    private $timeout_handler;
27
    private $error_handler;
28
29
    private string $port;
30
31
    /**
32
     * BeanstalkClient constructor.
33
     *
34
     * @param string $tube
35
     * @param string $port
36
     */
37
    public function __construct($tube = 'default', $port = '')
38
    {
39
        $this->tube = str_replace("\\", '-', $tube);
40
        $this->port = $port;
41
        $this->reconnect();
42
    }
43
44
    /**
45
     * Recreates connection to the beanstalkd server
46
     */
47
    public function reconnect(): void
48
    {
49
        $config = $this->di->get('config')->beanstalk;
50
        $port   = $config->port;
51
        if ( ! empty($this->port) && is_numeric($this->port)) {
52
            $port = $this->port;
53
        }
54
55
        $this->queue = Pheanstalk::create($config->host, $port);
56
        $this->queue->useTube($this->tube);
57
        $this->connected = true;
58
    }
59
60
    /**
61
     * Returns connection status
62
     *
63
     * @return bool
64
     */
65
    public function isConnected(): bool
66
    {
67
        return $this->connected;
68
    }
69
70
    /**
71
     * Sends request and wait for answer from processor
72
     *
73
     * @param      $job_data
74
     * @param int  $timeout
75
     * @param int  $priority
76
     *
77
     * @return bool|mixed
78
     *
79
     */
80
    public function request(
81
        $job_data,
82
        int $timeout = 10,
83
        int $priority = PheanstalkInterface::DEFAULT_PRIORITY
84
    ) {
85
        $this->message = false;
86
        $inbox_tube    = uniqid('INBOX_', true);
87
        $this->queue->watch($inbox_tube);
88
89
        // Отправляем данные для обработки.
90
        $requestMessage = [
91
            $job_data,
92
            'inbox_tube' => $inbox_tube,
93
        ];
94
        $this->publish($requestMessage, null, $priority, 0, $timeout);
95
96
        // Получаем ответ от сервера.
97
        $job = null;
0 ignored issues
show
Unused Code introduced by
The assignment to $job is dead and can be removed.
Loading history...
98
        try {
99
            $job = $this->queue->reserveWithTimeout($timeout);
100
            if ($job !== null) {
101
                $this->message = $job->getData();
102
                $this->queue->delete($job);
103
            }
104
        } catch (Throwable $exception) {
105
            Util::sysLogMsg(__METHOD__, 'Exception: ' . $exception->getMessage());
106
            if ($job !== null) {
107
                $this->queue->bury($job);
108
            }
109
        }
110
        $this->queue->ignore($inbox_tube);
111
112
        return $this->message;
113
    }
114
115
    /**
116
     * Puts a job in a beanstalkd server queue
117
     *
118
     * @param mixed   $job_data data to worker
119
     * @param ?string $tube     tube name
120
     * @param int     $priority Jobs with smaller priority values will be scheduled
121
     *                          before jobs with larger priorities. The most urgent priority is 0;
122
     *                          the least urgent priority is 4294967295.
123
     * @param int     $delay    delay before insert job into work query
124
     * @param int     $ttr      time to execute this job
125
     *
126
     * @return \Pheanstalk\Job
127
     */
128
    public function publish(
129
        $job_data,
130
        $tube = null,
131
        int $priority = PheanstalkInterface::DEFAULT_PRIORITY,
132
        int $delay = PheanstalkInterface::DEFAULT_DELAY,
133
        int $ttr = PheanstalkInterface::DEFAULT_TTR
134
    ): Job {
135
        $tube = str_replace("\\", '-', $tube);
136
        // Change tube
137
        if ( ! empty($tube) && $this->tube !== $tube) {
138
            $this->queue->useTube($tube);
139
        }
140
        $job_data = serialize($job_data);
141
        // Send JOB to queue
142
        $result = $this->queue->put($job_data, $priority, $delay, $ttr);
143
144
        // Return original tube
145
        $this->queue->useTube($this->tube);
146
147
        return $result;
148
    }
149
150
    /**
151
     * Drops orphaned tasks
152
     */
153
    public function cleanTubes()
154
    {
155
        $tubes = $this->queue->listTubes();
156
        foreach ($tubes as $tube) {
157
            try {
158
                $this->queue->useTube($tube);
159
                $queueStats = $this->queue->stats()->getArrayCopy();
160
161
                // Delete buried jobs
162
                $countBuried=$queueStats['current-jobs-buried'];
163
                while ($job = $this->queue->peekBuried()) {
164
                    $countBuried--;
165
                    if ($countBuried<0){
166
                        break;
167
                    }
168
                    $id = $job->getId();
169
                    $this->queue->delete($job);
170
                    Util::sysLogMsg(__METHOD__, "Deleted buried job with ID {$id} from {$tube}");
171
                }
172
173
                // Delete outdated jobs
174
                $countReady=$queueStats['current-jobs-ready'];
175
                while ($job = $this->queue->peekReady()) {
176
                    $countReady--;
177
                    if ($countReady<0){
178
                        break;
179
                    }
180
                    $id = $job->getId();
181
                    $jobStats = $this->queue->statsJob($job)->getArrayCopy();
182
                    $age                   = (int)$jobStats['age'];
183
                    $expectedTimeToExecute = (int)$jobStats['ttr'] * 2;
184
                    if ($age > $expectedTimeToExecute) {
185
                        $this->queue->delete($job);
186
                        Util::sysLogMsg(__METHOD__, "Deleted outdated job with ID {$id} from {$tube}");
187
                    }
188
                }
189
            } catch (Throwable $exception) {
190
                Util::sysLogMsg(__METHOD__, 'Exception: ' . $exception->getMessage());
191
            }
192
        }
193
    }
194
195
    /**
196
     * Subscribe on new message in tube
197
     *
198
     * @param string           $tube     - listening tube
199
     * @param array | callable $callback - worker
200
     */
201
    public function subscribe(string $tube, $callback): void
202
    {
203
        $tube = str_replace("\\", '-', $tube);
204
        $this->queue->watch($tube);
205
        $this->queue->ignore('default');
206
        $this->subscriptions[$tube] = $callback;
207
    }
208
209
    /**
210
     * Job worker
211
     *
212
     * @param float $timeout
213
     *
214
     */
215
    public function wait(float $timeout = 10): void
216
    {
217
        $this->message = null;
218
        $start         = microtime(true);
219
        $job           = null;
0 ignored issues
show
Unused Code introduced by
The assignment to $job is dead and can be removed.
Loading history...
220
        try {
221
            $job = $this->queue->reserveWithTimeout($timeout);
0 ignored issues
show
Bug introduced by
It seems like $timeout can also be of type double; however, parameter $timeout of Pheanstalk\Pheanstalk::reserveWithTimeout() does only seem to accept integer, maybe add an additional type check? ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-type  annotation

221
            $job = $this->queue->reserveWithTimeout(/** @scrutinizer ignore-type */ $timeout);
Loading history...
222
        } catch (Throwable $exception) {
223
            Util::sysLogMsg(__METHOD__, 'Exception: ' . $exception->getMessage());
224
        }
225
226
        if ($job === null) {
227
            $worktime = (microtime(true) - $start);
228
            if ($worktime < 0.5) {
229
                // Что то не то, вероятно потеряна связь с сервером очередей.
230
                $this->reconnect();
231
            }
232
            if (is_array($this->timeout_handler)) {
233
                call_user_func($this->timeout_handler);
234
            }
235
236
            return;
237
        }
238
239
        // Processing job over callable function attached in $this->subscribe
240
        if (json_decode($job->getData(), true) !== null) {
241
            $mData = $job->getData();
242
        } else {
243
            $mData = unserialize($job->getData(), [false]);
244
        }
245
        $this->message = $mData;
246
247
        $stats           = $this->queue->statsJob($job);
248
        $requestFormTube = $stats['tube'];
249
        $func            = $this->subscriptions[$requestFormTube] ?? null;
250
251
        if ($func === null) {
252
            // Action not found
253
            $this->queue->bury($job);
254
        } else {
255
            try {
256
                if (is_array($func)) {
257
                    call_user_func($func, $this);
258
                } elseif (is_callable($func) === true) {
259
                    $func($this);
260
                }
261
                // Removes the job from the queue when it has been successfully completed
262
                $this->queue->delete($job);
263
            } catch (Throwable $e) {
264
                // Marks the job as terminally failed and no workers will restart it.
265
                $this->queue->bury($job);
266
            }
267
        }
268
    }
269
270
    /**
271
     * Gets request body
272
     *
273
     * @return string
274
     */
275
    public function getBody(): string
276
    {
277
        if (is_array($this->message)
278
            && isset($this->message['inbox_tube'])
279
            && count($this->message) === 2) {
280
            // Это поступил request, треует ответа. Данные были переданы первым параметром массива.
281
            $message_data = $this->message[0];
282
        } else {
283
            $message_data = $this->message;
284
        }
285
286
        return $message_data;
287
    }
288
289
    /**
290
     * Sends response to queue
291
     *
292
     * @param $response
293
     */
294
    public function reply($response): void
295
    {
296
        if (isset($this->message['inbox_tube'])) {
297
            $this->queue->useTube($this->message['inbox_tube']);
298
            $this->queue->put($response);
299
            $this->queue->useTube($this->tube);
300
        }
301
    }
302
303
    /**
304
     *
305
     * @param $handler
306
     */
307
    public function setErrorHandler($handler): void
308
    {
309
        $this->error_handler = $handler;
310
    }
311
312
    /**
313
     * @param $handler
314
     */
315
    public function setTimeoutHandler($handler): void
316
    {
317
        $this->timeout_handler = $handler;
318
    }
319
320
    /**
321
     * @return int
322
     */
323
    public function reconnectsCount(): int
324
    {
325
        return $this->reconnectsCount;
326
    }
327
328
    /**
329
     * Gets all messages from tube and clean it
330
     *
331
     * @param string $tube
332
     *
333
     * @return array
334
     */
335
    public function getMessagesFromTube(string $tube = ''): array
336
    {
337
        if ($tube !== '') {
338
            $this->queue->useTube($tube);
339
        }
340
        $arrayOfMessages = [];
341
        while ($job = $this->queue->peekReady()) {
342
            if (json_decode($job->getData(), true) !== null) {
343
                $mData = $job->getData();
344
            } else {
345
                $mData = unserialize($job->getData(), [false]);
346
            }
347
            $arrayOfMessages[] = $mData;
348
            $this->queue->delete($job);
349
        }
350
351
        return $arrayOfMessages;
352
    }
353
}