Passed
Push — dev ( f5cf87...4396b8 )
by Marwan
14:15
created

AbstractEndpoint   A

Complexity

Total Complexity 19

Size/Duplication

Total Lines 266
Duplicated Lines 0 %

Importance

Changes 1
Bugs 0 Features 0
Metric Value
wmc 19
eloc 90
c 1
b 0
f 0
dl 0
loc 266
rs 10

8 Methods

Rating   Name   Duplication   Size   Complexity  
B ping() 0 65 6
A disconnect() 0 10 2
A connect() 0 20 2
A on() 0 13 1
A getConnection() 0 3 1
A isConnected() 0 10 4
A __construct() 0 4 2
A __destruct() 0 3 1
1
<?php
2
/**
3
 * @author Marwan Al-Soltany <[email protected]>
4
 * @copyright Marwan Al-Soltany 2020
5
 * For the full copyright and license information, please view
6
 * the LICENSE file that was distributed with this source code.
7
 */
8
9
namespace MAKS\AmqpAgent\RPC;
10
11
use Exception;
12
use PhpAmqpLib\Connection\AMQPStreamConnection;
13
use PhpAmqpLib\Channel\AMQPChannel;
14
use PhpAmqpLib\Message\AMQPMessage;
15
use MAKS\AmqpAgent\RPC\AbstractEndpointInterface;
16
use MAKS\AmqpAgent\Helper\EventTrait;
17
use MAKS\AmqpAgent\Exception\MagicMethodsExceptionsTrait;
18
use MAKS\AmqpAgent\Exception\RPCEndpointException;
19
use MAKS\AmqpAgent\Config\RPCEndpointParameters as Parameters;
20
21
/**
22
 * An abstract class implementing the basic functionality of an endpoint.
23
 * @since 2.0.0
24
 * @api
25
 */
