AbstractEndpoint::getConnection()   A
last analyzed

Complexity

Conditions 1
Paths 1

Size

Total Lines 3
Code Lines 1

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 2
CRAP Score 1

Importance

Changes 1
Bugs 0 Features 0
Metric Value
eloc 1
c 1
b 0
f 0
dl 0
loc 3
ccs 2
cts 2
cp 1
rs 10
cc 1
nc 1
nop 0
crap 1
1
<?php
2
3
/**
4
 * @author Marwan Al-Soltany <[email protected]>
5
 * @copyright Marwan Al-Soltany 2020
6
 * For the full copyright and license information, please view
7
 * the LICENSE file that was distributed with this source code.
8
 */
9
10
declare(strict_types=1);
11
12
namespace MAKS\AmqpAgent\RPC;
13
14
use Exception;
15
use PhpAmqpLib\Connection\AMQPStreamConnection;
16
use PhpAmqpLib\Channel\AMQPChannel;
17
use PhpAmqpLib\Message\AMQPMessage;
18
use MAKS\AmqpAgent\RPC\AbstractEndpointInterface;
19
use MAKS\AmqpAgent\Helper\EventTrait;
20
use MAKS\AmqpAgent\Exception\MagicMethodsExceptionsTrait;
21
use MAKS\AmqpAgent\Exception\RPCEndpointException;
22
use MAKS\AmqpAgent\Config\RPCEndpointParameters as Parameters;
23
24
/**
25
 * An abstract class implementing the basic functionality of an endpoint.
26
 * @since 2.0.0
27
 * @api
28
 */
