Passed
Push — main ( 1631ee...add2d8 )
by William
03:03
created

PubSub::executeReceiveCallback()   A

Complexity

Conditions 4
Paths 3

Size

Total Lines 15
Code Lines 8

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 4
eloc 8
c 0
b 0
f 0
nc 3
nop 1
dl 0
loc 15
rs 10
1
<?php
2
3
namespace WillRy\RabbitRun\PubSub;
4
5
6
use Exception;
7
use PhpAmqpLib\Message\AMQPMessage;
8
9
class PubSub extends \WillRy\RabbitRun\Base
10
{
11
    /** @var string nome da fila */
12
    protected string $queueName;
13
14
    /** @var string nome da exchange */
15
    protected string $exchangeBaseName;
16
17
    protected string $exchangeName;
18
19
    public \Closure $onReceiveCallback;
20
21
    public \Closure $onExecutingCallback;
22
23
    public \Closure $onErrorCallback;
24
25
    public \Closure $onCheckStatusCallback;
26
27
    public function __construct()
28
    {
29
        parent::__construct();
30
31
        $this->queueName = $this->randomConsumer(12);
32
    }
33
34
    /**
35
     * Configura o pubsub criando
36
     * a exchenge e fila
37
     * @param string $name
38
     * @return $this
39
     */
40
    public function createPubSubPublisher(string $name): PubSub
41
    {
42
        $this->getConnection();
43
44
        $this->exchangeName = "{$name}_exchange";
45
46
        $this->exchangeBaseName = "{$name}";
47
48
        $this->exchange($this->exchangeName, 'fanout', false, false, false);
49
50
        return $this;
51
    }
52
53
    /**
54
     * Configura o pubsub criando
55
     * a exchenge e fila
56
     * @param string $name
57
     * @return $this
58
     */
59
    public function createPubSubConsumer(string $name): PubSub
60
    {
61
        $this->getConnection();
62
63
        $this->exchangeName = "{$name}_exchange";
64
65
        $this->exchangeBaseName = "{$name}";
66
67
        $this->exchange($this->exchangeName, 'fanout', false, false, false);
68
69
        $defaultQueueName = !empty($this->queueName) ? $this->queueName : '';
70
        list($queueName,,) = $this->queue($defaultQueueName, false, false, true, true);
71
72
        $this->queueName = $queueName;
73
74
75
        $this->bind($this->queueName, $this->exchangeName);
76
77
        return $this;
78
    }
79
80
    /**
81
     * Faz publicacao no pubsub
82
     * @param array $payload
83
     * @return array
84
     */
85
    public function publish(string $queueName, array $payload = [])
86
    {
87
        $this->createPubSubPublisher($queueName);
88
89
        $json = json_encode($payload);
90
91
        $message = new AMQPMessage($json);
92
93
        $this->channel->basic_publish(
94
            $message,
95
            $this->exchangeName
96
        );
97
98
        return $payload;
99
    }
100
101
    /**
102
     * Loop de consumo de mensagem
103
     *
104
     * @param int $sleepSeconds
105
     * @throws Exception
106
     */
107
    public function consume(
108
        string $queueName,
109
        int    $sleepSeconds = 3
110
    ) {
111
112
        $this->loopConnection(function () use ($sleepSeconds, $queueName) {
113
114
            /** como no pubsub ao perder a conexão, a fila exclusiva é excluida, é necessário configurar
115
             * fila e etc novamente
116
             */
117
            $this->createPubSubConsumer($queueName);
118
119
            $this->channel->basic_qos(null, 1, null);
120
121
            $this->channel->basic_consume(
122
                $this->queueName,
123
                '',
124
                false,
125
                true,
126
                false,
127
                false,
128
                function (AMQPMessage $message) {
129
                    $statusBoolean = $this->executeStatusCallback($message);
0 ignored issues
show
Unused Code introduced by
The call to WillRy\RabbitRun\PubSub\...executeStatusCallback() has too many arguments starting with $message. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-call  annotation

129
                    /** @scrutinizer ignore-call */ 
130
                    $statusBoolean = $this->executeStatusCallback($message);

This check compares calls to functions or methods with their respective definitions. If the call has more arguments than are defined, it raises an issue.

If a function is defined several times with a different number of parameters, the check may pick up the wrong definition and report false positives. One codebase where this has been known to happen is Wordpress. Please note the @ignore annotation hint above.

Loading history...
130
131
                    if (!$statusBoolean) {
132
                        return false;
133
                    }
134
135
                    $incomeData = json_decode($message->getBody(), true);
136
137
                    $statusBoolean = $this->executeReceiveCallback($message, $incomeData);
0 ignored issues
show
Unused Code introduced by
The call to WillRy\RabbitRun\PubSub\...xecuteReceiveCallback() has too many arguments starting with $incomeData. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-call  annotation

137
                    /** @scrutinizer ignore-call */ 
138
                    $statusBoolean = $this->executeReceiveCallback($message, $incomeData);

This check compares calls to functions or methods with their respective definitions. If the call has more arguments than are defined, it raises an issue.

If a function is defined several times with a different number of parameters, the check may pick up the wrong definition and report false positives. One codebase where this has been known to happen is Wordpress. Please note the @ignore annotation hint above.

Loading history...
138
139
                    if (!$statusBoolean) {
140
                        return false;
141
                    }
142
143
144
                    try {
145
                        $executingCallback = $this->onExecutingCallback;
146
                        $executingCallback($message, $incomeData);
147
                    } catch (Exception $e) {
148
                        print_r("[ERROR]" . PHP_EOL);
149
                        $this->executeErrorCallback($e, $incomeData);
150
                    }
151
                }
152
            );
153
154
            // Loop as long as the channel has callbacks registered
155
            while ($this->channel->is_open()) {
156
                $this->channel->wait(null, false);
157
                sleep($sleepSeconds);
158
            }
159
        });
160
    }
161
162
    public function onCheckStatus(\Closure $callback)
163
    {
164
        $this->onCheckStatusCallback = $callback;
165
    }
166
167
    public function onReceive(\Closure $callback)
168
    {
169
        $this->onReceiveCallback = $callback;
170
    }
171
172
    public function onExecuting(\Closure $callback)
173
    {
174
        $this->onExecutingCallback = $callback;
175
    }
176
177
    public function onError(\Closure $callback)
178
    {
179
        $this->onErrorCallback = $callback;
180
    }
181
182
    public function executeStatusCallback()
183
    {
184
        if (empty($this->onCheckStatusCallback)) {
185
            return true;
186
        }
187
188
        $checkStatusCallback = $this->onCheckStatusCallback;
189
        $statusBoolean = $checkStatusCallback();
190
191
        if (!$statusBoolean && isset($statusBoolean)) {
192
            print_r("[WORKER STOPPED]" . PHP_EOL);
193
            return false;
194
        }
195
196
        return true;
197
    }
198
199
    public function executeReceiveCallback($incomeData)
200
    {
201
        if (empty($this->onReceiveCallback)) {
202
            return true;
203
        }
204
205
        $receiveCallback = $this->onReceiveCallback;
206
        $statusBoolean = $receiveCallback($incomeData);
207
        if (!$statusBoolean && isset($statusBoolean)) {
208
            print_r("[TASK IGNORED BY ON RECEIVE RETURN]" . PHP_EOL);
209
            return false;
210
        }
211
212
213
        return true;
214
    }
215
216
    public function executeErrorCallback(\Exception $e, $incomeData)
217
    {
218
        if (empty($this->onErrorCallback)) {
219
            return false;
220
        }
221
222
        $errorCallback = $this->onErrorCallback;
223
        $errorCallback($e, $incomeData);
224
        return true;
225
    }
226
}
227