Completed
Push — master ( 06d1d5...fff99a )
by Daniel
02:51
created

RpcClient::refreshChannel()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 5
Code Lines 3

Duplication

Lines 0
Ratio 0 %

Importance

Changes 1
Bugs 0 Features 1
Metric Value
c 1
b 0
f 1
dl 0
loc 5
rs 9.4285
nc 1
cc 1
eloc 3
nop 0
1
<?php
2
3
namespace Cmobi\RabbitmqBundle\Rpc;
4
5
use Cmobi\RabbitmqBundle\ConnectionManagerInterface;
6
use PhpAmqpLib\Channel\AbstractChannel;
7
use PhpAmqpLib\Connection\AMQPStreamConnection;
8
use PhpAmqpLib\Message\AMQPMessage;
9
10
abstract class RpcClient
11
{
12
    private $queue;
13
    private $connection;
14
    private $body;
15
    private $channel;
16
    private $callbackQueue;
17
    private $response;
18
    private $correlationId;
19
20
    public function __construct($queueName, ConnectionManagerInterface $manager)
21
    {
22
        $this->body = '';
23
        $this->queue = $queueName;
24
        $this->connection = $manager->getConnection();
25
        $this->channel = $this->connection->channel();
26
    }
27
28
    /**
29
     * @param AMQPMessage $rep
30
     */
31
    public function onResponse(AMQPMessage $rep)
32
    {
33
        if($rep->get('correlation_id') == $this->correlationId) {
34
            $this->response = $rep->body;
35
        }
36
    }
37
38
    public function refreshChannel()
39
    {
40
        $connection = $this->getConnection();
41
        $this->channel = $connection->channel();
42
    }
43
44
    /**
45
     * @return null|string
46
     */
47
    public function call()
48
    {
49
        list($callbackQueue, ,) = $this->getChannel()->queue_declare(
50
            '', false, false, false, true
51
        );
52
        $this->callbackQueue = $callbackQueue;
53
        $this->getChannel()->basic_consume(
54
            $this->callbackQueue, '', false, false, false, false,
55
            [$this, 'onResponse']
56
        );
57
        $this->response = null;
58
        $this->correlationId = uniqid();
59
60
        $msg = new AMQPMessage(
61
            $this->getMessage(),
62
            [
63
                'correlation_id' => $this->correlationId,
64
                'reply_to' => $this->callbackQueue
65
            ]
66
        );
67
        $this->getChannel()->basic_publish($msg, '', $this->getQueueName());
68
69
        while(!$this->response) {
70
            $this->getChannel()->wait();
71
        }
72
        $this->getChannel()->close();
73
        $this->getConnection()->close();
74
75
        return $this->response;
76
    }
77
78
    /**
79
     * @param string $body
80
     */
81
    public function setMessage($body)
82
    {
83
        $this->body = (string)$body;
84
    }
85
86
    /**
87
     * @return string
88
     */
89
    public function getMessage()
90
    {
91
        return $this->body;
92
    }
93
94
    /**
95
     * @param AbstractChannel $channel
96
     */
97
    public function setChannel(AbstractChannel $channel)
98
    {
99
        $this->channel = $channel;
100
    }
101
102
    /**
103
     * @return \PhpAmqpLib\Channel\AMQPChannel
104
     */
105
    public function getChannel()
106
    {
107
        return $this->channel;
108
    }
109
110
    /**
111
     * @return string
112
     */
113
    public function getQueueName()
114
    {
115
        return $this->queue;
116
    }
117
118
    /**
119
     * @return AMQPStreamConnection
120
     */
121
    public function getConnection()
122
    {
123
        return $this->connection;
124
    }
125
126
    /**
127
     * @param \PhpAmqpLib\Connection\AMQPStreamConnection $connection
128
     */
129
    public function setConnection(AMQPStreamConnection $connection)
130
    {
131
        $this->connection = $connection;
132
    }
133
}