Completed
Push — master ( 59dd4d...b5d47e )
by Marwan
15s queued 11s
created

ClientEndpoint::call()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 3
Code Lines 1

Duplication

Lines 0
Ratio 0 %

Importance

Changes 1
Bugs 0 Features 0
Metric Value
eloc 1
c 1
b 0
f 0
dl 0
loc 3
rs 10
cc 1
nc 1
nop 2
1
<?php
2
/**
3
 * @author Marwan Al-Soltany <[email protected]>
4
 * @copyright Marwan Al-Soltany 2020
5
 * For the full copyright and license information, please view
6
 * the LICENSE file that was distributed with this source code.
7
 */
8
9
namespace MAKS\AmqpAgent\RPC;
10
11
use PhpAmqpLib\Message\AMQPMessage;
12
use MAKS\AmqpAgent\Helper\ClassProxy;
13
use MAKS\AmqpAgent\Helper\IDGenerator;
14
use MAKS\AmqpAgent\RPC\AbstractEndpoint;
15
use MAKS\AmqpAgent\RPC\ClientEndpointInterface;
16
use MAKS\AmqpAgent\Exception\RPCEndpointException;
17
18
/**
19
 * A class specialized in requesting. Implementing only the methods needed for a client.
20
 *
21
 * Example:
22
 * ```
23
 * $clientEndpoint = new ClientEndpoint();
24
 * $clientEndpoint->on('some.event', function () { ... });
25
 * $clientEndpoint->connect();
26
 * $clientEndpoint->request('Message Body', 'queue.name');
27
 * $clientEndpoint->disconnect();
28
 * ```
29
 *
30
 * @since 2.0.0
31
 * @api
32
 */
33
class ClientEndpoint extends AbstractEndpoint implements ClientEndpointInterface
34
{
35
    /**
36
     * Opens a connection with RabbitMQ server.
37
     * @param array|null $connectionOptions
38
     * @return self
39
     */
40
    public function connect(?array $connectionOptions = [])
41
    {
42
        parent::connect($connectionOptions);
43
44
        if ($this->isConnected()) {
45
            list($this->responseQueue, , ) = $this->channel->queue_declare(
46
                null,
47
                false,
48
                false,
49
                true,
50
                false
51
            );
52
53
            $this->channel->basic_consume(
54
                $this->responseQueue,
55
                null,
56
                false,
57
                false,
58
                false,
59
                false,
60
                function ($message) {
61
                    ClassProxy::call($this, 'onResponse', $message);
62
                }
63
            );
64
        }
65
66
        return $this;
67
    }
68
69
    /**
70
     * Sends the passed request to the server using the passed queue.
71
     * @param string|AMQPMessage $request The request body or an `AMQPMessage` instance.
72
     * @param string|null $queueName [optional] The name of queue to send through.
73
     * @return string The response body.
74
     * @throws RPCEndpointException If the client is not connected yet or if request Correlation ID does not match the one of the response.
75
     */
76
    public function request($request, ?string $queueName = null): string
77
    {
78
        if (!$this->isConnected()) {
79
            throw new RPCEndpointException('Client is not connected yet!');
80
        }
81
82
        $this->queueName = $queueName ?? $this->queueName;
83
        $this->requestBody = $request instanceof AMQPMessage ? $request->body : (string)$request;
84
        $this->responseBody = null;
85
        $this->requestQueue = $this->queueName;
86
        $this->correlationId = IDGenerator::generateHash();
87
88
        $message = $request instanceof AMQPMessage ? $request : new AMQPMessage((string)$request);
89
        $message->set('reply_to', $this->responseQueue);
90
        $message->set('correlation_id', $this->correlationId);
91
        $message->set('timestamp', time());
92
93
        $this->channel->queue_declare(
94
            $this->requestQueue,
95
            false,
96
            false,
97
            false,
98
            false
99
        );
100
101
        $this->trigger('request.before.send', [$message]);
102
103
        $this->channel->basic_publish(
104
            $message,
105
            null,
106
            $this->requestQueue
107
        );
108
109
        $this->trigger('request.after.send', [$message]);
110
111
        while ($this->responseBody === null) {
112
            $this->channel->wait();
113
        }
114
115
        return $this->responseBody;
116
    }
117
118
    /**
119
     * Sends the passed request to the server using the passed queue.
120
     * Alias for `self::request()`.
121
     * @param string|AMQPMessage $request The request body or an `AMQPMessage` instance.
122
     * @param string|null $queueName [optional] The name of queue to send through.
123
     * @return string The response body.
124
     * @throws RPCEndpointException If the client is not connected yet or if request Correlation ID does not match the one of the response.
125
     */
126
    public function call($request, ?string $queueName = null): string
127
    {
128
        return $this->request($request, $queueName);
129
    }
130
131
    /**
132
     * Validates the response.
133
     * @param AMQPMessage $response
134
     * @return void
135
     */
136
    protected function onResponse(AMQPMessage $response): void
137
    {
138
        $this->trigger('response.on.get', [$response]);
139
140
        if ($this->correlationId === $response->get('correlation_id')) {
141
            $this->responseBody = $this->callback($response);
142
            $response->ack();
143
            return;
144
        }
145
146
        throw new RPCEndpointException(
147
            sprintf(
148
                'Correlation ID of the response "%s" does not match the one of the request "%s"!',
149
                $this->correlationId,
150
                (string)$response->get('correlation_id')
151
            )
152
        );
153
    }
154
155
    /**
156
     * Returns the final response body.
157
     * @return string
158
     */
159
    protected function callback(AMQPMessage $message): string
160
    {
161
        return $message->body;
162
    }
163
}
164