Completed
Push — master ( 9d479c...06d1d5 )
by Daniel
03:00
created

RpcClient::setMessage()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 4
Code Lines 2

Duplication

Lines 0
Ratio 0 %

Importance

Changes 2
Bugs 1 Features 0
Metric Value
c 2
b 1
f 0
dl 0
loc 4
rs 10
cc 1
eloc 2
nc 1
nop 1
1
<?php
2
3
namespace Cmobi\RabbitmqBundle\Rpc;
4
5
use Cmobi\RabbitmqBundle\ConnectionManagerInterface;
6
use PhpAmqpLib\Channel\AbstractChannel;
7
use PhpAmqpLib\Connection\AMQPStreamConnection;
8
use PhpAmqpLib\Message\AMQPMessage;
9
10
abstract class RpcClient
11
{
12
    private $queue;
13
    private $connection;
14
    private $body;
15
    private $channel;
16
    private $callbackQueue;
17
    private $response;
18
    private $correlationId;
19
20
    public function __construct($queueName, ConnectionManagerInterface $manager)
21
    {
22
        $this->body = '';
23
        $this->queue = $queueName;
24
        $this->connection = $manager->getConnection();
25
        $this->channel = $this->connection->channel();
26
    }
27
28
    /**
29
     * @param AMQPMessage $rep
30
     */
31
    public function onResponse(AMQPMessage $rep)
32
    {
33
        if($rep->get('correlation_id') == $this->correlationId) {
34
            $this->response = $rep->body;
35
        }
36
    }
37
38
    /**
39
     * @return null|string
40
     */
41
    public function call()
42
    {
43
        list($callbackQueue, ,) = $this->getChannel()->queue_declare(
44
            '', false, false, false, true
45
        );
46
        $this->callbackQueue = $callbackQueue;
47
        $this->getChannel()->basic_consume(
48
            $this->callbackQueue, '', false, false, false, false,
49
            [$this, 'onResponse']
50
        );
51
        $this->response = null;
52
        $this->correlationId = uniqid();
53
54
        $msg = new AMQPMessage(
55
            $this->getMessage(),
56
            [
57
                'correlation_id' => $this->correlationId,
58
                'reply_to' => $this->callbackQueue
59
            ]
60
        );
61
        $this->getChannel()->basic_publish($msg, '', $this->getQueueName());
62
63
        while(!$this->response) {
64
            $this->getChannel()->wait();
65
        }
66
        $this->getChannel()->close();
67
        $this->getConnection()->close();
68
        
69
        return $this->response;
70
    }
71
72
    /**
73
     * @param string $body
74
     */
75
    public function setMessage($body)
76
    {
77
        $this->body = (string)$body;
78
    }
79
80
    /**
81
     * @return string
82
     */
83
    public function getMessage()
84
    {
85
        return $this->body;
86
    }
87
88
    /**
89
     * @param AbstractChannel $channel
90
     */
91
    public function setChannel(AbstractChannel $channel)
92
    {
93
        $this->channel = $channel;
94
    }
95
96
    /**
97
     * @return \PhpAmqpLib\Channel\AMQPChannel
98
     */
99
    public function getChannel()
100
    {
101
        return $this->channel;
102
    }
103
104
    /**
105
     * @return string
106
     */
107
    public function getQueueName()
108
    {
109
        return $this->queue;
110
    }
111
112
    /**
113
     * @return AMQPStreamConnection
114
     */
115
    public function getConnection()
116
    {
117
        return $this->connection;
118
    }
119
120
    /**
121
     * @param \PhpAmqpLib\Connection\AMQPStreamConnection $connection
122
     */
123
    public function setConnection(AMQPStreamConnection $connection)
124
    {
125
        $this->connection = $connection;
126
    }
127
}