Issues (13)

src/PubSub/PubSub.php (2 issues)

Severity
1
<?php
2
3
namespace WillRy\RabbitRun\PubSub;
4
5
6
use Exception;
7
use PhpAmqpLib\Message\AMQPMessage;
8
use WillRy\RabbitRun\Base;
9
10
class PubSub extends Base
11
{
12
    public \Closure $onReceiveCallback;
13
    
14
    public \Closure $onExecutingCallback;
15
16
    public \Closure $onErrorCallback;
17
18
    public \Closure $onCheckStatusCallback;
19
20
    /** @var string nome da fila */
21
    protected string $queueName;
22
    /** @var string nome da exchange */
23
    protected string $exchangeBaseName;
24
    protected string $exchangeName;
25
26
    public function __construct($host, $port, $user, $pass, $vhost)
27
    {
28
        parent::__construct();
29
30
        $this->configRabbit($host, $port, $user, $pass, $vhost);
31
32
        $this->queueName = $this->randomConsumer(12);
33
    }
34
35
    /**
36
     * Faz publicacao no pubsub
37
     * @param array $payload
38
     * @return array
39
     */
40
    public function publish(string $queueName, array $payload = [])
41
    {
42
        $this->createPubSubPublisher($queueName);
43
44
        $json = json_encode($payload);
45
46
        $message = new AMQPMessage($json);
47
48
        $this->channel->basic_publish(
49
            $message,
50
            $this->exchangeName
51
        );
52
53
        return $payload;
54
    }
55
56
    /**
57
     * Configura o pubsub criando
58
     * a exchenge e fila
59
     * @param string $name
60
     * @return $this
61
     */
62
    public function createPubSubPublisher(string $name): PubSub
63
    {
64
        $this->exchangeName = "{$name}_exchange";
65
66
        $this->exchangeBaseName = "{$name}";
67
68
        $this->exchange($this->exchangeName, 'fanout', false, false, false);
69
70
        return $this;
71
    }
72
73
    /**
74
     * Loop de consumo de mensagem
75
     *
76
     * @param int $sleepSeconds
77
     * @throws Exception
78
     */
79
    public function consume(
80
        string $queueName,
81
        int    $sleepSeconds = 3
82
    )
83
    {
84
85
        $this->loopConnection(function () use ($sleepSeconds, $queueName) {
86
87
            /** como no pubsub ao perder a conexão, a fila exclusiva é excluida, é necessário configurar
88
             * fila e etc novamente
89
             */
90
            $this->createPubSubConsumer($queueName);
91
92
            $this->channel->basic_qos(null, 1, null);
93
94
            $this->channel->basic_consume(
95
                $this->queueName,
96
                '',
97
                false,
98
                true,
99
                false,
100
                false,
101
                function (AMQPMessage $message) {
102
                    $statusBoolean = $this->executeStatusCallback($message);
0 ignored issues
show
The call to WillRy\RabbitRun\PubSub\...executeStatusCallback() has too many arguments starting with $message. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-call  annotation

102
                    /** @scrutinizer ignore-call */ 
103
                    $statusBoolean = $this->executeStatusCallback($message);

This check compares calls to functions or methods with their respective definitions. If the call has more arguments than are defined, it raises an issue.

If a function is defined several times with a different number of parameters, the check may pick up the wrong definition and report false positives. One codebase where this has been known to happen is Wordpress. Please note the @ignore annotation hint above.

Loading history...
103
104
                    if (!$statusBoolean) {
105
                        return false;
106
                    }
107
108
                    $incomeData = json_decode($message->getBody(), true);
109
110
                    $statusBoolean = $this->executeReceiveCallback($message, $incomeData);
0 ignored issues
show
The call to WillRy\RabbitRun\PubSub\...xecuteReceiveCallback() has too many arguments starting with $incomeData. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-call  annotation

110
                    /** @scrutinizer ignore-call */ 
111
                    $statusBoolean = $this->executeReceiveCallback($message, $incomeData);

This check compares calls to functions or methods with their respective definitions. If the call has more arguments than are defined, it raises an issue.

If a function is defined several times with a different number of parameters, the check may pick up the wrong definition and report false positives. One codebase where this has been known to happen is Wordpress. Please note the @ignore annotation hint above.

Loading history...
111
112
                    if (!$statusBoolean) {
113
                        return false;
114
                    }
115
116
117
                    try {
118
                        $executingCallback = $this->onExecutingCallback;
119
                        $executingCallback($message, $incomeData);
120
                    } catch (Exception $e) {
121
                        print_r("[ERROR]" . PHP_EOL);
122
                        $this->executeErrorCallback($e, $incomeData);
123
                    }
124
                }
125
            );
126
127
            // Loop as long as the channel has callbacks registered
128
            while ($this->channel->is_open()) {
129
                $this->channel->wait(null, false);
130
                sleep($sleepSeconds);
131
            }
132
        });
133
    }
134
135
    /**
136
     * Configura o pubsub criando
137
     * a exchenge e fila
138
     * @param string $name
139
     * @return $this
140
     */
141
    public function createPubSubConsumer(string $name): PubSub
142
    {
143
        $this->getConnection();
144
145
        $this->exchangeName = "{$name}_exchange";
146
147
        $this->exchangeBaseName = "{$name}";
148
149
        $this->exchange($this->exchangeName, 'fanout', false, false, false);
150
151
        $defaultQueueName = !empty($this->queueName) ? $this->queueName : '';
152
        list($queueName, ,) = $this->queue($defaultQueueName, false, false, true, true);
153
154
        $this->queueName = $queueName;
155
156
157
        $this->bind($this->queueName, $this->exchangeName);
158
159
        return $this;
160
    }
161
162
    public function executeStatusCallback()
163
    {
164
        if (empty($this->onCheckStatusCallback)) {
165
            return true;
166
        }
167
168
        $checkStatusCallback = $this->onCheckStatusCallback;
169
        $statusBoolean = $checkStatusCallback();
170
171
        if (!$statusBoolean && isset($statusBoolean)) {
172
            print_r("[WORKER STOPPED]" . PHP_EOL);
173
            return false;
174
        }
175
176
        return true;
177
    }
178
179
    public function executeReceiveCallback($incomeData)
180
    {
181
        if (empty($this->onReceiveCallback)) {
182
            return true;
183
        }
184
185
        $receiveCallback = $this->onReceiveCallback;
186
        $statusBoolean = $receiveCallback($incomeData);
187
        if (!$statusBoolean && isset($statusBoolean)) {
188
            print_r("[TASK IGNORED BY ON RECEIVE RETURN]" . PHP_EOL);
189
            return false;
190
        }
191
192
193
        return true;
194
    }
195
196
    public function executeErrorCallback(Exception $e, $incomeData)
197
    {
198
        if (empty($this->onErrorCallback)) {
199
            return false;
200
        }
201
202
        $errorCallback = $this->onErrorCallback;
203
        $errorCallback($e, $incomeData);
204
        return true;
205
    }
206
207
    public function onCheckStatus(\Closure $callback)
208
    {
209
        $this->onCheckStatusCallback = $callback;
210
    }
211
212
    public function onReceive(\Closure $callback)
213
    {
214
        $this->onReceiveCallback = $callback;
215
    }
216
217
    public function onExecuting(\Closure $callback)
218
    {
219
        $this->onExecutingCallback = $callback;
220
    }
221
222
    public function onError(\Closure $callback)
223
    {
224
        $this->onErrorCallback = $callback;
225
    }
226
}