Passed
Push — develop ( 19a517...90693d )
by Nikolay
05:54 queued 10s
created

BeanstalkClient::reconnect()   A

Complexity

Conditions 3
Paths 2

Size

Total Lines 11
Code Lines 7

Duplication

Lines 0
Ratio 0 %

Importance

Changes 2
Bugs 0 Features 0
Metric Value
eloc 7
c 2
b 0
f 0
dl 0
loc 11
rs 10
cc 3
nc 2
nop 0
1
<?php
2
/*
3
 * Copyright © MIKO LLC - All Rights Reserved
4
 * Unauthorized copying of this file, via any medium is strictly prohibited
5
 * Proprietary and confidential
6
 * Written by Alexey Portnov, 9 2020
7
 */
8
9
namespace MikoPBX\Core\System;
10
11
use Phalcon\Di\Injectable;
12
use Pheanstalk\Contract\PheanstalkInterface;
13
use Pheanstalk\Job;
14
use Pheanstalk\Pheanstalk;
15
use Pheanstalk\Contract\ResponseInterface;
16
use Throwable;
17
18
class BeanstalkClient extends Injectable
19
{
20
    /** @var Pheanstalk */
21
    private Pheanstalk $queue;
22
    private bool $connected = false;
23
    private array $subscriptions = [];
24
    private string $tube;
25
    private int $reconnectsCount = 0;
26
    private $message;
27
    private $timeout_handler;
28
    private $error_handler;
29
30
    private string $port;
31
32
    /**
33
     * BeanstalkClient constructor.
34
     *
35
     * @param string $tube
36
     * @param string $port
37
     */
38
    public function __construct($tube = 'default', $port = '')
39
    {
40
        $this->tube = str_replace("\\", '-', $tube);
41
        $this->port = $port;
42
        $this->reconnect();
43
    }
44
45
    /**
46
     * Recreates connection to the beanstalkd server
47
     */
48
    public function reconnect(): void
49
    {
50
        $config = $this->di->get('config')->beanstalk;
51
        $port   = $config->port;
52
        if ( ! empty($this->port) && is_numeric($this->port)) {
53
            $port = $this->port;
54
        }
55
56
        $this->queue = Pheanstalk::create($config->host, $port);
57
        $this->queue->useTube($this->tube);
58
        $this->connected = true;
59
    }
60
61
    /**
62
     * Returns connection status
63
     *
64
     * @return bool
65
     */
66
    public function isConnected(): bool
67
    {
68
        return $this->connected;
69
    }
70
71
    /**
72
     * Sends request and wait for answer from processor
73
     *
74
     * @param      $job_data
75
     * @param int  $timeout
76
     * @param int  $priority
77
     *
78
     * @return bool|mixed
79
     *
80
     */
81
    public function request(
82
        $job_data,
83
        int $timeout = 10,
84
        int $priority = PheanstalkInterface::DEFAULT_PRIORITY
85
    ) {
86
        $this->message = false;
87
        $inbox_tube    = uniqid('INBOX_', true);
88
        $this->queue->watch($inbox_tube);
89
90
        // Отправляем данные для обработки.
91
        $requestMessage = [
92
            $job_data,
93
            'inbox_tube' => $inbox_tube,
94
        ];
95
        $this->publish($requestMessage, null, $priority, 0, $timeout);
96
97
        // Получаем ответ от сервера.
98
        $job = null;
0 ignored issues
show
Unused Code introduced by
The assignment to $job is dead and can be removed.
Loading history...
99
        try {
100
            $job = $this->queue->reserveWithTimeout($timeout);
101
            if ($job !== null) {
102
                $this->message = $job->getData();
103
                $this->queue->delete($job);
104
            }
105
        } catch (Throwable $exception) {
106
            Util::sysLogMsg(__METHOD__, 'Exception: ' . $exception->getMessage());
107
            if ($job !== null) {
108
                $this->queue->bury($job);
109
            }
110
        }
111
        $this->queue->ignore($inbox_tube);
112
113
        return $this->message;
114
    }
115
116
    /**
117
     * Puts a job in a beanstalkd server queue
118
     *
119
     * @param mixed   $job_data data to worker
120
     * @param ?string $tube     tube name
121
     * @param int     $priority Jobs with smaller priority values will be scheduled
122
     *                          before jobs with larger priorities. The most urgent priority is 0;
123
     *                          the least urgent priority is 4294967295.
124
     * @param int     $delay    delay before insert job into work query
125
     * @param int     $ttr      time to execute this job
126
     *
127
     * @return \Pheanstalk\Job
128
     */
129
    public function publish(
130
        $job_data,
131
        $tube = null,
132
        int $priority = PheanstalkInterface::DEFAULT_PRIORITY,
133
        int $delay = PheanstalkInterface::DEFAULT_DELAY,
134
        int $ttr = PheanstalkInterface::DEFAULT_TTR
135
    ): Job {
136
        $tube = str_replace("\\", '-', $tube);
137
        // Change tube
138
        if ( ! empty($tube) && $this->tube !== $tube) {
139
            $this->queue->useTube($tube);
140
        }
141
        $job_data = serialize($job_data);
142
        // Send JOB to queue
143
        $result = $this->queue->put($job_data, $priority, $delay, $ttr);
144
145
        // Return original tube
146
        $this->queue->useTube($this->tube);
147
148
        return $result;
149
    }
150
151
    /**
152
     * Drops orphaned tasks
153
     */
154
    public function cleanTubes()
155
    {
156
        $tubes = $this->queue->listTubes();
157
        foreach ($tubes as $tube) {
158
            try {
159
                $this->queue->useTube($tube);
160
                // Delete buried jobs
161
                while ($job = $this->queue->peekBuried()) {
162
                    $id = $job->getId();
163
                    $this->queue->delete($job);
164
                    Util::sysLogMsg(__METHOD__, "Deleted buried job with ID {$id} from {$tube}");
165
                }
166
167
                // Delete outdated jobs
168
                while ($job = $this->queue->peekReady()) {
169
                    $jobStats = $this->queue->statsJob($job);
170
                    if (
171
                        ! property_exists($jobStats, 'age')
172
                        || ! property_exists($jobStats, 'ttr')
173
                    ) {
174
                        continue;
175
                    }
176
                    $age                   = (int)($jobStats->age);
177
                    $expectedTimeToExecute = ((int)($jobStats->ttr)) * 2;
178
                    if ($age > $expectedTimeToExecute) {
179
                        $id = $job->getId();
180
                        $this->queue->delete($job);
181
                        Util::sysLogMsg(__METHOD__, "Deleted outdated job with ID {$id} from {$tube}");
182
                    }
183
                }
184
            } catch (Throwable $exception) {
185
                Util::sysLogMsg(__METHOD__, 'Exception: ' . $exception->getMessage());
186
            }
187
        }
188
    }
189
190
    /**
191
     * Subscribe on new message in tube
192
     *
193
     * @param string           $tube     - listening tube
194
     * @param array | callable $callback - worker
195
     */
196
    public function subscribe(string $tube, $callback): void
197
    {
198
        $tube = str_replace("\\", '-', $tube);
199
        $this->queue->watch($tube);
200
        $this->queue->ignore('default');
201
        $this->subscriptions[$tube] = $callback;
202
    }
203
204
    /**
205
     * Job worker
206
     *
207
     * @param float $timeout
208
     *
209
     */
210
    public function wait(float $timeout = 10): void
211
    {
212
        $this->message = null;
213
        $start         = microtime(true);
214
        $job           = null;
0 ignored issues
show
Unused Code introduced by
The assignment to $job is dead and can be removed.
Loading history...
215
        try {
216
            $job = $this->queue->reserveWithTimeout($timeout);
0 ignored issues
show
Bug introduced by
It seems like $timeout can also be of type double; however, parameter $timeout of Pheanstalk\Pheanstalk::reserveWithTimeout() does only seem to accept integer, maybe add an additional type check? ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-type  annotation

216
            $job = $this->queue->reserveWithTimeout(/** @scrutinizer ignore-type */ $timeout);
Loading history...
217
        } catch (Throwable $exception) {
218
            Util::sysLogMsg(__METHOD__, 'Exception: ' . $exception->getMessage());
219
        }
220
221
        if ($job === null) {
222
            $worktime = (microtime(true) - $start);
223
            if ($worktime < 0.5) {
224
                // Что то не то, вероятно потеряна связь с сервером очередей.
225
                $this->reconnect();
226
            }
227
            if (is_array($this->timeout_handler)) {
228
                call_user_func($this->timeout_handler);
229
            }
230
231
            return;
232
        }
233
234
        // Processing job over callable function attached in $this->subscribe
235
        if (json_decode($job->getData(), true) !== null) {
236
            $mData = $job->getData();
237
        } else {
238
            $mData = unserialize($job->getData(), [false]);
239
        }
240
        $this->message = $mData;
241
242
        $stats           = $this->queue->statsJob($job);
243
        $requestFormTube = $stats['tube'];
244
        $func            = $this->subscriptions[$requestFormTube] ?? null;
245
246
        if ($func === null) {
247
            // Action not found
248
            $this->queue->bury($job);
249
        } else {
250
            try {
251
                if (is_array($func)) {
252
                    call_user_func($func, $this);
253
                } elseif (is_callable($func) === true) {
254
                    $func($this);
255
                }
256
                // Removes the job from the queue when it has been successfully completed
257
                $this->queue->delete($job);
258
            } catch (Throwable $e) {
259
                // Marks the job as terminally failed and no workers will restart it.
260
                $this->queue->bury($job);
261
            }
262
        }
263
    }
264
265
    /**
266
     * Gets request body
267
     *
268
     * @return string
269
     */
270
    public function getBody(): string
271
    {
272
        if (is_array($this->message)
273
            && isset($this->message['inbox_tube'])
274
            && count($this->message) === 2) {
275
            // Это поступил request, треует ответа. Данные были переданы первым параметром массива.
276
            $message_data = $this->message[0];
277
        } else {
278
            $message_data = $this->message;
279
        }
280
281
        return $message_data;
282
    }
283
284
    /**
285
     * Sends response to queue
286
     *
287
     * @param $response
288
     */
289
    public function reply($response): void
290
    {
291
        if (isset($this->message['inbox_tube'])) {
292
            $this->queue->useTube($this->message['inbox_tube']);
293
            $this->queue->put($response);
294
            $this->queue->useTube($this->tube);
295
        }
296
    }
297
298
    /**
299
     *
300
     * @param $handler
301
     */
302
    public function setErrorHandler($handler): void
303
    {
304
        $this->error_handler = $handler;
305
    }
306
307
    /**
308
     * @param $handler
309
     */
310
    public function setTimeoutHandler($handler): void
311
    {
312
        $this->timeout_handler = $handler;
313
    }
314
315
    /**
316
     * @return int
317
     */
318
    public function reconnectsCount(): int
319
    {
320
        return $this->reconnectsCount;
321
    }
322
323
    /**
324
     * Gets all messages from tube and clean it
325
     *
326
     * @param string $tube
327
     *
328
     * @return array
329
     */
330
    public function getMessagesFromTube(string $tube = ''): array
331
    {
332
        if ($tube !== '') {
333
            $this->queue->useTube($tube);
334
        }
335
        $arrayOfMessages = [];
336
        while ($job = $this->queue->peekReady()) {
337
            if (json_decode($job->getData(), true) !== null) {
338
                $mData = $job->getData();
339
            } else {
340
                $mData = unserialize($job->getData(), [false]);
341
            }
342
            $arrayOfMessages[] = $mData;
343
            $this->queue->delete($job);
344
        }
345
346
        return $arrayOfMessages;
347
    }
348
}