Passed
Push — master ( 3e304d...441efe )
by Ramūnas
09:23
created

MultipleConsumerTest::createMessage()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 7
Code Lines 4

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 1
eloc 4
c 0
b 0
f 0
nc 1
nop 0
dl 0
loc 7
rs 10
1
<?php
2
3
namespace OldSound\RabbitMqBundle\Tests\RabbitMq;
4
5
use OldSound\RabbitMqBundle\Provider\QueuesProviderInterface;
6
use OldSound\RabbitMqBundle\RabbitMq\ConsumerInterface;
7
use OldSound\RabbitMqBundle\RabbitMq\MultipleConsumer;
8
use PhpAmqpLib\Channel\AMQPChannel;
9
use PhpAmqpLib\Connection\AMQPStreamConnection;
10
use PhpAmqpLib\Message\AMQPMessage;
11
use PHPUnit\Framework\Assert;
12
use PHPUnit\Framework\MockObject\MockObject;
13
use PHPUnit\Framework\TestCase;
14
15
class MultipleConsumerTest extends TestCase
16
{
17
    /**
18
     * Multiple consumer
19
     *
20
     * @var MultipleConsumer
21
     */
22
    private $multipleConsumer;
23
24
    /**
25
     * AMQP channel
26
     *
27
     * @var MockObject|AMQPChannel
28
     */
29
    private $amqpChannel;
30
31
    /**
32
     * AMQP connection
33
     *
34
     * @var MockObject|AMQPStreamConnection
35
     */
36
    private $amqpConnection;
37
38
    /**
39
     * Set up
40
     *
41
     * @return void
42
     */
43
    public function setUp(): void
44
    {
45
        $this->amqpConnection = $this->prepareAMQPConnection();
46
        $this->amqpChannel = $this->prepareAMQPChannel();
47
        $this->multipleConsumer = new MultipleConsumer($this->amqpConnection, $this->amqpChannel);
48
    }
49
50
    /**
51
     * Check if the message is requeued or not correctly.
52
     *
53
     * @dataProvider processMessageProvider
54
     */
55
    public function testProcessMessage($processFlag, $expectedMethod, $expectedRequeue = null)
56
    {
57
        $callback = $this->prepareCallback($processFlag);
58
59
        $this->multipleConsumer->setQueues(
60
            array(
61
                'test-1' => array('callback' => $callback),
62
                'test-2' => array('callback' => $callback)
63
            )
64
        );
65
66
        $this->prepareAMQPChannelExpectations($expectedMethod, $expectedRequeue);
67
68
        $this->multipleConsumer->processQueueMessage('test-1', $this->createMessage());
69
        $this->multipleConsumer->processQueueMessage('test-2', $this->createMessage());
70
    }
71
72
    /**
73
     * Check queues provider works well
74
     *
75
     * @dataProvider processMessageProvider
76
     */
77
    public function testQueuesProvider($processFlag, $expectedMethod, $expectedRequeue = null)
78
    {
79
        $callback = $this->prepareCallback($processFlag);
80
81
        $queuesProvider = $this->prepareQueuesProvider();
82
        $queuesProvider->expects($this->once())
83
            ->method('getQueues')
84
            ->will($this->returnValue(
85
                array(
86
                    'test-1' => array('callback' => $callback),
87
                    'test-2' => array('callback' => $callback)
88
                )
89
            ));
90
91
        $this->multipleConsumer->setQueuesProvider($queuesProvider);
92
93
        /**
94
         * We don't test consume method, which merges queues by calling $this->setupConsumer();
95
         * So we need to invoke it manually
96
         */
97
        $reflectionClass = new \ReflectionClass(get_class($this->multipleConsumer));
98
        $reflectionMethod = $reflectionClass->getMethod('mergeQueues');
99
        $reflectionMethod->setAccessible(true);
100
        $reflectionMethod->invoke($this->multipleConsumer);
101
102
        $this->prepareAMQPChannelExpectations($expectedMethod, $expectedRequeue);
103
104
        $this->multipleConsumer->processQueueMessage('test-1', $this->createMessage());
105
        $this->multipleConsumer->processQueueMessage('test-2', $this->createMessage());
106
    }
107
108
    /**
109
     * Check queues provider works well with static queues together
110
     *
111
     * @dataProvider processMessageProvider
112
     */
113
    public function testQueuesProviderAndStaticQueuesTogether($processFlag, $expectedMethod, $expectedRequeue = null)
114
    {
115
        $callback = $this->prepareCallback($processFlag);
116
117
        $this->multipleConsumer->setQueues(
118
            array(
119
                'test-1' => array('callback' => $callback),
120
                'test-2' => array('callback' => $callback)
121
            )
122
        );
123
124
        $queuesProvider = $this->prepareQueuesProvider();
125
        $queuesProvider->expects($this->once())
126
            ->method('getQueues')
127
            ->will($this->returnValue(
128
                array(
129
                    'test-3' => array('callback' => $callback),
130
                    'test-4' => array('callback' => $callback)
131
                )
132
            ));
133
134
        $this->multipleConsumer->setQueuesProvider($queuesProvider);
135
136
        /**
137
         * We don't test consume method, which merges queues by calling $this->setupConsumer();
138
         * So we need to invoke it manually
139
         */
140
        $reflectionClass = new \ReflectionClass(get_class($this->multipleConsumer));
141
        $reflectionMethod = $reflectionClass->getMethod('mergeQueues');
142
        $reflectionMethod->setAccessible(true);
143
        $reflectionMethod->invoke($this->multipleConsumer);
144
145
        $this->prepareAMQPChannelExpectations($expectedMethod, $expectedRequeue);
146
147
        $this->multipleConsumer->processQueueMessage('test-1', $this->createMessage());
148
        $this->multipleConsumer->processQueueMessage('test-2', $this->createMessage());
149
        $this->multipleConsumer->processQueueMessage('test-3', $this->createMessage());
150
        $this->multipleConsumer->processQueueMessage('test-4', $this->createMessage());
151
    }
152
153
    public function processMessageProvider()
154
    {
155
        return array(
156
            array(null, 'basic_ack'), // Remove message from queue only if callback return not false
157
            array(true, 'basic_ack'), // Remove message from queue only if callback return not false
158
            array(false, 'basic_reject', true), // Reject and requeue message to RabbitMQ
159
            array(ConsumerInterface::MSG_ACK, 'basic_ack'), // Remove message from queue only if callback return not false
160
            array(ConsumerInterface::MSG_REJECT_REQUEUE, 'basic_reject', true), // Reject and requeue message to RabbitMQ
161
            array(ConsumerInterface::MSG_REJECT, 'basic_reject', false), // Reject and drop
162
        );
163
    }
164
165
    /**
166
     * @dataProvider queueBindingRoutingKeyProvider
167
     */
168
    public function testShouldConsiderQueueArgumentsOnQueueDeclaration($routingKeysOption, $expectedRoutingKey)
169
    {
170
        $queueName = 'test-queue-name';
171
        $exchangeName = 'test-exchange-name';
172
        $expectedArgs = ['test-argument' => ['S', 'test-value']];
173
174
        $this->amqpChannel->expects($this->any())
175
            ->method('getChannelId')->willReturn(0);
176
177
        $this->amqpChannel->expects($this->any())
178
            ->method('queue_declare')
179
            ->willReturn([$queueName, 5, 0]);
180
181
182
        $this->multipleConsumer->setExchangeOptions([
183
            'declare' => false,
184
            'name' => $exchangeName,
185
            'type' => 'topic']);
186
187
        $this->multipleConsumer->setQueues([
188
            $queueName => [
189
                'passive' => true,
190
                'durable' => true,
191
                'exclusive' => true,
192
                'auto_delete' => true,
193
                'nowait' => true,
194
                'arguments' => $expectedArgs,
195
                'ticket' => null,
196
                'routing_keys' => $routingKeysOption]
197
        ]);
198
199
        $this->multipleConsumer->setRoutingKey('test-routing-key');
200
201
        // we assert that arguments are passed to the bind method
202
        $this->amqpChannel->expects($this->once())
203
            ->method('queue_bind')
204
            ->with($queueName, $exchangeName, $expectedRoutingKey, false, $expectedArgs);
205
206
        $this->multipleConsumer->setupFabric();
207
    }
208
209
    public function queueBindingRoutingKeyProvider()
210
    {
211
        return array(
212
            array(array(), 'test-routing-key'),
213
            array(array('test-routing-key-2'), 'test-routing-key-2'),
214
        );
215
    }
216
217
    /**
218
     * Preparing AMQP Connection
219
     *
220
     * @return MockObject|AMQPStreamConnection
221
     */
222
    private function prepareAMQPConnection()
223
    {
224
        return $this->getMockBuilder('\PhpAmqpLib\Connection\AMQPStreamConnection')
225
            ->disableOriginalConstructor()
226
            ->getMock();
227
    }
228
229
    /**
230
     * Preparing AMQP Connection
231
     *
232
     * @return MockObject|AMQPChannel
233
     */
234
    private function prepareAMQPChannel()
235
    {
236
        return $this->getMockBuilder('\PhpAmqpLib\Channel\AMQPChannel')
237
            ->disableOriginalConstructor()
238
            ->getMock();
239
    }
240
241
    /**
242
     * Preparing QueuesProviderInterface instance
243
     *
244
     * @return MockObject|QueuesProviderInterface
245
     */
246
    private function prepareQueuesProvider()
247
    {
248
        return $this->getMockBuilder('\OldSound\RabbitMqBundle\Provider\QueuesProviderInterface')
249
            ->getMock();
250
    }
251
252
    /**
253
     * Preparing AMQP Channel Expectations
254
     *
255
     * @param mixed $expectedMethod
256
     * @param string $expectedRequeue
257
     *
258
     * @return void
259
     */
260
    private function prepareAMQPChannelExpectations($expectedMethod, $expectedRequeue)
261
    {
262
        $this->amqpChannel->expects($this->any())
263
            ->method('basic_reject')
264
            ->will($this->returnCallback(function ($delivery_tag, $requeue) use ($expectedMethod, $expectedRequeue) {
265
                Assert::assertSame($expectedMethod, 'basic_reject'); // Check if this function should be called.
266
                Assert::assertSame($requeue, $expectedRequeue); // Check if the message should be requeued.
267
            }));
268
269
        $this->amqpChannel->expects($this->any())
270
            ->method('basic_ack')
271
            ->will($this->returnCallback(function ($delivery_tag) use ($expectedMethod) {
0 ignored issues
show
Unused Code introduced by
The parameter $delivery_tag is not used and could be removed. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-unused  annotation

271
            ->will($this->returnCallback(function (/** @scrutinizer ignore-unused */ $delivery_tag) use ($expectedMethod) {

This check looks for parameters that have been defined for a function or method, but which are not used in the method body.

Loading history...
272
                Assert::assertSame($expectedMethod, 'basic_ack'); // Check if this function should be called.
273
            }));
274
    }
275
276
    /**
277
     * Prepare callback
278
     *
279
     * @param bool $processFlag
280
     * @return callable
281
     */
282
    private function prepareCallback($processFlag)
283
    {
284
        return function ($msg) use ($processFlag) {
0 ignored issues
show
Unused Code introduced by
The parameter $msg is not used and could be removed. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-unused  annotation

284
        return function (/** @scrutinizer ignore-unused */ $msg) use ($processFlag) {

This check looks for parameters that have been defined for a function or method, but which are not used in the method body.

Loading history...
285
            return $processFlag;
286
        };
287
    }
288
289
    private function createMessage()
290
    {
291
        $amqpMessage = new AMQPMessage('foo body');
292
        $amqpMessage->setChannel($this->amqpChannel);
293
        $amqpMessage->setDeliveryTag(0);
294
295
        return $amqpMessage;
296
    }
297
}
298