1
|
|
|
<?php |
2
|
|
|
|
3
|
|
|
namespace Mouf\AmqpClient\Objects; |
4
|
|
|
|
5
|
|
|
use Mouf\AmqpClient\Client; |
6
|
|
|
use Mouf\AmqpClient\Consumer; |
7
|
|
|
use Mouf\AmqpClient\ConsumerService; |
8
|
|
|
use PhpAmqpLib\Message\AMQPMessage; |
9
|
|
|
use PHPUnit\Framework\TestCase; |
10
|
|
|
use Psr\Log\NullLogger; |
11
|
|
|
|
12
|
|
|
class ClientTest extends TestCase |
13
|
|
|
{ |
14
|
|
|
/** |
15
|
|
|
* @var Client |
16
|
|
|
*/ |
17
|
|
|
private $client; |
18
|
|
|
private $exchange; |
19
|
|
|
/** |
20
|
|
|
* @var Queue |
21
|
|
|
*/ |
22
|
|
|
private $queue; |
23
|
|
|
/** |
24
|
|
|
* @var Exchange |
25
|
|
|
*/ |
26
|
|
|
private $deadLetterExchange; |
27
|
|
|
/** |
28
|
|
|
* @var Queue |
29
|
|
|
*/ |
30
|
|
|
private $deadLetterQueue; |
31
|
|
|
private $msgReceived; |
32
|
|
|
private $deadLetterMsgReceived; |
33
|
|
|
private $triggerException = false; |
34
|
|
|
|
35
|
|
|
private function makeClient($port = null) |
36
|
|
|
{ |
37
|
|
|
global $rabbitmq_host; |
38
|
|
|
global $rabbitmq_port; |
39
|
|
|
global $rabbitmq_user; |
40
|
|
|
global $rabbitmq_password; |
41
|
|
|
|
42
|
|
|
if (!$port) { |
43
|
|
|
$port = $rabbitmq_port; |
44
|
|
|
} |
45
|
|
|
|
46
|
|
|
$client = new Client($rabbitmq_host, $port, $rabbitmq_user, $rabbitmq_password); |
47
|
|
|
$client->setPrefetchCount(1); |
48
|
|
|
return $client; |
49
|
|
|
} |
50
|
|
|
|
51
|
|
|
protected function init($port = null) |
52
|
|
|
{ |
53
|
|
|
$this->client = $this->makeClient($port); |
54
|
|
|
$this->exchange = new Exchange($this->client, 'test_exchange', 'fanout'); |
55
|
|
|
$this->queue = new Queue($this->client, 'test_queue', [ |
56
|
|
|
new Consumer(function(AMQPMessage $msg) { |
57
|
|
|
$this->msgReceived = $msg; |
58
|
|
|
if ($this->triggerException) { |
59
|
|
|
throw new \Exception('boom!'); |
60
|
|
|
} |
61
|
|
|
}, new NullLogger()) |
62
|
|
|
]); |
63
|
|
|
$this->queue->setDurable(true); |
64
|
|
|
|
65
|
|
|
$binding = new Binding($this->exchange, $this->queue); |
66
|
|
|
$this->client->register($binding); |
67
|
|
|
|
68
|
|
|
|
69
|
|
|
$this->deadLetterExchange = new Exchange($this->client, 'test_dead_letter_exchange', 'fanout'); |
70
|
|
|
|
71
|
|
|
$this->deadLetterQueue = new Queue($this->client, 'test_dead_letter_queue', [ |
72
|
|
|
new Consumer(function(AMQPMessage $msg) { |
73
|
|
|
$this->deadLetterMsgReceived = $msg; |
74
|
|
|
}, new NullLogger()) |
75
|
|
|
]); |
76
|
|
|
$this->deadLetterQueue->setDurable(true); |
77
|
|
|
|
78
|
|
|
$this->queue->setDeadLetterExchange($this->deadLetterExchange); |
79
|
|
|
|
80
|
|
|
$binding = new Binding($this->deadLetterExchange, $this->deadLetterQueue); |
81
|
|
|
$this->client->register($binding); |
82
|
|
|
} |
83
|
|
|
|
84
|
|
|
public function testExchange() |
85
|
|
|
{ |
86
|
|
|
$this->init(); |
87
|
|
|
$this->exchange->publish(new Message('my message'), 'key'); |
88
|
|
|
$this->assertTrue(true); |
89
|
|
|
} |
90
|
|
|
|
91
|
|
|
/** |
92
|
|
|
* @depends testExchange |
93
|
|
|
*/ |
94
|
|
|
public function testQueue() |
95
|
|
|
{ |
96
|
|
|
$this->init(); |
97
|
|
|
|
98
|
|
|
$consumerService = new ConsumerService($this->client, [ |
99
|
|
|
$this->queue |
100
|
|
|
]); |
101
|
|
|
|
102
|
|
|
$consumerService->run(true); |
103
|
|
|
|
104
|
|
|
$this->assertEquals('my message', $this->msgReceived->getBody()); |
105
|
|
|
} |
106
|
|
|
|
107
|
|
|
/** |
108
|
|
|
* @depends testExchange |
109
|
|
|
*/ |
110
|
|
|
public function testDeadLetterQueue() |
111
|
|
|
{ |
112
|
|
|
$this->init(); |
113
|
|
|
|
114
|
|
|
$this->exchange->publish(new Message('my other message'), 'key'); |
115
|
|
|
|
116
|
|
|
$this->triggerException = true; |
117
|
|
|
|
118
|
|
|
$consumerService = new ConsumerService($this->client, [ |
119
|
|
|
$this->queue, |
120
|
|
|
$this->deadLetterQueue |
121
|
|
|
]); |
122
|
|
|
|
123
|
|
|
$consumerService->run(true); |
124
|
|
|
$consumerService->run(true); |
125
|
|
|
|
126
|
|
|
$this->assertEquals('my other message', $this->msgReceived->getBody()); |
127
|
|
|
$this->assertEquals('my other message', $this->deadLetterMsgReceived->getBody()); |
128
|
|
|
} |
129
|
|
|
|
130
|
|
|
public function testExceptionOnExchangeEmptyName() |
131
|
|
|
{ |
132
|
|
|
$client = $this->makeClient(); |
133
|
|
|
$this->expectException(\InvalidArgumentException::class); |
134
|
|
|
new Exchange($client, '', 'direct'); |
135
|
|
|
} |
136
|
|
|
|
137
|
|
View Code Duplication |
public function testDefaultExchange() |
|
|
|
|
138
|
|
|
{ |
139
|
|
|
$client = $this->makeClient(); |
140
|
|
|
// Let's send a message on RabbitMQ default exchange (named "") |
141
|
|
|
$exchange = new DefaultExchange($client); |
142
|
|
|
$queue = new Queue($client, 'test_direct_queue', [ |
143
|
|
|
new Consumer(function(AMQPMessage $msg) { |
144
|
|
|
$this->msgReceived = $msg; |
145
|
|
|
}, new NullLogger()) |
146
|
|
|
]); |
147
|
|
|
|
148
|
|
|
// The key is the name of the queue. |
149
|
|
|
$exchange->publish(new Message('hello'), 'test_direct_queue'); |
150
|
|
|
|
151
|
|
|
$consumerService = new ConsumerService($client, [ |
152
|
|
|
$queue |
153
|
|
|
]); |
154
|
|
|
|
155
|
|
|
$consumerService->run(true); |
156
|
|
|
|
157
|
|
|
$this->assertEquals('hello', $this->msgReceived->getBody()); |
158
|
|
|
} |
159
|
|
|
|
160
|
|
View Code Duplication |
public function testPublishToQueue() |
|
|
|
|
161
|
|
|
{ |
162
|
|
|
$client = $this->makeClient(); |
163
|
|
|
$queue = new Queue($client, 'test_direct_queue', [ |
164
|
|
|
new Consumer(function (AMQPMessage $msg) { |
165
|
|
|
$this->msgReceived = $msg; |
166
|
|
|
}, new NullLogger()) |
167
|
|
|
]); |
168
|
|
|
|
169
|
|
|
// The key is the name of the queue. |
170
|
|
|
$queue->publish(new Message('hello')); |
171
|
|
|
|
172
|
|
|
$consumerService = new ConsumerService($client, [ |
173
|
|
|
$queue |
174
|
|
|
]); |
175
|
|
|
|
176
|
|
|
$consumerService->run(true); |
177
|
|
|
|
178
|
|
|
$this->assertEquals('hello', $this->msgReceived->getBody()); |
179
|
|
|
} |
180
|
|
|
|
181
|
|
|
/** |
182
|
|
|
* @expectedException \Mouf\AmqpClient\Exception\ConnectionException |
183
|
|
|
*/ |
184
|
|
|
public function testConnectionException() |
185
|
|
|
{ |
186
|
|
|
// A bug in PHPUnit prevents us for disabling warning to exceptions conversion when processIsolation is set. |
187
|
|
|
// Sockets are throwing warning before the exception. Hence, the test is failing. |
188
|
|
|
// Let's skip the test if sockets are enabled. |
189
|
|
|
if (function_exists('socket_create')) { |
190
|
|
|
$this->markTestSkipped('Skipping test because of a bug in PHPUnit regarding warning handling'); |
191
|
|
|
return; |
192
|
|
|
} |
193
|
|
|
|
194
|
|
|
$this->init(1242000042); |
195
|
|
|
$this->client->getChannel(); |
196
|
|
|
} |
197
|
|
|
} |
198
|
|
|
|
Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.
You can also find more detailed suggestions in the “Code” section of your repository.