1 | <?php |
||
38 | class RpcClient extends BaseAmqp |
||
39 | { |
||
40 | /** |
||
41 | * @var int |
||
42 | */ |
||
43 | protected $requests; |
||
44 | |||
45 | /** |
||
46 | * @var string[] |
||
47 | */ |
||
48 | protected $replies; |
||
49 | |||
50 | /** |
||
51 | * @var string |
||
52 | */ |
||
53 | protected $queueName; |
||
54 | |||
55 | /** |
||
56 | * @var int |
||
57 | */ |
||
58 | protected $requestTimeout = null; |
||
59 | |||
60 | /** |
||
61 | * Initialize client. |
||
62 | */ |
||
63 | 5 | public function initClient() |
|
64 | { |
||
65 | 5 | list($this->queueName, , ) = $this->channel->queue_declare('', false, false, true, true); |
|
66 | 5 | $this->requests = 0; |
|
67 | 5 | $this->replies = array(); |
|
68 | 5 | } |
|
69 | |||
70 | /** |
||
71 | * Add request to be sent to RPC Server. |
||
72 | * |
||
73 | * @param string $messageBody |
||
74 | * @param string $server |
||
75 | * @param string $requestId |
||
76 | * @param string $routingKey |
||
77 | */ |
||
78 | 25 | public function addRequest($messageBody, $server, $requestId, $routingKey = '') |
|
79 | { |
||
80 | 25 | if (empty($requestId)) { |
|
81 | 20 | throw new \InvalidArgumentException("You must provide a request ID"); |
|
82 | } |
||
83 | |||
84 | 5 | $message = new AMQPMessage( |
|
85 | 4 | $messageBody, |
|
86 | array( |
||
87 | 5 | 'content_type' => 'text/plain', |
|
88 | 5 | 'reply_to' => $this->queueName, |
|
89 | 1 | 'correlation_id' => $requestId |
|
90 | 4 | ) |
|
91 | 4 | ); |
|
92 | |||
93 | 5 | $this->channel |
|
94 | 5 | ->basic_publish($message, $server . '-exchange', $routingKey); |
|
95 | |||
96 | 5 | $this->requests++; |
|
97 | 5 | } |
|
98 | |||
99 | /** |
||
100 | * Get replies. |
||
101 | * |
||
102 | * @return array |
||
103 | */ |
||
104 | 15 | public function getReplies() |
|
105 | { |
||
106 | 15 | $this->channel |
|
107 | 15 | ->basic_consume( |
|
108 | 15 | $this->queueName, |
|
109 | 15 | $this->queueName, |
|
110 | 15 | false, |
|
111 | 15 | true, |
|
112 | 15 | false, |
|
113 | 15 | false, |
|
114 | 15 | array($this, 'processMessage') |
|
115 | 12 | ); |
|
116 | |||
117 | 15 | while (count($this->replies) < $this->requests) { |
|
118 | 10 | $this->channel |
|
119 | 10 | ->wait(null, false, $this->requestTimeout); |
|
120 | 8 | } |
|
121 | |||
122 | 15 | $this->channel |
|
123 | 15 | ->basic_cancel($this->queueName); |
|
124 | |||
125 | 15 | return $this->replies; |
|
126 | } |
||
127 | |||
128 | /** |
||
129 | * @param AMQPMessage $message |
||
130 | */ |
||
131 | 5 | public function processMessage(AMQPMessage $message) |
|
135 | |||
136 | /** |
||
137 | * @param int $timeout |
||
138 | */ |
||
139 | 5 | public function setTimeout($timeout) |
|
143 | } |
||
144 |