Passed
Push — master ( c836a6...5a8d35 )
by Mihai
07:13
created

MultipleConsumerTest   A

Complexity

Total Complexity 13

Size/Duplication

Total Lines 309
Duplicated Lines 0 %

Importance

Changes 5
Bugs 0 Features 0
Metric Value
eloc 132
c 5
b 0
f 0
dl 0
loc 309
rs 10
wmc 13

13 Methods

Rating   Name   Duplication   Size   Complexity  
A testQueuesProvider() 0 34 1
A setUp() 0 5 1
A processMessageProvider() 0 9 1
A testProcessMessage() 0 20 1
A testQueuesProviderAndStaticQueuesTogether() 0 43 1
A testQueuesPrivider() 0 21 1
A prepareQueuesProvider() 0 4 1
A testShouldConsiderQueueArgumentsOnQueueDeclaration() 0 39 1
A prepareAMQPChannel() 0 5 1
A prepareAMQPConnection() 0 5 1
A prepareAMQPChannelExpectations() 0 13 1
A queueBindingRoutingKeyProvider() 0 5 1
A prepareCallback() 0 4 1
1
<?php
2
3
namespace OldSound\RabbitMqBundle\Tests\RabbitMq;
4
5
use OldSound\RabbitMqBundle\Provider\QueuesProviderInterface;
6
use OldSound\RabbitMqBundle\RabbitMq\ConsumerInterface;
7
use OldSound\RabbitMqBundle\RabbitMq\MultipleConsumer;
8
use PhpAmqpLib\Channel\AMQPChannel;
9
use PhpAmqpLib\Connection\AMQPConnection;
10
use PhpAmqpLib\Message\AMQPMessage;
11
use PHPUnit\Framework\Assert;
12
use PHPUnit\Framework\MockObject\MockObject;
13
use PHPUnit\Framework\TestCase;
14
15
class MultipleConsumerTest extends TestCase
16
{
17
    /**
18
     * Multiple consumer
19
     *
20
     * @var MultipleConsumer
21
     */
22
    private $multipleConsumer;
23
24
    /**
25
     * AMQP channel
26
     *
27
     * @var MockObject|AMQPChannel
28
     */
29
    private $amqpChannel;
30
31
    /**
32
     * AMQP connection
33
     *
34
     * @var MockObject|AMQPConnection
35
     */
36
    private $amqpConnection;
37
38
    /**
39
     * Set up
40
     *
41
     * @return void
42
     */
43
    public function setUp(): void
44
    {
45
        $this->amqpConnection = $this->prepareAMQPConnection();
46
        $this->amqpChannel = $this->prepareAMQPChannel();
47
        $this->multipleConsumer = new MultipleConsumer($this->amqpConnection, $this->amqpChannel);
48
    }
49
50
    /**
51
     * Check if the message is requeued or not correctly.
52
     *
53
     * @dataProvider processMessageProvider
54
     */
55
    public function testProcessMessage($processFlag, $expectedMethod, $expectedRequeue = null)
56
    {
57
        $callback = $this->prepareCallback($processFlag);
58
59
        $this->multipleConsumer->setQueues(
60
            array(
61
                'test-1' => array('callback' => $callback),
62
                'test-2' => array('callback' => $callback)
63
            )
64
        );
65
66
        $this->prepareAMQPChannelExpectations($expectedMethod, $expectedRequeue);
67
68
        // Create a default message
69
        $amqpMessage = new AMQPMessage('foo body');
70
        $amqpMessage->delivery_info['channel'] = $this->amqpChannel;
0 ignored issues
show
Deprecated Code introduced by
The property PhpAmqpLib\Message\AMQPMessage::$delivery_info has been deprecated. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-deprecated  annotation

70
        /** @scrutinizer ignore-deprecated */ $amqpMessage->delivery_info['channel'] = $this->amqpChannel;
Loading history...
71
        $amqpMessage->delivery_info['delivery_tag'] = 0;
0 ignored issues
show
Deprecated Code introduced by
The property PhpAmqpLib\Message\AMQPMessage::$delivery_info has been deprecated. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-deprecated  annotation

71
        /** @scrutinizer ignore-deprecated */ $amqpMessage->delivery_info['delivery_tag'] = 0;
Loading history...
72
73
        $this->multipleConsumer->processQueueMessage('test-1', $amqpMessage);
74
        $this->multipleConsumer->processQueueMessage('test-2', $amqpMessage);
75
    }
76
77
    /**
78
     * Check queues provider works well
79
     *
80
     * @dataProvider processMessageProvider
81
     */
82
    public function testQueuesProvider($processFlag, $expectedMethod, $expectedRequeue = null)
83
    {
84
        $callback = $this->prepareCallback($processFlag);
85
86
        $queuesProvider = $this->prepareQueuesProvider();
87
        $queuesProvider->expects($this->once())
88
            ->method('getQueues')
89
            ->will($this->returnValue(
90
                array(
91
                    'test-1' => array('callback' => $callback),
92
                    'test-2' => array('callback' => $callback)
93
                )
94
            ));
95
96
        $this->multipleConsumer->setQueuesProvider($queuesProvider);
97
98
        /**
99
         * We don't test consume method, which merges queues by calling $this->setupConsumer();
100
         * So we need to invoke it manually
101
         */
102
        $reflectionClass = new \ReflectionClass(get_class($this->multipleConsumer));
103
        $reflectionMethod = $reflectionClass->getMethod('mergeQueues');
104
        $reflectionMethod->setAccessible(true);
105
        $reflectionMethod->invoke($this->multipleConsumer);
106
107
        $this->prepareAMQPChannelExpectations($expectedMethod, $expectedRequeue);
108
109
        // Create a default message
110
        $amqpMessage = new AMQPMessage('foo body');
111
        $amqpMessage->delivery_info['channel'] = $this->amqpChannel;
0 ignored issues
show
Deprecated Code introduced by
The property PhpAmqpLib\Message\AMQPMessage::$delivery_info has been deprecated. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-deprecated  annotation

111
        /** @scrutinizer ignore-deprecated */ $amqpMessage->delivery_info['channel'] = $this->amqpChannel;
Loading history...
112
        $amqpMessage->delivery_info['delivery_tag'] = 0;
0 ignored issues
show
Deprecated Code introduced by
The property PhpAmqpLib\Message\AMQPMessage::$delivery_info has been deprecated. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-deprecated  annotation

112
        /** @scrutinizer ignore-deprecated */ $amqpMessage->delivery_info['delivery_tag'] = 0;
Loading history...
113
114
        $this->multipleConsumer->processQueueMessage('test-1', $amqpMessage);
115
        $this->multipleConsumer->processQueueMessage('test-2', $amqpMessage);
116
    }
117
118
    public function testQueuesPrivider()
119
    {
120
        $amqpConnection = $this->prepareAMQPConnection();
0 ignored issues
show
Unused Code introduced by
The assignment to $amqpConnection is dead and can be removed.
Loading history...
121
        $amqpChannel = $this->prepareAMQPChannel();
0 ignored issues
show
Unused Code introduced by
The assignment to $amqpChannel is dead and can be removed.
Loading history...
122
        $this->multipleConsumer->setContext('foo');
123
124
        $queuesProvider = $this->prepareQueuesProvider();
125
        $queuesProvider->expects($this->once())
126
            ->method('getQueues')
127
            ->will($this->returnValue(
128
                array(
129
                    'queue_foo' => array()
130
                )
131
            ));
132
133
        $this->multipleConsumer->setQueuesProvider($queuesProvider);
134
135
        $reflectionClass = new \ReflectionClass(get_class($this->multipleConsumer));
136
        $reflectionMethod = $reflectionClass->getMethod('mergeQueues');
137
        $reflectionMethod->setAccessible(true);
138
        $reflectionMethod->invoke($this->multipleConsumer);
139
    }
140
141
    /**
142
     * Check queues provider works well with static queues together
143
     *
144
     * @dataProvider processMessageProvider
145
     */
146
    public function testQueuesProviderAndStaticQueuesTogether($processFlag, $expectedMethod, $expectedRequeue = null)
147
    {
148
        $callback = $this->prepareCallback($processFlag);
149
150
        $this->multipleConsumer->setQueues(
151
            array(
152
                'test-1' => array('callback' => $callback),
153
                'test-2' => array('callback' => $callback)
154
            )
155
        );
156
157
        $queuesProvider = $this->prepareQueuesProvider();
158
        $queuesProvider->expects($this->once())
159
            ->method('getQueues')
160
            ->will($this->returnValue(
161
                array(
162
                    'test-3' => array('callback' => $callback),
163
                    'test-4' => array('callback' => $callback)
164
                )
165
            ));
166
167
        $this->multipleConsumer->setQueuesProvider($queuesProvider);
168
169
        /**
170
         * We don't test consume method, which merges queues by calling $this->setupConsumer();
171
         * So we need to invoke it manually
172
         */
173
        $reflectionClass = new \ReflectionClass(get_class($this->multipleConsumer));
174
        $reflectionMethod = $reflectionClass->getMethod('mergeQueues');
175
        $reflectionMethod->setAccessible(true);
176
        $reflectionMethod->invoke($this->multipleConsumer);
177
178
        $this->prepareAMQPChannelExpectations($expectedMethod, $expectedRequeue);
179
180
        // Create a default message
181
        $amqpMessage = new AMQPMessage('foo body');
182
        $amqpMessage->delivery_info['channel'] = $this->amqpChannel;
0 ignored issues
show
Deprecated Code introduced by
The property PhpAmqpLib\Message\AMQPMessage::$delivery_info has been deprecated. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-deprecated  annotation

182
        /** @scrutinizer ignore-deprecated */ $amqpMessage->delivery_info['channel'] = $this->amqpChannel;
Loading history...
183
        $amqpMessage->delivery_info['delivery_tag'] = 0;
0 ignored issues
show
Deprecated Code introduced by
The property PhpAmqpLib\Message\AMQPMessage::$delivery_info has been deprecated. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-deprecated  annotation

183
        /** @scrutinizer ignore-deprecated */ $amqpMessage->delivery_info['delivery_tag'] = 0;
Loading history...
184
185
        $this->multipleConsumer->processQueueMessage('test-1', $amqpMessage);
186
        $this->multipleConsumer->processQueueMessage('test-2', $amqpMessage);
187
        $this->multipleConsumer->processQueueMessage('test-3', $amqpMessage);
188
        $this->multipleConsumer->processQueueMessage('test-4', $amqpMessage);
189
    }
190
191
    public function processMessageProvider()
192
    {
193
        return array(
194
            array(null, 'basic_ack'), // Remove message from queue only if callback return not false
195
            array(true, 'basic_ack'), // Remove message from queue only if callback return not false
196
            array(false, 'basic_reject', true), // Reject and requeue message to RabbitMQ
197
            array(ConsumerInterface::MSG_ACK, 'basic_ack'), // Remove message from queue only if callback return not false
198
            array(ConsumerInterface::MSG_REJECT_REQUEUE, 'basic_reject', true), // Reject and requeue message to RabbitMQ
199
            array(ConsumerInterface::MSG_REJECT, 'basic_reject', false), // Reject and drop
200
        );
201
    }
202
203
    /**
204
     * @dataProvider queueBindingRoutingKeyProvider
205
     */
206
    public function testShouldConsiderQueueArgumentsOnQueueDeclaration($routingKeysOption, $expectedRoutingKey)
207
    {
208
        $queueName = 'test-queue-name';
209
        $exchangeName = 'test-exchange-name';
210
        $expectedArgs = ['test-argument' => ['S', 'test-value']];
211
212
        $this->amqpChannel->expects($this->any())
213
            ->method('getChannelId')->willReturn(0);
214
215
        $this->amqpChannel->expects($this->any())
216
            ->method('queue_declare')
217
            ->willReturn([$queueName, 5, 0]);
218
219
220
        $this->multipleConsumer->setExchangeOptions([
221
            'declare' => false,
222
            'name' => $exchangeName,
223
            'type' => 'topic']);
224
225
        $this->multipleConsumer->setQueues([
226
            $queueName => [
227
                'passive' => true,
228
                'durable' => true,
229
                'exclusive' => true,
230
                'auto_delete' => true,
231
                'nowait' => true,
232
                'arguments' => $expectedArgs,
233
                'ticket' => null,
234
                'routing_keys' => $routingKeysOption]
235
        ]);
236
237
        $this->multipleConsumer->setRoutingKey('test-routing-key');
238
239
        // we assert that arguments are passed to the bind method
240
        $this->amqpChannel->expects($this->once())
241
            ->method('queue_bind')
242
            ->with($queueName, $exchangeName, $expectedRoutingKey, false, $expectedArgs);
243
244
        $this->multipleConsumer->setupFabric();
245
    }
246
247
    public function queueBindingRoutingKeyProvider()
248
    {
249
        return array(
250
            array(array(), 'test-routing-key'),
251
            array(array('test-routing-key-2'), 'test-routing-key-2'),
252
        );
253
    }
254
255
    /**
256
     * Preparing AMQP Connection
257
     *
258
     * @return MockObject|AMQPConnection
259
     */
260
    private function prepareAMQPConnection()
261
    {
262
        return $this->getMockBuilder('\PhpAmqpLib\Connection\AMQPConnection')
263
            ->disableOriginalConstructor()
264
            ->getMock();
265
    }
266
267
    /**
268
     * Preparing AMQP Connection
269
     *
270
     * @return MockObject|AMQPChannel
271
     */
272
    private function prepareAMQPChannel()
273
    {
274
        return $this->getMockBuilder('\PhpAmqpLib\Channel\AMQPChannel')
275
            ->disableOriginalConstructor()
276
            ->getMock();
277
    }
278
279
    /**
280
     * Preparing QueuesProviderInterface instance
281
     *
282
     * @return MockObject|QueuesProviderInterface
283
     */
284
    private function prepareQueuesProvider()
285
    {
286
        return $this->getMockBuilder('\OldSound\RabbitMqBundle\Provider\QueuesProviderInterface')
287
            ->getMock();
288
    }
289
290
    /**
291
     * Preparing AMQP Channel Expectations
292
     *
293
     * @param mixed $expectedMethod
294
     * @param string $expectedRequeue
295
     *
296
     * @return void
297
     */
298
    private function prepareAMQPChannelExpectations($expectedMethod, $expectedRequeue)
299
    {
300
        $this->amqpChannel->expects($this->any())
301
            ->method('basic_reject')
302
            ->will($this->returnCallback(function ($delivery_tag, $requeue) use ($expectedMethod, $expectedRequeue) {
303
                Assert::assertSame($expectedMethod, 'basic_reject'); // Check if this function should be called.
304
                Assert::assertSame($requeue, $expectedRequeue); // Check if the message should be requeued.
305
            }));
306
307
        $this->amqpChannel->expects($this->any())
308
            ->method('basic_ack')
309
            ->will($this->returnCallback(function ($delivery_tag) use ($expectedMethod) {
0 ignored issues
show
Unused Code introduced by
The parameter $delivery_tag is not used and could be removed. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-unused  annotation

309
            ->will($this->returnCallback(function (/** @scrutinizer ignore-unused */ $delivery_tag) use ($expectedMethod) {

This check looks for parameters that have been defined for a function or method, but which are not used in the method body.

Loading history...
310
                Assert::assertSame($expectedMethod, 'basic_ack'); // Check if this function should be called.
311
            }));
312
    }
313
314
    /**
315
     * Prepare callback
316
     *
317
     * @param bool $processFlag
318
     * @return callable
319
     */
320
    private function prepareCallback($processFlag)
321
    {
322
        return function ($msg) use ($processFlag) {
0 ignored issues
show
Unused Code introduced by
The parameter $msg is not used and could be removed. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-unused  annotation

322
        return function (/** @scrutinizer ignore-unused */ $msg) use ($processFlag) {

This check looks for parameters that have been defined for a function or method, but which are not used in the method body.

Loading history...
323
            return $processFlag;
324
        };
325
    }
326
}
327