|
1
|
|
|
<?php |
|
2
|
|
|
/** |
|
3
|
|
|
* @author Marwan Al-Soltany <[email protected]> |
|
4
|
|
|
* @copyright Marwan Al-Soltany 2020 |
|
5
|
|
|
* For the full copyright and license information, please view |
|
6
|
|
|
* the LICENSE file that was distributed with this source code. |
|
7
|
|
|
*/ |
|
8
|
|
|
|
|
9
|
|
|
namespace MAKS\AmqpAgent\RPC; |
|
10
|
|
|
|
|
11
|
|
|
use Exception; |
|
12
|
|
|
use PhpAmqpLib\Connection\AMQPStreamConnection; |
|
13
|
|
|
use PhpAmqpLib\Channel\AMQPChannel; |
|
14
|
|
|
use PhpAmqpLib\Message\AMQPMessage; |
|
15
|
|
|
use MAKS\AmqpAgent\RPC\AbstractEndpointInterface; |
|
16
|
|
|
use MAKS\AmqpAgent\Helper\EventTrait; |
|
17
|
|
|
use MAKS\AmqpAgent\Exception\MagicMethodsExceptionsTrait; |
|
18
|
|
|
use MAKS\AmqpAgent\Exception\RPCEndpointException; |
|
19
|
|
|
use MAKS\AmqpAgent\Config\RPCEndpointParameters as Parameters; |
|
20
|
|
|
|
|
21
|
|
|
/** |
|
22
|
|
|
* An abstract class implementing the basic functionality of an endpoint. |
|
23
|
|
|
* @since 2.0.0 |
|
24
|
|
|
* @api |
|
25
|
|
|
*/ |
|
26
|
|
|
abstract class AbstractEndpoint implements AbstractEndpointInterface |
|
27
|
|
|
{ |
|
28
|
|
|
use MagicMethodsExceptionsTrait; |
|
29
|
|
|
use EventTrait; |
|
30
|
|
|
|
|
31
|
|
|
/** |
|
32
|
|
|
* The connection options of the RPC endpoint. |
|
33
|
|
|
* @var array |
|
34
|
|
|
*/ |
|
35
|
|
|
protected $connectionOptions; |
|
36
|
|
|
|
|
37
|
|
|
/** |
|
38
|
|
|
* The queue name of the RPC endpoint. |
|
39
|
|
|
* @var array |
|
40
|
|
|
*/ |
|
41
|
|
|
protected $queueName; |
|
42
|
|
|
|
|
43
|
|
|
/** |
|
44
|
|
|
* Wether the endpoint is connected to RabbitMQ server or not. |
|
45
|
|
|
* @var bool |
|
46
|
|
|
*/ |
|
47
|
|
|
protected $connected; |
|
48
|
|
|
|
|
49
|
|
|
/** |
|
50
|
|
|
* The endpoint connection. |
|
51
|
|
|
* @var AMQPStreamConnection |
|
52
|
|
|
*/ |
|
53
|
|
|
protected $connection; |
|
54
|
|
|
|
|
55
|
|
|
/** |
|
56
|
|
|
* The endpoint channel. |
|
57
|
|
|
* @var AMQPChannel |
|
58
|
|
|
*/ |
|
59
|
|
|
protected $channel; |
|
60
|
|
|
|
|
61
|
|
|
/** |
|
62
|
|
|
* The request body. |
|
63
|
|
|
* @var string |
|
64
|
|
|
*/ |
|
65
|
|
|
protected $request; |
|
66
|
|
|
|
|
67
|
|
|
/** |
|
68
|
|
|
* Requests conveyor. |
|
69
|
|
|
* @var string |
|
70
|
|
|
*/ |
|
71
|
|
|
protected $requestQueue; |
|
72
|
|
|
|
|
73
|
|
|
/** |
|
74
|
|
|
* The response body. |
|
75
|
|
|
* @var string |
|
76
|
|
|
*/ |
|
77
|
|
|
protected $response; |
|
78
|
|
|
|
|
79
|
|
|
/** |
|
80
|
|
|
* Responses conveyor. |
|
81
|
|
|
* @var string |
|
82
|
|
|
*/ |
|
83
|
|
|
protected $responseQueue; |
|
84
|
|
|
|
|
85
|
|
|
/** |
|
86
|
|
|
* Correlation ID of the last request/response. |
|
87
|
|
|
* @var string |
|
88
|
|
|
*/ |
|
89
|
|
|
protected $correlationId; |
|
90
|
|
|
|
|
91
|
|
|
|
|
92
|
|
|
/** |
|
93
|
|
|
* Class constructor. |
|
94
|
|
|
* @param array $connectionOptions [optional] The overrides for the default connection options of the RPC endpoint. |
|
95
|
|
|
* @param string $queueName [optional] The override for the default queue name of the RPC endpoint. |
|
96
|
|
|
*/ |
|
97
|
|
|
public function __construct(?array $connectionOptions = [], ?string $queueName = null) |
|
98
|
|
|
{ |
|
99
|
|
|
$this->connectionOptions = Parameters::patch($connectionOptions, 'RPC_CONNECTION_OPTIONS'); |
|
100
|
|
|
$this->queueName = empty($queueName) ? Parameters::RPC_QUEUE_NAME : $queueName; |
|
|
|
|
|
|
101
|
|
|
} |
|
102
|
|
|
|
|
103
|
|
|
/** |
|
104
|
|
|
* Closes the connection with RabbitMQ server before destroying the object. |
|
105
|
|
|
*/ |
|
106
|
|
|
public function __destruct() |
|
107
|
|
|
{ |
|
108
|
|
|
$this->disconnect(); |
|
109
|
|
|
} |
|
110
|
|
|
|
|
111
|
|
|
|
|
112
|
|
|
/** |
|
113
|
|
|
* Opens a connection with RabbitMQ server. |
|
114
|
|
|
* @param array|null $connectionOptions [optional] The overrides for the default connection options of the RPC endpoint. |
|
115
|
|
|
* @return self |
|
116
|
|
|
* @throws RPCEndpointException If the endpoint is already connected. |
|
117
|
|
|
*/ |
|
118
|
|
|
public function connect(?array $connectionOptions = []): self |
|
119
|
|
|
{ |
|
120
|
|
|
$this->connectionOptions = Parameters::patchWith( |
|
121
|
|
|
$connectionOptions ?? [], |
|
122
|
|
|
$this->connectionOptions |
|
123
|
|
|
); |
|
124
|
|
|
|
|
125
|
|
|
if ($this->isConnected()) { |
|
126
|
|
|
throw new RPCEndpointException('Endpoint is already connected!'); |
|
127
|
|
|
} |
|
128
|
|
|
|
|
129
|
|
|
$parameters = array_values($this->connectionOptions); |
|
130
|
|
|
|
|
131
|
|
|
$this->connection = new AMQPStreamConnection(...$parameters); |
|
132
|
|
|
$this->trigger('connection.after.open', [$this->connection]); |
|
133
|
|
|
|
|
134
|
|
|
$this->channel = $this->connection->channel(); |
|
135
|
|
|
$this->trigger('channel.after.open', [$this->channel]); |
|
136
|
|
|
|
|
137
|
|
|
return $this; |
|
138
|
|
|
} |
|
139
|
|
|
|
|
140
|
|
|
/** |
|
141
|
|
|
* Closes the connection with RabbitMQ server. |
|
142
|
|
|
* @return void |
|
143
|
|
|
*/ |
|
144
|
|
|
public function disconnect(): void |
|
145
|
|
|
{ |
|
146
|
|
|
if ($this->isConnected()) { |
|
147
|
|
|
$this->connected = null; |
|
148
|
|
|
|
|
149
|
|
|
$this->trigger('channel.before.close', [$this->channel]); |
|
150
|
|
|
$this->channel->close(); |
|
151
|
|
|
|
|
152
|
|
|
$this->trigger('connection.before.close', [$this->connection]); |
|
153
|
|
|
$this->connection->close(); |
|
154
|
|
|
} |
|
155
|
|
|
} |
|
156
|
|
|
|
|
157
|
|
|
/** |
|
158
|
|
|
* Returns wether the endpoint is connected or not. |
|
159
|
|
|
* @return bool |
|
160
|
|
|
*/ |
|
161
|
|
|
public function isConnected(): bool |
|
162
|
|
|
{ |
|
163
|
|
|
$this->connected = ( |
|
164
|
|
|
isset($this->connection) && |
|
165
|
|
|
isset($this->channel) && |
|
166
|
|
|
$this->connection->isConnected() && |
|
167
|
|
|
$this->channel->is_open() |
|
168
|
|
|
); |
|
169
|
|
|
|
|
170
|
|
|
return $this->connected; |
|
171
|
|
|
} |
|
172
|
|
|
|
|
173
|
|
|
/** |
|
174
|
|
|
* Returns the connection used by the endpoint. |
|
175
|
|
|
* @return bool |
|
176
|
|
|
*/ |
|
177
|
|
|
public function getConnection(): AMQPStreamConnection |
|
178
|
|
|
{ |
|
179
|
|
|
return $this->connection; |
|
180
|
|
|
} |
|
181
|
|
|
|
|
182
|
|
|
/** |
|
183
|
|
|
* The time needed for the round-trip to RabbitMQ server in milliseconds. |
|
184
|
|
|
* Note that if the endpoint is not connected yet, this method will establish a new connection only for checking. |
|
185
|
|
|
* @return float A two decimal points rounded float. |
|
186
|
|
|
*/ |
|
187
|
|
|
final public function ping(): float |
|
188
|
|
|
{ |
|
189
|
|
|
try { |
|
190
|
|
|
$pingConnection = $this->connection; |
|
191
|
|
|
if (!isset($pingConnection) || !$pingConnection->isConnected()) { |
|
192
|
|
|
$parameters = array_values($this->connectionOptions); |
|
193
|
|
|
$pingConnection = new AMQPStreamConnection(...$parameters); |
|
194
|
|
|
} |
|
195
|
|
|
$pingChannel = $pingConnection->channel(); |
|
196
|
|
|
|
|
197
|
|
|
[$pingQueue] = $pingChannel->queue_declare( |
|
198
|
|
|
null, |
|
199
|
|
|
false, |
|
200
|
|
|
false, |
|
201
|
|
|
true, |
|
202
|
|
|
true |
|
203
|
|
|
); |
|
204
|
|
|
|
|
205
|
|
|
$pingChannel->basic_qos( |
|
206
|
|
|
null, |
|
207
|
|
|
1, |
|
208
|
|
|
null |
|
209
|
|
|
); |
|
210
|
|
|
|
|
211
|
|
|
$pingEcho = null; |
|
212
|
|
|
|
|
213
|
|
|
$pingChannel->basic_consume( |
|
214
|
|
|
$pingQueue, |
|
215
|
|
|
null, |
|
216
|
|
|
false, |
|
217
|
|
|
false, |
|
218
|
|
|
false, |
|
219
|
|
|
false, |
|
220
|
|
|
function ($message) use (&$pingEcho) { |
|
221
|
|
|
$message->ack(); |
|
222
|
|
|
$pingEcho = $message->body; |
|
223
|
|
|
} |
|
224
|
|
|
); |
|
225
|
|
|
|
|
226
|
|
|
$pingStartTime = microtime(true); |
|
227
|
|
|
|
|
228
|
|
|
$pingChannel->basic_publish( |
|
229
|
|
|
new AMQPMessage(__FUNCTION__), |
|
230
|
|
|
null, |
|
231
|
|
|
$pingQueue |
|
232
|
|
|
); |
|
233
|
|
|
|
|
234
|
|
|
while (!$pingEcho) { |
|
235
|
|
|
$pingChannel->wait(); |
|
236
|
|
|
} |
|
237
|
|
|
|
|
238
|
|
|
$pingEndTime = microtime(true); |
|
239
|
|
|
|
|
240
|
|
|
$pingChannel->queue_delete($pingQueue); |
|
241
|
|
|
|
|
242
|
|
|
if ($pingConnection === $this->connection) { |
|
243
|
|
|
$pingChannel->close(); |
|
244
|
|
|
} else { |
|
245
|
|
|
$pingChannel->close(); |
|
246
|
|
|
$pingConnection->close(); |
|
247
|
|
|
} |
|
248
|
|
|
|
|
249
|
|
|
return round(($pingEndTime - $pingStartTime) * 1000, 2); |
|
250
|
|
|
} catch (Exception $error) { |
|
251
|
|
|
RPCEndpointException::rethrow($error); |
|
252
|
|
|
} |
|
253
|
|
|
} |
|
254
|
|
|
|
|
255
|
|
|
/** |
|
256
|
|
|
* Hooking method based on events to manipulate the request/response during the endpoint/message life cycle. |
|
257
|
|
|
* Check out `self::$events` via `self::getEvents()` after processing at least one request/response to see all available events. |
|
258
|
|
|
* |
|
259
|
|
|
* The parameters will be passed to the callback as follows: |
|
260
|
|
|
* 1. `$listenedOnObject` (first segment of event name e.g. `'connection.after.open'` will be `$connection`), |
|
261
|
|
|
* 2. `$calledOnObject` (the object this method was called on e.g. `$endpoint`), |
|
262
|
|
|
* 3. `$eventName` (the event was listened on e.g. `'connection.after.open'`). |
|
263
|
|
|
* ``` |
|
264
|
|
|
* $endpoint->on('connection.after.open', function ($connection, $endpoint, $event) { |
|
265
|
|
|
* ... |
|
266
|
|
|
* }); |
|
267
|
|
|
* ``` |
|
268
|
|
|
* @param string $event The event to listen on. |
|
269
|
|
|
* @param callable $callback The callback to execute. |
|
270
|
|
|
* @return self |
|
271
|
|
|
*/ |
|
272
|
|
|
final public function on(string $event, callable $callback): self |
|
273
|
|
|
{ |
|
274
|
|
|
$this->bind($event, function (...$arguments) use ($event, $callback) { |
|
275
|
|
|
call_user_func_array( |
|
276
|
|
|
$callback, |
|
277
|
|
|
array_merge( |
|
278
|
|
|
$arguments, |
|
279
|
|
|
[$this, $event] |
|
280
|
|
|
) |
|
281
|
|
|
); |
|
282
|
|
|
}); |
|
283
|
|
|
|
|
284
|
|
|
return $this; |
|
285
|
|
|
} |
|
286
|
|
|
|
|
287
|
|
|
/** |
|
288
|
|
|
* Hook method to manipulate the message (request/response) when extending the class. |
|
289
|
|
|
* @return string |
|
290
|
|
|
*/ |
|
291
|
|
|
abstract protected function callback(AMQPMessage $message): string; |
|
292
|
|
|
} |
|
293
|
|
|
|
Our type inference engine has found a suspicous assignment of a value to a property. This check raises an issue when a value that can be of a mixed type is assigned to a property that is type hinted more strictly.
For example, imagine you have a variable
$accountIdthat can either hold an Id object or false (if there is no account id yet). Your code now assigns that value to theidproperty of an instance of theAccountclass. This class holds a proper account, so the id value must no longer be false.Either this assignment is in error or a type check should be added for that assignment.