Passed
Push — develop ( afdd08...d6811a )
by Портнов
04:48
created

BeanstalkClient   B

Complexity

Total Complexity 48

Size/Duplication

Total Lines 368
Duplicated Lines 0 %

Importance

Changes 7
Bugs 0 Features 0
Metric Value
wmc 48
eloc 156
c 7
b 0
f 0
dl 0
loc 368
rs 8.5599

15 Methods

Rating   Name   Duplication   Size   Complexity  
A subscribe() 0 6 1
A isConnected() 0 3 1
A __construct() 0 5 1
A reconnect() 0 13 4
A setTimeoutHandler() 0 3 1
A reply() 0 6 2
A buryJob() 0 9 3
A getMessagesFromTube() 0 17 4
A setErrorHandler() 0 3 1
B cleanTubes() 0 53 9
A reconnectsCount() 0 3 1
A request() 0 31 3
A getBody() 0 12 4
B wait() 0 54 10
A publish() 0 20 3

How to fix   Complexity   

Complex Class

Complex classes like BeanstalkClient often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

While breaking up the class, it is a good idea to analyze how other classes use BeanstalkClient, and based on these observations, apply Extract Interface, too.

1
<?php
2
/*
3
 * MikoPBX - free phone system for small business
4
 * Copyright (C) 2017-2020 Alexey Portnov and Nikolay Beketov
5
 *
6
 * This program is free software: you can redistribute it and/or modify
7
 * it under the terms of the GNU General Public License as published by
8
 * the Free Software Foundation; either version 3 of the License, or
9
 * (at your option) any later version.
10
 *
11
 * This program is distributed in the hope that it will be useful,
12
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14
 * GNU General Public License for more details.
15
 *
16
 * You should have received a copy of the GNU General Public License along with this program.
17
 * If not, see <https://www.gnu.org/licenses/>.
18
 */
