Queue::onReceive()   A
last analyzed

Complexity

Conditions 1
Paths 1

Size

Total Lines 3
Code Lines 1

Duplication

Lines 0
Ratio 0 %

Importance

Changes 1
Bugs 0 Features 0
Metric Value
cc 1
eloc 1
c 1
b 0
f 0
nc 1
nop 1
dl 0
loc 3
rs 10
1
<?php
2
3
declare(ticks=1);
4
5
namespace WillRy\RabbitRun\Queue;
6
7
8
use Exception;
9
use PhpAmqpLib\Message\AMQPMessage;
10
use WillRy\RabbitRun\Base;
11
12
class Queue extends Base
13
{
14
    /** @var string nome do consumer */
15
    public $consumerName;
16
    
17
    /** @var string nome da fila */
18
    protected $queueName;
19
    
20
    /** @var string nome da exchange */
21
    protected $exchangeName;
22
23
    protected \Closure $onReceiveCallback;
24
25
    protected \Closure $onExecutingCallback;
26
27
    protected \Closure $onErrorCallback;
28
29
    protected \Closure $onCheckStatusCallback;
30
31
    public function __construct($host, $port, $user, $pass, $vhost)
32
    {
33
        parent::__construct();
34
35
        $this->configRabbit($host, $port, $user, $pass, $vhost);
36
    }
37
38
    public function shutdown($signal)
39
    {
40
        parent::shutdown($signal);
41
    }
42
43
    /**
44
     * Publica mensagem
45
     *
46
     * @param array $job
47
     * @return array
48
     * @throws Exception
49
     */
50
    public function publish(
51
        string $queueName,
52
        array  $job
53
    ) {
54
        try {
55
            $this->createQueue($queueName);
56
57
58
            $payload = [
59
                "payload" => $job,
60
                'queue' => $this->queueName,
61
            ];
62
63
            $json = json_encode($payload);
64
65
            $message = new AMQPMessage(
66
                $json,
67
                array('content_type' => 'text/plain', 'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT)
68
            );
69
70
            $this->channel->basic_publish(
71
                $message,
72
                $this->exchangeName
73
            );
74
75
            if ($this->confirmSelect) {
76
                $this->channel->wait_for_pending_acks(5000);
77
            }
78
79
            return $payload;
80
        } catch (Exception $e) {
81
            throw $e;
82
        }
83
    }
84
85
    /**
86
     * Inicializa a fila e exchange, vinculano os 2
87
     * @param string $name
88
     * @return $this
89
     */
90
    public function createQueue(string $name)
91
    {
92
93
        $this->queueName = "{$name}";
94
95
        $this->exchangeName = "{$name}_exchange";
96
97
        $this->exchange($this->exchangeName);
98
99
        $this->queue($name);
100
101
        $this->bind($name, $this->exchangeName);
102
103
        return $this;
104
    }
105
106
    /**
107
     * Loop de consumo de mensagem
108
     *
109
     * @param int $sleepSeconds
110
     * @throws Exception
111
     */
112
    public function consume(
113
        string $queueName,
114
        int    $sleepSeconds = 3
115
    ) {
116
        $this->validateExecuteCallback();
117
118
119
        $this->loopConnection(function () use ($sleepSeconds, $queueName) {
120
121
            $this->createQueue($queueName);
122
123
            $this->channel->basic_qos(null, 1, null);
124
125
            $this->channel->basic_consume(
126
                $this->queueName,
127
                $this->randomConsumer(),
128
                false,
129
                false,
130
                false,
131
                false,
132
                function (AMQPMessage $message) {
133
                    pcntl_sigprocmask(SIG_BLOCK, [SIGTERM, SIGINT]);
134
135
                    $incomeData = json_decode($message->getBody(), true);
136
137
                    //se o status for negativo, não executa o consumo
138
                    $statusBoolean = $this->executeStatusCallback($message);
139
140
                    if (!$statusBoolean) {
141
                        return false;
142
                    }
143
144
                    $receiveBoolean = $this->executeReceiveCallback($message, $incomeData);
145
146
                    if (!$receiveBoolean) {
147
                        return false;
148
                    }
149
150
                    try {
151
                        $this->executeMessage($message, $incomeData);
152
                    } catch (Exception $e) {
153
                        $message->nack(true);
154
                        $this->executeErrorCallback($e, $incomeData);
155
                    }
156
157
                    pcntl_sigprocmask(SIG_UNBLOCK, [SIGTERM, SIGINT]);
158
                }
159
            );
160
161
            // Loop as long as the channel has callbacks registered
162
            while ($this->channel->is_open()) {
163
                $this->channel->wait(null, false);
164
                sleep($sleepSeconds);
165
166
                // Despachar eventos de "finalizar" script
167
                pcntl_signal_dispatch();
168
            }
169
        });
170
    }
171
172
    public function validateExecuteCallback()
173
    {
174
        if (!empty($this->onExecutingCallback)) {
175
            return true;
176
        }
177
178
        throw new Exception("Define a onExecuting callback");
179
    }
180
181
    public function executeStatusCallback(AMQPMessage $message)
182
    {
183
        if (empty($this->onCheckStatusCallback)) {
184
            return true;
185
        }
186
187
        $checkStatusCallback = $this->onCheckStatusCallback;
188
        $statusBoolean = $checkStatusCallback();
189
190
        if (!$statusBoolean && isset($statusBoolean)) {
191
            print_r("[WORKER STOPPED]" . PHP_EOL);
192
            $message->nack(true);
193
            pcntl_sigprocmask(SIG_UNBLOCK, [SIGTERM, SIGINT]);
194
            return false;
195
        }
196
197
        return true;
198
    }
199
200
    public function executeReceiveCallback(AMQPMessage $message, $incomeData)
201
    {
202
        if (empty($this->onReceiveCallback)) {
203
            return true;
204
        }
205
206
        $receiveCallback = $this->onReceiveCallback;
207
        $statusBoolean = $receiveCallback($incomeData);
208
        if (!$statusBoolean && isset($statusBoolean)) {
209
            print_r("[TASK IGNORED BY ON RECEIVE RETURN]" . PHP_EOL);
210
            $message->nack();
211
            pcntl_sigprocmask(SIG_UNBLOCK, [SIGTERM, SIGINT]);
212
            return false;
213
        }
214
215
216
        return true;
217
    }
218
219
    public function executeMessage(AMQPMessage $message, $incomeData)
220
    {
221
        $onExecutingCallback = $this->onExecutingCallback;
222
        $onExecutingCallback($message, $incomeData);
223
    }
224
225
    public function executeErrorCallback(Exception $e, $incomeData)
226
    {
227
        if (empty($this->onErrorCallback)) {
228
            return false;
229
        }
230
231
        $errorCallback = $this->onErrorCallback;
232
        $errorCallback($e, $incomeData);
233
        return true;
234
    }
235
236
    public function onCheckStatus(\Closure $callback)
237
    {
238
        $this->onCheckStatusCallback = $callback;
239
    }
240
241
    public function onReceive(\Closure $callback)
242
    {
243
        $this->onReceiveCallback = $callback;
244
    }
245
246
    public function onExecuting(\Closure $callback)
247
    {
248
        $this->onExecutingCallback = $callback;
249
    }
250
251
    public function onError(\Closure $callback)
252
    {
253
        $this->onErrorCallback = $callback;
254
    }
255
}
256