29
abstract class AbstractEndpoint implements AbstractEndpointInterface
30
{
31
    use MagicMethodsExceptionsTrait;
32
    use EventTrait;
33
34
    /**
35
     * The connection options of the RPC endpoint.
36
     * @var array
37
     */
38
    protected $connectionOptions;
39
40
    /**
41
     * The queue name of the RPC endpoint.
42
     * @var string
43
     */
44
    protected $queueName;
45
46
    /**
47
     * Whether the endpoint is connected to RabbitMQ server or not.
48
     * @var bool
49
     */
50
    protected $connected;
51
52
    /**
53
     * The endpoint connection.
54
     * @var AMQPStreamConnection
55
     */
56
    protected $connection;
57
58
    /**
59
     * The endpoint channel.
60
     * @var AMQPChannel
61
     */
62
    protected $channel;
63
64
    /**
65
     * The request body.
66
     * @var string
67
     */
68
    protected $requestBody;
69
70
    /**
71
     * Requests conveyor.
72
     * @var string
73
     */
74
    protected $requestQueue;
75
76
    /**
77
     * The response body.
78
     * @var string
79
     */
80
    protected $responseBody;
81
82
    /**
83
     * Responses conveyor.
84
     * @var string
85
     */
86
    protected $responseQueue;
87
88
    /**
89
     * Correlation ID of the last request/response.
90
     * @var string
91
     */
92
    protected $correlationId;
93
94
95
    /**
96
     * Class constructor.
97
     * @param array $connectionOptions [optional] The overrides for the default connection options of the RPC endpoint.
98
     * @param string $queueName [optional] The override for the default queue name of the RPC endpoint.
99
     */
100 16
    public function __construct(?array $connectionOptions = [], ?string $queueName = null)
101
    {
102 16
        $this->connectionOptions = Parameters::patch($connectionOptions, 'RPC_CONNECTION_OPTIONS');
103 16
        $this->queueName = empty($queueName) ? Parameters::RPC_QUEUE_NAME : $queueName;
104 16
    }
105
106
    /**
107
     * Closes the connection with RabbitMQ server before destroying the object.
108
     */
109 10
    public function __destruct()
110
    {
111 10
        $this->disconnect();
112 10
    }
113
114
115
    /**
116
     * Opens a connection with RabbitMQ server.
117
     * @param array|null $connectionOptions [optional] The overrides for the default connection options of the RPC endpoint.
118
     * @return self
119
     * @throws RPCEndpointException If the endpoint is already connected.
120
     */
121 11
    public function connect(?array $connectionOptions = [])
122
    {
123 11
        $this->connectionOptions = Parameters::patchWith(
124 11
            $connectionOptions ?? [],
125 11
            $this->connectionOptions
126
        );
127
128 11
        if ($this->isConnected()) {
129 1
            throw new RPCEndpointException('Endpoint is already connected!');
130
        }
131
132 11
        $parameters = array_values($this->connectionOptions);
133
134 11
        $this->connection = new AMQPStreamConnection(...$parameters);
135 11
        $this->trigger('connection.after.open', [$this->connection]);
136
137 11
        $this->channel = $this->connection->channel();
138 11
        $this->trigger('channel.after.open', [$this->channel]);
139
140 11
        return $this;
141
    }
142
143
    /**
144
     * Closes the connection with RabbitMQ server.
145
     * @return void
146
     */
147 12
    public function disconnect(): void
148
    {
149 12
        if ($this->isConnected()) {
150 7
            $this->connected = null;
151
152 7
            $this->trigger('channel.before.close', [$this->channel]);
153 7
            $this->channel->close();
154
155 7
            $this->trigger('connection.before.close', [$this->connection]);
156 7
            $this->connection->close();
157
        }
158 12
    }
159
160
    /**
161
     * Returns whether the endpoint is connected or not.
162
     * @return bool
163
     */
164 16
    public function isConnected(): bool
165
    {
166 16
        $this->connected = (
167 16
            isset($this->connection) &&
168 16
            isset($this->channel) &&
169 16
            $this->connection->isConnected() &&
170 10
            $this->channel->is_open()
171
        );
172
173 16
        return $this->connected;
174
    }
175
176
    /**
177
     * Returns the connection used by the endpoint.
178
     * @return AMQPStreamConnection
179
     */
180 1
    public function getConnection(): AMQPStreamConnection
181
    {
182 1
        return $this->connection;
183
    }
184
185
    /**
186
     * The time needed for the round-trip to RabbitMQ server in milliseconds.
187
     * Note that if the endpoint is not connected yet, this method will establish a new connection only for checking.
188
     * @return float A two decimal points rounded float.
189
     */
190 3
    final public function ping(): float
191
    {
192
        try {
193 3
            $pingConnection = $this->connection;
194 3
            if (!isset($pingConnection) || !$pingConnection->isConnected()) {
195 1
                $parameters = array_values($this->connectionOptions);
196 1
                $pingConnection = new AMQPStreamConnection(...$parameters);
197
            }
198 3
            $pingChannel = $pingConnection->channel();
199
200 2
            [$pingQueue] = $pingChannel->queue_declare(
201 2
                null,
202 2
                false,
203 2
                false,
204 2
                true,
205 2
                true
206
            );
207
208 2
            $pingChannel->basic_qos(
209 2
                null,
210 2
                1,
211 2
                null
212
            );
213
214 2
            $pingEcho = null;
215
216 2
            $pingChannel->basic_consume(
217 2
                $pingQueue,
218 2
                null,
219 2
                false,
220 2
                false,
221 2
                false,
222 2
                false,
223
                function ($message) use (&$pingEcho) {
224 2
                    $message->ack();
225 2
                    $pingEcho = $message->body;
226 2
                }
227
            );
228
229 2
            $pingStartTime = microtime(true);
230
231 2
            $pingChannel->basic_publish(
232 2
                new AMQPMessage(__FUNCTION__),
233 2
                null,
234 2
                $pingQueue
235
            );
236
237 2
            while (!$pingEcho) {
238 2
                $pingChannel->wait();
239
            }
240
241 2
            $pingEndTime = microtime(true);
242
243 2
            $pingChannel->queue_delete($pingQueue);
244
245 2
            if ($pingConnection === $this->connection) {
246 1
                $pingChannel->close();
247
            } else {
248 1
                $pingChannel->close();
249 1
                $pingConnection->close();
250
            }
251
252 2
            return round(($pingEndTime - $pingStartTime) * 1000, 2);
253 1
        } catch (Exception $error) {
254 1
            RPCEndpointException::rethrow($error);
255
        }
256
    }
257
258
    /**
259
     * Hooking method based on events to manipulate the request/response during the endpoint/message life cycle.
260
     * Check out `self::$events` via `self::getEvents()` after processing at least one request/response to see all available events.
261
     *
262
     * The parameters will be passed to the callback as follows:
263
     *      1. `$listenedOnObject` (first segment of event name e.g. `'connection.after.open'` will be `$connection`),
264
     *      2. `$calledOnObject` (the object this method was called on e.g. `$endpoint`),
265
     *      3. `$eventName` (the event was listened on e.g. `'connection.after.open'`).
266
     * ```
267
     * $endpoint->on('connection.after.open', function ($connection, $endpoint, $event) {
268
     *      ...
269
     * });
270
     * ```
271
     * @param string $event The event to listen on.
272
     * @param callable $callback The callback to execute.
273
     * @return self
274
     */
275 6
    final public function on(string $event, callable $callback)
276
    {
277
        $this->bind($event, function (...$arguments) use ($event, $callback) {
278 6
            call_user_func_array(
279 6
                $callback,
280 6
                array_merge(
281 6
                    $arguments,
282 6
                    [$this, $event]
283
                )
284
            );
285 6
        });
286
287 3
        return $this;
288
    }
289
290
    /**
291
     * Hook method to manipulate the message (request/response) when extending the class.
292
     * @param AMQPMessage $message
293
     * @return string
294
     */
295
    abstract protected function callback(AMQPMessage $message): string;
296
}
297