ServerEndpoint::callback()   A
last analyzed

Complexity

Conditions 1
Paths 1

Size

Total Lines 3
Code Lines 1

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 2
CRAP Score 1

Importance

Changes 1
Bugs 0 Features 0
Metric Value
eloc 1
c 1
b 0
f 0
dl 0
loc 3
ccs 2
cts 2
cp 1
rs 10
cc 1
nc 1
nop 1
crap 1
1
<?php
2
3
/**
4
 * @author Marwan Al-Soltany <[email protected]>
5
 * @copyright Marwan Al-Soltany 2020
6
 * For the full copyright and license information, please view
7
 * the LICENSE file that was distributed with this source code.
8
 */
9
10
declare(strict_types=1);
11
12
namespace MAKS\AmqpAgent\RPC;
13
14
use PhpAmqpLib\Message\AMQPMessage;
15
use MAKS\AmqpAgent\Helper\ClassProxy;
16
use MAKS\AmqpAgent\RPC\AbstractEndpoint;
17
use MAKS\AmqpAgent\RPC\ServerEndpointInterface;
18
use MAKS\AmqpAgent\Exception\RPCEndpointException;
19
20
/**
21
 * A class specialized in responding. Implementing only the methods needed for a server.
22
 *
23
 * Example:
24
 * ```
25
 * $serverEndpoint = new ServerEndpoint();
26
 * $serverEndpoint->on('some.event', function () { ... });
27
 * $serverEndpoint->connect();
28
 * $serverEndpoint->respond('Namespace\SomeClass::someMethod', 'queue.name');
29
 * $serverEndpoint->disconnect();
30
 * ```
31
 *
32
 * @since 2.0.0
33
 * @api
34
 */
35
class ServerEndpoint extends AbstractEndpoint implements ServerEndpointInterface
36
{
37
    /**
38
     * The callback to use when processing the requests.
39
     * @var callable
40
     */
41
    protected $callback;
42
43
44
    /**
45
     * Listens on requests coming via the passed queue and processes them with the passed callback.
46
     * @param callable|null $callback [optional] The callback to process the request. This callback will be passed an `AMQPMessage` and must return a string.
47
     * @param string|null $queueName [optional] The name of the queue to listen on.
48
     * @return string The last processed request.
49
     * @throws RPCEndpointException If the server is not connected yet or if the passed callback didn't return a string.
50
     */
51 3
    public function respond(?callable $callback = null, ?string $queueName = null): string
52
    {
53 3
        $this->callback = $callback ?? [$this, 'callback'];
54 3
        $this->queueName = $queueName ?? $this->queueName;
55
56 3
        if ($this->isConnected()) {
57 2
            $this->requestQueue = $this->queueName;
58
59 2
            $this->channel->queue_declare(
60 2
                $this->requestQueue,
61 2
                false,
62 2
                false,
63 2
                false,
64 2
                false
65
            );
66
67 2
            $this->channel->basic_qos(
68 2
                null,
69 2
                1,
70 2
                null
71
            );
72
73 2
            $this->channel->basic_consume(
74 2
                $this->requestQueue,
75 2
                null,
76 2
                false,
77 2
                false,
78 2
                false,
79 2
                false,
80
                function ($message) {
81 2
                    ClassProxy::call($this, 'onRequest', $message);
82 2
                }
83
            );
84
85 2
            while ($this->channel->is_consuming()) {
86 2
                $this->channel->wait();
87
            }
88
89 1
            return $this->requestBody;
90
        }
91
92 1
        throw new RPCEndpointException('Server is not connected yet!');
93
    }
94
95
    /**
96
     * Listens on requests coming via the passed queue and processes them with the passed callback.
97
     * Alias for `self::respond()`.
98
     * @param callable|null $callback [optional] The callback to process the request. This callback will be passed an `AMQPMessage` and must return a string.
99
     * @param string|null $queueName [optional] The queue to listen on.
100
     * @return string The last processed request.
101
     * @throws RPCEndpointException If the server is not connected yet or if the passed callback didn't return a string.
102
     */
103 1
    public function serve(?callable $callback = null, ?string $queueName = null): string
104
    {
105 1
        return $this->respond($callback, $queueName);
106
    }
107
108
    /**
109
     * Replies to the client.
110
     * @param AMQPMessage $request
111
     * @return void
112
     * @throws RPCEndpointException
113
     */
114 2
    protected function onRequest(AMQPMessage $request): void
115
    {
116 2
        $this->trigger('request.on.get', [$request]);
117
118 2
        $this->requestBody = $request->body;
119 2
        $this->responseBody = call_user_func($this->callback, $request);
120 2
        $this->responseQueue = (string)$request->get('reply_to');
121 2
        $this->correlationId = (string)$request->get('correlation_id');
122
123 2
        if (!is_string($this->responseBody)) {
124 1
            throw new RPCEndpointException(
125 1
                sprintf(
126 1
                    'The passed processing callback must return a string, instead it returned (data-type: %s)!',
127 1
                    gettype($this->responseBody)
128
                )
129
            );
130
        }
131
132 1
        $message = new AMQPMessage($this->responseBody);
133 1
        $message->set('correlation_id', $this->correlationId);
134 1
        $message->set('timestamp', time());
135
136 1
        $this->trigger('response.before.send', [$message]);
137
138 1
        $request->getChannel()->basic_publish(
139 1
            $message,
140 1
            null,
141 1
            $this->responseQueue
142
        );
143
144 1
        $request->ack();
145
146 1
        $this->trigger('response.after.send', [$message]);
147 1
    }
148
149
    /**
150
     * Returns the final request body. This method will be ignored if a callback in `self::respond()` is specified.
151
     * @param AMQPMessage $message
152
     * @return string
153
     */
154 1
    protected function callback(AMQPMessage $message): string
155
    {
156 1
        return $message->body;
157
    }
158
}
159