1 | <?php |
||||||
2 | |||||||
3 | namespace WillRy\RabbitRun\PubSub; |
||||||
4 | |||||||
5 | |||||||
6 | use Exception; |
||||||
7 | use PhpAmqpLib\Message\AMQPMessage; |
||||||
8 | use WillRy\RabbitRun\Base; |
||||||
9 | |||||||
10 | class PubSub extends Base |
||||||
11 | { |
||||||
12 | public \Closure $onReceiveCallback; |
||||||
13 | |||||||
14 | public \Closure $onExecutingCallback; |
||||||
15 | |||||||
16 | public \Closure $onErrorCallback; |
||||||
17 | |||||||
18 | public \Closure $onCheckStatusCallback; |
||||||
19 | |||||||
20 | /** @var string nome da fila */ |
||||||
21 | protected string $queueName; |
||||||
22 | /** @var string nome da exchange */ |
||||||
23 | protected string $exchangeBaseName; |
||||||
24 | protected string $exchangeName; |
||||||
25 | |||||||
26 | public function __construct($host, $port, $user, $pass, $vhost) |
||||||
27 | { |
||||||
28 | parent::__construct(); |
||||||
29 | |||||||
30 | $this->configRabbit($host, $port, $user, $pass, $vhost); |
||||||
31 | |||||||
32 | $this->queueName = $this->randomConsumer(12); |
||||||
33 | } |
||||||
34 | |||||||
35 | /** |
||||||
36 | * Faz publicacao no pubsub |
||||||
37 | * @param array $payload |
||||||
38 | * @return array |
||||||
39 | */ |
||||||
40 | public function publish(string $queueName, array $payload = []) |
||||||
41 | { |
||||||
42 | $this->createPubSubPublisher($queueName); |
||||||
43 | |||||||
44 | $json = json_encode($payload); |
||||||
45 | |||||||
46 | $message = new AMQPMessage($json); |
||||||
47 | |||||||
48 | $this->channel->basic_publish( |
||||||
49 | $message, |
||||||
50 | $this->exchangeName |
||||||
51 | ); |
||||||
52 | |||||||
53 | return $payload; |
||||||
54 | } |
||||||
55 | |||||||
56 | /** |
||||||
57 | * Configura o pubsub criando |
||||||
58 | * a exchenge e fila |
||||||
59 | * @param string $name |
||||||
60 | * @return $this |
||||||
61 | */ |
||||||
62 | public function createPubSubPublisher(string $name): PubSub |
||||||
63 | { |
||||||
64 | $this->exchangeName = "{$name}_exchange"; |
||||||
65 | |||||||
66 | $this->exchangeBaseName = "{$name}"; |
||||||
67 | |||||||
68 | $this->exchange($this->exchangeName, 'fanout', false, false, false); |
||||||
69 | |||||||
70 | return $this; |
||||||
71 | } |
||||||
72 | |||||||
73 | /** |
||||||
74 | * Loop de consumo de mensagem |
||||||
75 | * |
||||||
76 | * @param int $sleepSeconds |
||||||
77 | * @throws Exception |
||||||
78 | */ |
||||||
79 | public function consume( |
||||||
80 | string $queueName, |
||||||
81 | int $sleepSeconds = 3 |
||||||
82 | ) |
||||||
83 | { |
||||||
84 | |||||||
85 | $this->loopConnection(function () use ($sleepSeconds, $queueName) { |
||||||
86 | |||||||
87 | /** como no pubsub ao perder a conexão, a fila exclusiva é excluida, é necessário configurar |
||||||
88 | * fila e etc novamente |
||||||
89 | */ |
||||||
90 | $this->createPubSubConsumer($queueName); |
||||||
91 | |||||||
92 | $this->channel->basic_qos(null, 1, null); |
||||||
93 | |||||||
94 | $this->channel->basic_consume( |
||||||
95 | $this->queueName, |
||||||
96 | '', |
||||||
97 | false, |
||||||
98 | true, |
||||||
99 | false, |
||||||
100 | false, |
||||||
101 | function (AMQPMessage $message) { |
||||||
102 | $statusBoolean = $this->executeStatusCallback($message); |
||||||
0 ignored issues
–
show
|
|||||||
103 | |||||||
104 | if (!$statusBoolean) { |
||||||
105 | return false; |
||||||
106 | } |
||||||
107 | |||||||
108 | $incomeData = json_decode($message->getBody(), true); |
||||||
109 | |||||||
110 | $statusBoolean = $this->executeReceiveCallback($message, $incomeData); |
||||||
0 ignored issues
–
show
The call to
WillRy\RabbitRun\PubSub\...xecuteReceiveCallback() has too many arguments starting with $incomeData .
(
Ignorable by Annotation
)
If this is a false-positive, you can also ignore this issue in your code via the
This check compares calls to functions or methods with their respective definitions. If the call has more arguments than are defined, it raises an issue. If a function is defined several times with a different number of parameters, the check may pick up the wrong definition and report false positives. One codebase where this has been known to happen is Wordpress. Please note the @ignore annotation hint above. ![]() |
|||||||
111 | |||||||
112 | if (!$statusBoolean) { |
||||||
113 | return false; |
||||||
114 | } |
||||||
115 | |||||||
116 | |||||||
117 | try { |
||||||
118 | $executingCallback = $this->onExecutingCallback; |
||||||
119 | $executingCallback($message, $incomeData); |
||||||
120 | } catch (Exception $e) { |
||||||
121 | print_r("[ERROR]" . PHP_EOL); |
||||||
122 | $this->executeErrorCallback($e, $incomeData); |
||||||
123 | } |
||||||
124 | } |
||||||
125 | ); |
||||||
126 | |||||||
127 | // Loop as long as the channel has callbacks registered |
||||||
128 | while ($this->channel->is_open()) { |
||||||
129 | $this->channel->wait(null, false); |
||||||
130 | sleep($sleepSeconds); |
||||||
131 | } |
||||||
132 | }); |
||||||
133 | } |
||||||
134 | |||||||
135 | /** |
||||||
136 | * Configura o pubsub criando |
||||||
137 | * a exchenge e fila |
||||||
138 | * @param string $name |
||||||
139 | * @return $this |
||||||
140 | */ |
||||||
141 | public function createPubSubConsumer(string $name): PubSub |
||||||
142 | { |
||||||
143 | $this->getConnection(); |
||||||
144 | |||||||
145 | $this->exchangeName = "{$name}_exchange"; |
||||||
146 | |||||||
147 | $this->exchangeBaseName = "{$name}"; |
||||||
148 | |||||||
149 | $this->exchange($this->exchangeName, 'fanout', false, false, false); |
||||||
150 | |||||||
151 | $defaultQueueName = !empty($this->queueName) ? $this->queueName : ''; |
||||||
152 | list($queueName, ,) = $this->queue($defaultQueueName, false, false, true, true); |
||||||
153 | |||||||
154 | $this->queueName = $queueName; |
||||||
155 | |||||||
156 | |||||||
157 | $this->bind($this->queueName, $this->exchangeName); |
||||||
158 | |||||||
159 | return $this; |
||||||
160 | } |
||||||
161 | |||||||
162 | public function executeStatusCallback() |
||||||
163 | { |
||||||
164 | if (empty($this->onCheckStatusCallback)) { |
||||||
165 | return true; |
||||||
166 | } |
||||||
167 | |||||||
168 | $checkStatusCallback = $this->onCheckStatusCallback; |
||||||
169 | $statusBoolean = $checkStatusCallback(); |
||||||
170 | |||||||
171 | if (!$statusBoolean && isset($statusBoolean)) { |
||||||
172 | print_r("[WORKER STOPPED]" . PHP_EOL); |
||||||
173 | return false; |
||||||
174 | } |
||||||
175 | |||||||
176 | return true; |
||||||
177 | } |
||||||
178 | |||||||
179 | public function executeReceiveCallback($incomeData) |
||||||
180 | { |
||||||
181 | if (empty($this->onReceiveCallback)) { |
||||||
182 | return true; |
||||||
183 | } |
||||||
184 | |||||||
185 | $receiveCallback = $this->onReceiveCallback; |
||||||
186 | $statusBoolean = $receiveCallback($incomeData); |
||||||
187 | if (!$statusBoolean && isset($statusBoolean)) { |
||||||
188 | print_r("[TASK IGNORED BY ON RECEIVE RETURN]" . PHP_EOL); |
||||||
189 | return false; |
||||||
190 | } |
||||||
191 | |||||||
192 | |||||||
193 | return true; |
||||||
194 | } |
||||||
195 | |||||||
196 | public function executeErrorCallback(Exception $e, $incomeData) |
||||||
197 | { |
||||||
198 | if (empty($this->onErrorCallback)) { |
||||||
199 | return false; |
||||||
200 | } |
||||||
201 | |||||||
202 | $errorCallback = $this->onErrorCallback; |
||||||
203 | $errorCallback($e, $incomeData); |
||||||
204 | return true; |
||||||
205 | } |
||||||
206 | |||||||
207 | public function onCheckStatus(\Closure $callback) |
||||||
208 | { |
||||||
209 | $this->onCheckStatusCallback = $callback; |
||||||
210 | } |
||||||
211 | |||||||
212 | public function onReceive(\Closure $callback) |
||||||
213 | { |
||||||
214 | $this->onReceiveCallback = $callback; |
||||||
215 | } |
||||||
216 | |||||||
217 | public function onExecuting(\Closure $callback) |
||||||
218 | { |
||||||
219 | $this->onExecutingCallback = $callback; |
||||||
220 | } |
||||||
221 | |||||||
222 | public function onError(\Closure $callback) |
||||||
223 | { |
||||||
224 | $this->onErrorCallback = $callback; |
||||||
225 | } |
||||||
226 | } |
This check compares calls to functions or methods with their respective definitions. If the call has more arguments than are defined, it raises an issue.
If a function is defined several times with a different number of parameters, the check may pick up the wrong definition and report false positives. One codebase where this has been known to happen is Wordpress. Please note the @ignore annotation hint above.