Passed
Push — main ( 1f8a6a...d3094e )
by William
03:09
created

Queue   A

Complexity

Total Complexity 27

Size/Duplication

Total Lines 242
Duplicated Lines 0 %

Importance

Changes 13
Bugs 0 Features 0
Metric Value
eloc 97
c 13
b 0
f 0
dl 0
loc 242
rs 10
wmc 27

14 Methods

Rating   Name   Duplication   Size   Complexity  
A __construct() 0 3 1
A publish() 0 29 2
A shutdown() 0 3 1
A createQueue() 0 15 1
A onExecuting() 0 3 1
A executeReceiveCallback() 0 17 4
A onCheckStatus() 0 3 1
A onReceive() 0 3 1
A onError() 0 3 1
A executeStatusCallback() 0 17 4
A executeErrorCallback() 0 9 2
A executeMessage() 0 4 1
A consume() 0 56 5
A validateExecuteCallback() 0 7 2
1
<?php
2
3
declare(ticks=1);
4
5
namespace WillRy\RabbitRun\Queue;
6
7
8
use Exception;
9
use PhpAmqpLib\Message\AMQPMessage;
10
use WillRy\RabbitRun\Base;
11
12
class Queue extends Base
13
{
14
    /** @var string nome da fila */
15
    protected $queueName;
16
17
    /** @var string nome da exchange */
18
    protected $exchangeName;
19
20
21
    /** @var string nome do consumer */
22
    public $consumerName;
23
24
    protected \Closure $onReceiveCallback;
25
26
    protected \Closure $onExecutingCallback;
27
28
    protected \Closure $onErrorCallback;
29
30
    protected \Closure $onCheckStatusCallback;
31
32
    public function __construct()
33
    {
34
        parent::__construct();
35
    }
36
37
    public function shutdown($signal)
38
    {
39
        parent::shutdown($signal);
40
    }
41
42
    /**
43
     * Inicializa a fila e exchange, vinculano os 2
44
     * @param string $name
45
     * @return $this
46
     */
47
    public function createQueue(string $name)
48
    {
49
        $this->getConnection();
50
51
        $this->queueName = "{$name}";
52
53
        $this->exchangeName = "{$name}_exchange";
54
55
        $this->exchange($this->exchangeName);
56
57
        $this->queue($name);
58
59
        $this->bind($name, $this->exchangeName);
60
61
        return $this;
62
    }
63
64
65
    /**
66
     * Publica mensagem
67
     *
68
     * @param array $job
69
     * @return array
70
     * @throws Exception
71
     */
72
    public function publish(
73
        string $queueName,
74
        array  $job
75
    ) {
76
        try {
77
            $this->createQueue($queueName);
78
79
            $this->getConnection();
80
81
            $payload = [
82
                "payload" => $job,
83
                'queue' => $this->queueName,
84
            ];
85
86
            $json = json_encode($payload);
87
88
            $message = new AMQPMessage(
89
                $json,
90
                array('content_type' => 'text/plain', 'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT)
91
            );
92
93
            $this->channel->basic_publish(
94
                $message,
95
                $this->exchangeName
96
            );
97
98
            return $payload;
99
        } catch (Exception $e) {
100
            throw $e;
101
        }
102
    }
103
104
105
    /**
106
     * Loop de consumo de mensagem
107
     *
108
     * @param int $sleepSeconds
109
     * @throws Exception
110
     */
111
    public function consume(
112
        string $queueName,
113
        int    $sleepSeconds = 3
114
    ) {
115
        $this->validateExecuteCallback();
116
        
117
118
        $this->loopConnection(function () use ($sleepSeconds, $queueName) {
119
120
            $this->createQueue($queueName);
121
122
            $this->channel->basic_qos(null, 1, null);
123
124
            $this->channel->basic_consume(
125
                $this->queueName,
126
                $this->randomConsumer(),
127
                false,
128
                false,
129
                false,
130
                false,
131
                function (AMQPMessage $message) {
132
                    pcntl_sigprocmask(SIG_BLOCK, [SIGTERM, SIGINT]);
133
134
                    $incomeData = json_decode($message->getBody(), true);
135
136
                    //se o status for negativo, não executa o consumo
137
                    $statusBoolean = $this->executeStatusCallback($message);
138
139
                    if (!$statusBoolean) {
140
                        return false;
141
                    }
142
143
                    $receiveBoolean = $this->executeReceiveCallback($message, $incomeData);
144
145
                    if (!$receiveBoolean) {
146
                        return false;
147
                    }
148
149
                    try {
150
                        $this->executeMessage($message, $incomeData);
151
                    } catch (Exception $e) {
152
                        $message->nack(true);
153
                        $this->executeErrorCallback($e, $incomeData);
154
                    }
155
156
                    pcntl_sigprocmask(SIG_UNBLOCK, [SIGTERM, SIGINT]);
157
                }
158
            );
159
160
            // Loop as long as the channel has callbacks registered
161
            while ($this->channel->is_open()) {
162
                $this->channel->wait(null, false);
163
                sleep($sleepSeconds);
164
165
                // Despachar eventos de "finalizar" script
166
                pcntl_signal_dispatch();
167
            }
168
        });
169
    }
170
171
    public function onCheckStatus(\Closure $callback)
172
    {
173
        $this->onCheckStatusCallback = $callback;
174
    }
175
176
    public function onReceive(\Closure $callback)
177
    {
178
        $this->onReceiveCallback = $callback;
179
    }
180
181
    public function onExecuting(\Closure $callback)
182
    {
183
        $this->onExecutingCallback = $callback;
184
    }
185
186
    public function onError(\Closure $callback)
187
    {
188
        $this->onErrorCallback = $callback;
189
    }
190
191
    public function executeStatusCallback(AMQPMessage $message)
192
    {
193
        if (empty($this->onCheckStatusCallback)) {
194
            return true;
195
        }
196
197
        $checkStatusCallback = $this->onCheckStatusCallback;
198
        $statusBoolean = $checkStatusCallback();
199
200
        if (!$statusBoolean && isset($statusBoolean)) {
201
            print_r("[WORKER STOPPED]" . PHP_EOL);
202
            $message->nack(true);
203
            pcntl_sigprocmask(SIG_UNBLOCK, [SIGTERM, SIGINT]);
204
            return false;
205
        }
206
207
        return true;
208
    }
209
210
    public function executeReceiveCallback(AMQPMessage $message, $incomeData)
211
    {
212
        if (empty($this->onReceiveCallback)) {
213
            return true;
214
        }
215
216
        $receiveCallback = $this->onReceiveCallback;
217
        $statusBoolean = $receiveCallback($incomeData);
218
        if (!$statusBoolean && isset($statusBoolean)) {
219
            print_r("[TASK IGNORED BY ON RECEIVE RETURN]" . PHP_EOL);
220
            $message->nack();
221
            pcntl_sigprocmask(SIG_UNBLOCK, [SIGTERM, SIGINT]);
222
            return false;
223
        }
224
225
226
        return true;
227
    }
228
229
    public function executeMessage(AMQPMessage $message, $incomeData)
230
    {
231
        $onExecutingCallback = $this->onExecutingCallback;
232
        $onExecutingCallback($message, $incomeData);
233
    }
234
    
235
236
    public function executeErrorCallback(\Exception $e, $incomeData)
237
    {
238
        if (empty($this->onErrorCallback)) {
239
            return false;
240
        }
241
242
        $errorCallback = $this->onErrorCallback;
243
        $errorCallback($e, $incomeData);
244
        return true;
245
    }
246
247
    public function validateExecuteCallback()
248
    {
249
        if (!empty($this->onExecutingCallback)) {
250
            return true;
251
        }
252
253
        throw new \Exception("Define a onExecuting callback");
254
    }
255
}
256