Completed
Push — master ( 371f7d...1c3c02 )
by Daniel
03:19
created

RpcClient::getMessage()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 4
Code Lines 2

Duplication

Lines 0
Ratio 0 %

Importance

Changes 1
Bugs 0 Features 0
Metric Value
c 1
b 0
f 0
dl 0
loc 4
rs 10
cc 1
eloc 2
nc 1
nop 0
1
<?php
2
3
namespace Cmobi\RabbitmqBundle\Rpc;
4
5
use Cmobi\RabbitmqBundle\ConnectionManagerInterface;
6
use Cmobi\RabbitmqBundle\Rpc\Exception\RpcInvalidRequestException;
7
use Cmobi\RabbitmqBundle\Rpc\Exception\RpcInvalidResponseException;
8
use Cmobi\RabbitmqBundle\Rpc\Request\RpcRequest;
9
use Cmobi\RabbitmqBundle\Rpc\Request\RpcRequestCollection;
10
use Cmobi\RabbitmqBundle\Rpc\Response\RpcResponse;
11
use Cmobi\RabbitmqBundle\Rpc\Response\RpcResponseCollection;
12
use PhpAmqpLib\Channel\AbstractChannel;
13
use PhpAmqpLib\Connection\AMQPStreamConnection;
14
use PhpAmqpLib\Message\AMQPMessage;
15
16
abstract class RpcClient
17
{
18
    private $queue;
19
    private $connection;
20
    private $channel;
21
    private $callbackQueue;
22
    private $response;
23
    private $requestCollection;
24
    private $correlationId;
25
26
    public function __construct($queueName, ConnectionManagerInterface $manager)
27
    {
28
        $this->queue = $queueName;
29
        $this->connection = $manager->getConnection();
30
        $this->channel = $this->connection->channel();
31
        $this->requestCollection = new RpcRequestCollection();
32
    }
33
34
    /**
35
     * @param AMQPMessage $rep
36
     */
37
    public function onResponse(AMQPMessage $rep)
38
    {
39
        if($rep->get('correlation_id') == $this->correlationId) {
40
            $this->response = $rep->body;
41
        }
42
    }
43
44
    public function refreshChannel()
45
    {
46
        $connection = $this->getConnection();
47
        $this->channel = $connection->channel();
48
    }
49
50
    /**
51
     * @return RpcResponseCollection
52
     * @throws RpcInvalidRequestException
53
     * @throws RpcInvalidResponseException
54
     */
55
    public function call()
56
    {
57
        if (
58
            !$this->requestCollection instanceof RpcRequestCollection
59
            || $this->requestCollection->count() < 1
60
        ) {
61
            throw new RpcInvalidRequestException();
62
        }
63
        $requestId = uniqid($this->queue);
64
        $this->correlationId = $requestId;
65
        $requests = [];
66
67
        /**
68
         * @var RpcRequest $request
69
         */
70
        foreach ($this->requestCollection as $request) {
71
72
            if (is_null($request->id)) {
73
                $request->id = $requestId;
74
            }
75
            $requests[] = $request->toArray();
76
        }
77
78
        try {
79
            $body = json_encode($requests);
80
        } catch (\Exception $e) {
81
            throw new RpcInvalidRequestException($e);
82
        }
83
        /* Send to Message Broker */
84
        $this->handleRequest($body);
85
       $rpcResponse = $this->buildRpcResponseCollection();
86
87
        return $rpcResponse;
88
    }
89
90
    /**
91
     * @param RpcRequest $request
92
     */
93
    public function addRequest(RpcRequest $request)
94
    {
95
        $this->requestCollection->add($request);
96
    }
97
98
    /**
99
     * @param RpcRequest $request
100
     */
101
    public function removeRequest(RpcRequest $request)
102
    {
103
        $id = $this->requestCollection->getRequestIndex($request);
104
105
        if (!$id) {
106
            $this->requestCollection->remove($id);
107
        }
108
    }
109
110
    /**
111
     * @param RpcRequestCollection $requests
112
     */
113
    public function addRequestCollection(RpcRequestCollection $requests)
114
    {
115
        $this->requestCollection = $requests;
116
    }
117
118
    /**
119
     * @return RpcRequestCollection
120
     */
121
    public function getRequestCollection()
122
    {
123
        return $this->requestCollection;
124
    }
125
126
    /**
127
     * @param AbstractChannel $channel
128
     */
129
    public function setChannel(AbstractChannel $channel)
130
    {
131
        $this->channel = $channel;
132
    }
133
134
    /**
135
     * @return \PhpAmqpLib\Channel\AMQPChannel
136
     */
137
    public function getChannel()
138
    {
139
        return $this->channel;
140
    }
141
142
    /**
143
     * @return string
144
     */
145
    public function getQueueName()
146
    {
147
        return $this->queue;
148
    }
149
150
    /**
151
     * @return AMQPStreamConnection
152
     */
153
    public function getConnection()
154
    {
155
        return $this->connection;
156
    }
157
158
    /**
159
     * @param \PhpAmqpLib\Connection\AMQPStreamConnection $connection
160
     */
161
    public function setConnection(AMQPStreamConnection $connection)
162
    {
163
        $this->connection = $connection;
164
    }
165
166
    /**
167
     * @param $body
168
     */
169
    private function handleRequest($body)
170
    {
171
        list($callbackQueue, ,) = $this->getChannel()->queue_declare(
172
            '', false, false, false, true
173
        );
174
        $this->callbackQueue = $callbackQueue;
175
        $this->getChannel()->basic_consume(
176
            $this->callbackQueue, '', false, false, false, false,
177
            [$this, 'onResponse']
178
        );
179
180
        $msg = new AMQPMessage(
181
            (string)$body,
182
            [
183
                'correlation_id' => $this->correlationId,
184
                'reply_to' => $this->callbackQueue
185
            ]
186
        );
187
        $this->getChannel()->basic_publish($msg, '', $this->getQueueName());
188
189
        while(!$this->response) {
190
            $this->getChannel()->wait();
191
        }
192
        $this->getChannel()->close();
193
        $this->getConnection()->close();
194
    }
195
196
    /**
197
     * @return RpcResponseCollection
198
     * @throws RpcInvalidResponseException
199
     */
200
    private function buildRpcResponseCollection()
201
    {
202
        $responses = [];
203
        try {
204
            $responses = json_decode($this->response, true);
205
        } catch (\Exception $e) {
206
            throw new RpcInvalidResponseException($e);
207
        }
208
        $rpcResponse = new RpcResponseCollection();
209
210
        foreach ($responses as $responseArr) {
211
            $response = new RpcResponse();
212
            $response->fromArray($responseArr);
213
            $rpcResponse->add($response);
214
        }
215
216
        return $rpcResponse;
217
    }
218
}