1 | <?php |
||
2 | |||
3 | namespace OldSound\RabbitMqBundle\RabbitMq; |
||
4 | |||
5 | use PhpAmqpLib\Message\AMQPMessage; |
||
6 | |||
7 | class RpcClient extends BaseAmqp |
||
8 | { |
||
9 | protected $requests = 0; |
||
10 | protected $replies = []; |
||
11 | protected $expectSerializedResponse; |
||
12 | protected $timeout = 0; |
||
13 | protected $notifyCallback; |
||
14 | |||
15 | private $queueName; |
||
16 | private $unserializer = 'unserialize'; |
||
17 | private $directReplyTo; |
||
18 | private $directConsumerTag; |
||
19 | |||
20 | 2 | public function initClient($expectSerializedResponse = true) |
|
21 | { |
||
22 | 2 | $this->expectSerializedResponse = $expectSerializedResponse; |
|
23 | 2 | } |
|
24 | |||
25 | 1 | public function addRequest($msgBody, $server, $requestId = null, $routingKey = '', $expiration = 0) |
|
26 | { |
||
27 | 1 | if (empty($requestId)) { |
|
28 | throw new \InvalidArgumentException('You must provide a $requestId'); |
||
29 | } |
||
30 | |||
31 | 1 | if (0 == $this->requests) { |
|
32 | // On first addRequest() call, clear all replies |
||
33 | 1 | $this->replies = []; |
|
34 | |||
35 | 1 | if ($this->directReplyTo) { |
|
36 | // On direct reply-to mode, make initial consume call |
||
37 | $this->directConsumerTag = $this->getChannel()->basic_consume('amq.rabbitmq.reply-to', '', false, true, false, false, [$this, 'processMessage']); |
||
38 | } |
||
39 | } |
||
40 | |||
41 | 1 | $msg = new AMQPMessage($msgBody, ['content_type' => 'text/plain', |
|
42 | 1 | 'reply_to' => $this->directReplyTo |
|
43 | ? 'amq.rabbitmq.reply-to' // On direct reply-to mode, use predefined queue name |
||
44 | 1 | : $this->getQueueName(), |
|
45 | 1 | 'delivery_mode' => 1, // non durable |
|
46 | 1 | 'expiration' => $expiration * 1000, |
|
47 | 1 | 'correlation_id' => $requestId, ]); |
|
48 | |||
49 | 1 | $this->getChannel()->basic_publish($msg, $server, $routingKey); |
|
50 | |||
51 | 1 | $this->requests++; |
|
52 | |||
53 | 1 | if ($expiration > $this->timeout) { |
|
54 | $this->timeout = $expiration; |
||
55 | } |
||
56 | 1 | } |
|
57 | |||
58 | 1 | public function getReplies() |
|
59 | { |
||
60 | 1 | if ($this->directReplyTo) { |
|
61 | $consumer_tag = $this->directConsumerTag; |
||
62 | } else { |
||
63 | 1 | $consumer_tag = $this->getChannel()->basic_consume($this->getQueueName(), '', false, true, false, false, [$this, 'processMessage']); |
|
64 | } |
||
65 | |||
66 | try { |
||
67 | 1 | while (count($this->replies) < $this->requests) { |
|
68 | 1 | $this->getChannel()->wait(null, false, $this->timeout); |
|
69 | } |
||
70 | } finally { |
||
71 | 1 | $this->getChannel()->basic_cancel($consumer_tag); |
|
72 | } |
||
73 | |||
74 | $this->directConsumerTag = null; |
||
75 | $this->requests = 0; |
||
76 | $this->timeout = 0; |
||
77 | |||
78 | return $this->replies; |
||
79 | } |
||
80 | |||
81 | 2 | public function processMessage(AMQPMessage $msg) |
|
82 | { |
||
83 | 2 | $messageBody = $msg->body; |
|
0 ignored issues
–
show
|
|||
84 | 2 | if ($this->expectSerializedResponse) { |
|
85 | 1 | $messageBody = call_user_func($this->unserializer, $messageBody); |
|
86 | } |
||
87 | 2 | if ($this->notifyCallback !== null) { |
|
88 | 1 | call_user_func($this->notifyCallback, $messageBody); |
|
89 | } |
||
90 | |||
91 | 2 | $this->replies[$msg->get('correlation_id')] = $messageBody; |
|
92 | 2 | } |
|
93 | |||
94 | 1 | protected function getQueueName() |
|
95 | { |
||
96 | 1 | if (null === $this->queueName) { |
|
97 | 1 | [$this->queueName, , ] = $this->getChannel()->queue_declare("", false, false, true, false); |
|
98 | } |
||
99 | |||
100 | 1 | return $this->queueName; |
|
101 | } |
||
102 | |||
103 | 1 | public function setUnserializer($unserializer) |
|
104 | { |
||
105 | 1 | $this->unserializer = $unserializer; |
|
106 | 1 | } |
|
107 | |||
108 | 2 | public function notify($callback) |
|
109 | { |
||
110 | 2 | if (is_callable($callback)) { |
|
111 | 1 | $this->notifyCallback = $callback; |
|
112 | } else { |
||
113 | 1 | throw new \InvalidArgumentException('First parameter expects to be callable'); |
|
114 | } |
||
115 | 1 | } |
|
116 | |||
117 | public function setDirectReplyTo($directReplyTo) |
||
118 | { |
||
119 | $this->directReplyTo = $directReplyTo; |
||
120 | } |
||
121 | |||
122 | public function reset() |
||
123 | { |
||
124 | $this->replies = []; |
||
125 | $this->requests = 0; |
||
126 | } |
||
127 | } |
||
128 |
This property has been deprecated. The supplier of the class has supplied an explanatory message.
The explanatory message should give you some clue as to whether and when the property will be removed from the class and what other property to use instead.