26
abstract class AbstractEndpoint implements AbstractEndpointInterface
27
{
28
    use MagicMethodsExceptionsTrait;
29
    use EventTrait;
30
31
    /**
32
     * The connection options of the RPC endpoint.
33
     * @var array
34
     */
35
    protected $connectionOptions;
36
37
    /**
38
     * The queue name of the RPC endpoint.
39
     * @var array
40
     */
41
    protected $queueName;
42
43
    /**
44
     * Wether the endpoint is connected to RabbitMQ server or not.
45
     * @var bool
46
     */
47
    protected $connected;
48
49
    /**
50
     * The endpoint connection.
51
     * @var AMQPStreamConnection
52
     */
53
    protected $connection;
54
55
    /**
56
     * The endpoint channel.
57
     * @var AMQPChannel
58
     */
59
    protected $channel;
60
61
    /**
62
     * The request body.
63
     * @var string
64
     */
65
    protected $request;
66
67
    /**
68
     * Requests conveyor.
69
     * @var string
70
     */
71
    protected $requestQueue;
72
73
    /**
74
     * The response body.
75
     * @var string
76
     */
77
    protected $response;
78
79
    /**
80
     * Responses conveyor.
81
     * @var string
82
     */
83
    protected $responseQueue;
84
85
    /**
86
     * Correlation ID of the last request/response.
87
     * @var string
88
     */
89
    protected $correlationId;
90
91
92
    /**
93
     * Class constructor.
94
     * @param array $connectionOptions [optional] The overrides for the default connection options of the RPC endpoint.
95
     * @param string $queueName [optional] The override for the default queue name of the RPC endpoint.
96
     */
97
    public function __construct(?array $connectionOptions = [], ?string $queueName = null)
98
    {
99
        $this->connectionOptions = Parameters::patch($connectionOptions, 'RPC_CONNECTION_OPTIONS');
100
        $this->queueName = empty($queueName) ? Parameters::RPC_QUEUE_NAME : $queueName;
0 ignored issues
show
Documentation Bug introduced by
It seems like empty($queueName) ? MAKS...QUEUE_NAME : $queueName can also be of type string. However, the property $queueName is declared as type array. Maybe add an additional type check?

Our type inference engine has found a suspicous assignment of a value to a property. This check raises an issue when a value that can be of a mixed type is assigned to a property that is type hinted more strictly.

For example, imagine you have a variable $accountId that can either hold an Id object or false (if there is no account id yet). Your code now assigns that value to the id property of an instance of the Account class. This class holds a proper account, so the id value must no longer be false.

Either this assignment is in error or a type check should be added for that assignment.

class Id
{
    public $id;

    public function __construct($id)
    {
        $this->id = $id;
    }

}

class Account
{
    /** @var  Id $id */
    public $id;
}

$account_id = false;

if (starsAreRight()) {
    $account_id = new Id(42);
}

$account = new Account();
if ($account instanceof Id)
{
    $account->id = $account_id;
}
Loading history...
101
    }
102
103
    /**
104
     * Closes the connection with RabbitMQ server before destroying the object.
105
     */
106
    public function __destruct()
107
    {
108
        $this->disconnect();
109
    }
110
111
112
    /**
113
     * Opens a connection with RabbitMQ server.
114
     * @param array|null $connectionOptions [optional] The overrides for the default connection options of the RPC endpoint.
115
     * @return self
116
     * @throws RPCEndpointException If the endpoint is already connected.
117
     */
118
    public function connect(?array $connectionOptions = []): self
119
    {
120
        $this->connectionOptions = Parameters::patchWith(
121
            $connectionOptions ?? [],
122
            $this->connectionOptions
123
        );
124
125
        if ($this->isConnected()) {
126
            throw new RPCEndpointException('Endpoint is already connected!');
127
        }
128
129
        $parameters = array_values($this->connectionOptions);
130
131
        $this->connection = new AMQPStreamConnection(...$parameters);
132
        $this->trigger('connection.after.open', [$this->connection]);
133
134
        $this->channel = $this->connection->channel();
135
        $this->trigger('channel.after.open', [$this->channel]);
136
137
        return $this;
138
    }
139
140
    /**
141
     * Closes the connection with RabbitMQ server.
142
     * @return void
143
     */
144
    public function disconnect(): void
145
    {
146
        if ($this->isConnected()) {
147
            $this->connected = null;
148
149
            $this->trigger('channel.before.close', [$this->channel]);
150
            $this->channel->close();
151
152
            $this->trigger('connection.before.close', [$this->connection]);
153
            $this->connection->close();
154
        }
155
    }
156
157
    /**
158
     * Returns wether the endpoint is connected or not.
159
     * @return bool
160
     */
161
    public function isConnected(): bool
162
    {
163
        $this->connected = (
164
            isset($this->connection) &&
165
            isset($this->channel) &&
166
            $this->connection->isConnected() &&
167
            $this->channel->is_open()
168
        );
169
170
        return $this->connected;
171
    }
172
173
    /**
174
     * Returns the connection used by the endpoint.
175
     * @return bool
176
     */
177
    public function getConnection(): AMQPStreamConnection
178
    {
179
        return $this->connection;
180
    }
181
182
    /**
183
     * The time needed for the round-trip to RabbitMQ server in milliseconds.
184
     * Note that if the endpoint is not connected yet, this method will establish a new connection only for checking.
185
     * @return float A two decimal points rounded float.
186
     */
187
    final public function ping(): float
188
    {
189
        try {
190
            $pingConnection = $this->connection;
191
            if (!isset($pingConnection) || !$pingConnection->isConnected()) {
192
                $parameters = array_values($this->connectionOptions);
193
                $pingConnection = new AMQPStreamConnection(...$parameters);
194
            }
195
            $pingChannel = $pingConnection->channel();
196
197
            [$pingQueue] = $pingChannel->queue_declare(
198
                null,
199
                false,
200
                false,
201
                true,
202
                true
203
            );
204
205
            $pingChannel->basic_qos(
206
                null,
207
                1,
208
                null
209
            );
210
211
            $pingEcho = null;
212
213
            $pingChannel->basic_consume(
214
                $pingQueue,
215
                null,
216
                false,
217
                false,
218
                false,
219
                false,
220
                function ($message) use (&$pingEcho) {
221
                    $message->ack();
222
                    $pingEcho = $message->body;
223
                }
224
            );
225
226
            $pingStartTime = microtime(true);
227
228
            $pingChannel->basic_publish(
229
                new AMQPMessage(__FUNCTION__),
230
                null,
231
                $pingQueue
232
            );
233
234
            while (!$pingEcho) {
235
                $pingChannel->wait();
236
            }
237
238
            $pingEndTime = microtime(true);
239
240
            $pingChannel->queue_delete($pingQueue);
241
242
            if ($pingConnection === $this->connection) {
243
                $pingChannel->close();
244
            } else {
245
                $pingChannel->close();
246
                $pingConnection->close();
247
            }
248
249
            return round(($pingEndTime - $pingStartTime) * 1000, 2);
250
        } catch (Exception $error) {
251
            RPCEndpointException::rethrow($error);
252
        }
253
    }
254
255
    /**
256
     * Hooking method based on events to manipulate the request/response during the endpoint/message life cycle.
257
     * Check out `self::$events` via `self::getEvents()` after processing at least one request/response to see all available events.
258
     *
259
     * The parameters will be passed to the callback as follows:
260
     *      1. `$listenedOnObject` (first segment of event name e.g. `'connection.after.open'` will be `$connection`),
261
     *      2. `$calledOnObject` (the object this method was called on e.g. `$endpoint`),
262
     *      3. `$eventName` (the event was listened on e.g. `'connection.after.open'`).
263
     * ```
264
     * $endpoint->on('connection.after.open', function ($connection, $endpoint, $event) {
265
     *      ...
266
     * });
267
     * ```
268
     * @param string $event The event to listen on.
269
     * @param callable $callback The callback to execute.
270
     * @return self
271
     */
272
    final public function on(string $event, callable $callback): self
273
    {
274
        $this->bind($event, function (...$arguments) use ($event, $callback) {
275
            call_user_func_array(
276
                $callback,
277
                array_merge(
278
                    $arguments,
279
                    [$this, $event]
280
                )
281
            );
282
        });
283
284
        return $this;
285
    }
286
287
    /**
288
     * Hook method to manipulate the message (request/response) when extending the class.
289
     * @return string
290
     */
291
    abstract protected function callback(AMQPMessage $message): string;
292
}
293