Passed
Push — develop ( dc22ff...71045b )
by Nikolay
05:51 queued 10s
created

BeanstalkClient::cleanTube()   A

Complexity

Conditions 6
Paths 12

Size

Total Lines 20
Code Lines 14

Duplication

Lines 0
Ratio 0 %

Importance

Changes 1
Bugs 0 Features 0
Metric Value
eloc 14
dl 0
loc 20
rs 9.2222
c 1
b 0
f 0
cc 6
nc 12
nop 0
1
<?php
2
/*
3
 * Copyright © MIKO LLC - All Rights Reserved
4
 * Unauthorized copying of this file, via any medium is strictly prohibited
5
 * Proprietary and confidential
6
 * Written by Alexey Portnov, 9 2020
7
 */
8
9
namespace MikoPBX\Core\System;
10
11
use Phalcon\Di\Injectable;
12
use Pheanstalk\Contract\PheanstalkInterface;
13
use Pheanstalk\Job;
14
use Pheanstalk\Pheanstalk;
15
use Throwable;
16
17
class BeanstalkClient extends Injectable
18
{
19
    /** @var Pheanstalk */
20
    private Pheanstalk $queue;
21
    private bool $connected = false;
22
    private array $subscriptions = [];
23
    private string $tube;
24
    private int $reconnectsCount = 0;
25
    private $message;
26
    private $timeout_handler;
27
    private $error_handler;
28
29
    private string $port;
30
31
    /**
32
     * BeanstalkClient constructor.
33
     *
34
     * @param string $tube
35
     * @param string $port
36
     */
37
    public function __construct($tube = 'default', $port = '')
38
    {
39
        $this->tube        = str_replace("\\", '-', $tube);
40
        $this->port        = $port;
41
        $this->reconnect();
42
    }
43
44
    /**
45
     * Recreates connection to the beanstalkd server
46
     */
47
    public function reconnect(): void
48
    {
49
        $config = $this->di->get('config')->beanstalk;
50
        $port   = $config->port;
51
        if ( ! empty($this->port) && is_numeric($this->port)) {
52
            $port = $this->port;
53
        }
54
55
        $this->queue = Pheanstalk::create($config->host, $port);
56
        $this->queue->useTube($this->tube);
57
        $this->connected = true;
58
    }
59
60
    /**
61
     * Returns connection status
62
     *
63
     * @return bool
64
     */
65
    public function isConnected(): bool
66
    {
67
        return $this->connected;
68
    }
69
70
    /**
71
     * Sends request and wait for answer from processor
72
     *
73
     * @param      $job_data
74
     * @param int  $timeout
75
     * @param int  $priority
76
     *
77
     * @return bool|mixed
78
     *
79
     */
80
    public function request(
81
        $job_data,
82
        int $timeout = 10,
83
        int $priority = PheanstalkInterface::DEFAULT_PRIORITY
84
    ) {
85
        $this->message = false;
86
        $inbox_tube    = uniqid('INBOX_', true);
87
        $this->queue->watch($inbox_tube);
88
89
        // Отправляем данные для обработки.
90
        $requestMessage = [
91
            $job_data,
92
            'inbox_tube' => $inbox_tube,
93
        ];
94
        $this->publish($requestMessage, null, $priority, 0, $timeout);
95
96
        // Получаем ответ от сервера.
97
        $job = null;
0 ignored issues
show
Unused Code introduced by
The assignment to $job is dead and can be removed.
Loading history...
98
        try {
99
            $job = $this->queue->reserveWithTimeout($timeout);
100
            if ($job !== null) {
101
                $this->message = $job->getData();
102
                $this->queue->delete($job);
103
                $job = null;
104
            }
105
        } catch (Throwable $exception){
106
            Util::sysLogMsg(__METHOD__, 'Exception: '.$exception->getMessage());
107
        } finally {
108
            if ($job !== null){
109
                $this->queue->bury($job);
110
            }
111
        }
112
        $this->queue->ignore($inbox_tube);
113
114
        // Чистим мусор.
115
        $this->cleanTube();
116
117
        return $this->message;
118
    }
119
120
    /**
121
     * Puts a job in a beanstalkd server queue
122
     *
123
     * @param mixed   $job_data data to worker
124
     * @param ?string $tube     tube name
125
     * @param int     $priority Jobs with smaller priority values will be scheduled
126
     *                          before jobs with larger priorities. The most urgent priority is 0;
127
     *                          the least urgent priority is 4294967295.
128
     * @param int     $delay    delay before insert job into work query
129
     * @param int     $ttr      time to execute this job
130
     *
131
     * @return \Pheanstalk\Job
132
     */
133
    public function publish(
134
        $job_data,
135
        $tube = null,
136
        int $priority = PheanstalkInterface::DEFAULT_PRIORITY,
137
        int $delay = PheanstalkInterface::DEFAULT_DELAY,
138
        int $ttr = PheanstalkInterface::DEFAULT_TTR
139
    ): Job {
140
        $tube = str_replace("\\", '-', $tube);
141
        // Change tube
142
        if ( ! empty($tube) && $this->tube !== $tube) {
143
            $this->queue->useTube($tube);
144
        }
145
        $job_data = serialize($job_data);
146
        // Send JOB to queue
147
        $result = $this->queue->put($job_data, $priority, $delay, $ttr);
148
149
        // Return original tube
150
        $this->queue->useTube($this->tube);
151
152
        return $result;
153
    }
154
155
    /**
156
     * Drops orphaned tasks
157
     */
158
    public function cleanTube()
159
    {
160
        $tubes = $this->queue->listTubes();
161
        foreach ($tubes as $tube) {
162
            if (strpos($tube, "INBOX_") !== 0) {
163
                continue;
164
            }
165
            try {
166
                $statData = $this->queue->statsTube($tube)->getArrayCopy();
167
                $watching = $statData['current-watching'];
168
                if ($watching !== '0') {
169
                    continue;
170
                }
171
                // Нужно удалить все Jobs.
172
                $this->queue->watch($tube);
173
                while ($job = $this->queue->peekReady()) {
174
                    $this->queue->delete($job);
175
                }
176
            } catch (Throwable $exception){
177
                Util::sysLogMsg(__METHOD__, 'Exception: '.$exception->getMessage());
178
            }
179
        }
180
    }
181
182
    /**
183
     * Subscribe on new message in tube
184
     *
185
     * @param string           $tube     - listening tube
186
     * @param array | callable $callback - worker
187
     */
188
    public function subscribe(string $tube, $callback): void
189
    {
190
        $tube = str_replace("\\", '-', $tube);
191
        $this->queue->watch($tube);
192
        $this->queue->ignore('default');
193
        $this->subscriptions[$tube] = $callback;
194
    }
195
196
    /**
197
     * Job worker
198
     *
199
     * @param float $timeout
200
     *
201
     */
202
    public function wait(float $timeout = 10): void
203
    {
204
        $this->message = null;
205
        $start         = microtime(true);
206
        $job = null;
0 ignored issues
show
Unused Code introduced by
The assignment to $job is dead and can be removed.
Loading history...
207
        try {
208
            $job  = $this->queue->reserveWithTimeout($timeout);
0 ignored issues
show
Bug introduced by
It seems like $timeout can also be of type double; however, parameter $timeout of Pheanstalk\Pheanstalk::reserveWithTimeout() does only seem to accept integer, maybe add an additional type check? ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-type  annotation

208
            $job  = $this->queue->reserveWithTimeout(/** @scrutinizer ignore-type */ $timeout);
Loading history...
209
        } catch (Throwable $exception){
210
            Util::sysLogMsg(__METHOD__, 'Exception: '.$exception->getMessage());
211
        }
212
213
        if ($job === null) {
214
            $worktime = (microtime(true) - $start);
215
            if ($worktime < 0.5) {
216
                // Что то не то, вероятно потеряна связь с сервером очередей.
217
                $this->reconnect();
218
            }
219
            if (is_array($this->timeout_handler)) {
220
                call_user_func($this->timeout_handler);
221
            }
222
223
            return;
224
        }
225
226
        // Processing job over callable function attached in $this->subscribe
227
        if (json_decode($job->getData(), true) !== null) {
228
            $mData = $job->getData();
229
        } else {
230
            $mData = unserialize($job->getData(), [false]);
231
        }
232
        $this->message = $mData;
233
234
        $stats           = $this->queue->statsJob($job);
235
        $requestFormTube = $stats['tube'];
236
        $func            = $this->subscriptions[$requestFormTube] ?? null;
237
238
        if ($func===null){
239
            // Action not found
240
            $this->queue->bury($job);
241
        } else {
242
            try {
243
                if (is_array($func)) {
244
                    call_user_func($func, $this);
245
                } elseif (is_callable($func) === true) {
246
                    $func($this);
247
                }
248
                // Removes the job from the queue when it has been successfully completed
249
                $this->queue->delete($job);
250
            } catch (Throwable $e) {
251
                // Marks the job as terminally failed and no workers will restart it.
252
                $this->queue->bury($job);
253
            }
254
        }
255
256
    }
257
258
    /**
259
     * Gets request body
260
     *
261
     * @return string
262
     */
263
    public function getBody(): string
264
    {
265
        if (is_array($this->message)
266
            && isset($this->message['inbox_tube'])
267
            && count($this->message) === 2) {
268
            // Это поступил request, треует ответа. Данные были переданы первым параметром массива.
269
            $message_data = $this->message[0];
270
        } else {
271
            $message_data = $this->message;
272
        }
273
274
        return $message_data;
275
    }
276
277
    /**
278
     * Sends response to queue
279
     *
280
     * @param $response
281
     */
282
    public function reply($response): void
283
    {
284
        if (isset($this->message['inbox_tube'])) {
285
            $this->queue->useTube($this->message['inbox_tube']);
286
            $this->queue->put($response);
287
            $this->queue->useTube($this->tube);
288
        }
289
    }
290
291
    /**
292
     *
293
     * @param $handler
294
     */
295
    public function setErrorHandler($handler): void
296
    {
297
        $this->error_handler = $handler;
298
    }
299
300
    /**
301
     * @param $handler
302
     */
303
    public function setTimeoutHandler($handler): void
304
    {
305
        $this->timeout_handler = $handler;
306
    }
307
308
    /**
309
     * @return int
310
     */
311
    public function reconnectsCount(): int
312
    {
313
        return $this->reconnectsCount;
314
    }
315
}