Passed
Push — develop ( cd6731...746ed8 )
by Nikolay
05:47 queued 12s
created

BeanstalkClient::isConnected()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 3
Code Lines 1

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 1
dl 0
loc 3
rs 10
c 0
b 0
f 0
cc 1
nc 1
nop 0
1
<?php
2
/*
3
 * Copyright © MIKO LLC - All Rights Reserved
4
 * Unauthorized copying of this file, via any medium is strictly prohibited
5
 * Proprietary and confidential
6
 * Written by Alexey Portnov, 9 2020
7
 */
8
9
namespace MikoPBX\Core\System;
10
11
use Phalcon\Di\Injectable;
12
use Pheanstalk\Contract\PheanstalkInterface;
13
use Pheanstalk\Job;
14
use Pheanstalk\Pheanstalk;
15
use Throwable;
16
17
class BeanstalkClient extends Injectable
18
{
19
    /** @var Pheanstalk */
20
    private Pheanstalk $queue;
21
    private bool $connected = false;
22
    private array $subscriptions = [];
23
    private string $tube;
24
    private int $reconnectsCount = 0;
25
    private $message;
26
    private $timeout_handler;
27
    private $error_handler;
28
29
    private string $port;
30
31
    /**
32
     * BeanstalkClient constructor.
33
     *
34
     * @param string $tube
35
     * @param string $port
36
     */
37
    public function __construct($tube = 'default', $port = '')
38
    {
39
        $this->tube        = str_replace("\\", '-', $tube);
40
        $this->port        = $port;
41
        $this->reconnect();
42
    }
43
44
    /**
45
     * Recreates connection to the beanstalkd server
46
     */
47
    public function reconnect(): void
48
    {
49
        $config = $this->di->get('config')->beanstalk;
50
        $port   = $config->port;
51
        if ( ! empty($this->port) && is_numeric($this->port)) {
52
            $port = $this->port;
53
        }
54
55
        $this->queue = Pheanstalk::create($config->host, $port);
56
        $this->queue->useTube($this->tube);
57
        $this->connected = true;
58
    }
59
60
    /**
61
     * Returns connection status
62
     *
63
     * @return bool
64
     */
65
    public function isConnected(): bool
66
    {
67
        return $this->connected;
68
    }
69
70
    /**
71
     * Sends request and wait for answer from processor
72
     *
73
     * @param      $job_data
74
     * @param int  $timeout
75
     * @param int  $priority
76
     *
77
     * @return bool|mixed
78
     *
79
     */
80
    public function request(
81
        $job_data,
82
        int $timeout = 10,
83
        int $priority = PheanstalkInterface::DEFAULT_PRIORITY
84
    ) {
85
        $this->message = false;
86
        $inbox_tube    = uniqid('INBOX_', true);
87
        $this->queue->watch($inbox_tube);
88
89
        // Отправляем данные для обработки.
90
        $requestMessage = [
91
            $job_data,
92
            'inbox_tube' => $inbox_tube,
93
        ];
94
        $this->publish($requestMessage, null, $priority, 0, $timeout);
95
96
        // Получаем ответ от сервера.
97
        $job = null;
0 ignored issues
show
Unused Code introduced by
The assignment to $job is dead and can be removed.
Loading history...
98
        try {
99
            $job = $this->queue->reserveWithTimeout($timeout);
100
            if ($job !== null) {
101
                $this->message = $job->getData();
102
                $this->queue->delete($job);
103
                $job = null;
104
            }
105
        } catch (Throwable $exception){
106
            Util::sysLogMsg(__METHOD__, 'Exception: '.$exception->getMessage());
107
        } finally {
108
            if ($job !== null){
109
                $this->queue->bury($job);
110
            }
111
        }
112
        $this->queue->ignore($inbox_tube);
113
114
        return $this->message;
115
    }
116
117
    /**
118
     * Puts a job in a beanstalkd server queue
119
     *
120
     * @param mixed   $job_data data to worker
121
     * @param ?string $tube     tube name
122
     * @param int     $priority Jobs with smaller priority values will be scheduled
123
     *                          before jobs with larger priorities. The most urgent priority is 0;
124
     *                          the least urgent priority is 4294967295.
125
     * @param int     $delay    delay before insert job into work query
126
     * @param int     $ttr      time to execute this job
127
     *
128
     * @return \Pheanstalk\Job
129
     */
130
    public function publish(
131
        $job_data,
132
        $tube = null,
133
        int $priority = PheanstalkInterface::DEFAULT_PRIORITY,
134
        int $delay = PheanstalkInterface::DEFAULT_DELAY,
135
        int $ttr = PheanstalkInterface::DEFAULT_TTR
136
    ): Job {
137
        $tube = str_replace("\\", '-', $tube);
138
        // Change tube
139
        if ( ! empty($tube) && $this->tube !== $tube) {
140
            $this->queue->useTube($tube);
141
        }
142
        $job_data = serialize($job_data);
143
        // Send JOB to queue
144
        $result = $this->queue->put($job_data, $priority, $delay, $ttr);
145
146
        // Return original tube
147
        $this->queue->useTube($this->tube);
148
149
        return $result;
150
    }
151
152
    /**
153
     * Drops orphaned tasks
154
     */
155
    public function cleanTubes()
156
    {
157
        $tubes = $this->queue->listTubes();
158
        foreach ($tubes as $tube) {
159
            try {
160
                $this->queue->watchOnly($tube);
161
                // Delete buried jobs
162
                while ($job = $this->queue->peekBuried()) {
163
                    $id = $job->getId();
164
                    $this->queue->delete($job);
165
                    Util::sysLogMsg(__METHOD__, "Deleted buried job with ID {$id} from {$tube}");
166
                }
167
168
                // Delete outdated jobs
169
                while ($job = $this->queue->peekReady()) {
170
                    $jobStats = $this->queue->statsJob($job);
171
                    if ($jobStats->age>$jobStats->timeout*2){
0 ignored issues
show
Bug introduced by
Accessing timeout on the interface Pheanstalk\Contract\ResponseInterface suggest that you code against a concrete implementation. How about adding an instanceof check?
Loading history...
Bug introduced by
Accessing age on the interface Pheanstalk\Contract\ResponseInterface suggest that you code against a concrete implementation. How about adding an instanceof check?
Loading history...
172
                        $id = $job->getId();
173
                        $this->queue->delete($job);
174
                        Util::sysLogMsg(__METHOD__, "Deleted outdated job with ID {$id} from {$tube}");
175
                    }
176
                }
177
                $this->queue->watchOnly('default');
178
179
            } catch (Throwable $exception){
180
                Util::sysLogMsg(__METHOD__, 'Exception: '.$exception->getMessage());
181
            }
182
        }
183
    }
184
185
    /**
186
     * Subscribe on new message in tube
187
     *
188
     * @param string           $tube     - listening tube
189
     * @param array | callable $callback - worker
190
     */
191
    public function subscribe(string $tube, $callback): void
192
    {
193
        $tube = str_replace("\\", '-', $tube);
194
        $this->queue->watch($tube);
195
        $this->queue->ignore('default');
196
        $this->subscriptions[$tube] = $callback;
197
    }
198
199
    /**
200
     * Job worker
201
     *
202
     * @param float $timeout
203
     *
204
     */
205
    public function wait(float $timeout = 10): void
206
    {
207
        $this->message = null;
208
        $start         = microtime(true);
209
        $job = null;
0 ignored issues
show
Unused Code introduced by
The assignment to $job is dead and can be removed.
Loading history...
210
        try {
211
            $job  = $this->queue->reserveWithTimeout($timeout);
0 ignored issues
show
Bug introduced by
It seems like $timeout can also be of type double; however, parameter $timeout of Pheanstalk\Pheanstalk::reserveWithTimeout() does only seem to accept integer, maybe add an additional type check? ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-type  annotation

211
            $job  = $this->queue->reserveWithTimeout(/** @scrutinizer ignore-type */ $timeout);
Loading history...
212
        } catch (Throwable $exception){
213
            Util::sysLogMsg(__METHOD__, 'Exception: '.$exception->getMessage());
214
        }
215
216
        if ($job === null) {
217
            $worktime = (microtime(true) - $start);
218
            if ($worktime < 0.5) {
219
                // Что то не то, вероятно потеряна связь с сервером очередей.
220
                $this->reconnect();
221
            }
222
            if (is_array($this->timeout_handler)) {
223
                call_user_func($this->timeout_handler);
224
            }
225
226
            return;
227
        }
228
229
        // Processing job over callable function attached in $this->subscribe
230
        if (json_decode($job->getData(), true) !== null) {
231
            $mData = $job->getData();
232
        } else {
233
            $mData = unserialize($job->getData(), [false]);
234
        }
235
        $this->message = $mData;
236
237
        $stats           = $this->queue->statsJob($job);
238
        $requestFormTube = $stats['tube'];
239
        $func            = $this->subscriptions[$requestFormTube] ?? null;
240
241
        if ($func===null){
242
            // Action not found
243
            $this->queue->bury($job);
244
        } else {
245
            try {
246
                if (is_array($func)) {
247
                    call_user_func($func, $this);
248
                } elseif (is_callable($func) === true) {
249
                    $func($this);
250
                }
251
                // Removes the job from the queue when it has been successfully completed
252
                $this->queue->delete($job);
253
            } catch (Throwable $e) {
254
                // Marks the job as terminally failed and no workers will restart it.
255
                $this->queue->bury($job);
256
            }
257
        }
258
259
    }
260
261
    /**
262
     * Gets request body
263
     *
264
     * @return string
265
     */
266
    public function getBody(): string
267
    {
268
        if (is_array($this->message)
269
            && isset($this->message['inbox_tube'])
270
            && count($this->message) === 2) {
271
            // Это поступил request, треует ответа. Данные были переданы первым параметром массива.
272
            $message_data = $this->message[0];
273
        } else {
274
            $message_data = $this->message;
275
        }
276
277
        return $message_data;
278
    }
279
280
    /**
281
     * Sends response to queue
282
     *
283
     * @param $response
284
     */
285
    public function reply($response): void
286
    {
287
        if (isset($this->message['inbox_tube'])) {
288
            $this->queue->useTube($this->message['inbox_tube']);
289
            $this->queue->put($response);
290
            $this->queue->useTube($this->tube);
291
        }
292
    }
293
294
    /**
295
     *
296
     * @param $handler
297
     */
298
    public function setErrorHandler($handler): void
299
    {
300
        $this->error_handler = $handler;
301
    }
302
303
    /**
304
     * @param $handler
305
     */
306
    public function setTimeoutHandler($handler): void
307
    {
308
        $this->timeout_handler = $handler;
309
    }
310
311
    /**
312
     * @return int
313
     */
314
    public function reconnectsCount(): int
315
    {
316
        return $this->reconnectsCount;
317
    }
318
}