Completed
Push — master ( 59dd4d...b5d47e )
by Marwan
15s queued 11s
created

AbstractEndpoint::ping()   B

Complexity

Conditions 6
Paths 55

Size

Total Lines 65
Code Lines 43

Duplication

Lines 0
Ratio 0 %

Importance

Changes 1
Bugs 0 Features 0
Metric Value
eloc 43
c 1
b 0
f 0
dl 0
loc 65
rs 8.6097
cc 6
nc 55
nop 0

How to fix   Long Method   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

1
<?php
2
/**
3
 * @author Marwan Al-Soltany <[email protected]>
4
 * @copyright Marwan Al-Soltany 2020
5
 * For the full copyright and license information, please view
6
 * the LICENSE file that was distributed with this source code.
7
 */
8
9
namespace MAKS\AmqpAgent\RPC;
10
11
use Exception;
12
use PhpAmqpLib\Connection\AMQPStreamConnection;
13
use PhpAmqpLib\Channel\AMQPChannel;
14
use PhpAmqpLib\Message\AMQPMessage;
15
use MAKS\AmqpAgent\RPC\AbstractEndpointInterface;
16
use MAKS\AmqpAgent\Helper\EventTrait;
17
use MAKS\AmqpAgent\Exception\MagicMethodsExceptionsTrait;
18
use MAKS\AmqpAgent\Exception\RPCEndpointException;
19
use MAKS\AmqpAgent\Config\RPCEndpointParameters as Parameters;
20
21
/**
22
 * An abstract class implementing the basic functionality of an endpoint.
23
 * @since 2.0.0
24
 * @api
25
 */
