Passed
Push — develop ( 0ee8c3...045db2 )
by Nikolay
05:22
created

BeanstalkClient::wait()   B

Complexity

Conditions 10
Paths 48

Size

Total Lines 51
Code Lines 33

Duplication

Lines 0
Ratio 0 %

Importance

Changes 3
Bugs 0 Features 0
Metric Value
eloc 33
dl 0
loc 51
rs 7.6666
c 3
b 0
f 0
cc 10
nc 48
nop 1

How to fix   Long Method    Complexity   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

1
<?php
2
/*
3
 * Copyright © MIKO LLC - All Rights Reserved
4
 * Unauthorized copying of this file, via any medium is strictly prohibited
5
 * Proprietary and confidential
6
 * Written by Alexey Portnov, 9 2020
7
 */
8
9
namespace MikoPBX\Core\System;
10
11
use Phalcon\Di\Injectable;
12
use Pheanstalk\Contract\PheanstalkInterface;
13
use Pheanstalk\Job;
14
use Pheanstalk\Pheanstalk;
15
use Pheanstalk\Contract\ResponseInterface;
16
use Throwable;
17
18
class BeanstalkClient extends Injectable
19
{
20
    /** @var Pheanstalk */
21
    private Pheanstalk $queue;
22
    private bool $connected = false;
23
    private array $subscriptions = [];
24
    private string $tube;
25
    private int $reconnectsCount = 0;
26
    private $message;
27
    private $timeout_handler;
28
    private $error_handler;
29
30
    private string $port;
31
32
    /**
33
     * BeanstalkClient constructor.
34
     *
35
     * @param string $tube
36
     * @param string $port
37
     */
38
    public function __construct($tube = 'default', $port = '')
39
    {
40
        $this->tube        = str_replace("\\", '-', $tube);
41
        $this->port        = $port;
42
        $this->reconnect();
43
    }
44
45
    /**
46
     * Recreates connection to the beanstalkd server
47
     */
48
    public function reconnect(): void
49
    {
50
        $config = $this->di->get('config')->beanstalk;
51
        $port   = $config->port;
52
        if ( ! empty($this->port) && is_numeric($this->port)) {
53
            $port = $this->port;
54
        }
55
56
        $this->queue = Pheanstalk::create($config->host, $port);
57
        $this->queue->useTube($this->tube);
58
        $this->connected = true;
59
    }
60
61
    /**
62
     * Returns connection status
63
     *
64
     * @return bool
65
     */
66
    public function isConnected(): bool
67
    {
68
        return $this->connected;
69
    }
70
71
    /**
72
     * Sends request and wait for answer from processor
73
     *
74
     * @param      $job_data
75
     * @param int  $timeout
76
     * @param int  $priority
77
     *
78
     * @return bool|mixed
79
     *
80
     */
81
    public function request(
82
        $job_data,
83
        int $timeout = 10,
84
        int $priority = PheanstalkInterface::DEFAULT_PRIORITY
85
    ) {
86
        $this->message = false;
87
        $inbox_tube    = uniqid('INBOX_', true);
88
        $this->queue->watch($inbox_tube);
89
90
        // Отправляем данные для обработки.
91
        $requestMessage = [
92
            $job_data,
93
            'inbox_tube' => $inbox_tube,
94
        ];
95
        $this->publish($requestMessage, null, $priority, 0, $timeout);
96
97
        // Получаем ответ от сервера.
98
        $job = null;
0 ignored issues
show
Unused Code introduced by
The assignment to $job is dead and can be removed.
Loading history...
99
        try {
100
            $job = $this->queue->reserveWithTimeout($timeout);
101
            if ($job !== null) {
102
                $this->message = $job->getData();
103
                $this->queue->delete($job);
104
            }
105
        } catch (Throwable $exception){
106
            Util::sysLogMsg(__METHOD__, 'Exception: '.$exception->getMessage());
107
            if ($job !== null) {
108
                $this->queue->bury($job);
109
            }
110
        }
111
        $this->queue->ignore($inbox_tube);
112
113
        return $this->message;
114
    }
115
116
    /**
117
     * Puts a job in a beanstalkd server queue
118
     *
119
     * @param mixed   $job_data data to worker
120
     * @param ?string $tube     tube name
121
     * @param int     $priority Jobs with smaller priority values will be scheduled
122
     *                          before jobs with larger priorities. The most urgent priority is 0;
123
     *                          the least urgent priority is 4294967295.
124
     * @param int     $delay    delay before insert job into work query
125
     * @param int     $ttr      time to execute this job
126
     *
127
     * @return \Pheanstalk\Job
128
     */
129
    public function publish(
130
        $job_data,
131
        $tube = null,
132
        int $priority = PheanstalkInterface::DEFAULT_PRIORITY,
133
        int $delay = PheanstalkInterface::DEFAULT_DELAY,
134
        int $ttr = PheanstalkInterface::DEFAULT_TTR
135
    ): Job {
136
        $tube = str_replace("\\", '-', $tube);
137
        // Change tube
138
        if ( ! empty($tube) && $this->tube !== $tube) {
139
            $this->queue->useTube($tube);
140
        }
141
        $job_data = serialize($job_data);
142
        // Send JOB to queue
143
        $result = $this->queue->put($job_data, $priority, $delay, $ttr);
144
145
        // Return original tube
146
        $this->queue->useTube($this->tube);
147
148
        return $result;
149
    }
150
151
    /**
152
     * Drops orphaned tasks
153
     */
154
    public function cleanTubes()
155
    {
156
        $tubes = $this->queue->listTubes();
157
        foreach ($tubes as $tube) {
158
            try {
159
                $this->queue->useTube($tube);
160
                // Delete buried jobs
161
                while ($job = $this->queue->peekBuried()) {
162
                    $id = $job->getId();
163
                    $this->queue->delete($job);
164
                    Util::sysLogMsg(__METHOD__, "Deleted buried job with ID {$id} from {$tube}");
165
                }
166
167
                // Delete outdated jobs
168
                while ($job = $this->queue->peekReady()) {
169
                    $jobStats = $this->queue->statsJob($job);
170
                    if (! $jobStats instanceof ResponseInterface){
171
                        continue;
172
                    }
173
                    $age = (int)($jobStats->age);
0 ignored issues
show
Bug introduced by
Accessing age on the interface Pheanstalk\Contract\ResponseInterface suggest that you code against a concrete implementation. How about adding an instanceof check?
Loading history...
174
                    $expectedTimeToExecute = ((int)($jobStats->ttr))*2;
0 ignored issues
show
Bug introduced by
Accessing ttr on the interface Pheanstalk\Contract\ResponseInterface suggest that you code against a concrete implementation. How about adding an instanceof check?
Loading history...
175
                    if ($age>$expectedTimeToExecute){
176
                        $id = $job->getId();
177
                        $this->queue->delete($job);
178
                        Util::sysLogMsg(__METHOD__, "Deleted outdated job with ID {$id} from {$tube}");
179
                    }
180
                }
181
182
            } catch (Throwable $exception){
183
                Util::sysLogMsg(__METHOD__, 'Exception: '.$exception->getMessage());
184
            }
185
        }
186
    }
187
188
    /**
189
     * Subscribe on new message in tube
190
     *
191
     * @param string           $tube     - listening tube
192
     * @param array | callable $callback - worker
193
     */
194
    public function subscribe(string $tube, $callback): void
195
    {
196
        $tube = str_replace("\\", '-', $tube);
197
        $this->queue->watch($tube);
198
        $this->queue->ignore('default');
199
        $this->subscriptions[$tube] = $callback;
200
    }
201
202
    /**
203
     * Job worker
204
     *
205
     * @param float $timeout
206
     *
207
     */
208
    public function wait(float $timeout = 10): void
209
    {
210
        $this->message = null;
211
        $start         = microtime(true);
212
        $job = null;
0 ignored issues
show
Unused Code introduced by
The assignment to $job is dead and can be removed.
Loading history...
213
        try {
214
            $job  = $this->queue->reserveWithTimeout($timeout);
0 ignored issues
show
Bug introduced by
It seems like $timeout can also be of type double; however, parameter $timeout of Pheanstalk\Pheanstalk::reserveWithTimeout() does only seem to accept integer, maybe add an additional type check? ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-type  annotation

214
            $job  = $this->queue->reserveWithTimeout(/** @scrutinizer ignore-type */ $timeout);
Loading history...
215
        } catch (Throwable $exception){
216
            Util::sysLogMsg(__METHOD__, 'Exception: '.$exception->getMessage());
217
        }
218
219
        if ($job === null) {
220
            $worktime = (microtime(true) - $start);
221
            if ($worktime < 0.5) {
222
                // Что то не то, вероятно потеряна связь с сервером очередей.
223
                $this->reconnect();
224
            }
225
            if (is_array($this->timeout_handler)) {
226
                call_user_func($this->timeout_handler);
227
            }
228
229
            return;
230
        }
231
232
        // Processing job over callable function attached in $this->subscribe
233
        if (json_decode($job->getData(), true) !== null) {
234
            $mData = $job->getData();
235
        } else {
236
            $mData = unserialize($job->getData(), [false]);
237
        }
238
        $this->message = $mData;
239
240
        $stats           = $this->queue->statsJob($job);
241
        $requestFormTube = $stats['tube'];
242
        $func            = $this->subscriptions[$requestFormTube] ?? null;
243
244
        if ($func===null){
245
            // Action not found
246
            $this->queue->bury($job);
247
        } else {
248
            try {
249
                if (is_array($func)) {
250
                    call_user_func($func, $this);
251
                } elseif (is_callable($func) === true) {
252
                    $func($this);
253
                }
254
                // Removes the job from the queue when it has been successfully completed
255
                $this->queue->delete($job);
256
            } catch (Throwable $e) {
257
                // Marks the job as terminally failed and no workers will restart it.
258
                $this->queue->bury($job);
259
            }
260
        }
261
262
    }
263
264
    /**
265
     * Gets request body
266
     *
267
     * @return string
268
     */
269
    public function getBody(): string
270
    {
271
        if (is_array($this->message)
272
            && isset($this->message['inbox_tube'])
273
            && count($this->message) === 2) {
274
            // Это поступил request, треует ответа. Данные были переданы первым параметром массива.
275
            $message_data = $this->message[0];
276
        } else {
277
            $message_data = $this->message;
278
        }
279
280
        return $message_data;
281
    }
282
283
    /**
284
     * Sends response to queue
285
     *
286
     * @param $response
287
     */
288
    public function reply($response): void
289
    {
290
        if (isset($this->message['inbox_tube'])) {
291
            $this->queue->useTube($this->message['inbox_tube']);
292
            $this->queue->put($response);
293
            $this->queue->useTube($this->tube);
294
        }
295
    }
296
297
    /**
298
     *
299
     * @param $handler
300
     */
301
    public function setErrorHandler($handler): void
302
    {
303
        $this->error_handler = $handler;
304
    }
305
306
    /**
307
     * @param $handler
308
     */
309
    public function setTimeoutHandler($handler): void
310
    {
311
        $this->timeout_handler = $handler;
312
    }
313
314
    /**
315
     * @return int
316
     */
317
    public function reconnectsCount(): int
318
    {
319
        return $this->reconnectsCount;
320
    }
321
}