Completed
Push — master ( be1f89...dd7c75 )
by Artem
13:35
created

ConsumerTest::testProcessMessage()   B

Complexity

Conditions 1
Paths 1

Size

Total Lines 29
Code Lines 20

Duplication

Lines 0
Ratio 0 %

Importance

Changes 1
Bugs 0 Features 0
Metric Value
c 1
b 0
f 0
dl 0
loc 29
rs 8.8571
cc 1
eloc 20
nc 1
nop 3
1
<?php
2
3
namespace RabbitMqModule;
4
5
use PhpAmqpLib\Channel\AMQPChannel;
6
use PhpAmqpLib\Connection\AbstractConnection;
7
use PhpAmqpLib\Message\AMQPMessage;
8
use RabbitMqModule\Options\ExchangeBind;
9
use RabbitMqModule\Options\Queue as QueueOptions;
10
use RabbitMqModule\Options\Exchange as ExchangeOptions;
11
12
/**
13
 * Class ConsumerTest
14
 * @package RabbitMqModule
15
 */
16
class ConsumerTest extends \PHPUnit_Framework_TestCase
17
{
18
    public function testProperties()
19
    {
20
        $connection = static::getMockBuilder(AbstractConnection::class)
21
            ->disableOriginalConstructor()
22
            ->getMockForAbstractClass();
23
24
        /* @var AbstractConnection $connection */
25
        $consumer = new Consumer($connection);
26
27
        static::assertTrue($consumer->isAutoSetupFabricEnabled());
28
        static::assertEquals(0, $consumer->getIdleTimeout());
29
30
        $queueOptions = new QueueOptions();
31
        $exchangeOptions = new ExchangeOptions();
32
33
        $callback = function () {
34
35
        };
36
37
        $consumer->setConsumerTag('consumer-tag-test');
38
        $consumer->setCallback($callback);
39
        $consumer->setQueueOptions($queueOptions);
40
        $consumer->setExchangeOptions($exchangeOptions);
41
        $consumer->setAutoSetupFabricEnabled(false);
42
        $consumer->setIdleTimeout(5);
43
44
        static::assertSame($connection, $consumer->getConnection());
45
        static::assertSame($callback, $consumer->getCallback());
46
        static::assertSame($queueOptions, $consumer->getQueueOptions());
47
        static::assertSame($exchangeOptions, $consumer->getExchangeOptions());
48
        static::assertFalse($consumer->isAutoSetupFabricEnabled());
49
        static::assertEquals(5, $consumer->getIdleTimeout());
50
    }
51
52
    public function testSetupFabric()
53
    {
54
        $connection = static::getMockBuilder(AbstractConnection::class)
55
            ->disableOriginalConstructor()
56
            ->getMockForAbstractClass();
57
        $channel = static::getMockBuilder(AMQPChannel::class)
58
            ->disableOriginalConstructor()
59
            ->getMock();
60
61
        $queueOptions = new QueueOptions();
62
        $queueOptions->setName('foo');
63
        $exchangeOptions = new ExchangeOptions();
64
65
        $exchangeBindOptions = new ExchangeOptions();
66
        $exchangeBind = new ExchangeBind();
67
        $exchangeBind->setExchange($exchangeBindOptions);
68
        $exchangeOptions->setExchangeBinds([$exchangeBind]);
69
70
        /* @var AbstractConnection $connection */
71
        $consumer = new Consumer($connection, $channel);
72
        $consumer->setQueueOptions($queueOptions);
73
        $consumer->setExchangeOptions($exchangeOptions);
74
75
        $channel->expects(static::exactly(1))
76
            ->method('exchange_bind');
77
        $channel->expects(static::exactly(2))
78
            ->method('exchange_declare');
79
        $channel->expects(static::once())
80
            ->method('queue_declare');
81
82
        static::assertSame($consumer, $consumer->setupFabric());
83
    }
84
85
    public function testSetupFabricWithEmptyQueueName()
86
    {
87
        $connection = static::getMockBuilder(AbstractConnection::class)
88
            ->disableOriginalConstructor()
89
            ->getMockForAbstractClass();
90
        $channel = static::getMockBuilder(AMQPChannel::class)
91
            ->disableOriginalConstructor()
92
            ->getMock();
93
94
        $queueOptions = new QueueOptions();
95
        $exchangeOptions = new ExchangeOptions();
96
        $exchangeOptions->setDeclare(false);
97
98
        /* @var AbstractConnection $connection */
99
        $consumer = new Consumer($connection, $channel);
100
        $consumer->setQueueOptions($queueOptions);
101
        $consumer->setExchangeOptions($exchangeOptions);
102
103
        $channel->expects(static::never())
104
            ->method('exchange_bind');
105
        $channel->expects(static::never())
106
            ->method('exchange_declare');
107
        $channel->expects(static::never())
108
            ->method('queue_declare');
109
110
        static::assertSame($consumer, $consumer->setupFabric());
111
    }
112
113
    public function testSetupFabricWithoutQueueOptions()
114
    {
115
        $connection = static::getMockBuilder(AbstractConnection::class)
116
            ->disableOriginalConstructor()
117
            ->getMockForAbstractClass();
118
        $channel = static::getMockBuilder(AMQPChannel::class)
119
            ->disableOriginalConstructor()
120
            ->getMock();
121
122
        $exchangeOptions = new ExchangeOptions();
123
        $exchangeOptions->setDeclare(false);
124
125
        /* @var AbstractConnection $connection */
126
        $consumer = new Consumer($connection, $channel);
127
        $consumer->setExchangeOptions($exchangeOptions);
128
129
        $channel->expects(static::never())
130
            ->method('exchange_bind');
131
        $channel->expects(static::never())
132
            ->method('exchange_declare');
133
        $channel->expects(static::never())
134
            ->method('queue_declare');
135
136
        static::assertSame($consumer, $consumer->setupFabric());
137
    }
138
139
    public function testSetupFabricWithNoDeclareExchange()
140
    {
141
        $connection = static::getMockBuilder(AbstractConnection::class)
142
            ->disableOriginalConstructor()
143
            ->getMockForAbstractClass();
144
        $channel = static::getMockBuilder(AMQPChannel::class)
145
            ->disableOriginalConstructor()
146
            ->getMock();
147
148
        $exchangeOptions = new ExchangeOptions();
149
        $exchangeOptions->setDeclare(false);
150
151
        /* @var AbstractConnection $connection */
152
        $consumer = new Consumer($connection, $channel);
153
        $consumer->setExchangeOptions($exchangeOptions);
154
155
        $channel->expects(static::never())
156
            ->method('exchange_bind');
157
        $channel->expects(static::never())
158
            ->method('exchange_declare');
159
        $channel->expects(static::never())
160
            ->method('queue_declare');
161
162
        static::assertSame($consumer, $consumer->setupFabric());
163
    }
164
165
    /**
166
     * @dataProvider processMessageProvider
167
     * @param $response
168
     * @param $method
169
     * @param $paramsExpects
170
     */
171
    public function testProcessMessage($response, $method, $paramsExpects)
172
    {
173
        $connection = static::getMockBuilder(AbstractConnection::class)
174
            ->disableOriginalConstructor()
175
            ->getMockForAbstractClass();
176
        $channel = static::getMockBuilder(AMQPChannel::class)
177
            ->disableOriginalConstructor()
178
            ->getMock();
179
        /** @var AMQPMessage $message */
180
        $message = static::getMockBuilder(AMQPMessage::class)
181
            ->disableOriginalConstructor()
182
            ->getMock();
183
        $message->delivery_info = [
184
            'channel' => $channel,
185
            'delivery_tag' => 'foo',
186
        ];
187
188
        /* @var AbstractConnection $connection */
189
        $consumer = new Consumer($connection, $channel);
190
        $consumer->setCallback(function () use ($response) {
191
            return $response;
192
        });
193
194
        $expect = $channel->expects(static::once())
195
            ->method($method);
196
        call_user_func_array([$expect, 'with'], $paramsExpects);
197
198
        $consumer->processMessage($message);
199
    }
200
201
    public function testPurge()
202
    {
203
        $connection = static::getMockBuilder(AbstractConnection::class)
204
            ->disableOriginalConstructor()
205
            ->getMockForAbstractClass();
206
        $channel = static::getMockBuilder(AMQPChannel::class)
207
            ->disableOriginalConstructor()
208
            ->getMock();
209
        $queueOptions = new QueueOptions();
210
        $queueOptions->setName('foo');
211
212
        $channel->expects(static::once())
213
            ->method('queue_purge')
214
            ->with(static::equalTo('foo'), static::equalTo(true));
215
216
        /* @var AbstractConnection $connection */
217
        $consumer = new Consumer($connection, $channel);
218
        $consumer->setQueueOptions($queueOptions);
219
        $consumer->purgeQueue();
220
    }
221
222
    public function testStart()
223
    {
224
        $connection = static::getMockBuilder(AbstractConnection::class)
225
            ->disableOriginalConstructor()
226
            ->getMockForAbstractClass();
227
        /** @var AMQPChannel|\PHPUnit_Framework_MockObject_MockObject $channel */
228
        $channel = static::getMockBuilder(AMQPChannel::class)
229
            ->disableOriginalConstructor()
230
            ->getMock();
231
        $exchangeOptions = new ExchangeOptions();
232
        $exchangeOptions->setName('foo');
233
        $queueOptions = new QueueOptions();
234
        $queueOptions->setName('foo');
235
236
        $callbacks = range(0, 2);
237
        $channel->callbacks = $callbacks;
238
        $channel->expects(static::exactly(count($callbacks)))
239
            ->method('wait')
240
            ->willReturnCallback(function () use ($channel) {
241
                array_shift($channel->callbacks);
242
243
                return true;
244
            });
245
246
        $channel->expects(static::once())
247
            ->method('basic_consume');
248
        $channel->expects(static::exactly(count($callbacks)))
249
            ->method('wait');
250
251
        /* @var AbstractConnection $connection */
252
        $consumer = new Consumer($connection, $channel);
253
        $consumer->setExchangeOptions($exchangeOptions);
254
        $consumer->setQueueOptions($queueOptions);
255
        $consumer->start();
256
    }
257
258
    public function testConsume()
259
    {
260
        $connection = static::getMockBuilder(AbstractConnection::class)
261
            ->disableOriginalConstructor()
262
            ->getMockForAbstractClass();
263
        /** @var AMQPChannel|\PHPUnit_Framework_MockObject_MockObject $channel */
264
        $channel = static::getMockBuilder(AMQPChannel::class)
265
            ->disableOriginalConstructor()
266
            ->getMock();
267
        $exchangeOptions = new ExchangeOptions();
268
        $exchangeOptions->setName('foo');
269
        $queueOptions = new QueueOptions();
270
        $queueOptions->setName('foo');
271
272
        $callbacks = range(0, 2);
273
        $channel->callbacks = $callbacks;
274
        $channel->expects(static::exactly(count($callbacks)))
275
            ->method('wait')
276
            ->willReturnCallback(function () use ($channel) {
277
                array_shift($channel->callbacks);
278
279
                return true;
280
            });
281
282
        $channel->expects(static::once())
283
            ->method('basic_consume');
284
        $channel->expects(static::exactly(count($callbacks)))
285
            ->method('wait');
286
287
        /* @var AbstractConnection $connection */
288
        $consumer = new Consumer($connection, $channel);
289
        $consumer->setExchangeOptions($exchangeOptions);
290
        $consumer->setQueueOptions($queueOptions);
291
        $consumer->consume();
292
    }
293
294
    public function testConsumeWithStop()
295
    {
296
        $connection = static::getMockBuilder(AbstractConnection::class)
297
            ->disableOriginalConstructor()
298
            ->getMockForAbstractClass();
299
        /** @var AMQPChannel|\PHPUnit_Framework_MockObject_MockObject $channel */
300
        $channel = static::getMockBuilder(AMQPChannel::class)
301
            ->disableOriginalConstructor()
302
            ->getMock();
303
304
        /* @var AbstractConnection $connection */
305
        $consumer = new Consumer($connection, $channel);
306
307
        $exchangeOptions = new ExchangeOptions();
308
        $exchangeOptions->setName('foo');
309
        $queueOptions = new QueueOptions();
310
        $queueOptions->setName('foo');
311
312
        $callbacks = range(0, 2);
313
        $channel->callbacks = $callbacks;
314
        $channel->expects(static::atLeast(1))
315
            ->method('wait')
316
            ->willReturnCallback(function () use ($channel, $consumer) {
317
                array_shift($channel->callbacks);
318
                $consumer->forceStopConsumer();
319
320
                return true;
321
            });
322
323
        $channel->expects(static::once())
324
            ->method('basic_consume');
325
        $channel->expects(static::once())
326
            ->method('basic_cancel')
327
            ->willReturnCallback(function () use ($channel) {
328
                $channel->callbacks = [];
329
330
                return true;
331
            });
332
        $channel->expects(static::atLeast(1))
333
            ->method('wait');
334
335
        $consumer->setExchangeOptions($exchangeOptions);
336
        $consumer->setQueueOptions($queueOptions);
337
        $consumer->consume();
338
    }
339
340
    /**
341
     * @expectedException \InvalidArgumentException
342
     */
343
    public function testSetCallbackWithInvalidValue()
344
    {
345
        $connection = static::getMockBuilder(AbstractConnection::class)
346
            ->disableOriginalConstructor()
347
            ->getMockForAbstractClass();
348
        $channel = static::getMockBuilder(AMQPChannel::class)
349
            ->disableOriginalConstructor()
350
            ->getMock();
351
352
        /* @var AbstractConnection $connection */
353
        $consumer = new Consumer($connection, $channel);
354
355
        $consumer->setCallback('string');
356
    }
357
358
    /**
359
     * @return array
360
     */
361
    public function processMessageProvider()
362
    {
363
        return [
364
            [
365
                ConsumerInterface::MSG_ACK,
366
                'basic_ack',
367
                [
368
                    static::equalTo('foo'),
369
                ],
370
            ],
371
            [
372
                ConsumerInterface::MSG_REJECT,
373
                'basic_reject',
374
                [
375
                    static::equalTo('foo'),
376
                    static::equalTo(false),
377
                ],
378
            ],
379
            [
380
                ConsumerInterface::MSG_REJECT_REQUEUE,
381
                'basic_reject',
382
                [
383
                    static::equalTo('foo'),
384
                    static::equalTo(true),
385
                ],
386
            ],
387
            [
388
                ConsumerInterface::MSG_SINGLE_NACK_REQUEUE,
389
                'basic_nack',
390
                [
391
                    static::equalTo('foo'),
392
                    static::equalTo(false),
393
                    static::equalTo(true),
394
                ],
395
            ],
396
        ];
397
    }
398
}
399