Completed
Push — master ( 59dd4d...b5d47e )
by Marwan
15s queued 11s
created

ServerEndpoint::onRequest()   A

Complexity

Conditions 2
Paths 2

Size

Total Lines 33
Code Lines 20

Duplication

Lines 0
Ratio 0 %

Importance

Changes 2
Bugs 0 Features 0
Metric Value
eloc 20
c 2
b 0
f 0
dl 0
loc 33
rs 9.6
cc 2
nc 2
nop 1
1
<?php
2
/**
3
 * @author Marwan Al-Soltany <[email protected]>
4
 * @copyright Marwan Al-Soltany 2020
5
 * For the full copyright and license information, please view
6
 * the LICENSE file that was distributed with this source code.
7
 */
8
9
namespace MAKS\AmqpAgent\RPC;
10
11
use PhpAmqpLib\Message\AMQPMessage;
12
use MAKS\AmqpAgent\Helper\ClassProxy;
13
use MAKS\AmqpAgent\RPC\AbstractEndpoint;
14
use MAKS\AmqpAgent\RPC\ServerEndpointInterface;
15
use MAKS\AmqpAgent\Exception\RPCEndpointException;
16
17
/**
18
 * A class specialized in responding. Implementing only the methods needed for a server.
19
 *
20
 * Example:
21
 * ```
22
 * $serverEndpoint = new ServerEndpoint();
23
 * $serverEndpoint->on('some.event', function () { ... });
24
 * $serverEndpoint->connect();
25
 * $serverEndpoint->respond('Namespace\SomeClass::someMethod', 'queue.name');
26
 * $serverEndpoint->disconnect();
27
 * ```
28
 *
29
 * @since 2.0.0
30
 * @api
31
 */
32
class ServerEndpoint extends AbstractEndpoint implements ServerEndpointInterface
33
{
34
    /**
35
     * The callback to use when processing the requests.
36
     * @var callable
37
     */
38
    protected $callback;
39
40
41
    /**
42
     * Listens on requests coming via the passed queue and processes them with the passed callback.
43
     * @param callable|null $callback [optional] The callback to process the request. This callback will be passed an `AMQPMessage` and must return a string.
44
     * @param string|null $queueName [optional] The name of the queue to listen on.
45
     * @return string The last processed request.
46
     * @throws RPCEndpointException If the server is not connected yet or if the passed callback didn't return a string.
47
     */
48
    public function respond(?callable $callback = null, ?string $queueName = null): string
49
    {
50
        $this->callback = $callback ?? [$this, 'callback'];
51
        $this->queueName = $queueName ?? $this->queueName;
52
53
        if ($this->isConnected()) {
54
            $this->requestQueue = $this->queueName;
55
56
            $this->channel->queue_declare(
57
                $this->requestQueue,
58
                false,
59
                false,
60
                false,
61
                false
62
            );
63
64
            $this->channel->basic_qos(
65
                null,
66
                1,
67
                null
68
            );
69
70
            $this->channel->basic_consume(
71
                $this->requestQueue,
72
                null,
73
                false,
74
                false,
75
                false,
76
                false,
77
                function ($message) {
78
                    ClassProxy::call($this, 'onRequest', $message);
79
                }
80
            );
81
82
            while ($this->channel->is_consuming()) {
83
                $this->channel->wait();
84
            }
85
86
            return $this->requestBody;
87
        }
88
89
        throw new RPCEndpointException('Server is not connected yet!');
90
    }
91
92
    /**
93
     * Listens on requests coming via the passed queue and processes them with the passed callback.
94
     * Alias for `self::respond()`.
95
     * @param callable|null $callback [optional] The callback to process the request. This callback will be passed an `AMQPMessage` and must return a string.
96
     * @param string|null $queueName [optional] The queue to listen on.
97
     * @return string The last processed request.
98
     * @throws RPCEndpointException If the server is not connected yet or if the passed callback didn't return a string.
99
     */
100
    public function serve(?callable $callback = null, ?string $queueName = null): string
101
    {
102
        return $this->respond($callback, $queueName);
103
    }
104
105
    /**
106
     * Replies to the client.
107
     * @param AMQPMessage $request
108
     * @return void
109
     */
110
    protected function onRequest(AMQPMessage $request): void
111
    {
112
        $this->trigger('request.on.get', [$request]);
113
114
        $this->requestBody = $request->body;
115
        $this->responseBody = call_user_func($this->callback, $request);
116
        $this->responseQueue = (string)$request->get('reply_to');
117
        $this->correlationId = (string)$request->get('correlation_id');
118
119
        if (!is_string($this->responseBody)) {
120
            throw new RPCEndpointException(
121
                sprintf(
122
                    'The passed processing callback must return a string, instead it returned (data-type: %s)!',
123
                    gettype($this->responseBody)
124
                )
125
            );
126
        }
127
128
        $message = new AMQPMessage($this->responseBody);
129
        $message->set('correlation_id', $this->correlationId);
130
        $message->set('timestamp', time());
131
132
        $this->trigger('response.before.send', [$message]);
133
134
        $request->getChannel()->basic_publish(
135
            $message,
136
            null,
137
            $this->responseQueue
138
        );
139
140
        $request->ack();
141
142
        $this->trigger('response.after.send', [$message]);
143
    }
144
145
    /**
146
     * Returns the final request body. This method will be ignored if a callback in `self::respond()` is specified.
147
     * @return string
148
     */
149
    protected function callback(AMQPMessage $message): string
150
    {
151
        return $message->body;
152
    }
153
}
154