ClientEndpoint   A
last analyzed

Complexity

Total Complexity 11

Size/Duplication

Total Lines 132
Duplicated Lines 0 %

Test Coverage

Coverage 100%

Importance

Changes 3
Bugs 0 Features 0
Metric Value
wmc 11
eloc 57
c 3
b 0
f 0
dl 0
loc 132
ccs 61
cts 61
cp 1
rs 10

5 Methods

Rating   Name   Duplication   Size   Complexity  
A request() 0 40 5
A connect() 0 27 2
A call() 0 3 1
A callback() 0 3 1
A onResponse() 0 15 2
1
<?php
2
3
/**
4
 * @author Marwan Al-Soltany <[email protected]>
5
 * @copyright Marwan Al-Soltany 2020
6
 * For the full copyright and license information, please view
7
 * the LICENSE file that was distributed with this source code.
8
 */
9
10
declare(strict_types=1);
11
12
namespace MAKS\AmqpAgent\RPC;
13
14
use PhpAmqpLib\Message\AMQPMessage;
15
use MAKS\AmqpAgent\Helper\ClassProxy;
16
use MAKS\AmqpAgent\Helper\IDGenerator;
17
use MAKS\AmqpAgent\RPC\AbstractEndpoint;
18
use MAKS\AmqpAgent\RPC\ClientEndpointInterface;
19
use MAKS\AmqpAgent\Exception\RPCEndpointException;
20
21
/**
22
 * A class specialized in requesting. Implementing only the methods needed for a client.
23
 *
24
 * Example:
25
 * ```
26
 * $clientEndpoint = new ClientEndpoint();
27
 * $clientEndpoint->on('some.event', function () { ... });
28
 * $clientEndpoint->connect();
29
 * $clientEndpoint->request('Message Body', 'queue.name');
30
 * $clientEndpoint->disconnect();
31
 * ```
32
 *
33
 * @since 2.0.0
34
 * @api
35
 */
36
class ClientEndpoint extends AbstractEndpoint implements ClientEndpointInterface
37
{
38
    /**
39
     * Opens a connection with RabbitMQ server.
40
     * @param array|null $connectionOptions
41
     * @return self
42
     * @throws RPCEndpointException
43
     */
44 3
    public function connect(?array $connectionOptions = [])
45
    {
46 3
        parent::connect($connectionOptions);
47
48 3
        if ($this->isConnected()) {
49 3
            list($this->responseQueue, , ) = $this->channel->queue_declare(
50 3
                null,
51 3
                false,
52 3
                false,
53 3
                true,
54 3
                false
55
            );
56
57 3
            $this->channel->basic_consume(
58 3
                $this->responseQueue,
59 3
                null,
60 3
                false,
61 3
                false,
62 3
                false,
63 3
                false,
64
                function ($message) {
65 2
                    ClassProxy::call($this, 'onResponse', $message);
66 3
                }
67
            );
68
        }
69
70 3
        return $this;
71
    }
72
73
    /**
74
     * Sends the passed request to the server using the passed queue.
75
     * @param string|AMQPMessage $request The request body or an `AMQPMessage` instance.
76
     * @param string|null $queueName [optional] The name of queue to send through.
77
     * @return string The response body.
78
     * @throws RPCEndpointException If the client is not connected yet or if request Correlation ID does not match the one of the response.
79
     */
80 3
    public function request($request, ?string $queueName = null): string
81
    {
82 3
        if (!$this->isConnected()) {
83 1
            throw new RPCEndpointException('Client is not connected yet!');
84
        }
85
86 2
        $this->queueName = $queueName ?? $this->queueName;
87 2
        $this->requestBody = $request instanceof AMQPMessage ? $request->body : (string)$request;
88 2
        $this->responseBody = null;
89 2
        $this->requestQueue = $this->queueName;
90 2
        $this->correlationId = IDGenerator::generateHash();
91
92 2
        $message = $request instanceof AMQPMessage ? $request : new AMQPMessage((string)$request);
93 2
        $message->set('reply_to', $this->responseQueue);
94 2
        $message->set('correlation_id', $this->correlationId);
95 2
        $message->set('timestamp', time());
96
97 2
        $this->channel->queue_declare(
98 2
            $this->requestQueue,
99 2
            false,
100 2
            false,
101 2
            false,
102 2
            false
103
        );
104
105 2
        $this->trigger('request.before.send', [$message]);
106
107 2
        $this->channel->basic_publish(
108 2
            $message,
109 2
            null,
110 2
            $this->requestQueue
111
        );
112
113 2
        $this->trigger('request.after.send', [$message]);
114
115 2
        while ($this->responseBody === null) {
116 2
            $this->channel->wait();
117
        }
118
119 1
        return $this->responseBody;
120
    }
121
122
    /**
123
     * Sends the passed request to the server using the passed queue.
124
     * Alias for `self::request()`.
125
     * @param string|AMQPMessage $request The request body or an `AMQPMessage` instance.
126
     * @param string|null $queueName [optional] The name of queue to send through.
127
     * @return string The response body.
128
     * @throws RPCEndpointException If the client is not connected yet or if request Correlation ID does not match the one of the response.
129
     */
130 1
    public function call($request, ?string $queueName = null): string
131
    {
132 1
        return $this->request($request, $queueName);
133
    }
134
135
    /**
136
     * Validates the response.
137
     * @param AMQPMessage $response
138
     * @return void
139
     * @throws RPCEndpointException
140
     */
141 2
    protected function onResponse(AMQPMessage $response): void
142
    {
143 2
        $this->trigger('response.on.get', [$response]);
144
145 2
        if ($this->correlationId === $response->get('correlation_id')) {
146 1
            $this->responseBody = $this->callback($response);
147 1
            $response->ack();
148 1
            return;
149
        }
150
151 1
        throw new RPCEndpointException(
152 1
            sprintf(
153 1
                'Correlation ID of the response "%s" does not match the one of the request "%s"!',
154 1
                $this->correlationId,
155 1
                (string)$response->get('correlation_id')
156
            )
157
        );
158
    }
159
160
    /**
161
     * Returns the final response body.
162
     * @param AMQPMessage $message
163
     * @return string
164
     */
165 1
    protected function callback(AMQPMessage $message): string
166
    {
167 1
        return $message->body;
168
    }
169
}
170