19
20
namespace MikoPBX\Core\System;
21
22
use Phalcon\Di\Injectable;
23
use Pheanstalk\Contract\PheanstalkInterface;
24
use Pheanstalk\Job;
25
use Pheanstalk\Pheanstalk;
26
use Throwable;
27
28
class BeanstalkClient extends Injectable
29
{
30
    public const INBOX_PREFIX = 'INBOX_';
31
32
    /** @var Pheanstalk */
33
    private Pheanstalk $queue;
34
    private bool $connected = false;
35
    private array $subscriptions = [];
36
    private string $tube;
37
    private int $reconnectsCount = 0;
38
    private $message;
39
    private $timeout_handler;
40
    private $error_handler;
41
42
    private string $port;
43
44
    /**
45
     * BeanstalkClient constructor.
46
     *
47
     * @param string $tube
48
     * @param string $port
49
     */
50
    public function __construct($tube = 'default', $port = '')
51
    {
52
        $this->tube = str_replace("\\", '-', $tube);
53
        $this->port = $port;
54
        $this->reconnect();
55
    }
56
57
    /**
58
     * Recreates connection to the beanstalkd server
59
     */
60
    public function reconnect(): void
61
    {
62
        $config = $this->di->get('config')->beanstalk;
63
        $port   = $config->port;
64
        if ( ! empty($this->port) && is_numeric($this->port)) {
65
            $port = $this->port;
66
        }
67
        $this->queue = Pheanstalk::create($config->host, $port);
68
        $this->queue->useTube($this->tube);
69
        foreach ($this->subscriptions as $tube => $callback) {
70
            $this->subscribe($tube, $callback);
71
        }
72
        $this->connected = true;
73
    }
74
75
    /**
76
     * Subscribe on new message in tube
77
     *
78
     * @param string           $tube     - listening tube
79
     * @param array | callable $callback - worker
80
     */
81
    public function subscribe(string $tube, $callback): void
82
    {
83
        $tube = str_replace("\\", '-', $tube);
84
        $this->queue->watch($tube);
85
        $this->queue->ignore('default');
86
        $this->subscriptions[$tube] = $callback;
87
    }
88
89
    /**
90
     * Returns connection status
91
     *
92
     * @return bool
93
     */
94
    public function isConnected(): bool
95
    {
96
        return $this->connected;
97
    }
98
99
    /**
100
     * Sends request and wait for answer from processor
101
     *
102
     * @param      $job_data
103
     * @param int  $timeout
104
     * @param int  $priority
105
     *
106
     * @return bool|string
107
     */
108
    public function request(
109
        $job_data,
110
        int $timeout = 10,
111
        int $priority = PheanstalkInterface::DEFAULT_PRIORITY
112
    ) {
113
        $this->message = false;
114
        $inbox_tube    = uniqid(self::INBOX_PREFIX, true);
115
        $this->queue->watch($inbox_tube);
116
117
        // Send message to backend worker
118
        $requestMessage = [
119
            $job_data,
120
            'inbox_tube' => $inbox_tube,
121
        ];
122
        $this->publish($requestMessage, null, $priority, 0, $timeout);
123
124
        // We wait until a worker process request.
125
        $job = null;
0 ignored issues
show
Unused Code introduced by
The assignment to $job is dead and can be removed.
Loading history...
126
        try {
127
            $job = $this->queue->reserveWithTimeout($timeout);
128
            if ($job !== null) {
129
                $this->message = $job->getData();
130
                $this->queue->delete($job);
131
            }
132
        } catch (Throwable $exception) {
133
            Util::sysLogMsg(__METHOD__, 'Exception: ' . $exception->getMessage(), LOG_ERR);
134
            $this->buryJob($job);
135
        }
136
        $this->queue->ignore($inbox_tube);
137
138
        return $this->message;
139
    }
140
141
    /**
142
     * Puts a job in a beanstalkd server queue
143
     *
144
     * @param mixed   $job_data data to worker
145
     * @param ?string $tube     tube name
146
     * @param int     $priority Jobs with smaller priority values will be scheduled
147
     *                          before jobs with larger priorities. The most urgent priority is 0;
148
     *                          the least urgent priority is 4294967295.
149
     * @param int     $delay    delay before insert job into work query
150
     * @param int     $ttr      time to execute this job
151
     *
152
     * @return \Pheanstalk\Job
153
     */
154
    public function publish(
155
        $job_data,
156
        $tube = null,
157
        int $priority = PheanstalkInterface::DEFAULT_PRIORITY,
158
        int $delay = PheanstalkInterface::DEFAULT_DELAY,
159
        int $ttr = PheanstalkInterface::DEFAULT_TTR
160
    ): Job {
161
        $tube = str_replace("\\", '-', $tube);
162
        // Change tube
163
        if ( ! empty($tube) && $this->tube !== $tube) {
164
            $this->queue->useTube($tube);
165
        }
166
        $job_data = serialize($job_data);
167
        // Send JOB to queue
168
        $result = $this->queue->put($job_data, $priority, $delay, $ttr);
169
170
        // Return original tube
171
        $this->queue->useTube($this->tube);
172
173
        return $result;
174
    }
175
176
    /**
177
     * Drops orphaned tasks
178
     */
179
    public function cleanTubes()
180
    {
181
        $tubes          = $this->queue->listTubes();
182
        $deletedJobInfo = [];
183
        foreach ($tubes as $tube) {
184
            try {
185
                $this->queue->useTube($tube);
186
                $queueStats = $this->queue->stats()->getArrayCopy();
187
188
                // Delete buried jobs
189
                $countBuried = $queueStats['current-jobs-buried'];
190
                while ($job = $this->queue->peekBuried()) {
191
                    $countBuried--;
192
                    if ($countBuried < 0) {
193
                        break;
194
                    }
195
                    $id = $job->getId();
196
                    Util::sysLogMsg(
197
                        __METHOD__,
198
                        "Deleted buried job with ID {$id} from {$tube} with message {$job->getData()}",
199
                        LOG_DEBUG
200
                    );
201
                    $this->queue->delete($job);
202
                    $deletedJobInfo[] = "{$id} from {$tube}";
203
                }
204
205
                // Delete outdated jobs
206
                $countReady = $queueStats['current-jobs-ready'];
207
                while ($job = $this->queue->peekReady()) {
208
                    $countReady--;
209
                    if ($countReady < 0) {
210
                        break;
211
                    }
212
                    $id                    = $job->getId();
213
                    $jobStats              = $this->queue->statsJob($job)->getArrayCopy();
214
                    $age                   = (int)$jobStats['age'];
215
                    $expectedTimeToExecute = (int)$jobStats['ttr'] * 2;
216
                    if ($age > $expectedTimeToExecute) {
217
                        Util::sysLogMsg(
218
                            __METHOD__,
219
                            "Deleted outdated job with ID {$id} from {$tube} with message {$job->getData()}",
220
                            LOG_DEBUG
221
                        );
222
                        $this->queue->delete($job);
223
                        $deletedJobInfo[] = "{$id} from {$tube}";
224
                    }
225
                }
226
            } catch (Throwable $exception) {
227
                Util::sysLogMsg(__METHOD__, 'Exception: ' . $exception->getMessage(), LOG_ERR);
228
            }
229
        }
230
        if (count($deletedJobInfo) > 0) {
231
            Util::sysLogMsg(__METHOD__, "Delete outdated jobs" . implode(PHP_EOL, $deletedJobInfo), LOG_WARNING);
232
        }
233
    }
234
235
    /**
236
     * Job worker for loop cycles
237
     *
238
     * @param float $timeout
239
     *
240
     */
241
    public function wait(float $timeout = 5): void
242
    {
243
        $this->message = null;
244
        $start         = microtime(true);
245
        try {
246
            $job = $this->queue->reserveWithTimeout((int)$timeout);
247
        } catch (Throwable $exception) {
248
            Util::sysLogMsg(__METHOD__, 'Exception: ' . $exception->getMessage(), LOG_ERR);
249
        }
250
251
        if ( ! isset($job)) {
252
            $workTime = (microtime(true) - $start);
253
            if ($workTime < $timeout) {
254
                usleep(100000);
255
                // Если время ожидания $worktime меньше значения таймаута $timeout
256
                // И задача не получена $job === null
257
                // Что то не то, вероятно потеряна связь с сервером очередей
258
                $this->reconnect();
259
            }
260
            if (is_array($this->timeout_handler)) {
261
                call_user_func($this->timeout_handler);
262
            }
263
264
            return;
265
        }
266
267
        // Processing job over callable function attached in $this->subscribe
268
        if (json_decode($job->getData(), true) !== null) {
269
            $mData = $job->getData();
270
        } else {
271
            $mData = unserialize($job->getData(), [false]);
272
        }
273
        $this->message = $mData;
274
275
        $stats           = $this->queue->statsJob($job);
276
        $requestFormTube = $stats['tube'];
277
        $func            = $this->subscriptions[$requestFormTube] ?? null;
278
279
        if ($func === null) {
280
            // Action not found
281
            $this->buryJob($job);
282
        } else {
283
            try {
284
                if (is_array($func)) {
285
                    call_user_func($func, $this);
286
                } elseif (is_callable($func) === true) {
287
                    $func($this);
288
                }
289
                // Removes the job from the queue when it has been successfully completed
290
                $this->queue->delete($job);
291
            } catch (Throwable $e) {
292
                // Marks the job as terminally failed and no workers will restart it.
293
                $this->buryJob($job);
294
                Util::sysLogMsg(__METHOD__ . '_EXCEPTION', $e->getMessage(), LOG_ERR);
295
            }
296
        }
297
    }
298
299
    /**
300
     * @param $job
301
     */
302
    private function buryJob($job):void
303
    {
304
        if(!isset($job)){
305
            return;
306
        }
307
        try {
308
            $this->queue->bury($job);
309
        } catch (Throwable $e) {
310
            Util::sysLogMsg(__METHOD__ . '_EXCEPTION', $e->getMessage(), LOG_ERR);
311
        }
312
    }
313
314
    /**
315
     * Gets request body
316
     *
317
     * @return string
318
     */
319
    public function getBody(): string
320
    {
321
        if (is_array($this->message)
322
            && isset($this->message['inbox_tube'])
323
            && count($this->message) === 2) {
324
            // Это поступил request, треует ответа. Данные были переданы первым параметром массива.
325
            $message_data = $this->message[0];
326
        } else {
327
            $message_data = $this->message;
328
        }
329
330
        return $message_data;
331
    }
332
333
    /**
334
     * Sends response to queue
335
     *
336
     * @param $response
337
     */
338
    public function reply($response): void
339
    {
340
        if (isset($this->message['inbox_tube'])) {
341
            $this->queue->useTube($this->message['inbox_tube']);
342
            $this->queue->put($response);
343
            $this->queue->useTube($this->tube);
344
        }
345
    }
346
347
    /**
348
     *
349
     * @param $handler
350
     */
351
    public function setErrorHandler($handler): void
352
    {
353
        $this->error_handler = $handler;
354
    }
355
356
    /**
357
     * @param $handler
358
     */
359
    public function setTimeoutHandler($handler): void
360
    {
361
        $this->timeout_handler = $handler;
362
    }
363
364
    /**
365
     * @return int
366
     */
367
    public function reconnectsCount(): int
368
    {
369
        return $this->reconnectsCount;
370
    }
371
372
    /**
373
     * Gets all messages from tube and clean it
374
     *
375
     * @param string $tube
376
     *
377
     * @return array
378
     */
379
    public function getMessagesFromTube(string $tube = ''): array
380
    {
381
        if ($tube !== '') {
382
            $this->queue->useTube($tube);
383
        }
384
        $arrayOfMessages = [];
385
        while ($job = $this->queue->peekReady()) {
386
            if (json_decode($job->getData(), true) !== null) {
387
                $mData = $job->getData();
388
            } else {
389
                $mData = unserialize($job->getData(), [false]);
390
            }
391
            $arrayOfMessages[] = $mData;
392
            $this->queue->delete($job);
393
        }
394
395
        return $arrayOfMessages;
396
    }
397
}