Passed
Push — main ( 1631ee...add2d8 )
by William
03:03
created

Queue::onError()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 3
Code Lines 1

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 1
eloc 1
nc 1
nop 1
dl 0
loc 3
rs 10
c 0
b 0
f 0
1
<?php
2
3
declare(ticks=1);
4
5
namespace WillRy\RabbitRun\Queue;
6
7
8
use Exception;
9
use PhpAmqpLib\Message\AMQPMessage;
10
use WillRy\RabbitRun\Base;
11
12
class Queue extends Base
13
{
14
    /** @var string nome da fila */
15
    protected $queueName;
16
17
    /** @var string nome da exchange */
18
    protected $exchangeName;
19
20
21
    /** @var string nome do consumer */
22
    public $consumerName;
23
24
    protected \Closure $onReceiveCallback;
25
26
    protected \Closure $onExecutingCallback;
27
28
    protected \Closure $onErrorCallback;
29
30
    protected \Closure $onCheckStatusCallback;
31
32
    public function __construct()
33
    {
34
        parent::__construct();
35
    }
36
37
    public function shutdown($signal)
38
    {
39
        parent::shutdown($signal);
40
    }
41
42
    /**
43
     * Inicializa a fila e exchange, vinculano os 2
44
     * @param string $name
45
     * @return $this
46
     */
47
    public function createQueue(string $name)
48
    {
49
        $this->getConnection();
50
51
        $this->queueName = "{$name}";
52
53
        $this->exchangeName = "{$name}_exchange";
54
55
        $this->exchange($this->exchangeName);
56
57
        $this->queue($name);
58
59
        $this->bind($name, $this->exchangeName);
60
61
        return $this;
62
    }
63
64
65
    /**
66
     * Publica mensagem
67
     *
68
     * @param array $job
69
     * @return array
70
     * @throws Exception
71
     */
72
    public function publish(
73
        string $queueName,
74
        array  $job
75
    ) {
76
        try {
77
            $this->createQueue($queueName);
78
79
            $this->getConnection();
80
81
            $payload = [
82
                "payload" => $job,
83
                'queue' => $this->queueName,
84
            ];
85
86
            $json = json_encode($payload);
87
88
            $message = new AMQPMessage(
89
                $json,
90
                array('content_type' => 'text/plain', 'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT)
91
            );
92
93
            $this->channel->basic_publish(
94
                $message,
95
                $this->exchangeName
96
            );
97
98
            return $payload;
99
        } catch (Exception $e) {
100
            throw $e;
101
        }
102
    }
103
104
105
    /**
106
     * Loop de consumo de mensagem
107
     *
108
     * @param int $sleepSeconds
109
     * @throws Exception
110
     */
111
    public function consume(
112
        string $queueName,
113
        int    $sleepSeconds = 3
114
    ) {
115
116
        if (empty($this->onExecutingCallback)) {
117
            throw new \Exception("Define a onExecuting callback");
118
        }
119
120
        $this->loopConnection(function () use ($sleepSeconds, $queueName) {
121
122
            $this->createQueue($queueName);
123
124
            $this->channel->basic_qos(null, 1, null);
125
126
            $this->channel->basic_consume(
127
                $this->queueName,
128
                $this->randomConsumer(),
129
                false,
130
                false,
131
                false,
132
                false,
133
                function (AMQPMessage $message) {
134
                    pcntl_sigprocmask(SIG_BLOCK, [SIGTERM, SIGINT]);
135
136
137
                    //se o status for negativo, não executa o consumo
138
                    $statusBoolean = $this->executeStatusCallback($message);
139
140
                    if (!$statusBoolean) {
141
                        return false;
142
                    }
143
144
                    $incomeData = json_decode($message->getBody(), true);
145
146
                    
147
                    $statusBoolean = $this->executeReceiveCallback($message, $incomeData);
148
149
                    if (!$statusBoolean) {
150
                        return false;
151
                    }
152
153
                    try {
154
                        $executingCallback = $this->onExecutingCallback;
155
                        $executingCallback($message, $incomeData);
156
                    } catch (Exception $e) {
157
                        $message->nack(true);
158
                        $this->executeErrorCallback($e, $incomeData);
159
                    }
160
161
                    pcntl_sigprocmask(SIG_UNBLOCK, [SIGTERM, SIGINT]);
162
                }
163
            );
164
165
            // Loop as long as the channel has callbacks registered
166
            while ($this->channel->is_open()) {
167
                $this->channel->wait(null, false);
168
                sleep($sleepSeconds);
169
170
                // Despachar eventos de "finalizar" script
171
                pcntl_signal_dispatch();
172
            }
173
        });
174
    }
175
176
    public function onCheckStatus(\Closure $callback)
177
    {
178
        $this->onCheckStatusCallback = $callback;
179
    }
180
181
    public function onReceive(\Closure $callback)
182
    {
183
        $this->onReceiveCallback = $callback;
184
    }
185
186
    public function onExecuting(\Closure $callback)
187
    {
188
        $this->onExecutingCallback = $callback;
189
    }
190
191
    public function onError(\Closure $callback)
192
    {
193
        $this->onErrorCallback = $callback;
194
    }
195
196
    public function executeStatusCallback(AMQPMessage $message)
197
    {
198
        if (empty($this->onCheckStatusCallback)) {
199
            return true;
200
        }
201
202
        $checkStatusCallback = $this->onCheckStatusCallback;
203
        $statusBoolean = $checkStatusCallback();
204
205
        if (!$statusBoolean && isset($statusBoolean)) {
206
            print_r("[WORKER STOPPED]" . PHP_EOL);
207
            $message->nack(true);
208
            pcntl_sigprocmask(SIG_UNBLOCK, [SIGTERM, SIGINT]);
209
            return false;
210
        }
211
212
        return true;
213
    }
214
215
    public function executeReceiveCallback(AMQPMessage $message, $incomeData)
216
    {
217
        if (empty($this->onReceiveCallback)) {
218
            return true;
219
        }
220
221
        $receiveCallback = $this->onReceiveCallback;
222
        $statusBoolean = $receiveCallback($incomeData);
223
        if (!$statusBoolean && isset($statusBoolean)) {
224
            print_r("[TASK IGNORED BY ON RECEIVE RETURN]" . PHP_EOL);
225
            $message->nack();
226
            pcntl_sigprocmask(SIG_UNBLOCK, [SIGTERM, SIGINT]);
227
            return false;
228
        }
229
230
231
        return true;
232
    }
233
234
    public function executeErrorCallback(\Exception $e, $incomeData)
235
    {
236
        if (empty($this->onErrorCallback)) {
237
            return false;
238
        }
239
240
        $errorCallback = $this->onErrorCallback;
241
        $errorCallback($e, $incomeData);
242
        return true;
243
    }
244
}
245