26
abstract class AbstractEndpoint implements AbstractEndpointInterface
27
{
28
    use MagicMethodsExceptionsTrait;
29
    use EventTrait;
30
31
    /**
32
     * The connection options of the RPC endpoint.
33
     * @var array
34
     */
35
    protected $connectionOptions;
36
37
    /**
38
     * The queue name of the RPC endpoint.
39
     * @var string
40
     */
41
    protected $queueName;
42
43
    /**
44
     * Wether the endpoint is connected to RabbitMQ server or not.
45
     * @var bool
46
     */
47
    protected $connected;
48
49
    /**
50
     * The endpoint connection.
51
     * @var AMQPStreamConnection
52
     */
53
    protected $connection;
54
55
    /**
56
     * The endpoint channel.
57
     * @var AMQPChannel
58
     */
59
    protected $channel;
60
61
    /**
62
     * The request body.
63
     * @var string
64
     */
65
    protected $requestBody;
66
67
    /**
68
     * Requests conveyor.
69
     * @var string
70
     */
71
    protected $requestQueue;
72
73
    /**
74
     * The response body.
75
     * @var string
76
     */
77
    protected $responseBody;
78
79
    /**
80
     * Responses conveyor.
81
     * @var string
82
     */
83
    protected $responseQueue;
84
85
    /**
86
     * Correlation ID of the last request/response.
87
     * @var string
88
     */
89
    protected $correlationId;
90
91
92
    /**
93
     * Class constructor.
94
     * @param array $connectionOptions [optional] The overrides for the default connection options of the RPC endpoint.
95
     * @param string $queueName [optional] The override for the default queue name of the RPC endpoint.
96
     */
97
    public function __construct(?array $connectionOptions = [], ?string $queueName = null)
98
    {
99
        $this->connectionOptions = Parameters::patch($connectionOptions, 'RPC_CONNECTION_OPTIONS');
100
        $this->queueName = empty($queueName) ? Parameters::RPC_QUEUE_NAME : $queueName;
101
    }
102
103
    /**
104
     * Closes the connection with RabbitMQ server before destroying the object.
105
     */
106
    public function __destruct()
107
    {
108
        $this->disconnect();
109
    }
110
111
112
    /**
113
     * Opens a connection with RabbitMQ server.
114
     * @param array|null $connectionOptions [optional] The overrides for the default connection options of the RPC endpoint.
115
     * @return self
116
     * @throws RPCEndpointException If the endpoint is already connected.
117
     */
118
    public function connect(?array $connectionOptions = [])
119
    {
120
        $this->connectionOptions = Parameters::patchWith(
121
            $connectionOptions ?? [],
122
            $this->connectionOptions
123
        );
124
125
        if ($this->isConnected()) {
126
            throw new RPCEndpointException('Endpoint is already connected!');
127
        }
128
129
        $parameters = array_values($this->connectionOptions);
130
131
        $this->connection = new AMQPStreamConnection(...$parameters);
132
        $this->trigger('connection.after.open', [$this->connection]);
133
134
        $this->channel = $this->connection->channel();
135
        $this->trigger('channel.after.open', [$this->channel]);
136
137
        return $this;
138
    }
139
140
    /**
141
     * Closes the connection with RabbitMQ server.
142
     * @return void
143
     */
144
    public function disconnect(): void
145
    {
146
        if ($this->isConnected()) {
147
            $this->connected = null;
148
149
            $this->trigger('channel.before.close', [$this->channel]);
150
            $this->channel->close();
151
152
            $this->trigger('connection.before.close', [$this->connection]);
153
            $this->connection->close();
154
        }
155
    }
156
157
    /**
158
     * Returns wether the endpoint is connected or not.
159
     * @return bool
160
     */
161
    public function isConnected(): bool
162
    {
163
        $this->connected = (
164
            isset($this->connection) &&
165
            isset($this->channel) &&
166
            $this->connection->isConnected() &&
167
            $this->channel->is_open()
168
        );
169
170
        return $this->connected;
171
    }
172
173
    /**
174
     * Returns the connection used by the endpoint.
175
     * @return bool
176
     */
177
    public function getConnection(): AMQPStreamConnection
178
    {
179
        return $this->connection;
180
    }
181
182
    /**
183
     * The time needed for the round-trip to RabbitMQ server in milliseconds.
184
     * Note that if the endpoint is not connected yet, this method will establish a new connection only for checking.
185
     * @return float A two decimal points rounded float.
186
     */
187
    final public function ping(): float
188
    {
189
        try {
190
            $pingConnection = $this->connection;
191
            if (!isset($pingConnection) || !$pingConnection->isConnected()) {
192
                $parameters = array_values($this->connectionOptions);
193
                $pingConnection = new AMQPStreamConnection(...$parameters);
194
            }
195
            $pingChannel = $pingConnection->channel();
196
197
            [$pingQueue] = $pingChannel->queue_declare(
198
                null,
199
                false,
200
                false,
201
                true,
202
                true
203
            );
204
205
            $pingChannel->basic_qos(
206
                null,
207
                1,
208
                null
209
            );
210
211
            $pingEcho = null;
212
213
            $pingChannel->basic_consume(
214
                $pingQueue,
215
                null,
216
                false,
217
                false,
218
                false,
219
                false,
220
                function ($message) use (&$pingEcho) {
221
                    $message->ack();
222
                    $pingEcho = $message->body;
223
                }
224
            );
225
226
            $pingStartTime = microtime(true);
227
228
            $pingChannel->basic_publish(
229
                new AMQPMessage(__FUNCTION__),
230
                null,
231
                $pingQueue
232
            );
233
234
            while (!$pingEcho) {
235
                $pingChannel->wait();
236
            }
237
238
            $pingEndTime = microtime(true);
239
240
            $pingChannel->queue_delete($pingQueue);
241
242
            if ($pingConnection === $this->connection) {
243
                $pingChannel->close();
244
            } else {
245
                $pingChannel->close();
246
                $pingConnection->close();
247
            }
248
249
            return round(($pingEndTime - $pingStartTime) * 1000, 2);
250
        } catch (Exception $error) {
251
            RPCEndpointException::rethrow($error);
252
        }
253
    }
254
255
    /**
256
     * Hooking method based on events to manipulate the request/response during the endpoint/message life cycle.
257
     * Check out `self::$events` via `self::getEvents()` after processing at least one request/response to see all available events.
258
     *
259
     * The parameters will be passed to the callback as follows:
260
     *      1. `$listenedOnObject` (first segment of event name e.g. `'connection.after.open'` will be `$connection`),
261
     *      2. `$calledOnObject` (the object this method was called on e.g. `$endpoint`),
262
     *      3. `$eventName` (the event was listened on e.g. `'connection.after.open'`).
263
     * ```
264
     * $endpoint->on('connection.after.open', function ($connection, $endpoint, $event) {
265
     *      ...
266
     * });
267
     * ```
268
     * @param string $event The event to listen on.
269
     * @param callable $callback The callback to execute.
270
     * @return self
271
     */
272
    final public function on(string $event, callable $callback)
273
    {
274
        $this->bind($event, function (...$arguments) use ($event, $callback) {
275
            call_user_func_array(
276
                $callback,
277
                array_merge(
278
                    $arguments,
279
                    [$this, $event]
280
                )
281
            );
282
        });
283
284
        return $this;
285
    }
286
287
    /**
288
     * Hook method to manipulate the message (request/response) when extending the class.
289
     * @return string
290
     */
291
    abstract protected function callback(AMQPMessage $message): string;
292
}
293