|
1
|
|
|
<?php |
|
2
|
|
|
/** |
|
3
|
|
|
* @author Marwan Al-Soltany <[email protected]> |
|
4
|
|
|
* @copyright Marwan Al-Soltany 2020 |
|
5
|
|
|
* For the full copyright and license information, please view |
|
6
|
|
|
* the LICENSE file that was distributed with this source code. |
|
7
|
|
|
*/ |
|
8
|
|
|
|
|
9
|
|
|
namespace MAKS\AmqpAgent\RPC; |
|
10
|
|
|
|
|
11
|
|
|
use PhpAmqpLib\Message\AMQPMessage; |
|
12
|
|
|
use MAKS\AmqpAgent\Helper\Utility; |
|
13
|
|
|
use MAKS\AmqpAgent\Helper\ClassProxy; |
|
14
|
|
|
use MAKS\AmqpAgent\RPC\AbstractEndpoint; |
|
15
|
|
|
use MAKS\AmqpAgent\RPC\ClientEndpointInterface; |
|
16
|
|
|
use MAKS\AmqpAgent\Exception\RPCEndpointException; |
|
17
|
|
|
|
|
18
|
|
|
/** |
|
19
|
|
|
* A class specialized in requesting. Implementing only the methods needed for a client. |
|
20
|
|
|
* |
|
21
|
|
|
* Example: |
|
22
|
|
|
* ``` |
|
23
|
|
|
* $clientEndpoint = new ClientEndpoint(); |
|
24
|
|
|
* $clientEndpoint->on('some.event', function () { ... }); |
|
25
|
|
|
* $clientEndpoint->connect(); |
|
26
|
|
|
* $clientEndpoint->request('Message Body', 'queue.name'); |
|
27
|
|
|
* $clientEndpoint->disconnect(); |
|
28
|
|
|
* ``` |
|
29
|
|
|
* |
|
30
|
|
|
* @since 2.0.0 |
|
31
|
|
|
* @api |
|
32
|
|
|
*/ |
|
33
|
|
|
class ClientEndpoint extends AbstractEndpoint implements ClientEndpointInterface |
|
34
|
|
|
{ |
|
35
|
|
|
/** |
|
36
|
|
|
* Opens a connection with RabbitMQ server. |
|
37
|
|
|
* @param array|null $connectionOptions |
|
38
|
|
|
* @return self |
|
39
|
|
|
*/ |
|
40
|
|
|
public function connect(?array $connectionOptions = []): self |
|
41
|
|
|
{ |
|
42
|
|
|
parent::connect($connectionOptions); |
|
43
|
|
|
|
|
44
|
|
|
if ($this->isConnected()) { |
|
45
|
|
|
list($this->responseQueue, ,) = $this->channel->queue_declare( |
|
46
|
|
|
null, |
|
47
|
|
|
false, |
|
48
|
|
|
false, |
|
49
|
|
|
true, |
|
50
|
|
|
false |
|
51
|
|
|
); |
|
52
|
|
|
|
|
53
|
|
|
$this->channel->basic_consume( |
|
54
|
|
|
$this->responseQueue, |
|
55
|
|
|
null, |
|
56
|
|
|
false, |
|
57
|
|
|
false, |
|
58
|
|
|
false, |
|
59
|
|
|
false, |
|
60
|
|
|
function ($message) { |
|
61
|
|
|
ClassProxy::call($this, 'onResponse', $message); |
|
62
|
|
|
} |
|
63
|
|
|
); |
|
64
|
|
|
} |
|
65
|
|
|
|
|
66
|
|
|
return $this; |
|
67
|
|
|
} |
|
68
|
|
|
|
|
69
|
|
|
/** |
|
70
|
|
|
* Sends the passed request to the server using the passed queue. |
|
71
|
|
|
* @param string|AMQPMessage $request The request body or an `AMQPMessage` instance. |
|
72
|
|
|
* @param string|null $queueName [optional] The name of queue to send through. |
|
73
|
|
|
* @return string The response body. |
|
74
|
|
|
* @throws RPCEndpointException If the client is not connected yet or if request Correlation ID does not match the one of the response. |
|
75
|
|
|
*/ |
|
76
|
|
|
public function request($request, ?string $queueName = null): string |
|
77
|
|
|
{ |
|
78
|
|
|
if (!$this->isConnected()) { |
|
79
|
|
|
throw new RPCEndpointException('Client is not connected yet!'); |
|
80
|
|
|
} |
|
81
|
|
|
|
|
82
|
|
|
$this->queueName = $queueName ?? $this->queueName; |
|
|
|
|
|
|
83
|
|
|
|
|
84
|
|
|
$this->response = null; |
|
85
|
|
|
$this->request = $request; |
|
|
|
|
|
|
86
|
|
|
$this->requestQueue = $this->queueName; |
|
|
|
|
|
|
87
|
|
|
$this->correlationId = Utility::generateHash(); |
|
88
|
|
|
|
|
89
|
|
|
$message = $request instanceof AMQPMessage ? $request : new AMQPMessage((string)$request); |
|
90
|
|
|
$message->set('reply_to', $this->responseQueue); |
|
91
|
|
|
$message->set('correlation_id', $this->correlationId); |
|
92
|
|
|
$message->set('timestamp', time()); |
|
93
|
|
|
|
|
94
|
|
|
$this->channel->queue_declare( |
|
95
|
|
|
$this->requestQueue, |
|
|
|
|
|
|
96
|
|
|
false, |
|
97
|
|
|
false, |
|
98
|
|
|
false, |
|
99
|
|
|
false |
|
100
|
|
|
); |
|
101
|
|
|
|
|
102
|
|
|
$this->trigger('request.before.send', [$message]); |
|
103
|
|
|
|
|
104
|
|
|
$this->channel->basic_publish( |
|
105
|
|
|
$message, |
|
106
|
|
|
null, |
|
107
|
|
|
$this->requestQueue |
|
|
|
|
|
|
108
|
|
|
); |
|
109
|
|
|
|
|
110
|
|
|
$this->trigger('request.after.send', [$message]); |
|
111
|
|
|
|
|
112
|
|
|
while ($this->response === null) { |
|
113
|
|
|
$this->channel->wait(); |
|
114
|
|
|
} |
|
115
|
|
|
|
|
116
|
|
|
return $this->response; |
|
117
|
|
|
} |
|
118
|
|
|
|
|
119
|
|
|
/** |
|
120
|
|
|
* Sends the passed request to the server using the passed queue. |
|
121
|
|
|
* Alias for `self::request()`. |
|
122
|
|
|
* @param string|AMQPMessage $request The request body or an `AMQPMessage` instance. |
|
123
|
|
|
* @param string|null $queueName [optional] The name of queue to send through. |
|
124
|
|
|
* @return string The response body. |
|
125
|
|
|
* @throws RPCEndpointException If the client is not connected yet or if request Correlation ID does not match the one of the response. |
|
126
|
|
|
*/ |
|
127
|
|
|
public function call($request, ?string $queueName = null): string |
|
128
|
|
|
{ |
|
129
|
|
|
return $this->request($request, $queueName); |
|
130
|
|
|
} |
|
131
|
|
|
|
|
132
|
|
|
/** |
|
133
|
|
|
* Validates the response. |
|
134
|
|
|
* @param AMQPMessage $response |
|
135
|
|
|
* @return void |
|
136
|
|
|
*/ |
|
137
|
|
|
protected function onResponse(AMQPMessage $response): void |
|
138
|
|
|
{ |
|
139
|
|
|
$this->trigger('response.on.get', [$response]); |
|
140
|
|
|
|
|
141
|
|
|
if ($this->correlationId === $response->get('correlation_id')) { |
|
142
|
|
|
$this->response = $this->callback($response); |
|
143
|
|
|
$response->ack(); |
|
144
|
|
|
return; |
|
145
|
|
|
} |
|
146
|
|
|
|
|
147
|
|
|
throw new RPCEndpointException( |
|
148
|
|
|
sprintf( |
|
149
|
|
|
'Correlation ID of the response "%s" does not match the one of the request "%s"!', |
|
150
|
|
|
$this->correlationId, |
|
151
|
|
|
$response->get('correlation_id') |
|
|
|
|
|
|
152
|
|
|
) |
|
153
|
|
|
); |
|
154
|
|
|
} |
|
155
|
|
|
|
|
156
|
|
|
/** |
|
157
|
|
|
* Returns the final response body. |
|
158
|
|
|
* @return string |
|
159
|
|
|
*/ |
|
160
|
|
|
protected function callback(AMQPMessage $message): string |
|
161
|
|
|
{ |
|
162
|
|
|
return $message->body; |
|
163
|
|
|
} |
|
164
|
|
|
} |
|
165
|
|
|
|
Our type inference engine has found a suspicous assignment of a value to a property. This check raises an issue when a value that can be of a mixed type is assigned to a property that is type hinted more strictly.
For example, imagine you have a variable
$accountIdthat can either hold an Id object or false (if there is no account id yet). Your code now assigns that value to theidproperty of an instance of theAccountclass. This class holds a proper account, so the id value must no longer be false.Either this assignment is in error or a type check should be added for that assignment.