Passed
Push — main ( 8bb8ec...4bcff6 )
by William
02:18
created

Base   A

Complexity

Total Complexity 21

Size/Duplication

Total Lines 223
Duplicated Lines 0 %

Importance

Changes 3
Bugs 0 Features 0
Metric Value
eloc 70
c 3
b 0
f 0
dl 0
loc 223
rs 10
wmc 21

12 Methods

Rating   Name   Duplication   Size   Complexity  
A getChannel() 0 9 2
A randomConsumer() 0 4 1
A queue() 0 15 1
A getConnection() 0 9 2
A bind() 0 7 1
A loopConnection() 0 11 3
A shutdown() 0 13 4
A cleanConnection() 0 7 2
A exchange() 0 16 1
A configRabbit() 0 7 1
A configConfirmSelect() 0 4 1
A __construct() 0 17 2
1
<?php
2
3
namespace WillRy\RabbitRun;
4
5
use PhpAmqpLib\Connection\AMQPStreamConnection;
6
use PhpAmqpLib\Exception\AMQPIOException;
7
use PhpAmqpLib\Exception\AMQPRuntimeException;
8
use PhpAmqpLib\Exception\AMQPTimeoutException;
9
use WillRy\RabbitRun\Connections\Connect;
10
11
class Base
12
{
13
14
15
    /** @var AMQPStreamConnection Instância de conexão */
16
    protected $instance;
17
18
    /** @var \PhpAmqpLib\Channel\AMQPChannel Canal de comunicação */
19
    protected $channel;
20
21
    /** @var PDO Conexão do PDO */
0 ignored issues
show
Bug introduced by
The type WillRy\RabbitRun\PDO was not found. Did you mean PDO? If so, make sure to prefix the type with \.
Loading history...
22
    protected $db;
23
24
    /** @var bool */
25
    protected $executing = false;
26
27
    protected $confirmSelect = false;
28
29
30
    public function __construct()
31
    {
32
33
        /**
34
         * Graceful shutdown
35
         * Faz a execucao parar ao enviar um sinal do linux para matar o script
36
         */
37
        if (php_sapi_name() == "cli") {
38
            \pcntl_signal(SIGTERM, function ($signal) {
39
                $this->shutdown($signal);
40
            }, false);
41
            \pcntl_signal(SIGINT, function ($signal) {
42
                $this->shutdown($signal);
43
            }, false);
44
        }
45
46
        return $this;
47
    }
48
49
    /**
50
     * Garante o desligamento correto dos workers
51
     * via sinal no sistema operacional
52
     * eliminando loops e conexões
53
     * @param $signal
54
     */
55
    public function shutdown($signal)
56
    {
57
        $data = date('Y-m-d H:i:s');
58
        switch ($signal) {
59
            case SIGTERM:
60
                print "Caught SIGTERM {$data}" . PHP_EOL;
61
                exit;
0 ignored issues
show
Best Practice introduced by
Using exit here is not recommended.

In general, usage of exit should be done with care and only when running in a scripting context like a CLI script.

Loading history...
62
            case SIGKILL:
63
                print "Caught SIGKILL {$data}" . PHP_EOL;
64
                exit;
0 ignored issues
show
Best Practice introduced by
Using exit here is not recommended.

In general, usage of exit should be done with care and only when running in a scripting context like a CLI script.

Loading history...
65
            case SIGINT:
66
                print "Caught SIGINT {$data}" . PHP_EOL;
67
                exit;
0 ignored issues
show
Best Practice introduced by
Using exit here is not recommended.

In general, usage of exit should be done with care and only when running in a scripting context like a CLI script.

Loading history...
68
        }
69
    }
70
71
    /**
72
     * Configura o rabbitmq e gera conexão ativa
73
     * @param $host
74
     * @param $port
75
     * @param $user
76
     * @param $pass
77
     * @param $vhost
78
     * @return $this
79
     */
80
    public function configRabbit($host, $port, $user, $pass, $vhost): Base
81
    {
82
        Connect::config($host, $port, $user, $pass, $vhost);
83
84
        $this->getConnection();
85
86
        return $this;
87
    }
88
89
    /**
90
     * Cria uma exchange
91
     *
92
     * @param string $exchange
93
     * @param string $type
94
     * @param bool $passive
95
     * @param bool $durable
96
     * @param bool $auto_delete
97
     * @return $this
98
     */
99
    public function exchange(
100
        string $exchange,
101
        string $type = "direct",
102
        bool   $passive = false,
103
        bool   $durable = true,
104
        bool   $auto_delete = false
105
    )
106
    {
107
        $this->channel->exchange_declare(
108
            $exchange,
109
            $type,
110
            $passive,
111
            $durable,
112
            $auto_delete,
113
        );
114
        return $this;
115
    }
116
117
    /**
118
     * Cria uma fila
119
     *
120
     * @param string $queue
121
     * @param false $passive
122
     * @param false $durable
123
     * @param false $exclusive
124
     * @param bool $auto_delete
125
     * @return array|null
126
     */
127
    public function queue(
128
        string $queue = '',
129
        bool   $passive = false,
130
        bool   $durable = true,
131
        bool   $exclusive = false,
132
        bool   $auto_delete = false
133
    )
134
    {
135
        return $this->channel->queue_declare(
136
            $queue,
137
            $passive,
138
            $durable,
139
            $exclusive,
140
            $auto_delete,
141
            false
142
        );
143
    }
144
145
    /**
146
     * Vincula a fila e a exchange criada
147
     * @return $this
148
     */
149
    public function bind(
150
        $queue,
151
        $exchange
152
    )
153
    {
154
        $this->channel->queue_bind($queue, $exchange);
155
        return $this;
156
    }
157
158
    public function randomConsumer($len = 30): string
159
    {
160
        $pool = 'abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ';
161
        return substr(str_shuffle(str_repeat($pool, (int)ceil($len / strlen($pool)))), 0, $len);
162
    }
163
164
    /**
165
     * Mantém a conexão, mesmo em caso de erro
166
     * de rede ou conexão
167
     * @param $callback
168
     * @throws Exception
169
     */
170
    public function loopConnection($callback)
171
    {
172
        while (true) {
173
            try {
174
                $this->getConnection(true);
175
                echo ' [x] [  connected  ] ', "\n";
176
                $callback();
177
            } catch (\Exception $e) {
178
                echo get_class($e) . ':' . $e->getMessage() . " | file:" . $e->getFile() . " | line:" . $e->getLine() . PHP_EOL;
179
                $this->cleanConnection();
180
                sleep(2);
181
            }
182
        }
183
    }
184
185
    /**
186
     * Gera uma conexão no rabbitmq e gera um canal(opcional)
187
     * @param bool $createChannel
188
     * @return AMQPStreamConnection|void
189
     */
190
    public function getConnection(bool $createChannel = true)
191
    {
192
        $this->instance = Connect::getInstance();
193
194
        if ($createChannel) {
195
            $this->getChannel();
196
        }
197
198
        return $this->instance;
199
    }
200
201
    /**
202
     * Retorna canal ativo ou cria um novo
203
     * @return AMQPChannel|void
0 ignored issues
show
Bug introduced by
The type WillRy\RabbitRun\AMQPChannel was not found. Did you mean AMQPChannel? If so, make sure to prefix the type with \.
Loading history...
204
     */
205
    public function getChannel()
206
    {
207
        $this->channel = Connect::getChannel();
208
209
        if ($this->confirmSelect) {
210
            $this->channel->confirm_select();
211
        }
212
213
        return $this->channel;
214
    }
215
216
    /**
217
     * Fecha a conexão com o RabbitMQ
218
     * @throws Exception
219
     */
220
    public function cleanConnection()
221
    {
222
        try {
223
            Connect::closeChannel();
224
            Connect::closeInstance();
225
        } catch (\Exception $e) {
226
            echo '[ERROR CLOSE CHANNEL|INSTANCE]' . $e->getMessage() . "|file:" . $e->getFile() . "|line:" . $e->getLine() . PHP_EOL;
227
        }
228
    }
229
230
    public function configConfirmSelect(bool $confirmSelect = true)
231
    {
232
        $this->confirmSelect = $confirmSelect;
233
        return $this;
234
    }
235
}